Android 使用 Kotlin 鏈接 MQTT

MQTT 是一種輕量級的、靈活的物聯網消息交換和數據傳遞協議,致力於爲 IoT 開發人員實現靈活性與硬件/網絡資源的平衡。java

Kotlin 是一門由 JetBrains 公司開發的編程語言,Kotlin 是基於 JVM 的,因此開發者能夠很方便地用它來進行 Android 開發,而且支持 Kotlin 和 Java 的混合編寫。而早在 2017 年,Google 就宣佈 Kotlin 成爲官方開發語言。android

本文主要介紹使用 Kotlin 語言在 Android 平臺上使用 MQTT。編程

新建 Kotlin 項目

打開 Android Studio 新建一個項目,選擇語言爲 Kotlin,Android Studio 會自動建立 Kotlin 相關配置。若要配置現有項目,則能夠參考 將 Kotlin 添加到現有應用安全

添加依賴

打開項目的 build.gradle,添加 Eclipse Paho Java ClientEclipse Paho Android Service 依賴到 dependencies 部分。服務器

dependencies {
    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.4'
    implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1' 
}

配置 AndroidManifest.xml

Android Service 是 Eclipse 開發的基於 Android 平臺的一個後臺服務,咱們須要將它註冊到AndroidManifest.xml 文件,同時,咱們須要註冊權限。網絡

<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />

<application
   ...
   <service android:name="org.eclipse.paho.android.service.MqttService" />
</application>

建立 MQTT 客戶端

private lateinit var mqttClient: MqttAndroidClient
// TAG
companion object {
    const val TAG = "AndroidMqttClient"
}

鏈接 MQTT 服務器

本文將使用 EMQ X MQTT Cloud 運營和維護的免費公共 MQTT 服務器, EMQ X Cloud 是由 EMQ 推出的安全的 MQTT 物聯網雲服務平臺,它提供一站式運維代管、獨有隔離環境的 MQTT 5.0 接入服務。app

  • Broker: broker.emqx.io
  • TCP Port: 1883
  • Websocket Port: 8083
fun connect(context: Context) {
        val serverURI = "tcp://broker.emqx.io:1883"
        mqttClient = MqttAndroidClient(context, serverURI, "kotlin_client")
        mqttClient.setCallback(object : MqttCallback {
            override fun messageArrived(topic: String?, message: MqttMessage?) {
                Log.d(TAG, "Receive message: ${message.toString()} from topic: $topic")
            }

            override fun connectionLost(cause: Throwable?) {
                Log.d(TAG, "Connection lost ${cause.toString()}")
            }

            override fun deliveryComplete(token: IMqttDeliveryToken?) {

            }
        })
        val options = MqttConnectOptions()
        try {
            mqttClient.connect(options, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    Log.d(TAG, "Connection success")
                }

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.d(TAG, "Connection failure")
                }
            })
        } catch (e: MqttException) {
            e.printStackTrace()
        }

    }

其中,MqttCallback 接口包含 3 個方法:運維

  1. messageArrived:收到 broker 新消息
  2. connectionLost:與 broker 鏈接丟失
  3. deliveryComplete:消息到 broker 傳遞完成

MqttConnectOptions 用於配置鏈接設置,包含用戶名密碼,超時配置等,具體能夠查看其方法。eclipse

建立 MQTT 訂閱

訂閱 topicsocket

fun subscribe(topic: String, qos: Int = 1) {
        try {
            mqttClient.subscribe(topic, qos, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    Log.d(TAG, "Subscribed to $topic")
                }

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.d(TAG, "Failed to subscribe $topic")
                }
            })
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

取消訂閱

取消訂閱 topic

fun unsubscribe(topic: String) {
        try {
            mqttClient.unsubscribe(topic, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    Log.d(TAG, "Unsubscribed to $topic")
                }

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.d(TAG, "Failed to unsubscribe $topic")
                }
            })
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

發佈消息

fun publish(topic: String, msg: String, qos: Int = 1, retained: Boolean = false) {
        try {
            val message = MqttMessage()
            message.payload = msg.toByteArray()
            message.qos = qos
            message.isRetained = retained
            mqttClient.publish(topic, message, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    Log.d(TAG, "$msg published to $topic")
                }

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.d(TAG, "Failed to publish $msg to $topic")
                }
            })
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

斷開 MQTT 鏈接

fun disconnect() {
        try {
            mqttClient.disconnect(null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    Log.d(TAG, "Disconnected")
                }

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.d(TAG, "Failed to disconnect")
                }
            })
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

測試

首先將 Android 客戶端鏈接到 MQTT 服務器,而後訂閱 topic: a/b,能夠看到鏈接成功和成功訂閱的日誌

MQTT connect and subscribe.png

而後咱們使用 MQTT 5.0 客戶端工具 - MQTT X 進行測試,發佈消息到 topic: a/b,客戶端能夠看到收到消息的日誌

MQTT 5.0 Client Tool - MQTT X.png

receive MQTT messages.png

咱們在客戶端發佈消息到 topic: a/b ,由於咱們訂閱了該 topic,同時也會收到消息,最後咱們斷開客戶端與 MQTT 服務器的鏈接,日誌以下:

publish mqtt message and disconnect.png

至此,咱們已經完成了Android 上 MQTT 客戶端的構建,實現了客戶端與 MQTT 服務器的鏈接、主題訂閱、收發消息等功能。

MQTT 能夠以極少的代碼和有限的帶寬,爲鏈接遠程設備提供實時可靠的消息服務。做爲一種低開銷、低帶寬佔用的即時通信協議,使其在物聯網、小型設備、移動應用等方面有較普遍的應用。

而 Kotlin 也是 Google 官方主推的一門語言,結合 MQTT 協議及 MQTT 雲服務,咱們能夠開發更多有趣的應用。

版權聲明: 本文爲 EMQ 原創,轉載請註明出處。

原文連接:https://www.emqx.io/cn/blog/a...

相關文章
相關標籤/搜索