MQTT 消息 發佈 訂閱


 

當鏈接向一個mqtt服務器時,clientId必須是惟一的。設置同樣,致使client.setCallback老是走到 connectionLost回調。報connection reset。調查一天才發現是clientid重複致使。php

client = new MqttAsyncClient(serverURIString, "client-id");html

 


 

clientId是用來保存會話信息。java

MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);node

當服務器將message發佈向全部訂閱過的客戶端,就會清除這個message。若是當前客戶端不在線,等它鏈接時發送。git

 

session與client之間的關係是怎樣的?

這樣的,好比你一個板子,做爲客戶端,發起mqtt的鏈接請求connect到mqtt服務器,好比說就是emqtt服務吧,emqtt服務端收到這個板子的鏈接請求以後,在tcp層上會和板子創建一個tcp的鏈接,在emqtt內部,會產生一個進程,和這個板子作數據通信,同時還會產生一個進程,叫session,這個sessoin是專門管理這個板子訂閱的主題,其它板子若是發佈了這個板子感興趣的主題的時候,也會發到這個板子對應的這個session裏面,若是這個session收到訂閱的主題以後,發現對用的client還活着,就經過這個client把數據通過tcp發到這個板子上,若是發現client已經沒有了,就是說板子和服務端斷掉了,那麼session就會把收到的訂閱的主題,先保存在session裏面,下次板子鏈接上了,並且cleansession=false,那麼這個session就不會清除,在此次鏈接時,就會把之前收到的訂閱消息,發給板子,大概就是這個意思。github


 

參考:web

http://www.blogjava.net/yongboy/archive/2014/02/15/409893.htmlapache

http://www.cnblogs.com/znlgis/p/4930990.htmlapi

http://blog.csdn.net/ljf10010/article/details/51424506安全

paho客戶端示例

 

https://github.com/eclipse/paho.mqtt.java/tree/master/org.eclipse.paho.client.mqttv3.test/src/test/java/org/eclipse/paho/client/mqttv3/test

http://www.eclipse.org/paho/files/javadoc/index.html api文檔

ibm客戶端paho示例:

http://www.programcreek.com/java-api-examples/index.php?source_dir=streamsx.messaging-master/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttAsyncClientWrapper.java

hivemq的mqtt詳解:http://www.hivemq.com/blog/mqtt-essentials-part-8-retained-messages

 

 http://blog.csdn.net/wuyinxian/article/details/38826259

MQTT Protocol Manual(Apollo中MQTT協議解析)

 
 
 

 
 MQTT協議
Apollo容許客戶端經過開放的MQTT協議鏈接。該協議主要是用在資源有限的驅動上,以及網絡不穩定的狀況下使用,是一個訂閱、發佈模型。這種驅動一般不適用相似http,stomp這類基於文本,或者相似openfire,AMQP等傳統二進制協議。MQTT是一個簡介的二進制協議,適用這類驅動資源受限,並且是不穩定的網絡條件下。
以前的穩定發佈版本中,MQTT是做爲一個Apollo的一個插件提供的。可是如今,這個插件已經變爲開發項目的一部分。MQTT在Apollo中已經不須要其餘配置文件或者是第三方插件支持了。
MQTT是一個線路層的協議,任何實現該協議的客戶端均可以鏈接到Apollo。固然也能夠整合其餘MQTT兼容的消息代理中。
更多有關MQTT協議內容,參考 the MQTT Specification
 MQTT協議配置
爲了開始使用MQTT協議,首先使用MQTT3.1協議的客戶端,鏈接到Apollo正在監聽端口。Apollo會作協議檢測,並且自動識別MQTT鏈接,並且將鏈接做爲MQTT協議處理。你沒必要要爲MQTT協議打開一個端口(STomp,Openfire,AMQP等都是自動識別)。若是你必定指定鏈接的協議,有下面兩種方式:你能夠選擇不用協議識別,而是爲MQTT指定鏈接:
<connector id="tcp" bind="tcp://0.0.0.0:61613" protocol="mqtt"/>
或者你能夠限制哪一種協議能夠被自動識別。經過下面的<detece>配置方式:
<connector id="tcp" bind="tcp://0.0.0.0:61613">
  <detect protocols="mqtt openwire" />
