Mqtt(paho)重連機制

本文是使用Java語言,eclipse paho的實現方式,去調用MQTT服務器端,編寫的MqttClient代碼中針對MQTT服務器重啓定製重連機制所遇到的問題進行彙總。服務器

 

1.1編寫MqttConnection類,建立MQTT鏈接

 1 public synchronized boolean connect() {
 2         try {
 3             if(null == client) {
 4                 //host爲主機名,clientid即鏈接MQTT的客戶端ID,通常以惟一標識符表示,
 5                 // MemoryPersistence設置clientid的保存形式,默認爲之內存保存
 6                 client = new MqttClient(host, client_id, new MemoryPersistence());
 7                 //設置回調
 8                 client.setCallback(new PushCallBack(MqttConnection.this, devDao));
 9             }
10             //獲取鏈接配置
11             getOption();
12             client.connect(option);
13             log.info("[MQTT] connect to Mqtt Server success...");
14             return isConnected();
15         } catch (Exception e) {
16             e.printStackTrace();
17             return false;
18         }
19     }
 1 private void getOption() {
 2         //MQTT鏈接設置
 3         option = new MqttConnectOptions();
 4         //設置是否清空session,false表示服務器會保留客戶端的鏈接記錄,true表示每次鏈接到服務器都以新的身份鏈接
 5         option.setCleanSession(true);
 6         //設置鏈接的用戶名
 7         option.setUserName(userName);
 8         //設置鏈接的密碼
 9         option.setPassword(password.toCharArray());
10         //設置超時時間 單位爲秒
11         option.setConnectionTimeout(outTime);
12         //設置會話心跳時間 單位爲秒 服務器會每隔(1.5*keepTime)秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並無重連的機制
13         option.setKeepAliveInterval(keepTime);
14         option.setAutomaticReconnect(true);
15         //setWill方法,若是項目中須要知道客戶端是否掉線能夠調用該方法。設置最終端口的通知消息
16 //            option.setWill(topic, "close".getBytes(), 2, true);
17     }

1.2編寫PushCallBack回調類,實現重連

 1 public void connectionLost(Throwable throwable) {
 2         // 鏈接丟失後,通常在這裏面進行重連
 3         log.info("[MQTT] 鏈接斷開,30S以後嘗試重連...");
 4         while(true) {
 5             try {
 6                 Thread.sleep(30000);
 7                 mqttConn.reConnect();
 8                 break;
 9             } catch (Exception e) {
10                 e.printStackTrace();
11                 continue;
12             }
13         }
14     }

異常出現:

若在connectionLost()方法中直接循環調用MqttConnection類中connect()方法,session

去實現重連機制的話,會出如今第一次重連成功後,一直斷開鏈接再重連再斷開鏈接再重連的死循環中。eclipse

異常定位:

緣由在connect()方法中的這句:this

 

從新new上一個client_id相同的MqttClient,client_id是MQTT client的惟一標識,client_id不能重複。spa

這樣就會出現重連時建立的MqttClient,使程序中初始化時建立的MqttClient斷開鏈接,斷開鏈接後就會回滾到connectionLost方法中,3d

而後此方法中又會繼續重連,就出現上述的斷開鏈接再重連再斷開鏈接再重連的死循環。code

異常解決:

在MqttConnection鏈接類中定義一個重連的方法:blog

1 //斷線重連
2     public void reConnect() throws Exception {
3         if(null != client) {
4             client.connect(option);
5         }
6     }

不須要從新new一個MqttClient,只須要調用connect方法就OK了,eclipse paho就會把以前的鏈接從新建立起來。token

 

另外分享setRetained()方法:

 1 /**
 2      * 發佈消息
 3      * @param topic     消息主題
 4      * @param qos       消息傳輸質量
 5      * @param message   消息內容
 6      */
 7     public void publish(String topic, int qos, String message) throws Exception {
 8         MqttTopic mqttTopic = client.getTopic(topic);
 9         MqttMessage mqttMessage = new MqttMessage();
10         mqttMessage.setQos(qos);
11         //是否設置保留消息,若爲true,後來的訂閱者訂閱該主題時仍可接收到該消息
12         mqttMessage.setRetained(false);
13         mqttMessage.setPayload(message.getBytes());
14         MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
15         token.waitForCompletion();
16         log.info("[MQTT] publish message : " + token.isComplete() +
17                 ",{topic : " + topic + ", message : " + message + "}");
18     }

setRetained():消息保留機制,若設置爲true,mqtt服務器會保留每次發佈的消息,ip

若訂閱某主題的客戶端重啓,則會把此主題以前發佈的消息從新推送到客戶端。

相關文章
相關標籤/搜索