MQTT協議實現Eclipse Paho學習總結

轉載自:http://xiaoxinzhou.blog.163.com/blog/static/20704538620145411306821/

1、概述

遙測傳輸 (MQTT) 是輕量級基於代理的發佈/訂閱的消息傳輸協議,設計思想是開放、簡單、輕量、易於實現。這些特色使它適用於受限環境。例如,但不只限於此:php

  • 網絡代價昂貴,帶寬低、不可靠。
  • 在嵌入設備中運行,處理器和內存資源有限。

該協議的特色有:java

  • 使用發佈/訂閱消息模式,提供一對多的消息發佈,解除應用程序耦合。
  • 對負載內容屏蔽的消息傳輸。
  • 使用 TCP/IP 提供網絡鏈接。
  • 有三種消息發佈服務質量:
    • 「至多一次」,消息發佈徹底依賴底層 TCP/IP 網絡。會發生消息丟失或重複。這一級別可用於以下狀況,環境傳感器數據,丟失一次讀記錄無所謂,由於不久後還會有第二次發送。
    • 「至少一次」,確保消息到達,但消息重複可能會發生。
    • 「只有一次」,確保消息到達一次。這一級別可用於以下狀況,在計費系統中,消息重複或丟失會致使不正確的結果。
  • 小型傳輸,開銷很小(固定長度的頭部是 2 字節),協議交換最小化,以下降網絡流量。
  • 使用 Last Will 和 Testament 特性通知有關各方客戶端異常中斷的機制。
由於MQTT是輕量級的發佈/訂閱的消息傳輸協議, 所以不少應用均可以借用MQTT的思想, 好比Facebook的的Messager聽說就是按照MQTT的協議編寫的。若是須要了解這個協議,簡單的讀一下其協議的主要內容實際上是不能深入理解其 中的意思的,就像你看了XMPP的協議以後,不讀smack很快就會遺忘掉這個協議的樣子同樣,程序員對代碼的熱愛程度會遠遠大多文檔(初級碼農),因而 乎讀了一下MQTT的實現Eclipse Paho,一下是一些簡單的總結。

2、MQTT協議實現Eclipse Paho

MQTT有不一樣語言,不一樣版本的諸多的實現,詳細信息見 http://mqtt.org/software,其中Eclipse Paho只是諸多Java實現中的一個,關於Eclipse Paho的介紹以下 http://www.eclipse.org/proposals/technology.paho/,具體下載地址 http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.java.git/
由於MQTT是輕量級的發佈/訂閱的消息傳輸協議,其實現Eclipse Paho,也是很是的輕量級,相比smack代碼真是小巫見大巫了,看過smack以後,再看Eclipse Paho你內心會豁然開朗,原來代碼這麼少啊!其主要實現包以下:
其中主要的代碼集中在畫框的三個包內。
 
org.eclipse.paho.client.mqttv3: 主要用於對外提供服務,即整個Eclipse Paho對外的窗口,當你的程序須要調用Eclipse Paho時,直接調用org.eclipse.paho.client.mqttv3包內的類就能實現Eclipse Paho所提供的整個功能。固然你也能夠調用其餘包內的類,這要看你對整個代碼的瞭解程度了。

org.eclipse.paho.client.mqttv3.internal:看看單詞internal你可能就猜到了,沒錯,這就是第一個包的主要功能實現,這個包有承上啓下的功能,首先對第一包提供功能的實現,其次調用剩下包中的類以實現MQTT協議的規定。
 
org.eclipse.paho.client.mqttv3.internal.nls:主要是國際化相關的文件,打開這個包以後,你會欣喜的看到messages_zh_CN.properties,有中文實現!
 
org.eclipse.paho.client.mqttv3.internal.security:固然是跟安全相關,其中包含了MQTT協議所規定的實現的TLS協議實現,固然在Java中tls的實現固然是SSLSocket。
 
org.eclipse.paho.client.mqttv3.internal.wire:主要是信息的載體,也就是socket之上傳輸的心跳包,訂閱,發佈信息等報文信息。
 
org.eclipse.paho.client.mqttv3.logging:日誌。
 
