代碼:java
package cc.gongchang.mqtt; import java.net.URISyntaxException; import org.fusesource.hawtdispatch.Dispatch; import org.fusesource.mqtt.client.Future; import org.fusesource.mqtt.client.FutureConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.Message; import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; import com.alibaba.fastjson.JSONObject; /** * Hello world! * */ public class App { public static void main(String[] args) { MQTT mqtt = new MQTT(); // MQTT設置說明 // 設置主機號 try { mqtt.setHost("tcp://sgdzpic.3322.org:1883"); } catch (URISyntaxException e) { // TODO Auto-generated catch block e.printStackTrace(); } // 用於設置客戶端會話的ID。在setCleanSession(false);被調用時,MQTT服務器利用該ID得到相應的會話。此ID應少於23個字符,默認根據本機地址、端口和時間自動生成 mqtt.setClientId("876543210"); // 若設爲false,MQTT服務器將持久化客戶端會話的主體訂閱和ACK位置,默認爲true mqtt.setCleanSession(false); // 定義客戶端傳來消息的最大時間間隔秒數,服務器能夠據此判斷與客戶端的鏈接是否已經斷開,從而避免TCP/IP超時的長時間等待 mqtt.setKeepAlive((short) 60); // 服務器認證用戶名 mqtt.setUserName("admin"); // 服務器認證密碼 mqtt.setPassword("123456"); // 設置「遺囑」消息的話題,若客戶端與服務器之間的鏈接意外中斷,服務器將發佈客戶端的「遺囑」消息 mqtt.setWillTopic("willTopic"); // 設置「遺囑」消息的內容,默認是長度爲零的消息 mqtt.setWillMessage("willMessage"); // 設置「遺囑」消息的QoS,默認爲QoS.ATMOSTONCE mqtt.setWillQos(QoS.AT_LEAST_ONCE); // 若想要在發佈「遺囑」消息時擁有retain選項,則爲true mqtt.setWillRetain(true); // 設置版本 mqtt.setVersion("3.1.1"); // 失敗重鏈接設置說明 // 客戶端首次鏈接到服務器時,鏈接的最大重試次數,超出該次數客戶端將返回錯誤。-1意爲無重試上限,默認爲-1 mqtt.setConnectAttemptsMax(10L); // 客戶端已經鏈接到服務器,但因某種緣由鏈接斷開時的最大重試次數,超出該次數客戶端將返回錯誤。-1意爲無重試上限,默認爲-1 mqtt.setReconnectAttemptsMax(3L); // 首次重鏈接間隔毫秒數,默認爲10ms mqtt.setReconnectDelay(10L); // 重鏈接間隔毫秒數,默認爲30000ms mqtt.setReconnectDelayMax(30000L); // 設置重鏈接指數迴歸。設置爲1則停用指數迴歸,默認爲2 mqtt.setReconnectBackOffMultiplier(2); // Socket設置說明 // 設置socket接收緩衝區大小,默認爲65536(64k) mqtt.setReceiveBufferSize(65536); // 設置socket發送緩衝區大小,默認爲65536(64k) mqtt.setSendBufferSize(65536); // 設置發送數據包頭的流量類型或服務類型字段,默認爲8,意爲吞吐量最大化傳輸 mqtt.setTrafficClass(8); // 帶寬限制設置說明 // 設置鏈接的最大接收速率,單位爲bytes/s。默認爲0,即無限制 mqtt.setMaxReadRate(0); // 設置鏈接的最大發送速率,單位爲bytes/s。默認爲0,即無限制 mqtt.setMaxWriteRate(0); Boolean contition = true; while (contition) { // 選擇消息分發隊列 // 若沒有調用方法setDispatchQueue,客戶端將爲鏈接新建一個隊列。若是想實現多個鏈接使用公用的隊列,顯式地指定隊列是一個很是方便的實現方法 mqtt.setDispatchQueue(Dispatch.createQueue("foo")); FutureConnection connection = mqtt.futureConnection(); Future<Void> f1 = connection.connect(); try { f1.await(); } catch (Exception e1) { // TODO Auto-generated catch block e1.printStackTrace(); } // 訂閱消息 Future<byte[]> f2 = connection.subscribe(new Topic[] { new Topic("person/blacklist/#", QoS.AT_LEAST_ONCE) }); // try { byte[] qoses = f2.await(); } catch (Exception e1) { // TODO Auto-generated catch block e1.printStackTrace(); } // 發送身份驗證消息. // Future<Void> f3 = connection.publish("foo", "Hello".getBytes(), // QoS.AT_LEAST_ONCE, false); // 接收訂閱消息.. Future<Message> receive = connection.receive(); // 打印消息. Message message = null; try { message = receive.await(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } String subscribeInfo = String.valueOf(message.getPayloadBuffer()); if(subscribeInfo.startsWith("ascii")) { JSONObject subscribeObject = JSONObject.parseObject(subscribeInfo.substring(7)); System.out.println(subscribeInfo); System.out.println(subscribeObject.get("name"));//姓名,對應name System.out.println(subscribeObject.get("certifiedNo"));//身份證,對應identityId System.out.println(subscribeObject.get("url"));//圖片地址,對應imageUrl //還須要傳送tarLibSerial //入庫黑名單 } // 迴應 message.ack(); // Future<Void> f4 = connection.disconnect(); try { f4.await(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }