開源項(xiàng)目:用環(huán)信MQTT實(shí)現(xiàn)"世界頻道"只需5分鐘【附源碼】

  說到“世界頻道”想必大家都不陌生,常見的如王者榮耀的世界廣播搖人組隊(duì)以及最近興起的Discord社區(qū)交友等等。究其目的就是在應(yīng)用內(nèi)讓海量用戶可以實(shí)時(shí)互動。有些開發(fā)者為了實(shí)現(xiàn)這種場景會選擇聊天室方案來實(shí)現(xiàn),但是這種方式存在一定的局限性,比如聊天室人數(shù)上限、海量消息處理等各種情況。

  當(dāng)然如果有錢有顏,可以直接選擇云廠商產(chǎn)品(比如環(huán)信的聊天室方案和超級社區(qū)),如果有才有time,也可以選擇平替版MQTT實(shí)現(xiàn)方案。今天小猿將介紹用環(huán)信MQTT消息云實(shí)現(xiàn)應(yīng)用內(nèi)的世界頻道,滿滿干貨,不要錯過~~

  使用MQTT實(shí)現(xiàn)世界頻道-Demo效果演示

  協(xié)議優(yōu)勢:

  在介紹具體方案之前,我們先嘮一嘮為啥選擇MQTT協(xié)議。

  輕量級:MQTT本身是物聯(lián)網(wǎng)的連接協(xié)議,專為受限設(shè)備和低帶寬場景使用。所以其代碼占用空間較小,同樣適用于注重SDK大小的移動應(yīng)用領(lǐng)域(比如:游戲領(lǐng)域)。

  易集成:MQTT作為標(biāo)準(zhǔn)開放的消息協(xié)議,經(jīng)過多年演進(jìn),已支持30多種開發(fā)語言,10余種SDK,無論何種開發(fā)環(huán)境,都可以快速找到開源SDK。

  高并發(fā):MQTT是輕量級的消息傳輸協(xié)議,2字節(jié)心跳報(bào)文,最小化傳輸和連接成本,云廠商broker產(chǎn)品都可支持千萬級并發(fā)接入,適用于高并發(fā)連接場景。

  低成本:MQTT是基于客戶端-服務(wù)器的訂閱/發(fā)布模型,通過服務(wù)器中間件實(shí)現(xiàn)消息分發(fā),減少消息復(fù)制成本,快速實(shí)現(xiàn)一對多在線推送。

  靈活性:MQTT協(xié)議支持多種消息特性,包括:topic主題層級、消息分級(QoS0,1,2)、遺囑消息、保留消息等,可以靈活實(shí)現(xiàn)多種業(yè)務(wù)場景。

  衍生功能:隨著MQTT云服務(wù)的發(fā)展,部分服務(wù)器廠商已支持消息存儲、獲取在線設(shè)備列表、查看歷史消息等衍生功能,降低開發(fā)工作量與消息存儲成本。

  實(shí)現(xiàn)方案:

  言歸正傳,上干貨。本次技術(shù)實(shí)現(xiàn)方案包含:移動客戶端(Android)、后端服務(wù)(Java)以及MQTT服務(wù)器。這里提一下,MQTT服務(wù)器使用環(huán)信MQTT消息云,使用三方云服務(wù)比較省心,既節(jié)省開發(fā)時(shí)間,產(chǎn)品性能也不需要擔(dān)心,現(xiàn)在注冊可以直接使用環(huán)信MQTT消息云超高額度的免費(fèi)版:每月100并發(fā)連接、300萬消息,完全滿足功能開發(fā)使用。

  客戶端實(shí)現(xiàn):

  客戶端實(shí)現(xiàn)主要包含以下兩部分:

  底層MQTT業(yè)務(wù)集成:包含引入SDK、MQTT方法封裝、業(yè)務(wù)交互(消息收發(fā))。

  APP上層交互:在APP首頁提供世界頻道入口,實(shí)現(xiàn)心情彈幕飄窗(接收)和發(fā)送。

  接下來上底層MQTT業(yè)務(wù)集成代碼。

  引入SDK:

  這一步環(huán)信官方文檔比較明確,就是根據(jù)自己的平臺引入相應(yīng)的mqtt客戶端sdk,這里簡單貼一下AndroidStudio的引入配置

  1// 在根目錄 build.gradle repositories 下加入配置

  2maven { url "https://repo.eclipse.org/content/repositories/paho-snapshots/" }

  3...

  4// 然后加入 MQTT 依賴

  5// MQTT sdk https://docs-im.easemob.com/mqtt/qsandroidsdk

  6implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'

  7implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

  方法封裝

  這里貼一下對mqtt相關(guān)方法的簡單封裝,代碼在vmmqtt模塊兒的MQTTHelper類下:

  1 /**

  2 * Create by lzan13 on 2022/3/22

  3 * 描述:MQTT 幫助類

  4 */

  5 object MQTTHelper {

  6

  7 private var mqttClient: MqttAndroidClient? = null

  8

  9 // 緩存主題集合

  10 private val topicList = mutableListOf()

  11

  12 /**

  13 * 鏈接MQTT

  14 * @param id 用戶 Id

  15 * @param token 用戶鏈接 MQTT 的 Token

  16 * @param topic 需要訂閱的主題,不為空就會在連接成功后進(jìn)行訂閱

  17 */

  18 fun connect(id: String, token: String, topic: String = "") {

  19 // 處理訂閱主題

  20 if (topic.isNotEmpty()) topicList.add(topic)

  21

  22 // 拼接鏈接地址

  23 val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}"

  24 // 拼接 clientId

  25 val clientId = "${id}@${MQTTConstants.mqttAppId()}"

  26 mqttClient = MqttAndroidClient(VMTools.context, url, clientId)

  27

  28 //連接參數(shù)

  29 val options = MqttConnectOptions()

  30 options.isAutomaticReconnect = true //設(shè)置自動重連

  31 options.isCleanSession = true // 緩存

  32 options.connectionTimeout = CConstants.timeMinute.toInt() // 設(shè)置超時(shí)時(shí)間,單位:秒

  33 options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包發(fā)送間隔,單位:秒

  34 options.userName = id // 用戶名

  35 options.password = token.toCharArray() // 密碼

  36 options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;

  37 // 設(shè)置MQTT監(jiān)聽

  38 mqttClient?.setCallback(object : MqttCallback {

  39 override fun connectionLost(t: Throwable) {

  40 // 通知鏈接斷開

  41 VMLog.d("MQTT 鏈接斷開 $t")

  42 }

  43

  44 @Throws(Exception::class)

  45 override fun messageArrived(topic: String, message: MqttMessage) {

  46 // 通知收到消息

  47 VMLog.d("MQTT 收到消息:$message")

  48 // 如果未訂閱則直接丟棄

  49 if (!topicList.contains(topic)) return

  50 notifyEvent(topic, String(message.payload))

  51 }

  52

  53 override fun deliveryComplete(token: IMqttDeliveryToken) {}

  54 })

  55 //進(jìn)行連接

  56 mqttClient?.connect(options, null, object : IMqttActionListener {

  57 override fun onSuccess(token: IMqttToken) {

  58 VMLog.d("MQTT 鏈接成功")

  59 // 鏈接成功,循環(huán)訂閱緩存的主題

  60 topicList.forEach { subscribe(it) }

  61 }

  62

  63 override fun onFailure(token: IMqttToken, t: Throwable) {

  64 VMLog.d("MQTT 鏈接失敗 $t")

  65 }

  66 })

  67 }

  68

  69 /**

  70 * 訂閱主題

  71 * @param topic 主題

  72 */

  73 fun subscribe(topic: String) {

  74 if (!topicList.contains(topic)) {

  75 topicList.add(topic)

  76 }

  77 try {

  78 //連接成功后訂閱主題

  79 mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener {

  80 override fun onSuccess(token: IMqttToken) {

  81 VMLog.d("MQTT 訂閱成功 $topic")

  82 }

  83

  84 override fun onFailure(token: IMqttToken, t: Throwable) {

  85 VMLog.d("MQTT 訂閱失敗 $topic $t")

  86 }

  87 })

  88 } catch (e: MqttException) {

  89 e.printStackTrace()

  90 }

  91 }

  92

  93 /**

  94 * 取消訂閱

  95 * @param topic 主題

  96 */

  97 fun unsubscribe(topic: String) {

  98 if (topicList.contains(topic)) {

  99 topicList.remove(topic)

  100 }

  101 try {

  102 mqttClient?.unsubscribe(topic)

  103 } catch (e: MqttException) {

  104 e.printStackTrace()

  105 }

  106 }

  107

  108 /**

  109 * 發(fā)送 MQTT 消息

  110 * @param topic 主題

  111 * @param content 內(nèi)容

  112 */

  113 fun sendMsg(topic: String, content: String) {

  114 val msg = MqttMessage()

  115 msg.payload = content.encodeToByteArray() // 設(shè)置消息內(nèi)容

  116 msg.qos = 0 //設(shè)置消息發(fā)送質(zhì)量,可為0,1,2.

  117 // 設(shè)置消息的topic,并發(fā)送。

  118 mqttClient?.publish(topic, msg, null, object : IMqttActionListener {

  119 override fun onSuccess(asyncActionToken: IMqttToken) {

  120 VMLog.d("MQTT 消息發(fā)送成功")

  121 }

  122

  123 override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {

  124 VMLog.d("MQTT 消息發(fā)送失敗 ${exception.message}")

  125 }

  126 })

  127 }

  128

  129 /**

  130 * 通知 MQTT 事件

  131 */

  132 private fun notifyEvent(topic: String, data: String) {

  133 LDEventBus.post(topic, data)

  134 }

  135 }

  業(yè)務(wù)交互

  和業(yè)務(wù)相關(guān)的就是在啟動APP后,使用后端服務(wù)器返回的鑒權(quán)token信息及連接封裝接口登錄環(huán)信通MQTT服務(wù)器,登錄成功后訂閱主題并監(jiān)聽消息。

  1// 請求 token 成功后,調(diào)用MQTTHelper.connect()鏈接 MQTT 服務(wù)器,這里會同時(shí)傳遞監(jiān)聽的主題

  2MQTTHelper.connect(mUser.id, token, MQTTConstants.Topic.newMatchInfo)

  3

  4/**

  5 * 發(fā)送匹配信息

  6 */

  7private fun sendMatchInfo() {

  8 if (selfMatch.user.nickname.isEmpty()) return

  9 // 提交自己的匹配信息到服務(wù)器

  10 mViewModel.submitMatch(selfMatch)

  11 val json = JSONObject()

  12 json.put("content", selfMatch.content)

  13 json.put("emotion", selfMatch.emotion)

  14 json.put("gender", selfMatch.gender)

  15 json.put("type", selfMatch.type)

  16 val jsonUser = JSONObject()

  17 jsonUser.put("avatar", mUser.avatar)

  18 jsonUser.put("id", mUser.id)

  19 jsonUser.put("nickname", mUser.nickname)

  20 jsonUser.put("username", mUser.username)

  21 json.put("user", jsonUser)

  22 MQTTHelper.sendMsg(MQTTConstants.Topic.newMatchInfo, json.toString())

  23}

  24

  25// 監(jiān)聽消息這里使用了一個(gè)事件總線進(jìn)行通知,在上邊封裝 MQTTHelper 發(fā)送消息也使用了這個(gè),

  26// 訂閱 MQTT 事件

  27LDEventBus.observe(this, MQTTConstants.Topic.newMatchInfo, String::class.java) {

  28 val match = JsonUtils.fromJson(it, Match::class.java)

  29 // 這里收到匹配信息之后就增加一條彈幕

  30 addBarrage(match)

  31}

  后端服務(wù)實(shí)現(xiàn)

  接下來介紹后端服務(wù)實(shí)現(xiàn),主要包含以下兩部分:

  配置連接信息:配置環(huán)信MQTT消息云連接信息。

  獲取鑒權(quán)信息:獲取客戶端連接需要的鑒權(quán)信息。

  配置連接信息

  配置部分只需要按照環(huán)信后臺配置信息進(jìn)行替換就好,配置在config目錄下的config.xxx.json文件內(nèi)

  1/**

  2 * Easemob MQTT 配置 https://console.easemob.com/app/generalizeMsg/overviewService

  3 */

  4config.mqtt = {

  5 host: 'mqtt host', // MQTT 鏈接地址

  6 appId: 'appId', // MQTT AppId

  7 port: [ 1883, 1884, 80, 443 ], // MQTT 端口 1883(mqtt),1884(mqtts),80(ws),443(wss)

  8 restHost: 'https://api.cn1.mqtt.chat/app/8igtc0', // MQTT 服務(wù) API 地址

  9 clientId: 'client id', // 替換環(huán)信后臺 clientId

  10 clientSecret: 'client secret', // 替換環(huán)信后臺 clientSecret

  11};

  獲取鑒權(quán)信息

  這里主要是獲取客戶端連接所需要的鑒權(quán)信息token,為了安全token肯定是要放在服務(wù)器端生成的,廢話不多說,上代碼:

  1/**

  2 * Create by lzan13 on 2022/3/22

  3 * 描述:MQTT 幫助類

  4 */

  5object MQTTHelper {

  6

  7 private var mqttClient: MqttAndroidClient? = null

  8

  9 // 緩存主題集合

  10 private val topicList = mutableListOf()

  11

  12 /**

  13 * 鏈接MQTT

  14 * @param id 用戶 Id

  15 * @param token 用戶鏈接 MQTT 的 Token

  16 * @param topic 需要訂閱的主題,不為空就會在連接成功后進(jìn)行訂閱

  17 */

  18 fun connect(id: String, token: String, topic: String = "") {

  19 // 處理訂閱主題

  20 if (topic.isNotEmpty()) topicList.add(topic)

  21

  22 // 拼接鏈接地址

  23 val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}"

  24 // 拼接 clientId

  25 val clientId = "${id}@${MQTTConstants.mqttAppId()}"

  26 mqttClient = MqttAndroidClient(VMTools.context, url, clientId)

  27

  28 //連接參數(shù)

  29 val options = MqttConnectOptions()

  30 options.isAutomaticReconnect = true //設(shè)置自動重連

  31 options.isCleanSession = true // 緩存

  32 options.connectionTimeout = CConstants.timeMinute.toInt() // 設(shè)置超時(shí)時(shí)間,單位:秒

  33 options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包發(fā)送間隔,單位:秒

  34 options.userName = id // 用戶名

  35 options.password = token.toCharArray() // 密碼

  36 options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;

  37 // 設(shè)置MQTT監(jiān)聽

  38 mqttClient?.setCallback(object : MqttCallback {

  39 override fun connectionLost(t: Throwable) {

  40 // 通知鏈接斷開

  41 VMLog.d("MQTT 鏈接斷開 $t")

  42 }

  43

  44 @Throws(Exception::class)

  45 override fun messageArrived(topic: String, message: MqttMessage) {

  46 // 通知收到消息

  47 VMLog.d("MQTT 收到消息:$message")

  48 // 如果未訂閱則直接丟棄

  49 if (!topicList.contains(topic)) return

  50 notifyEvent(topic, String(message.payload))

  51 }

  52

  53 override fun deliveryComplete(token: IMqttDeliveryToken) {}

  54 })

  55 //進(jìn)行連接

  56 mqttClient?.connect(options, null, object : IMqttActionListener {

  57 override fun onSuccess(token: IMqttToken) {

  58 VMLog.d("MQTT 鏈接成功")

  59 // 鏈接成功,循環(huán)訂閱緩存的主題

  60 topicList.forEach { subscribe(it) }

  61 }

  62

  63 override fun onFailure(token: IMqttToken, t: Throwable) {

  64 VMLog.d("MQTT 鏈接失敗 $t")

  65 }

  66 })

  67 }

  68

  69 /**

  70 * 訂閱主題

  71 * @param topic 主題

  72 */

  73 fun subscribe(topic: String) {

  74 if (!topicList.contains(topic)) {

  75 topicList.add(topic)

  76 }

  77 try {

  78 //連接成功后訂閱主題

  79 mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener {

  80 override fun onSuccess(token: IMqttToken) {

  81 VMLog.d("MQTT 訂閱成功 $topic")

  82 }

  83

  84 override fun onFailure(token: IMqttToken, t: Throwable) {

  85 VMLog.d("MQTT 訂閱失敗 $topic $t")

  86 }

  87 })

  88 } catch (e: MqttException) {

  89 e.printStackTrace()

  90 }

  91 }

  92

  93 /**

  94 * 取消訂閱

  95 * @param topic 主題

  96 */

  97 fun unsubscribe(topic: String) {

  98 if (topicList.contains(topic)) {

  99 topicList.remove(topic)

  100 }

  101 try {

  102 mqttClient?.unsubscribe(topic)

  103 } catch (e: MqttException) {

  104 e.printStackTrace()

  105 }

  106 }

  107

  108 /**

  109 * 發(fā)送 MQTT 消息

  110 * @param topic 主題

  111 * @param content 內(nèi)容

  112 */

  113 fun sendMsg(topic: String, content: String) {

  114 val msg = MqttMessage()

  115 msg.payload = content.encodeToByteArray() // 設(shè)置消息內(nèi)容

  116 msg.qos = 0 //設(shè)置消息發(fā)送質(zhì)量,可為0,1,2.

  117 // 設(shè)置消息的topic,并發(fā)送。

  118 mqttClient?.publish(topic, msg, null, object : IMqttActionListener {

  119 override fun onSuccess(asyncActionToken: IMqttToken) {

  120 VMLog.d("MQTT 消息發(fā)送成功")

  121 }

  122

  123 override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {

  124 VMLog.d("MQTT 消息發(fā)送失敗 ${exception.message}")

  125 }

  126 })

  127 }

  128

  129 /**

  130 * 通知 MQTT 事件

  131 */

  132 private fun notifyEvent(topic: String, data: String) {

  133 LDEventBus.post(topic, data)

  134 }

  135}

  源碼地址

  核心代碼就這么多,不超過500行,這里沒有直接調(diào)用環(huán)信歷史消息接口獲取消息存儲記錄,后續(xù)可以在進(jìn)行改良,簡化實(shí)現(xiàn)流程。源碼鏈接附上,配合使用效果更佳。

  服務(wù)端github源碼:

  https://github.com/lzan13/vmtemplateserver

  客戶端github源碼:

  https://gitee.com/lzan13/VMTemplateAndroid

  寫在最后

  MQTT協(xié)議資源占用小,并發(fā)連接高,集成簡單,特別適用于高頻數(shù)據(jù)交互場景,比如:游戲的世界廣場、視頻平臺彈幕等等等等,歡迎各位小伙伴集思廣益,基于MQTT服務(wù)實(shí)現(xiàn)更多的業(yè)務(wù)場景,享受技術(shù)帶來的便利與快樂。

(免責(zé)聲明:本網(wǎng)站內(nèi)容主要來自原創(chuàng)、合作伙伴供稿和第三方自媒體作者投稿,凡在本網(wǎng)站出現(xiàn)的信息,均僅供參考。本網(wǎng)站將盡力確保所提供信息的準(zhǔn)確性及可靠性,但不保證有關(guān)資料的準(zhǔn)確性及可靠性,讀者在使用前請進(jìn)一步核實(shí),并對任何自主決定的行為負(fù)責(zé)。本網(wǎng)站對有關(guān)資料所引致的錯誤、不確或遺漏,概不負(fù)任何法律責(zé)任。
任何單位或個(gè)人認(rèn)為本網(wǎng)站中的網(wǎng)頁或鏈接內(nèi)容可能涉嫌侵犯其知識產(chǎn)權(quán)或存在不實(shí)內(nèi)容時(shí),應(yīng)及時(shí)向本網(wǎng)站提出書面權(quán)利通知或不實(shí)情況說明,并提供身份證明、權(quán)屬證明及詳細(xì)侵權(quán)或不實(shí)情況證明。本網(wǎng)站在收到上述法律文件后,將會依法盡快聯(lián)系相關(guān)文章源頭核實(shí),溝通刪除相關(guān)內(nèi)容或斷開相關(guān)鏈接。 )