還能再漲23%!AI寵兒NVIDIA成大摩明年首選AMD FSR 4.0將與RX 9070 XT顯卡同步登場羅永浩細(xì)紅線最新進展,暫別AR,迎來AI Jarvis構(gòu)建堅實數(shù)據(jù)地基,南京打造可信數(shù)據(jù)空間引領(lǐng)數(shù)字城市建設(shè)下單前先比價不花冤枉錢 同款圖書京東價低于抖音6折日媒感慨中國電動汽車/智駕遙遙領(lǐng)先:本田、日產(chǎn)、三菱合并也沒戲消委會吹風(fēng)機品質(zhì)檢測結(jié)果揭曉 徠芬獨占鰲頭 共話新質(zhì)營銷力,2024梅花數(shù)據(jù)峰會圓滿落幕索尼影像專業(yè)服務(wù) PRO Support 升級,成為會員至少需注冊 2 臺 α 全畫幅相機、3 支 G 大師鏡頭消息稱vivo加碼電池軍備競賽:6500mAh 旗艦機+7500mAh中端機寶馬M8雙門轎跑車明年年初將停產(chǎn),后續(xù)無2026款車型比亞迪:2025 款漢家族車型城市領(lǐng)航智駕功能開啟內(nèi)測雷神預(yù)告2025年首次出席CES 將發(fā)布三款不同技術(shù)原理智能眼鏡realme真我全球首發(fā)聯(lián)發(fā)科天璣 8400 耐玩戰(zhàn)神共創(chuàng)計劃iQOO Z9 Turbo長續(xù)航版手機被曝電池加大到6400mAh,搭驍龍 8s Gen 3處理器普及放緩 銷量大跌:曝保時捷將重新評估電動汽車計劃來京東參與榮耀Magic7 RSR 保時捷設(shè)計預(yù)售 享365天只換不修國補期間電視迎來換機潮,最暢銷MiniLED品牌花落誰家?美團旗下微信社群團購業(yè)務(wù)“團買買”宣布年底停運消息稱微軟正與第三方廠商洽談,試圖合作推出Xbox游戲掌機設(shè)備
  • 首頁 > 數(shù)據(jù)存儲頻道 > 數(shù)據(jù)庫頻道 > 操作系統(tǒng)與開源

    開源項目:用環(huán)信MQTT實現(xiàn)"世界頻道"只需5分鐘【附源碼】

    2022年04月27日 16:09:14   來源:中文科技資訊

      說到“世界頻道”想必大家都不陌生,常見的如王者榮耀的世界廣播搖人組隊以及最近興起的Discord社區(qū)交友等等。究其目的就是在應(yīng)用內(nèi)讓海量用戶可以實時互動。有些開發(fā)者為了實現(xiàn)這種場景會選擇聊天室方案來實現(xiàn),但是這種方式存在一定的局限性,比如聊天室人數(shù)上限、海量消息處理等各種情況。

      當(dāng)然如果有錢有顏,可以直接選擇云廠商產(chǎn)品(比如環(huán)信的聊天室方案和超級社區(qū)),如果有才有time,也可以選擇平替版MQTT實現(xiàn)方案。今天小猿將介紹用環(huán)信MQTT消息云實現(xiàn)應(yīng)用內(nèi)的世界頻道,滿滿干貨,不要錯過~~

      使用MQTT實現(xiàn)世界頻道-Demo效果演示

      協(xié)議優(yōu)勢:

      在介紹具體方案之前,我們先嘮一嘮為啥選擇MQTT協(xié)議。

      輕量級:MQTT本身是物聯(lián)網(wǎng)的連接協(xié)議,專為受限設(shè)備和低帶寬場景使用。所以其代碼占用空間較小,同樣適用于注重SDK大小的移動應(yīng)用領(lǐng)域(比如:游戲領(lǐng)域)。

      易集成:MQTT作為標(biāo)準(zhǔn)開放的消息協(xié)議,經(jīng)過多年演進,已支持30多種開發(fā)語言,10余種SDK,無論何種開發(fā)環(huán)境,都可以快速找到開源SDK。

      高并發(fā):MQTT是輕量級的消息傳輸協(xié)議,2字節(jié)心跳報文,最小化傳輸和連接成本,云廠商broker產(chǎn)品都可支持千萬級并發(fā)接入,適用于高并發(fā)連接場景。

      低成本:MQTT是基于客戶端-服務(wù)器的訂閱/發(fā)布模型,通過服務(wù)器中間件實現(xiàn)消息分發(fā),減少消息復(fù)制成本,快速實現(xiàn)一對多在線推送。

      靈活性:MQTT協(xié)議支持多種消息特性,包括:topic主題層級、消息分級(QoS0,1,2)、遺囑消息、保留消息等,可以靈活實現(xiàn)多種業(yè)務(wù)場景。

      衍生功能:隨著MQTT云服務(wù)的發(fā)展,部分服務(wù)器廠商已支持消息存儲、獲取在線設(shè)備列表、查看歷史消息等衍生功能,降低開發(fā)工作量與消息存儲成本。

      實現(xiàn)方案:

      言歸正傳,上干貨。本次技術(shù)實現(xiàn)方案包含:移動客戶端(Android)、后端服務(wù)(Java)以及MQTT服務(wù)器。這里提一下,MQTT服務(wù)器使用環(huán)信MQTT消息云,使用三方云服務(wù)比較省心,既節(jié)省開發(fā)時間,產(chǎn)品性能也不需要擔(dān)心,現(xiàn)在注冊可以直接使用環(huán)信MQTT消息云超高額度的免費版:每月100并發(fā)連接、300萬消息,完全滿足功能開發(fā)使用。

      客戶端實現(xiàn):

      客戶端實現(xiàn)主要包含以下兩部分:

      底層MQTT業(yè)務(wù)集成:包含引入SDK、MQTT方法封裝、業(yè)務(wù)交互(消息收發(fā))。

      APP上層交互:在APP首頁提供世界頻道入口,實現(xiàn)心情彈幕飄窗(接收)和發(fā)送。

      接下來上底層MQTT業(yè)務(wù)集成代碼。

      引入SDK:

      這一步環(huán)信官方文檔比較明確,就是根據(jù)自己的平臺引入相應(yīng)的mqtt客戶端sdk,這里簡單貼一下AndroidStudio的引入配置

      1// 在根目錄 build.gradle repositories 下加入配置

      2maven { url "https://repo.eclipse.org/content/repositories/paho-snapshots/" }

      3...

      4// 然后加入 MQTT 依賴

      5// MQTT sdk https://docs-im.easemob.com/mqtt/qsandroidsdk

      6implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'

      7implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

      方法封裝

      這里貼一下對mqtt相關(guān)方法的簡單封裝,代碼在vmmqtt模塊兒的MQTTHelper類下:

      1 /**

      2 * Create by lzan13 on 2022/3/22

      3 * 描述:MQTT 幫助類

      4 */

      5 object MQTTHelper {

      6

      7    private var mqttClient: MqttAndroidClient? = null

      8

      9    // 緩存主題集合

      10    private val topicList = mutableListOf()

      11

      12    /**

      13     * 鏈接MQTT

      14     * @param id 用戶 Id

      15     * @param token 用戶鏈接 MQTT 的 Token

      16     * @param topic 需要訂閱的主題,不為空就會在連接成功后進行訂閱

      17     */

      18    fun connect(id: String, token: String, topic: String = "") {

      19        // 處理訂閱主題

      20        if (topic.isNotEmpty()) topicList.add(topic)

      21

      22        // 拼接鏈接地址

      23        val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}"

      24        // 拼接 clientId

      25        val clientId = "${id}@${MQTTConstants.mqttAppId()}"

      26        mqttClient = MqttAndroidClient(VMTools.context, url, clientId)

      27

      28        //連接參數(shù)

      29        val options = MqttConnectOptions()

      30        options.isAutomaticReconnect = true //設(shè)置自動重連

      31        options.isCleanSession = true // 緩存

      32        options.connectionTimeout = CConstants.timeMinute.toInt() // 設(shè)置超時時間,單位:秒

      33        options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包發(fā)送間隔,單位:秒

      34        options.userName = id // 用戶名

      35        options.password = token.toCharArray() // 密碼

      36        options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;

      37        // 設(shè)置MQTT監(jiān)聽

      38        mqttClient?.setCallback(object : MqttCallback {

      39            override fun connectionLost(t: Throwable) {

      40                // 通知鏈接斷開

      41                VMLog.d("MQTT 鏈接斷開 $t")

      42            }

      43

      44            @Throws(Exception::class)

      45            override fun messageArrived(topic: String, message: MqttMessage) {

      46                // 通知收到消息

      47                VMLog.d("MQTT 收到消息:$message")

      48                // 如果未訂閱則直接丟棄

      49                if (!topicList.contains(topic)) return

      50                notifyEvent(topic, String(message.payload))

      51            }

      52

      53            override fun deliveryComplete(token: IMqttDeliveryToken) {}

      54        })

      55        //進行連接

      56        mqttClient?.connect(options, null, object : IMqttActionListener {

      57            override fun onSuccess(token: IMqttToken) {

      58                VMLog.d("MQTT 鏈接成功")

      59                // 鏈接成功,循環(huán)訂閱緩存的主題

      60                topicList.forEach { subscribe(it) }

      61            }

      62

      63            override fun onFailure(token: IMqttToken, t: Throwable) {

      64                VMLog.d("MQTT 鏈接失敗 $t")

      65            }

      66        })

      67    }

      68

      69    /**

      70     * 訂閱主題

      71     * @param topic 主題

      72     */

      73    fun subscribe(topic: String) {

      74        if (!topicList.contains(topic)) {

      75            topicList.add(topic)

      76        }

      77        try {

      78            //連接成功后訂閱主題

      79            mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener {

      80                override fun onSuccess(token: IMqttToken) {

      81                    VMLog.d("MQTT 訂閱成功 $topic")

      82                }

      83

      84                override fun onFailure(token: IMqttToken, t: Throwable) {

      85                    VMLog.d("MQTT 訂閱失敗 $topic $t")

      86                }

      87            })

      88        } catch (e: MqttException) {

      89            e.printStackTrace()

      90        }

      91    }

      92

      93    /**

      94     * 取消訂閱

      95     * @param topic 主題

      96     */

      97    fun unsubscribe(topic: String) {

      98        if (topicList.contains(topic)) {

      99            topicList.remove(topic)

      100        }

      101        try {

      102            mqttClient?.unsubscribe(topic)

      103        } catch (e: MqttException) {

      104            e.printStackTrace()

      105        }

      106    }

      107

      108    /**

      109     * 發(fā)送 MQTT 消息

      110     * @param topic 主題

      111     * @param content 內(nèi)容

      112     */

      113    fun sendMsg(topic: String, content: String) {

      114        val msg = MqttMessage()

      115        msg.payload = content.encodeToByteArray() // 設(shè)置消息內(nèi)容

      116        msg.qos = 0 //設(shè)置消息發(fā)送質(zhì)量,可為0,1,2.

      117        // 設(shè)置消息的topic,并發(fā)送。

      118        mqttClient?.publish(topic, msg, null, object : IMqttActionListener {

      119            override fun onSuccess(asyncActionToken: IMqttToken) {

      120                VMLog.d("MQTT 消息發(fā)送成功")

      121            }

      122

      123            override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {

      124                VMLog.d("MQTT 消息發(fā)送失敗 ${exception.message}")

      125            }

      126        })

      127    }

      128

      129    /**

      130     * 通知 MQTT 事件

      131     */

      132    private fun notifyEvent(topic: String, data: String) {

      133        LDEventBus.post(topic, data)

      134    }

      135 }

      業(yè)務(wù)交互

      和業(yè)務(wù)相關(guān)的就是在啟動APP后,使用后端服務(wù)器返回的鑒權(quán)token信息及連接封裝接口登錄環(huán)信通MQTT服務(wù)器,登錄成功后訂閱主題并監(jiān)聽消息。

      1// 請求 token 成功后,調(diào)用MQTTHelper.connect()鏈接 MQTT 服務(wù)器,這里會同時傳遞監(jiān)聽的主題

      2MQTTHelper.connect(mUser.id, token, MQTTConstants.Topic.newMatchInfo)

      3

      4/**

      5 * 發(fā)送匹配信息

      6 */

      7private fun sendMatchInfo() {

      8    if (selfMatch.user.nickname.isEmpty()) return

      9    // 提交自己的匹配信息到服務(wù)器

      10    mViewModel.submitMatch(selfMatch)

      11    val json = JSONObject()

      12    json.put("content", selfMatch.content)

      13    json.put("emotion", selfMatch.emotion)

      14    json.put("gender", selfMatch.gender)

      15    json.put("type", selfMatch.type)

      16    val jsonUser = JSONObject()

      17    jsonUser.put("avatar", mUser.avatar)

      18    jsonUser.put("id", mUser.id)

      19    jsonUser.put("nickname", mUser.nickname)

      20    jsonUser.put("username", mUser.username)

      21    json.put("user", jsonUser)

      22    MQTTHelper.sendMsg(MQTTConstants.Topic.newMatchInfo, json.toString())

      23}

      24

      25// 監(jiān)聽消息這里使用了一個事件總線進行通知,在上邊封裝 MQTTHelper 發(fā)送消息也使用了這個,

      26// 訂閱 MQTT 事件

      27LDEventBus.observe(this, MQTTConstants.Topic.newMatchInfo, String::class.java) {

      28        val match = JsonUtils.fromJson(it, Match::class.java)

      29        // 這里收到匹配信息之后就增加一條彈幕

      30    addBarrage(match)

      31}

      后端服務(wù)實現(xiàn)

      接下來介紹后端服務(wù)實現(xiàn),主要包含以下兩部分:

      配置連接信息:配置環(huán)信MQTT消息云連接信息。

      獲取鑒權(quán)信息:獲取客戶端連接需要的鑒權(quán)信息。

      配置連接信息

      配置部分只需要按照環(huán)信后臺配置信息進行替換就好,配置在config目錄下的config.xxx.json文件內(nèi)

      1/**

      2 * Easemob MQTT 配置 https://console.easemob.com/app/generalizeMsg/overviewService

      3 */

      4config.mqtt = {

      5    host: 'mqtt host', // MQTT 鏈接地址

      6  appId: 'appId', // MQTT AppId

      7  port: [ 1883, 1884, 80, 443 ], // MQTT 端口 1883(mqtt),1884(mqtts),80(ws),443(wss)

      8  restHost: 'https://api.cn1.mqtt.chat/app/8igtc0', // MQTT 服務(wù) API 地址

      9  clientId: 'client id', // 替換環(huán)信后臺 clientId

      10  clientSecret: 'client secret', // 替換環(huán)信后臺 clientSecret

      11};

      獲取鑒權(quán)信息

      這里主要是獲取客戶端連接所需要的鑒權(quán)信息token,為了安全token肯定是要放在服務(wù)器端生成的,廢話不多說,上代碼:

      1/**

      2 * Create by lzan13 on 2022/3/22

      3 * 描述:MQTT 幫助類

      4 */

      5object MQTTHelper {

      6

      7    private var mqttClient: MqttAndroidClient? = null

      8

      9    // 緩存主題集合

      10    private val topicList = mutableListOf()

      11

      12    /**

      13     * 鏈接MQTT

      14     * @param id 用戶 Id

      15     * @param token 用戶鏈接 MQTT 的 Token

      16     * @param topic 需要訂閱的主題,不為空就會在連接成功后進行訂閱

      17     */

      18    fun connect(id: String, token: String, topic: String = "") {

      19        // 處理訂閱主題

      20        if (topic.isNotEmpty()) topicList.add(topic)

      21

      22        // 拼接鏈接地址

      23        val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}"

      24        // 拼接 clientId

      25        val clientId = "${id}@${MQTTConstants.mqttAppId()}"

      26        mqttClient = MqttAndroidClient(VMTools.context, url, clientId)

      27

      28        //連接參數(shù)

      29        val options = MqttConnectOptions()

      30        options.isAutomaticReconnect = true //設(shè)置自動重連

      31        options.isCleanSession = true // 緩存

      32        options.connectionTimeout = CConstants.timeMinute.toInt() // 設(shè)置超時時間,單位:秒

      33        options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包發(fā)送間隔,單位:秒

      34        options.userName = id // 用戶名

      35        options.password = token.toCharArray() // 密碼

      36        options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;

      37        // 設(shè)置MQTT監(jiān)聽

      38        mqttClient?.setCallback(object : MqttCallback {

      39            override fun connectionLost(t: Throwable) {

      40                // 通知鏈接斷開

      41                VMLog.d("MQTT 鏈接斷開 $t")

      42            }

      43

      44            @Throws(Exception::class)

      45            override fun messageArrived(topic: String, message: MqttMessage) {

      46                // 通知收到消息

      47                VMLog.d("MQTT 收到消息:$message")

      48                // 如果未訂閱則直接丟棄

      49                if (!topicList.contains(topic)) return

      50                notifyEvent(topic, String(message.payload))

      51            }

      52

      53            override fun deliveryComplete(token: IMqttDeliveryToken) {}

      54        })

      55        //進行連接

      56        mqttClient?.connect(options, null, object : IMqttActionListener {

      57            override fun onSuccess(token: IMqttToken) {

      58                VMLog.d("MQTT 鏈接成功")

      59                // 鏈接成功,循環(huán)訂閱緩存的主題

      60                topicList.forEach { subscribe(it) }

      61            }

      62

      63            override fun onFailure(token: IMqttToken, t: Throwable) {

      64                VMLog.d("MQTT 鏈接失敗 $t")

      65            }

      66        })

      67    }

      68

      69    /**

      70     * 訂閱主題

      71     * @param topic 主題

      72     */

      73    fun subscribe(topic: String) {

      74        if (!topicList.contains(topic)) {

      75            topicList.add(topic)

      76        }

      77        try {

      78            //連接成功后訂閱主題

      79            mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener {

      80                override fun onSuccess(token: IMqttToken) {

      81                    VMLog.d("MQTT 訂閱成功 $topic")

      82                }

      83

      84                override fun onFailure(token: IMqttToken, t: Throwable) {

      85                    VMLog.d("MQTT 訂閱失敗 $topic $t")

      86                }

      87            })

      88        } catch (e: MqttException) {

      89            e.printStackTrace()

      90        }

      91    }

      92

      93    /**

      94     * 取消訂閱

      95     * @param topic 主題

      96     */

      97    fun unsubscribe(topic: String) {

      98        if (topicList.contains(topic)) {

      99            topicList.remove(topic)

      100        }

      101        try {

      102            mqttClient?.unsubscribe(topic)

      103        } catch (e: MqttException) {

      104            e.printStackTrace()

      105        }

      106    }

      107

      108    /**

      109     * 發(fā)送 MQTT 消息

      110     * @param topic 主題

      111     * @param content 內(nèi)容

      112     */

      113    fun sendMsg(topic: String, content: String) {

      114        val msg = MqttMessage()

      115        msg.payload = content.encodeToByteArray() // 設(shè)置消息內(nèi)容

      116        msg.qos = 0 //設(shè)置消息發(fā)送質(zhì)量,可為0,1,2.

      117        // 設(shè)置消息的topic,并發(fā)送。

      118        mqttClient?.publish(topic, msg, null, object : IMqttActionListener {

      119            override fun onSuccess(asyncActionToken: IMqttToken) {

      120                VMLog.d("MQTT 消息發(fā)送成功")

      121            }

      122

      123            override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {

      124                VMLog.d("MQTT 消息發(fā)送失敗 ${exception.message}")

      125            }

      126        })

      127    }

      128

      129    /**

      130     * 通知 MQTT 事件

      131     */

      132    private fun notifyEvent(topic: String, data: String) {

      133        LDEventBus.post(topic, data)

      134    }

      135}

      源碼地址

      核心代碼就這么多,不超過500行,這里沒有直接調(diào)用環(huán)信歷史消息接口獲取消息存儲記錄,后續(xù)可以在進行改良,簡化實現(xiàn)流程。源碼鏈接附上,配合使用效果更佳。

      服務(wù)端github源碼:

      https://github.com/lzan13/vmtemplateserver

      客戶端github源碼:

      https://gitee.com/lzan13/VMTemplateAndroid

      寫在最后

      MQTT協(xié)議資源占用小,并發(fā)連接高,集成簡單,特別適用于高頻數(shù)據(jù)交互場景,比如:游戲的世界廣場、視頻平臺彈幕等等等等,歡迎各位小伙伴集思廣益,基于MQTT服務(wù)實現(xiàn)更多的業(yè)務(wù)場景,享受技術(shù)帶來的便利與快樂。

      文章內(nèi)容僅供閱讀,不構(gòu)成投資建議,請謹(jǐn)慎對待。投資者據(jù)此操作,風(fēng)險自擔(dān)。

    [No. X016-1]
    分享到微信

    即時

    新聞

    明火炊具市場:三季度健康屬性貫穿全類目

    奧維云網(wǎng)(AVC)推總數(shù)據(jù)顯示,2024年1-9月明火炊具線上零售額94.2億元,同比增加3.1%,其中抖音渠道表現(xiàn)優(yōu)異,同比有14%的漲幅,傳統(tǒng)電商略有下滑,同比降低2.3%。

    企業(yè)IT

    重慶創(chuàng)新公積金應(yīng)用,“區(qū)塊鏈+政務(wù)服務(wù)”顯成效

    “以前都要去窗口辦,一套流程下來都要半個月了,現(xiàn)在方便多了!”打開“重慶公積金”微信小程序,按照提示流程提交相關(guān)材料,僅幾秒鐘,重慶市民曾某的賬戶就打進了21600元。

    3C消費

    華碩ProArt創(chuàng)藝27 Pro PA279CRV顯示器,高能實力,創(chuàng)

    華碩ProArt創(chuàng)藝27 Pro PA279CRV顯示器,憑借其優(yōu)秀的性能配置和精準(zhǔn)的色彩呈現(xiàn)能力,為您的創(chuàng)作工作帶來實質(zhì)性的幫助,雙十一期間低至2799元,性價比很高,簡直是創(chuàng)作者們的首選。

    研究

    中國信通院羅松:深度解讀《工業(yè)互聯(lián)網(wǎng)標(biāo)識解析體系

    9月14日,2024全球工業(yè)互聯(lián)網(wǎng)大會——工業(yè)互聯(lián)網(wǎng)標(biāo)識解析專題論壇在沈陽成功舉辦。