MQTT目錄:html
在此強調一下mqtt的使用場景:java
一、不可靠、網絡帶寬小的網絡web
二、運行的設備CPU、內存很是有限服務器
在idea中簡單模擬測試代碼:網絡
第一步:添加mqtt-client的依賴
session
<!--驗證mqtt協議--> <!-- https://mvnrepository.com/artifact/org.eclipse.paho/mqtt-client --> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>mqtt-client</artifactId> <version>0.4.0</version> </dependency>
第二步:具體代碼實現:eclipse
分三部分:tcp
第一:服務端:ide
1 package com.huhy.web.common.mqtt; 2 3 import org.eclipse.paho.client.mqttv3.MqttClient; 4 import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 5 import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; 6 import org.eclipse.paho.client.mqttv3.MqttException; 7 import org.eclipse.paho.client.mqttv3.MqttMessage; 8 import org.eclipse.paho.client.mqttv3.MqttPersistenceException; 9 import org.eclipse.paho.client.mqttv3.MqttTopic; 10 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 11 /** 12 * @Author:huhy 13 * @DATE:Created on 2017/12/1 14:29 14 * @Modified By: 15 * @Class Description: 16 */ 17 public class ServerMQTT { 18 19 //tcp://MQTT安裝的服務器地址:MQTT定義的端口號 20 public static final String HOST = "tcp://localhost:61613"; 21 //定義一個主題 22 public static final String TOPIC = "huhy"; 23 //定義MQTT的ID,能夠在MQTT服務配置中指定 24 private static final String clientid = "server"; 25 26 private MqttClient client; 27 private MqttTopic topic11; 28 private String userName = "admin"; //非必須 29 private String passWord = "password"; //非必須 30 31 private MqttMessage message; 32 33 /** 34 * 構造函數 35 * @throws MqttException 36 */ 37 public ServerMQTT() throws MqttException { 38 // MemoryPersistence設置clientid的保存形式,默認爲之內存保存 39 client = new MqttClient(HOST, clientid, new MemoryPersistence()); 40 connect(); 41 } 42 43 /** 44 * 用來鏈接服務器 45 */ 46 private void connect() { 47 MqttConnectOptions options = new MqttConnectOptions(); 48 options.setCleanSession(false); 49 options.setUserName(userName); 50 options.setPassword(passWord.toCharArray()); 51 // 設置超時時間 52 options.setConnectionTimeout(10); 53 // 設置會話心跳時間 54 options.setKeepAliveInterval(20); 55 try { 56 client.setCallback(new PushCallback()); 57 client.connect(options); 58 59 topic11 = client.getTopic(TOPIC); 60 } catch (Exception e) { 61 e.printStackTrace(); 62 } 63 } 64 65 /** 66 * 67 * @param topic 68 * @param message 69 * @throws MqttPersistenceException 70 * @throws MqttException 71 */ 72 public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException, 73 MqttException { 74 MqttDeliveryToken token = topic.publish(message); 75 token.waitForCompletion(); 76 System.out.println("message is published completely! " 77 + token.isComplete()); 78 } 79 80 /** 81 * 啓動入口 82 * @param args 83 * @throws MqttException 84 */ 85 public static void main(String[] args) throws MqttException, InterruptedException { 86 ServerMQTT server = new ServerMQTT(); 87 server.message = new MqttMessage(); 88 server.message.setQos(1); //保證消息能到達一次 89 server.message.setRetained(true); 90 server.message.setPayload("abcde1".getBytes()); 91 server.publish(server.topic11 , server.message); 92 Thread.sleep(2000); 93 server.message.setPayload("abcde2".getBytes()); 94 server.publish(server.topic11 , server.message); 95 Thread.sleep(2000); 96 server.message.setPayload("abcde3".getBytes()); 97 server.publish(server.topic11 , server.message); 98 System.out.println(server.message.isRetained() + "------ratained狀態"); 99 } 100 }
第二部分:客戶端代碼:函數
1 package com.huhy.web.common.mqtt; 2 3 import org.eclipse.paho.client.mqttv3.MqttClient; 4 import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 5 import org.eclipse.paho.client.mqttv3.MqttException; 6 import org.eclipse.paho.client.mqttv3.MqttTopic; 7 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 8 9 import java.util.concurrent.ScheduledExecutorService; 10 11 /** 12 * @Author:huhy 13 * @DATE:Created on 2017/12/1 14:34 14 * @Modified By: 15 * @Class Description: 16 */ 17 public class ClientMQTT { 18 19 public static final String HOST = "tcp://localhost:61613"; 20 public static final String TOPIC1 = "huhy"; 21 private static final String clientid = "client"; 22 private MqttClient client; 23 private MqttConnectOptions options; 24 private String userName = "admin"; //非必須 25 private String passWord = "password"; //非必須 26 @SuppressWarnings("unused") 27 private ScheduledExecutorService scheduler; 28 29 private void start() { 30 try { 31 // host爲主機名,clientid即鏈接MQTT的客戶端ID,通常以惟一標識符表示,MemoryPersistence設置clientid的保存形式,默認爲之內存保存 32 client = new MqttClient(HOST, clientid, new MemoryPersistence()); 33 // MQTT的鏈接設置 34 options = new MqttConnectOptions(); 35 // 設置是否清空session,這裏若是設置爲false表示服務器會保留客戶端的鏈接記錄,設置爲true表示每次鏈接到服務器都以新的身份鏈接 36 options.setCleanSession(false); 37 // 設置鏈接的用戶名 38 options.setUserName(userName); 39 // 設置鏈接的密碼 40 options.setPassword(passWord.toCharArray()); 41 // 設置超時時間 單位爲秒 42 options.setConnectionTimeout(10); 43 // 設置會話心跳時間 單位爲秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並無重連的機制 44 options.setKeepAliveInterval(20); 45 // 設置回調 46 client.setCallback(new PushCallback()); 47 MqttTopic topic = client.getTopic(TOPIC1); 48 //setWill方法,若是項目中須要知道客戶端是否掉線能夠調用該方法。設置最終端口的通知消息 49 //遺囑 options.setWill(topic, "close".getBytes(), 2, true); 50 client.connect(options); 51 //訂閱消息 52 int[] Qos = {1}; 53 String[] topic1 = {TOPIC1}; 54 client.subscribe(topic1, Qos); 55 56 } catch (Exception e) { 57 e.printStackTrace(); 58 } 59 } 60 61 public static void main(String[] args) throws MqttException { 62 ClientMQTT client = new ClientMQTT(); 63 client.start(); 64 } 65 }
第三部分:關於客戶端和服務端的回調函數(注意,我這說客戶端和服務端是不許確的,準確的來講是叫發佈和訂閱,爲了好理解就這樣了
1 package com.huhy.web.common.mqtt; 2 3 /** 4 * @Author:huhy 5 * @DATE:Created on 2017/12/1 14:33 6 * @Modified By: 7 * @Class Description: 8 */ 9 10 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 11 import org.eclipse.paho.client.mqttv3.MqttCallback; 12 import org.eclipse.paho.client.mqttv3.MqttMessage; 13 14 /** 15 * 發佈消息的回調類 16 * 17 * 必須實現MqttCallback的接口並實現對應的相關接口方法CallBack 類將實現 MqttCallBack。 18 * 每一個客戶機標識都須要一個回調實例。在此示例中,構造函數傳遞客戶機標識以另存爲實例數據。 19 * 在回調中,將它用來標識已經啓動了該回調的哪一個實例。 20 * 必須在回調類中實現三個方法: 21 * 22 * public void messageArrived(MqttTopic topic, MqttMessage message)接收已經預訂的發佈。 23 * 24 * public void connectionLost(Throwable cause)在斷開鏈接時調用。 25 * 26 * public void deliveryComplete(MqttDeliveryToken token)) 27 * 接收到已經發布的 QoS 1 或 QoS 2 消息的傳遞令牌時調用。 28 * 由 MqttClient.connect 激活此回調。 29 * 30 */ 31 public class PushCallback implements MqttCallback { 32 @Override 33 public void connectionLost(Throwable cause) { 34 // 鏈接丟失後,通常在這裏面進行重連 35 System.out.println("鏈接斷開,能夠作重連"); 36 } 37 @Override 38 public void deliveryComplete(IMqttDeliveryToken token) { 39 System.out.println("deliveryComplete---------" + token.isComplete()); 40 } 41 @Override 42 public void messageArrived(String topic, MqttMessage message) throws Exception { 43 // subscribe後獲得的消息會執行到這裏面 44 System.out.println("接收消息主題 : " + topic); 45 System.out.println("接收消息Qos : " + message.getQos()); 46 System.out.println("接收消息內容 : " + new String(message.getPayload())); 47 } 48 }
關於運行分別啓動就能夠了,要先代理服務器啓動起來,關於怎麼啓動看我上節的講解