咱們已經把相關的鏈接報文搞定了。筆者想來想去仍是決定先講解一下訂閱報文(SUBSCRIBE )。若是傳統的通訊方式是客戶端和服務端之間通常就直接傳輸信息。可是MQTT的通訊方式是經過發佈/訂閱的方式進行的。筆者不知道他是否跟設計模式中的發佈訂閱模式有沒有關係。但是他們思想卻有一點類似之處。redis
客戶端知道服務上有不少個主題。就比如如說有不少消息的分類同樣子。有社會新聞、體育講壇等。那麼客戶端只要找到本身感興趣的進行訂閱就能夠了。一個客戶端能夠向服務器訂閱多個主題。而所謂的發佈就是客戶端對不一樣的主題進行發佈信息。即比如如新聞的發佈者同樣子。這個時候只要訂閱這個主題的客戶端就能夠接收到來自服務端的新聞。咱們的手機經常會接收到一些推送的信息。事實上有不少App應用都是用MQTT協議來進行的。因此不難看出服務端主要是負責客戶端和客戶端的之間信息的傳輸和信息管理。大至如圖下sql
注意:發佈者也是客戶端。訂閱者也是客戶端數據庫
主題(Topic )設計模式
若是主題只是一個字符串值的話,那麼顯然會比較單調。這樣子功能也顯得比較無力。因此在主題上面就了所謂的分隔符和通配符的說法(我的想法)。分隔符的意思就是讓主題能夠分層次。就好如說主題「體育講壇/籃球/NBA」。看到這樣子的主題,請問一下你還有什麼不明白的話。是否是感受頗有層次感。剩下只有一個問題?若是咱們訂閱了主題「體育講壇/籃球/NBA」,並向主題「體育講壇/籃球」發佈一個信息。那麼已經訂閱主題「體育講壇/籃球/NBA」的客戶端們是否是能夠接受到信息呢?反過來說若是咱們訂閱了主題「體育講壇/籃球」,向主題「體育講壇/籃球/NBA」發信息,客戶端們是否又能接受信息呢?服務器
筆者就以HiveMQ做服務器來作一下上面的小實驗。以下學習
客驗結果顯然是失敗的——訂閱主題「體育講壇/籃球/NBA」的客戶端根本收不到來自主題「體育講壇/籃球「的發佈信息。說明分隔符就是用於主題名的分層次。沒有別的意思。this
經過上面的實驗咱們知道若是想要收到NBA就是必須訂閱主題「體育講壇/籃球/NBA」。但是老是有一些人只要是籃球的新聞有喜歡。怎麼辦。通配符的功能就出來了。通配符有倆種——"+"和「#」。+爲單層的通配符。表示當前這一層的全都合非。這樣子以上面的說到的例子來作實驗。咱們訂閱一個主題爲「體育講壇/籃球/+」。按照理解的意思就是隻要是在「體育講壇/籃球」的信息都是咱們想要的。結果以下spa
咱們能夠看到筆者在「體育講壇/籃球/NBA」和「體育講壇/籃球/ABC」各發布了信息。結果他都能收到。那麼若是咱們對主題「體育講壇/籃球」或是主題「體育講壇/籃球/NBA/福州專場」發佈信息呢?筆者試過了很惋惜都不行。設計
記得咱們上面說到有一些人只要跟籃球相關的都喜歡。但是若是使用通配符「+」是能夠接近咱們的要求。注意是接近。「+」通配符只是表示當前一層的。從當前的第二層就不行了。而自己的層也不算。就像上面的。只有籃球下的子一層纔是合非的。講到這裏你們必定會想到用「#「通配符試試。沒有錯。「#「通配符就是表示當前自己和下面子層全部。以下3d
實驗的結果很終知足了。
對於主題,在文檔中有一個要求——主題不能以 」#「 "+" "$" 爲開頭。對於」#「 」 +「的話,你們都好理解。那麼」$「又是什麼鬼。在文檔咱們能夠看到這樣子的字符"$SYS"。事實上他們是想說」$「開頭的主題通常用於系統內部的一些主題。大家能夠去找一些第三方的MQTT服務器。都會有不少以」$「開頭的主題。
SUBSCRIBE報文
經過上面的介紹。筆者想大家必定對MQTT通訊方式有了必定的概念。而本章的訂閱報文就是用於告訴服務器我想要什麼的主題了。經過前面幾章的瞭解。咱們知道報文的固定報頭是少了的。筆者就以MQTT 3.1.1來介紹吧。以下
SUBSCRIBE報文的INT值是8。因此對應的二進制爲1000。後面的DUP QOS RETAIN對應是0010。其中QOS是必須是01。對訂閱者來說,他必定但願本身的訂閱是成功的。因此訂閱報文的QOS是01就至關好理解了。若是不理解QOS是什麼的話,請看一下前面幾章。
訂閱報文也有可變報頭,可變報頭只有一個消息ID。消息ID是從客端端開始分配的。筆者爲何樣子認爲呢?主要是看到客戶端在發佈信息的時候就要求消息ID。因此筆者纔會以爲消息ID在客戶端進行分配的。固然也不是什麼報文都會消息ID的。可是有消息ID通常QOS大於0。
訂閱報文的有效載荷裏面存在了相關的訂閱訂題列表。前面說過能夠支持一個客戶端多個訂閱。列表裏面每有一主題項只有倆個值。一個表示主題名,一個表示服務質量要求(Requested QoS)。這裏的服務質量要求(Requested QoS)和 固定報頭的服務質量的值是同樣子。可是用意倒是不同子。這裏是指這個訂閱者接收這主題的服務質量最大等級。舉個列子吧。筆者訂閱了一個主題主題「體育講壇/籃球/NBA」,同時他的服務質量要求(Requested QoS)的值爲1。這個時候有一個發佈者在這個主題上發佈一個服務質QOS爲2。筆者仍是能夠收到這個發佈者發來的信息。只是信息的服務質量QOS卻變爲1了。要明白QOS(1)和QOS(2)的執行行爲是不樣子的。這個後面章節會講到。固然若是發佈者在這個主題上發佈一個服務質QOS爲0。這就沒有什麼區別了。以下
對於有效載荷筆者這裏就很少講解了。也沒有什麼可說的。看文檔的圖片就夠了以下。
宏觀上:
微觀上:
列表出咱們能夠看他訂閱了倆個主題。一個主題」a/b「,一個主題」c/b「。上面列出大概的圖片(宏觀上)和比較細的圖片(微觀上)。若是看不懂也沒有關係。筆者接下來會用代碼來抓一包看看。相信在對照一下就明白列表出畫的是什麼。
如今讓咱們好好想一想當服務器接收到來自客戶端的訂閱報文的時候要作些什麼樣子的反應呢?首先咱們要明白若是服務端接收到一個訂閱報文,第一步想到必定是查看訂閱報文的格式是否是正確的。相關的主題名是否是爲空的。主題名的寫法是否是非法。這些必定離不開。固然對應的一些共有的驗證筆者就不說了。一切沒有問題的狀況下,服務器會去看一下當前訂閱者前面有沒有訂閱過相同的主題。若是有就替換當前的。若是沒有就建立一下新的。而後服務器在根據當前主題查找一下符合保留的信息。若是有,就發送給當前的訂閱者。而後發送一個訂閱報文肯定(SUBACK )。固然這先後沒有規定。先發送一個訂閱報文肯定(SUBACK ),在處理保留的信息也是能夠的。
注意:在發送符合保留的信息就要對QOS進行處理。上面筆者也講過了。
SUBACK 報文
當服務端處理SUBSCRIBE報文的時候,都會生成一個SUBACK 報文來回應訂閱者。筆者這裏不想對他太過的講解。他的內容也很簡單。以下
對於SUBACK 報文的可變報頭裏面也只有一個消息ID。並且跟SUBSCRIBE報文的消息ID是同樣子的。有效載何的內容存放是訂閱主題的服務質量要求(Requested QoS)。筆者在MQTT 3.1 文檔時面能夠看到有多個主題的列子。但是在MQTT 3.1.1裏面卻沒有。那麼筆者就把MQTT 3.1.1的放在下里吧。讀者們能夠自行查看。
上面列表裏面顯示返回碼,事實上是主題相關的服務質量要求(Requested QoS)。因此就能夠知道他能夠會返回四個值。以下
QOS 0:0x00
QOS 1:0x01
QOS2 :0x02
Failure :0x80
代碼實現
有了上面的瞭解以後,筆者就想在經過一些代碼來加深理解。固然從新寫那是不可能的。筆者就用上一章的代碼。並加上訂閱報文相關的處理。以下
1 private void onSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) { 2 3 if (!this.connected) { 4 ctx.close(); 5 return; 6 } 7 int messageId = msg.variableHeader().messageId(); 8 9 List<MqttTopicSubscription> requestSubscriptions = msg.payload().topicSubscriptions(); 10 11 for (MqttTopicSubscription subscription : requestSubscriptions) { 12 13 if (StringUtils.isEmpty(subscription.topicName())) { 14 ctx.close(); 15 return; 16 } 17 } 18 19 List<Integer> grantedQosLevels = new ArrayList<Integer>(); 20 21 requestSubscriptions.forEach(subscription -> { 22 if (subscription.topicName().startsWith("$")) grantedQosLevels.add(MqttQoS.FAILURE.value()); 23 else grantedQosLevels.add(subscription.qualityOfService().value()); 24 }); 25 26 27 BrokerSessionHelper.sendMessage( 28 ctx, 29 MqttMessageFactory.newMessage( 30 new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0), 31 MqttMessageIdVariableHeader.from(messageId), 32 new MqttSubAckPayload(grantedQosLevels)), 33 this.clientId, 34 messageId, 35 true); 36 37 for (int i = 0; i < requestSubscriptions.size(); i++) { 38 39 MqttQoS grantedQoS = MqttQoS.valueOf(grantedQosLevels.get(i)); 40 String topic = requestSubscriptions.get(i).topicName(); 41 42 //1。查看之前有沒有訂閱過相同的主題,若是有就替換。 43 //2。查看有沒有符合的保留信息,有發送 44 //讀者們自行去實現。是要用redis,仍是要用sqllite自去實現。 45 46 } 47 }
訂閱報文的實現並不難。難就在對於對保留信息的處理。還有就是服務端要對當前的客戶端的訂閱進行保留。那麼筆者這邊作的事情比較簡單。主要是爲了學習查看相關的報文格式。可是筆者仍是要列出來一下。以下
1.判斷是否發生過鏈接。便是鏈接報文的處理。若是沒有的話,斷開鏈接。
if (!this.connected) { ctx.close(); return; }
2.得到報文的消息ID和相關的訂閱主題。判斷主題不爲空。固然你也可自定義主題的驗證合法規則。筆者這裏就很少說了。
int messageId = msg.variableHeader().messageId(); List<MqttTopicSubscription> requestSubscriptions = msg.payload().topicSubscriptions(); for (MqttTopicSubscription subscription : requestSubscriptions) { if (StringUtils.isEmpty(subscription.topicName())) { ctx.close(); return; } }
3.得到相關主題的服務質量要求,用於返回碼和處理保留的消息。並返回SUBACK報文
1 List<Integer> grantedQosLevels = new ArrayList<Integer>(); 2 3 requestSubscriptions.forEach(subscription -> { 4 if (subscription.topicName().startsWith("$")) grantedQosLevels.add(MqttQoS.FAILURE.value()); 5 else grantedQosLevels.add(subscription.qualityOfService().value()); 6 }); 7 8 9 BrokerSessionHelper.sendMessage( 10 ctx, 11 MqttMessageFactory.newMessage( 12 new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0), 13 MqttMessageIdVariableHeader.from(messageId), 14 new MqttSubAckPayload(grantedQosLevels)), 15 this.clientId, 16 messageId, 17 true);
4.處理保留的信息。這裏筆者並沒實現。由於這裏要接合相關的數據庫或是NOSQL。因此這裏筆者沒有去作。因這裏太多的東西的。並且不一樣的人實現和想法也不同子。因此筆者就沒有列出來。
for (int i = 0; i < requestSubscriptions.size(); i++) { MqttQoS grantedQoS = MqttQoS.valueOf(grantedQosLevels.get(i)); String topic = requestSubscriptions.get(i).topicName(); //1。查看之前有沒有訂閱過相同的主題,若是有就替換。 //2。查看有沒有符合的保留信息,有發送 //讀者們自行去實現。是要用redis,仍是要用sqllite自去實現。 }
筆者把相關的抓到的報文格列出來。以下
SUBSCRIBE報文:
筆者已經把SUBSCRIBE報文的各個部分用不一樣的顏色標出耿了。其中的黃色線表示下同主題的長度。就是上面微觀圖片裏面的MSB和LSB。其餘的也沒有什麼。 只是要注意最後一個值也就是服務質量要求(Requested QoS)。筆者這邊是1。因此最後的二進制是00000001。
SUBACK 報文:
咱們能夠看到SUBACK 報文的消息ID和SUBSCRIBE報文的消息是同樣子的。還有就是記得最後的服務質量要求。