做者:wildma
, 連接:https://www.jianshu.com/p/73436a5cf855前端
1 前言
年初作了一款Android TV 應用,用到了MQTT。主要實現的是相似一些景區利用大屏幕實時顯示景點人數,超過人數就不容許進入。即利用閘機設備監控到進景區的遊客,而後經過MQTT將消息發送給大屏幕,最後大屏幕實時顯示景區人數,並響應一個消息通知閘機設備已經收到了它發過來的消息(確保消息到達)。這篇文章會模擬真實的使用流程進行講解,即閘機發布消息——服務器(代理)收到消息轉發給大屏幕——大屏幕收到消息後響應回去(發佈消息)——服務器收到消息轉發給閘機設備。android
2 關於MQTT
2.1 簡介
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸)是IBM開發的一個即時通信協議。它是一種發佈/訂閱,極其簡單和輕量級的消息傳遞協議,專爲受限設備和低帶寬,高延遲或不可靠的網絡而設計。它的設計思想是輕巧、開放、簡單、規範,易於實現。這些特色使得它對不少場景來講都是很好的選擇,特別是對於受限的環境如機器與機器的通訊(M2M)以及物聯網環境。相對於XMPP,MQTT更加輕量級,而且佔用的寬帶低。git
2.2 特色
MQTT協議有如下特色:github
-
使用發佈/訂閱消息模式,提供一對多的消息發佈,解除應用程序耦合。 -
對負載內容屏蔽的消息傳輸。 -
使用 TCP/IP 提供網絡鏈接。 -
有三種消息發佈服務質量:
-
qos爲0:「至多一次」,消息發佈徹底依賴底層 TCP/IP 網絡。會發生消息丟失或重複。這一級別可用於以下狀況,環境傳感器數據,丟失一次讀記錄無所謂,由於不久後還會有第二次發送。 -
qos爲1:「至少一次」,確保消息到達,但消息重複可能會發生。這一級別可用於以下狀況,你須要得到每一條消息,而且消息重複發送對你的使用場景無影響。 -
qos爲2:「只有一次」,確保消息到達一次。這一級別可用於以下狀況,在計費系統中,消息重複或丟失會致使不正確的結果。
-
小型傳輸,開銷很小(固定長度的頭部是 2 字節),協議交換最小化,以下降網絡流量。使用 Last Will 和 Testament 特性通知有關各方客戶端異常中斷的機制。
2.3 MQTT體系結構

該體系結構圖是結合文章開頭說的例子畫出來的,能很好的描述MQTT在實際運用中的三種身份。即進景區入口配置一臺閘機設備做爲發佈者(Publisher),當閘機設備監控到有遊客進入的時候會發佈一個帶主題(Topic)的消息(例如主題爲「tourist_enter」)給服務器(MQTT-Broker),當服務器接收到發佈過來的消息後,會進行基於主題的過濾,將消息轉發給訂閱了該主題的訂閱者。而景區大屏幕做爲訂閱者(Subscriber),訂閱的主題也是「tourist_enter」,這樣就能接收到服務器轉發過來的消息,收到消息後在大屏幕上實時顯示當前景區人數便可。web
該結構圖中的閘機設備和大屏幕都是客戶端,均可以進行發佈和訂閱。例如大屏幕收到消息後也能夠發佈一個消息通知閘機設備已經收到了它發過來的消息。apache
3 MQTT服務器搭建
想要使用MQTT,首先須要搭建一個MQTT的服務器(在公司通常是後臺人員負責搭建)。通常前端人員爲了方便測試都會先使用第三方提供的服務器,官方推薦了不少種服務器,我這裏選用的是Apollo(屬於Apache ActiveMQ)。編程
1. 下載、解壓
點擊下載地址,選擇最適合你的操做系統的版本進行下載,我這裏用的是Windows,進行以下選擇:api
下載後進行解壓,我這裏解壓到D盤根目錄下(D:\apache-apollo-1.7.1)。數組
2. 建立服務器實例
命令行進入解壓文件的bin目錄下(例如:cd D:\apache-apollo-1.7.1\bin)
,而後輸入apollo create mybroker
(其中mybroker
爲自定義的服務器名稱)建立服務器實例。具體以下圖:瀏覽器

