java mqtt

代碼:java

package cc.gongchang.mqtt;

import java.net.URISyntaxException;

import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.mqtt.client.Future;
import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;

import com.alibaba.fastjson.JSONObject;

/**
 * Hello world!
 *
 */
public class App {
	public static void main(String[] args) {
		MQTT mqtt = new MQTT();
		// MQTT設置說明
		// 設置主機號
		try {
			mqtt.setHost("tcp://sgdzpic.3322.org:1883");
		} catch (URISyntaxException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		// 用於設置客戶端會話的ID。在setCleanSession(false);被調用時,MQTT服務器利用該ID得到相應的會話。此ID應少於23個字符,默認根據本機地址、端口和時間自動生成
		mqtt.setClientId("876543210");
		// 若設爲false,MQTT服務器將持久化客戶端會話的主體訂閱和ACK位置,默認爲true
		mqtt.setCleanSession(false);
		// 定義客戶端傳來消息的最大時間間隔秒數,服務器能夠據此判斷與客戶端的鏈接是否已經斷開,從而避免TCP/IP超時的長時間等待
		mqtt.setKeepAlive((short) 60);
		// 服務器認證用戶名
		mqtt.setUserName("admin");
		// 服務器認證密碼
		mqtt.setPassword("123456");
		// 設置「遺囑」消息的話題,若客戶端與服務器之間的鏈接意外中斷,服務器將發佈客戶端的「遺囑」消息
		mqtt.setWillTopic("willTopic");
		// 設置「遺囑」消息的內容,默認是長度爲零的消息
		mqtt.setWillMessage("willMessage");
		// 設置「遺囑」消息的QoS,默認爲QoS.ATMOSTONCE
		mqtt.setWillQos(QoS.AT_LEAST_ONCE);
		// 若想要在發佈「遺囑」消息時擁有retain選項,則爲true
		mqtt.setWillRetain(true);
		// 設置版本
		mqtt.setVersion("3.1.1");
		// 失敗重鏈接設置說明
		// 客戶端首次鏈接到服務器時,鏈接的最大重試次數,超出該次數客戶端將返回錯誤。-1意爲無重試上限,默認爲-1
		mqtt.setConnectAttemptsMax(10L);
		// 客戶端已經鏈接到服務器,但因某種緣由鏈接斷開時的最大重試次數,超出該次數客戶端將返回錯誤。-1意爲無重試上限,默認爲-1
		mqtt.setReconnectAttemptsMax(3L);
		// 首次重鏈接間隔毫秒數,默認爲10ms
		mqtt.setReconnectDelay(10L);
		// 重鏈接間隔毫秒數,默認爲30000ms
		mqtt.setReconnectDelayMax(30000L);
		// 設置重鏈接指數迴歸。設置爲1則停用指數迴歸,默認爲2
		mqtt.setReconnectBackOffMultiplier(2);

		// Socket設置說明
		// 設置socket接收緩衝區大小,默認爲65536(64k)
		mqtt.setReceiveBufferSize(65536);
		// 設置socket發送緩衝區大小,默認爲65536(64k)
		mqtt.setSendBufferSize(65536);
		// 設置發送數據包頭的流量類型或服務類型字段,默認爲8,意爲吞吐量最大化傳輸
		mqtt.setTrafficClass(8);

		// 帶寬限制設置說明
		// 設置鏈接的最大接收速率,單位爲bytes/s。默認爲0,即無限制
		mqtt.setMaxReadRate(0);
		// 設置鏈接的最大發送速率,單位爲bytes/s。默認爲0,即無限制
		mqtt.setMaxWriteRate(0);

		Boolean contition = true;
		while (contition) {
			// 選擇消息分發隊列
			// 若沒有調用方法setDispatchQueue,客戶端將爲鏈接新建一個隊列。若是想實現多個鏈接使用公用的隊列,顯式地指定隊列是一個很是方便的實現方法
			mqtt.setDispatchQueue(Dispatch.createQueue("foo"));
			FutureConnection connection = mqtt.futureConnection();
			Future<Void> f1 = connection.connect();
			try {
				f1.await();
			} catch (Exception e1) {
				// TODO Auto-generated catch block
				e1.printStackTrace();
			}
			// 訂閱消息
			Future<byte[]> f2 = connection.subscribe(new Topic[] {
					new Topic("person/blacklist/#", QoS.AT_LEAST_ONCE) });
			//
			try {
				byte[] qoses = f2.await();
			} catch (Exception e1) {
				// TODO Auto-generated catch block
				e1.printStackTrace();
			}

			// 發送身份驗證消息.
			// Future<Void> f3 = connection.publish("foo", "Hello".getBytes(),
			// QoS.AT_LEAST_ONCE, false);
			// 接收訂閱消息..
			Future<Message> receive = connection.receive();
			// 打印消息.
			Message message = null;
			try {
				message = receive.await();
			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			String subscribeInfo = String.valueOf(message.getPayloadBuffer());
			if(subscribeInfo.startsWith("ascii")) {
				JSONObject subscribeObject = JSONObject.parseObject(subscribeInfo.substring(7));
				System.out.println(subscribeInfo);
				System.out.println(subscribeObject.get("name"));//姓名,對應name
				System.out.println(subscribeObject.get("certifiedNo"));//身份證,對應identityId
				System.out.println(subscribeObject.get("url"));//圖片地址,對應imageUrl
				//還須要傳送tarLibSerial
				//入庫黑名單
			}
			// 迴應
			message.ack();
			//
			Future<Void> f4 = connection.disconnect();
			try {
				f4.await();
			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}

	}
}
相關文章
相關標籤/搜索