</connector>
<detect> 下protocols 對應的參數經過空格來隔開支持的通訊協議。若是隻支持一種協議,就不要空格,默認狀況下對任何協議生效。
若是你想調整MQTT默認設置,在apollo.xml文件中有一個<connector> 元素,經過MQTT參數配置:
<connector id="tcp" bind="tcp://0.0.0.0:61613">
  <mqtt max_message_length="1000" />
</connector>
MQTT元素支持下面幾個參數:
  • max_message_length : The size (in bytes) of the largest message that can be sent to the broker. Defaults to 100MB(broker能接受的最大消息量:默認是100M)
  • protocol_filters : A filter which can filter frames being sent/received to and from a client. It can modify the frame or even drop it.(一個控制發送和接收,Client的過濾器框架。能夠修改,刪除這個框架)
  • die_delay : How long after a connection is deemed to be 「dead」 before the connection actually closes; default: 5000ms(在實際斷開鏈接以前,會有默認5000ms的時間被認爲鏈接已經dead)
mqtt 配置元素也能夠用來控制目的消息頭的解析。下面是支持的參數:
  • queue_prefix : a tag used to identify destination types; default: null(用來確認目的地類型)
  • path_separator : used to separate segments in a destination name; default: /(用來分割目的地名稱)
  • any_child_wildcard : indicate all child-level destinations that match the wildcard; default: +(識別子目錄)
  • any_descendant_wildcard : indicate destinations that match the wildcard recursively; default:#(目標地址通配符)
  • regex_wildcard_start : pattern used to identify the start of a regex(表示正則表達開始)
  • regex_wildcard_end : pattern used to identify the end of a regex(表示正則表達結束)
  • part_pattern : allows you to specify a regex that constrains the naming of topics. (你能夠指定正則表達規則)default: [ a-zA-Z0-9\_\-\%\~\:\(\)]+
 Client 可用函數庫
Apollo 支持MQTT3.1 協議,下面是可用的Clients:
若是要找到新支持的Clients ,能夠檢索: the MQTT website for its software
在目錄example 目錄下,你能夠找到一些例子,實現了與broker之間收發。
 connecting
爲了確保broker配置文件的安全,因此只容許一個admin 用戶鏈接,默認的用戶名和密碼是:admin ,password.
Mqtt 客戶端不能specify 虛擬主機(更多請看:see  the section on Virtual Hosts in the user guide),以致於默認狀況下虛擬主機已經被使用了。一般第一虛擬主機定義在apollo.xml文件中。
 Destination 類型
MQTT協議是訂閱,發佈協議,是不容許真正的利用隊列點對點的消息收發。所以Apollo僅容許利用主題,還進行MQTT消息發送。訂閱的概念和持久的主題訂閱 和其餘協議提到的有些相似,同時也被MQTT CONNECT 框架的clean session屬性控制。
 Clean Sessions
但一個Client 發送一個鏈接,這個鏈接中clean session 被設置爲false,那麼以前鏈接中有相同Client_id 的session 將會被重複使用。這就意味着Client斷開了,訂閱依然能收到消息。這就等同與同Apollo創建一個長訂閱。
若是 clean session 設置爲true ,那麼新session就開始了,其餘的session會慢慢消失,刪除。這就是Apollo中定義的普通的主題訂閱。
 Topic Retained Messages
若是消息被髮布的同時retain 標記被設置,消息將被主題記住,以致於新的訂閱到達,最近的retain 消息會被髮送到訂閱者。好比說:你想發佈一個參數,並且你想讓最新的這個參數發佈到老是可用的訂閱了這個主題的客戶端上,你就設置在PUBLISH 框架上設置retain 標籤。
注意:retained 消息 不會被設置成retained 在 QoS設置爲零的broker 重啓過程當中。