以後會在bin目錄下生成mybroker文件夾,其中mybroker文件夾下的etc\apollo.xml
文件下是配置服務器信息的文件,etc\users.properties
文件包含鏈接MQTT服務器時用到的用戶名和密碼,注意這裏只能修改密碼(發現不少博客在沒有驗證的狀況下就說用戶名和密碼都在這裏修改),若是要修改用戶名須要到etc\groups.properties
文件下去修改。etc\groups.properties
文件下的用戶名與etc\users.properties
文件下的密碼是一一對應的,以下表示一個組中配置了兩個用戶分別是admin
與wildma
,而後這兩個用戶名對應的密碼分別是password
與123456

3. 開啓服務器
進入mybroker文件夾下的bin目錄下,輸入apollo-broker.cmd
run開啓服務器。看到以下界面表示開啓成功。

4. 驗證是否安裝成功
最後在瀏覽器輸入http://127.0.0.1:61680/
,能成功打開界面就表示安裝成功了。能夠用上面配置的兩個用戶名進行登陸。
4 調試MQTT的客戶端——mqttfx 的使用
爲了方便調試MQTT,我這裏選用mqttfx做爲閘機設備客戶端。具體使用以下:
1. 下載
點擊下載地址,選擇最適合你的操做系統的版本進行下載。以下圖:

2. 安裝
下載後一路點擊下一步便可安裝成功,安裝成功後打開軟件界面。以下圖:

3. 配置
點擊上圖中的設置,添加一個新的配置文件。分別填寫配置文件名稱、服務器地址(因爲服務器就是本機,因此這裏用本機的IP地址便可,ipconfig/all
可獲取IP地址)、端口號(開啓服務器後會顯示接受鏈接的地址:Accepting connections at: tcp://0.0.0.0:61613
,用這裏的端口號61613便可,見上文中「開啓服務器」後的圖片)、用戶名、密碼,點擊OK便可。以下圖:

4. 訂閱消息
選擇剛剛添加的配置文件「閘機設備」,點擊"Connect"鏈接服務器。點擊「Subscribe」,設置一個Topic(例如tourist_enter),點擊Topic右側的「Subscribe」進行消息訂閱。以下圖:

5. 發佈消息
點擊「Publish」,輸入剛剛訂閱的Topic (tourist_enter),輸入須要發佈的消息內容(tourist enter),點擊Topic右側的「Publish」進行消息發佈。以下圖:

再返回訂閱界面就能看到剛剛發佈的消息,以下圖:

5 Android中MQTT的使用
Android中使用MQTT須要使用到Paho Android Service庫,Paho Android Service
是一個用Java編寫的MQTT客戶端庫。GitHub地址:https://github.com/eclipse/paho.mqtt.android
5.1 集成
在module的build.gradle
文件中添加依賴
repositories {
maven {
url "https://repo.eclipse.org/content/repositories/paho-snapshots/"
}
}
dependencies {
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}
}
在 AndroidManifest.xml 添加限權
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
在 AndroidManifest.xml
註冊Service (MyMqttService爲本身寫的服務,下文會講到)
<service android:name="org.eclipse.paho.android.service.MqttService" /> <!--MqttService-->
<service android:name="com.dongyk.service.MyMqttService"/> <!--MyMqttService-->
5.2 具體代碼
Android中使用MQTT最主要的就是如下幾個方法:
-
connect
:鏈接MQTT服務器,這裏主要講3個參數的方法,以下:
@Override
public IMqttToken connect(MqttConnectOptions options, Object userContext,
IMqttActionListener callback) throws MqttException {
//...
}
-
參數 options
:用來攜帶鏈接服務器的一系列參數,例如用戶名、密碼等。 -
參數 userContext
:可選對象,用於向回調傳遞上下文。通常傳null便可。 -
參數 callback
:用來監聽MQTT是否鏈接成功的回調
-
publish
:發佈消息,這裏使用四個參數的方法,以下:
@Override
public IMqttDeliveryToken publish(String topic, byte[] payload, int qos,
boolean retained) throws MqttException, MqttPersistenceException {
//...
}
-
參數 topic
:發佈消息的主題 -
參數 payload
:消息的字節數組 -
參數 qos
:提供消息的服務質量,可傳0、1或2 -
參數 retained
:是否在服務器保留斷開鏈接後的最後一條消息
-
subscribe
:訂閱消息,這裏主要講2個參數的方法,以下:
@Override
public IMqttToken subscribe(String topic, int qos) throws MqttException,
MqttSecurityException {
//...
}
-
參數 topic
:訂閱消息的主題 -
參數 qos
:訂閱消息的服務質量,可傳0、1或2
5.2.2 MQTT服務——MyMqttService
下面寫一個 Service 來實現MQTT在Android運用中的connect、publish、subscribe
package com.wildma.mqttandroidclient;
import android.app.Service;
import android.content.Context;
import android.content.Intent;
import android.net.ConnectivityManager;
import android.net.NetworkInfo;
import android.os.Build;
import android.os.Handler;
import android.os.IBinder;
import android.support.annotation.Nullable;
import android.support.annotation.RequiresApi;
import android.util.Log;
import android.widget.Toast;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* Author wildma
* Github https://github.com/wildma
* CreateDate 2018/11/08
* Desc ${MQTT服務}
*/
public class MyMqttService extends Service {
public final String TAG = MyMqttService.class.getSimpleName();
private static MqttAndroidClient mqttAndroidClient;
private MqttConnectOptions mMqttConnectOptions;
public String HOST = "tcp://192.168.0.102:61613";//服務器地址(協議+地址+端口號)
public String USERNAME = "admin";//用戶名
public String PASSWORD = "password";//密碼
public static String PUBLISH_TOPIC = "tourist_enter";//發佈主題
public static String RESPONSE_TOPIC = "message_arrived";//響應主題
@RequiresApi(api = 26)
public String CLIENTID = Build.VERSION.SDK_INT >= Build.VERSION_CODES.O
? Build.getSerial() : Build.SERIAL;//客戶端ID,通常以客戶端惟一標識符表示,這裏用設備序列號表示
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
init();
return super.onStartCommand(intent, flags, startId);
}
@Nullable
@Override
public IBinder onBind(Intent intent) {
return null;
}
/**
* 開啓服務
*/
public static void startService(Context mContext) {
mContext.startService(new Intent(mContext, MyMqttService.class));
}
/**
* 發佈 (模擬其餘客戶端發佈消息)
*
* @param message 消息
*/
public static void publish(String message) {
String topic = PUBLISH_TOPIC;
Integer qos = 2;
Boolean retained = false;
try {
//參數分別爲:主題、消息的字節數組、服務質量、是否在服務器保留斷開鏈接後的最後一條消息
mqttAndroidClient.publish(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 響應 (收到其餘客戶端的消息後,響應給對方告知消息已到達或者消息有問題等)
*
* @param message 消息
*/
public void response(String message) {
String topic = RESPONSE_TOPIC;
Integer qos = 2;
Boolean retained = false;
try {
//參數分別爲:主題、消息的字節數組、服務質量、是否在服務器保留斷開鏈接後的最後一條消息
mqttAndroidClient.publish(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 初始化
*/
private void init() {
String serverURI = HOST; //服務器地址(協議+地址+端口號)
mqttAndroidClient = new MqttAndroidClient(this, serverURI, CLIENTID);
mqttAndroidClient.setCallback(mqttCallback); //設置監聽訂閱消息的回調
mMqttConnectOptions = new MqttConnectOptions();
mMqttConnectOptions.setCleanSession(true); //設置是否清除緩存
mMqttConnectOptions.setConnectionTimeout(10); //設置超時時間,單位:秒
mMqttConnectOptions.setKeepAliveInterval(20); //設置心跳包發送間隔,單位:秒
mMqttConnectOptions.setUserName(USERNAME); //設置用戶名
mMqttConnectOptions.setPassword(PASSWORD.toCharArray()); //設置密碼
// last will message
boolean doConnect = true;
String message = "{\"terminal_uid\":\"" + CLIENTID + "\"}";
String topic = PUBLISH_TOPIC;
Integer qos = 2;
Boolean retained = false;
if ((!message.equals("")) || (!topic.equals(""))) {
// 最後的遺囑
try {
mMqttConnectOptions.setWill(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
} catch (Exception e) {
Log.i(TAG, "Exception Occured", e);
doConnect = false;
iMqttActionListener.onFailure(null, e);
}
}
if (doConnect) {
doClientConnection();
}
}
/**
* 鏈接MQTT服務器
*/
private void doClientConnection() {
if (!mqttAndroidClient.isConnected() && isConnectIsNomarl()) {
try {
mqttAndroidClient.connect(mMqttConnectOptions, null, iMqttActionListener);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
/**
* 判斷網絡是否鏈接
*/
private boolean isConnectIsNomarl() {
ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE);
NetworkInfo info = connectivityManager.getActiveNetworkInfo();
if (info != null && info.isAvailable()) {
String name = info.getTypeName();
Log.i(TAG, "當前網絡名稱:" + name);
return true;
} else {
Log.i(TAG, "沒有可用網絡");
/*沒有可用網絡的時候,延遲3秒再嘗試重連*/
new Handler().postDelayed(new Runnable() {
@Override
public void run() {
doClientConnection();
}
}, 3000);
return false;
}
}
//MQTT是否鏈接成功的監聽
private IMqttActionListener iMqttActionListener = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken arg0) {
Log.i(TAG, "鏈接成功 ");
try {
mqttAndroidClient.subscribe(PUBLISH_TOPIC, 2);//訂閱主題,參數:主題、服務質量
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void onFailure(IMqttToken arg0, Throwable arg1) {
arg1.printStackTrace();
Log.i(TAG, "鏈接失敗 ");
doClientConnection();//鏈接失敗,重連(可關閉服務器進行模擬)
}
};
//訂閱主題的回調
private MqttCallback mqttCallback = new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.i(TAG, "收到消息: " + new String(message.getPayload()));
//收到消息,這裏彈出Toast表示。若是須要更新UI,可使用廣播或者EventBus進行發送
Toast.makeText(getApplicationContext(), "messageArrived: " + new String(message.getPayload()), Toast.LENGTH_LONG).show();
//收到其餘客戶端的消息後,響應給對方告知消息已到達或者消息有問題等
response("message arrived");
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
}
@Override
public void connectionLost(Throwable arg0) {
Log.i(TAG, "鏈接斷開 ");
doClientConnection();//鏈接斷開,重連
}
};
@Override
public void onDestroy() {
try {
mqttAndroidClient.disconnect(); //斷開鏈接
} catch (MqttException e) {
e.printStackTrace();
}
super.onDestroy();
}
}
該 MyMqttService 類的大概邏輯就是開啓服務後,調用init()方法初始化各個參數,包括服務器地址、用戶名、密碼等等,而後調用doClientConnection()
方法鏈接MQTT服務器,iMqttActionListener用來監聽MQTT是否鏈接成功,鏈接成功則訂閱主題。mqttCallback爲訂閱主題的回調,收到消息後會執行該回調中的messageArrived()方法,拿到消息後進行UI更新,並調用response()
方法響應給對方告知消息已到達或者消息有問題等。
5.2.3 開啓服務
在MainActivity中開啓服務,這裏爲了方便不作UI更新,因此就一行開啓服務的代碼,以下:
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.view.View;
public class MainActivity extends AppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
MyMqttService.startService(this); //開啓服務
}
}
6 模擬真實場景
仍是以文章開頭說的例子來說,如今拿mqttfx
客戶端做爲閘機設備,上面的Android代碼運行後做爲大屏幕。
-
-
將大屏幕與服務器鏈接 即將大屏幕APK運行到Android TV上,沒有TV能夠用Android手機代替。記得代碼中的發佈主題設置爲「tourist_enter」,響應主題設置爲「message_arrived」。 -
-
將閘機設備與服務器鏈接 選擇閘機設備——點擊鏈接——發佈主題設置爲「tourist_enter」,以下圖:

切換到Subscribe界面——響應主題設置爲「message_arrived」
——點擊Subscribe按鈕進行訂閱,以下圖:

-
發佈 點擊步驟2圖中的Publish按鈕進行發佈
-
大屏幕收到消息 這時候大屏幕收到服務器轉發過來的消息,就會在大屏幕上顯示進場人數,並響應給對方告知消息已到達。代碼中爲了簡單就彈個Toast表示,具體顯示就不貼圖了。
-
閘機設備收到消息 這時候mqttfx切換到Subscribe界面就能夠看到大屏幕響應回來的消息,以下:

如上流程就是大概模擬我在開發中用到的MQTT使用流程,固然個人真實項目並無那麼簡單,還包括各類數據和UI交互顯示。但願模擬這種真實的使用流程進行講解能讓各位更好的理解MQTT的使用,有不足的請指出。
項目地址:MqttAndroidClient:https://github.com/wildma/MqttAndroidClient
參考資料:
-
MQTT 101 – How to Get Started with the lightweight IoT Protocol -
MQTT Client Library Encyclopedia – Paho Android Service -
Android APP必備高級功能,消息推送之MQTT
---END---

本文分享自微信公衆號 - 技術最TOP(Tech-Android)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。