org.eclipse.paho.client.mqttv3.persist: 主要用於保存已經發送的數據包。從這裏能夠看出,MQTT協議最初的面向目標即傳感器之間信息的傳輸,其實現採用了,將數據包保存的文件當中的方式 (MqttDefaultFilePersistence)保證了數據確定可以發送到服務器,無論程序崩潰不崩潰,網絡好很差,只要發送的數據包沒有收到 確認,這個數據包就一直保存在文件當中,直到其發送出去爲止。
 
org.eclipse.paho.client.mqttv3.util:工具類。
 
這些包中,最主要的包就是上圖中包含在框中的包,這三個包中,最主要的就是org.eclipse.paho.client.mqttv3.internal這個包,所以只要你看懂了這個包中的主要的類,那麼你就拿下了MQTT協議的實現Eclipse Paho!!

3、MQTT協議的報文類別

3.1 MQTT協議規定報文

1.鏈接請求(CONNECT)
 當一個從客戶端到服務器的TCP/IP套接字鏈接被創建時,必須用一個鏈接流來建立一個協議級別的會話。

2.鏈接請求確認(CONNECTACK)
 鏈接請求確認報文(CONNECTACK)是服務器發給客戶端,用以確認客戶端的鏈接請求

3.發佈報文(PUBLISH)
客戶端發佈報文到服務器端,用來提供給有着不一樣需求的訂閱者們。每一個發佈的報文都有一個主題,這是一個分層的命名空間,他定義了報文來源分類,方便訂閱者訂閱他們須要的主題。訂閱者們能夠註冊本身的須要的報文類別。

4.發佈確認報文(PUBACK)
發佈確認報文(PUBACK)是對服務質量級別爲1的發佈報文的應答。他能夠是服務器對發佈報文的客戶端的報文確認,也能夠是報文訂閱者對發佈報文的服務器的應答。

5.發佈確認報文(PUBREC)
PUBREC報文是對服務質量級別爲2的發佈報文的應答。這是服務質量級別爲2的協議流的第二個報文。PUBREC是由服務器端對發佈報文的客戶端的應答,或者是報文訂閱者對發佈報文的服務器的應答。

6.發佈確認報文(PUBREL)
PUBREL是報文發佈者對來自服務器的PUBREC報文的確認,或者是服務器對來自報文訂閱者的PUBREC報文的確認。它是服務質量級別爲2的協議流的第三個報文。

7.肯定發佈完成(PUBCOMP)
PUBCOMP報文是服務器對報文發佈者的PUBREL報文的應答,或者是報文訂閱者對服務器的PUBREL報文的應答。它是服務質量級別爲2的協議流的第四個也是最後一個報文。

8.訂閱命名的主題(SUBSCRIBE)
訂閱報文(SUBSCRIBE)容許一個客戶端在服務器上註冊一個或多個感興趣的主題名字。發佈給這些主題的報文做爲發佈報文從服務器端交付給客戶端。訂閱報文也描述了訂閱者想要收到的發佈報文的服務質量等級。

9. 訂閱報文確認(SUBACK)
當服務器收到客戶端發來的訂閱報文時,將發送訂閱報文的確認報文給客戶端。一個這樣的確認報文包含一列被授予的服務質量等級。被授予的服務質量等級次序和對應的訂閱報文中的主題名稱的次序相符。

10. 退訂命名的主題(UNSUBSCRIBE)
退訂主題的報文是從客戶端發往服務器端,用以退訂命名的主題。

11. 退訂確認(UNSUBACK)
退訂確認報文是從服務器發往客戶端,用以確認客戶端發來的退訂請求報文。

12. Ping請求(PINGREQ)
Ping請求報文是從鏈接的客戶端發往服務器端,用來詢問服務器端是否還存在。

13. Ping應答(PINGRESP)
Ping應答報文是從服務器端發往Ping請求的客戶端,對客戶端的Ping請求進行確認。

14. 斷開通知(DISCONNECT)
斷開通知報文是從客戶端發往服務器端用來指明將要關閉它的TCP/IP鏈接,他容許完全地斷開,而非只是下線。若是客戶端已經和乾淨會話標誌集聯繫,那麼全部先前關於客戶端維護的信息將被丟棄。一個服務器在收到斷開報文以後,不能依賴客戶端關閉TCP/IP鏈接。
 

3.2 Eclipse Paho的對報文的實現

Eclipse Paho對MQTT協議報文的實現,主要在org.eclipse.paho.client.mqttv3.internal.wire包下,
其下包含了對MQTT協議14中報文的主要實現以下:
 