Last Will and Testament Message

當Client第一次鏈接的時候,有一個will 消息和一個更QoS相關的消息會跟你有關。will消息是一個基礎消息,這個基礎消息只有在鏈接異常或者是掉線的時候纔會被髮送。通常用在你有一個設備,當他們掉了的時候,你須要知道。因此若是一個醫療Client從broker掉線,will消息將會做爲一個鬧鐘主題發送,並且會被系統做爲高優先級提醒。

Reliable Messaging

MQTT協議容許Client 發佈消息的時候指定Qos參數:
  • At Most Once (QoS=0)
  • At Least Once (QoS=1)
  • Exactly Once (QoS=2)
 最多一次
這個設置時推送消息給Client,可靠性最低的一種。若是設置Qos=0,那broker就不會返回結果碼,告訴你他收到消息了,也不會在失敗後嘗試重發。這有點像不可靠消息,如JMS。
 至少一次
該設置會確保消息會被至少一次推送到Client。若是推送設置爲至少推送一次,Apollo會返回一個回調函數,確保代理已經收到消息,並且確保會確保推送該消息。若是Client 將發佈了一個Qos=1的消息,若是在指定的時間內沒有收到回覆,Client會但願從新發布這個消息。因此可能存在這種狀況:代理收到一個須要推送的消息,而後又收到一個消息推送到同一個Client。因此若是傳輸過程當中PUBACK丟失,Client會從新發送,並且不會去檢測是不是重發,broker就將消息發送到訂閱主題中。
 剛好一次
該設置是可靠等級最高的。他會確保發佈者不只僅會推送,並且不會像Qos=1 那樣,會被接收兩次。固然這個設置會增長網絡的負載。當一個消息被髮布出去的時候,broker會保存該消息的id,並且會利用任何長鏈接,堅持要把該消息推送給目標地址。若是Client收到PUBREC 標誌,那就代表broker已經收到消息了。 這個時候broker會期待Client發送一個PUBREL 來清除session 中消息id,broker若是發送成功就會發送一個PUBCOMP通知Client。

Wildcard Subscriptions

通配用在主題的目標地址中。這能實現一個主題發送到多個用戶,或者多層用戶中。
  • / is used to separate names in a path(分割路徑)
  • + is used to match any name in a path(通配地址任何字符)
  • # is used to recursively match path names(遞歸通配)
好比通配可能這樣來用:
  • PRICE/# : Any price for any product on any exchange(任何交易中任何產品的價格)
  • PRICE/STOCK/# : Any price for a stock on any exchange(任何交易中的股票價格)
  • PRICE/STOCK/NASDAQ/+ : Any stock price on NASDAQ(納斯達克的任何股票價格)
  • PRICE/STOCK/+/IBM : Any IBM stock price on any exchange(任何交易中IBM股票價格)

Keep Alive

Apollo只有在Client指定了CONNECT的KeepAlive 值的時候,纔會設置保持鏈接、心跳檢測。若是one Client指定了keepalive,apollo 將會使用1.5*keepalive值。這個在MQTT中有說明。

Destination Name Restrictions