從以上看,其發送一個數據包後,服務器端必須回覆一個確認包,這爲傳輸數據包的魯棒性,下降丟包率,提升準確性提供了很好實現。不一樣於IM協議MXPP,沒有對數據的確認。

3.3 心跳包

還有一個重要一點就是對其對心跳包的設定,看心跳包,主要是要看public class MqttPingReq extends MqttWireMessage 這個類!
 
[java]  view plain copy
  1. public class MqttPingReq extends MqttWireMessage {  
  2.     public MqttPingReq() {  
  3.         super(MqttWireMessage.MESSAGE_TYPE_PINGREQ);  
  4.     }  
  5.       
  6.     /** 
  7.      * Returns <code>false</code> as message IDs are not required for MQTT 
  8.      * PINGREQ messages. 
  9.      */  
  10.     public boolean isMessageIdRequired() {  
  11.         return false;  
  12.     }  
  13.   
  14.     protected byte[] getVariableHeader() throws MqttException {  
  15.         return new byte[0];  
  16.     }  
  17.       
  18.     protected byte getMessageInfo() {  
  19.         return 0;  
  20.     }  
  21.       
  22.     public String getKey() {  
  23.         return new String("Ping");  
  24.     }  
  25. }  
固然只看這個類,也沒法知道其心跳包的內容,這時候,咱們須要從其發送的內容當中逆向推出其心跳包的內容。
 
我 們先看其發送的的模塊:找到public class CommsSender implements Runnable 類,看到其有一個private MqttOutputStream out;私有字段,一看這個方法,咱們就能判斷,這個字段就是輸出流,而後,咱們順藤摸瓜,看public class MqttOutputStream extends OutputStream這個類,你會看到這樣一個方法:
[java]  view plain copy
  1. /** 
  2.      * Writes an <code>MqttWireMessage</code> to the stream. 
  3.      */  
  4.     public void write(MqttWireMessage message) throws IOException, MqttException {  
  5.         byte[] bytes = message.getHeader();  
  6.         byte[] pl = message.getPayload();  
  7. //      out.write(message.getHeader());  
  8. //      out.write(message.getPayload());  
  9.         out.write(bytes,0,bytes.length);  
  10.         out.write(pl,0,pl.length);  
  11.     }  
哦,這下好了,原來,其發送的是header和payload,而後,咱們就能夠看心跳包的header和payload是什麼。
 
public class MqttPingReq extends MqttWireMessage心跳包下有
 
[java]  view plain copy
  1. protected byte[] getVariableHeader() throws MqttException {  
  2.     return new byte[0];  
  3. }  
這個方法,咱們就知道了,這個確定是父類MqttWireMessage中getHeader調用的方法,而後再回到MqttWireMessage,果然getHeader方法以下:
[java]  view plain copy
  1. public byte[] getHeader() throws MqttException {  
  2.         if (encodedHeader == null) {  
  3.             try {  
  4.                 int first = ((getType() & 0x0f) << 4) ^ (getMessageInfo() & 0x0f);  
  5.                 byte[] varHeader = getVariableHeader();  
  6.                 int remLen = varHeader.length + getPayload().length;  
  7.   
  8.                 ByteArrayOutputStream baos = new ByteArrayOutputStream();  
  9.                 DataOutputStream dos = new DataOutputStream(baos);  
  10.                 dos.writeByte(first);//1個字節  
  11.                 dos.write(encodeMBI(remLen));//1個字節  
  12.                 dos.write(varHeader);//0個字節  
  13.                 dos.flush();  
  14.                 encodedHeader = baos.toByteArray();  
  15.             } catch(IOException ioe) {  
  16.                 throw new MqttException(ioe);  
  17.             }  
  18.         }  
  19.         return encodedHeader;  
  20.     }  
而MqttWireMessage中還有一個getPayload方法,這個方法MqttPingReq 沒有重寫,也就是說,默認調用這個方法。
[java]  view plain copy
  1. /** 
  2.      * Sub-classes should override this method to supply the payload bytes. 
  3.      */  
  4.     public byte[] getPayload() throws MqttException {  
  5.         return new byte[0];//0個字節  
  6.     }  

也就是說MQTT的心跳包只有2個字節!git

相關文章
相關標籤/搜索