路徑名稱限制了使用(a-z, A-Z, 0-9, _, - %, ~, :, ' ', '(', ')' ,. )字符,通配符(*)在複雜的分隔符中。並且確保使用utf-8來編譯你的URL。
  1 package com.xxx.mqtt;
  2 
  3 import java.net.URI;
  4 
  5 import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;
  6 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  7 import org.eclipse.paho.client.mqttv3.IMqttToken;
  8 import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
  9 import org.eclipse.paho.client.mqttv3.MqttCallback;
 10 import org.eclipse.paho.client.mqttv3.MqttClient;
 11 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 12 import org.eclipse.paho.client.mqttv3.MqttException;
 13 import org.eclipse.paho.client.mqttv3.MqttMessage;
 14 import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
 15 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 16 
 17 
 18 public class MyMqttClient implements MqttCallback {
 19 
 20     private static final MemoryPersistence DATA_STORE = new MemoryPersistence();
 21     private static final String topic = "mytopic";
 22 
 23     private String HOST = "127.0.0.1";
 24     private int PORT = 1883;
 25     private String USERNAME = "user";
 26     private String PASSWORD = "password";
 27     private String serverURIString = "tcp://" + HOST + ":" + PORT;
 28     
 29     String clientId = "client-1";
 30 
 31     MqttAsyncClient client;
 32     // Tokens
 33     IMqttToken connectToken;
 34     IMqttDeliveryToken pubToken;
 35 
 36 
 37     public static void main(String[] args) {
 38         MyMqttClient app = new MyMqttClient();
 39         app.asyncClient();
 40         try {
 41             Thread.sleep(20000);
 42             app.disconnect();
 43         } catch (Exception e) {
 44             e.printStackTrace();
 45         }
 46         System.out.println("end");
 47     }
 48 
 49     public void blockingClient() {
 50         
 51         try {
 52             MqttClient sampleClient = new MqttClient(serverURIString, clientId);
 53             MqttConnectOptions connOpts = new MqttConnectOptions();
 54             connOpts.setCleanSession(true);
 55             connOpts.setUserName(USERNAME);
 56             connOpts.setPassword(PASSWORD.toCharArray());
 57             System.out.println("Connecting to broker: " + serverURIString);
 58             sampleClient.connect(connOpts);
 59             sampleClient.subscribe("#", 1);
 60             System.out.println("Connected");
 61 //                System.out.println("Publish message: " + content);
 62 //                MqttMessage message = new MqttMessage(content.getBytes());
 63 //                message.setQos(qos);
 64             sampleClient.setCallback(this);
 65 //                sampleClient.publish(topic, message);
 66 //                System.out.println("Message published");
 67             try {
 68                 Thread.sleep(10000000);
 69                 System.out.println("Disconnected");
 70                 sampleClient.disconnect();
 71             } catch (Exception e) {
 72                 e.printStackTrace();
 73             }
 74 
 75         } catch (MqttException me) {
 76             System.out.println("reason " + me.getReasonCode());
 77             System.out.println("msg " + me.getMessage());
 78             System.out.println("loc " + me.getLocalizedMessage());
 79             System.out.println("cause " + me.getCause());
 80             System.out.println("except " + me);
 81             me.printStackTrace();
 82         }
 83     }
 84 
 85     public void asyncClient() {
 86         info(" MQTT init start.");
 87 
 88         // Tokens
 89         IMqttToken connectToken;
 90         IMqttDeliveryToken pubToken;
 91 
 92         // Client Options
 93         MqttConnectOptions options = new MqttConnectOptions();
 94         options.setCleanSession(false);
 95         options.setAutomaticReconnect(true);
 96 
 97         options.setUserName(USERNAME);
 98         options.setPassword(PASSWORD.toCharArray());
 99 
100         try {
101             client = new MqttAsyncClient(serverURIString, clientId);
102 
103             DisconnectedBufferOptions disconnectedOpts = new DisconnectedBufferOptions();
104             disconnectedOpts.setBufferEnabled(true);
105             client.setBufferOpts(disconnectedOpts);
106 
107             connectToken = client.connect(options);
108             connectToken.waitForCompletion();//異步變成了同步。能夠用IMqttCallbackListen..在connect時候設置回調。
109             boolean isConnected = client.isConnected();
110             info("Connection isConnected: " + isConnected);
111 
112             if (connectToken.isComplete() && connectToken.getException() == null && client.isConnected()) {
113                 info("[Connect:] Success: "); //$NON-NLS-1$ //$NON-NLS-2$ 
114                 client.setCallback(this);
115 
116             } else {
117                 info("[Connect:] faild: "); //$NON-NLS-1$ //$NON-NLS-2$ 
118             }
119 
120 //             MqttTopic topic = client.getTopic(topic); 
121 //             topic.
122             //setWill方法,若是項目中須要知道客戶端是否掉線能夠調用該方法。設置最終端口的通知消息    
123 //                options.setWill(topic, "close".getBytes(), 2, true);  
124 
125             IMqttToken subToken = client.subscribe("#", 1);
126 
127             subToken.waitForCompletion(1000);
128 
129             if (subToken.isComplete()) {
130                 info("subToken  complete.");
131                 if (subToken.getException() != null) {
132                     info("Topics Subscription failed." + subToken.getException()); //$NON-NLS-1$ 
133                 }
134             } else {
135                 info("subToken not complete.");
136                 if (subToken.getException() != null) {
137                     info("Topics Subscription failed." + subToken.getException()); //$NON-NLS-1$ 
138                 }
139             }
140 
141             info("init end");
142 
143         } catch (MqttException e) {
144             // TODO Auto-generated catch block
145             e.printStackTrace();
146         }
147 
148     }
149 
150 //String clientId, String topic, String message
151     public void send() {
152         String topic;
153         String message;
154         info("===Send Message start.===");
155         message = "Hello, boy.";
156         
157 
158         boolean isConnected = client.isConnected();
159         if (!isConnected) {
160             //no need. it will auto reconnect and send.
161         }
162 
163         // Publish Message
164         try {
165             pubToken = client.publish(topic, new MqttMessage(message.getBytes()));
166 
167             info("Publish attempted: isComplete:" + pubToken.isComplete());
168 
169             pubToken.waitForCompletion();
170         } catch (MqttPersistenceException e) {
171             // TODO Auto-generated catch block
172             e.printStackTrace();
173         } catch (MqttException e) {
174             // TODO Auto-generated catch block
175             e.printStackTrace();
176         }
177 
178         // Check that Message has been delivered
179         info("Message Delivered: " + pubToken.isComplete());
180         info("=== send end.====");
181     }
182 
183     void disconnect() {
184         IMqttToken disconnectToken;
185         try {
186             disconnectToken = client.disconnect();
187             disconnectToken.waitForCompletion();
188             client.close();
189         } catch (MqttException e) {
190             // TODO Auto-generated catch block
191             e.printStackTrace();
192         }
193         client = null;
194     }
195 
196     void info(String s) {
197         System.out.println(s);
198     }
199 
200     public void connectionLost(Throwable thrwbl) {
201         // TODO Auto-generated method stub
202         info("connectionLost");
203 
204         info("MQTT is disconnected from topic: {}. Message: {}. Cause: {}" + topic + thrwbl.getMessage() + thrwbl.getCause().getMessage());
205         thrwbl.printStackTrace();
206 
207     }
208 
209     public void deliveryComplete(IMqttDeliveryToken arg0) {
210         // TODO Auto-generated method stub
211         info("deliveryComplete");
212 
213     }
214 
215     public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
216         // TODO Auto-generated method stub
217         String message = new String(arg1.getPayload());
218         String topic = arg0;
219 
220         info("xxx Receive : topic=" + topic + "; message=" + message);
221 
222     }
223 }
View Code
 1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 2     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 3     <modelVersion>4.0.0</modelVersion>
 4 
 5     <groupId>com.italktv.mqtt.client</groupId>
 6     <artifactId>mqttclient</artifactId>
 7     <version>0.0.1-SNAPSHOT</version>
 8     <packaging>jar</packaging>
 9 
10     <name>mqttclient</name>
11     <url>http://maven.apache.org</url>
12 
13     <properties>
14         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
15     </properties>
16 
17     <dependencies>
18         <dependency>
19             <groupId>junit</groupId>
20             <artifactId>junit</artifactId>
21             <version>3.8.1</version>
22             <scope>test</scope>
23         </dependency>
24 
25 
26         <dependency>
27             <groupId>org.eclipse.paho</groupId>
28             <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
29             <version>1.1.0</version>
30         </dependency>
31 
32     </dependencies>
33 </project>
View Code

上面是maven管理項目的pom.xml

相關文章
相關標籤/搜索