服務端代碼:java
1 package bsit.mqtt.demo.one_way; 2 3 import org.eclipse.paho.client.mqttv3.MqttClient; 4 import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 5 import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; 6 import org.eclipse.paho.client.mqttv3.MqttException; 7 import org.eclipse.paho.client.mqttv3.MqttMessage; 8 import org.eclipse.paho.client.mqttv3.MqttPersistenceException; 9 import org.eclipse.paho.client.mqttv3.MqttTopic; 10 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 11 /** 12 * 13 * Title:Server 14 * Description: 服務器向多個客戶端推送主題,即不一樣客戶端可向服務器訂閱相同主題 15 * @author chenrl 16 * 2016年1月6日下午3:29:28 17 */ 18 public class Server { 19 20 public static final String HOST = "tcp://192.168.1.3:61613"; 21 public static final String TOPIC = "toclient/124"; 22 public static final String TOPIC125 = "toclient/125"; 23 private static final String clientid = "server"; 24 25 private MqttClient client; 26 private MqttTopic topic; 27 private MqttTopic topic125; 28 private String userName = "admin"; 29 private String passWord = "password"; 30 31 private MqttMessage message; 32 33 public Server() throws MqttException { 34 // MemoryPersistence設置clientid的保存形式,默認爲之內存保存 35 client = new MqttClient(HOST, clientid, new MemoryPersistence()); 36 connect(); 37 } 38 39 private void connect() { 40 MqttConnectOptions options = new MqttConnectOptions(); 41 options.setCleanSession(false); 42 options.setUserName(userName); 43 options.setPassword(passWord.toCharArray()); 44 // 設置超時時間 45 options.setConnectionTimeout(10); 46 // 設置會話心跳時間 47 options.setKeepAliveInterval(20); 48 try { 49 client.setCallback(new PushCallback()); 50 client.connect(options); 51 topic = client.getTopic(TOPIC); 52 topic125 = client.getTopic(TOPIC125); 53 } catch (Exception e) { 54 e.printStackTrace(); 55 } 56 } 57 58 public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException, 59 MqttException { 60 MqttDeliveryToken token = topic.publish(message); 61 token.waitForCompletion(); 62 System.out.println("message is published completely! " 63 + token.isComplete()); 64 } 65 66 public static void main(String[] args) throws MqttException { 67 Server server = new Server(); 68 69 server.message = new MqttMessage(); 70 server.message.setQos(2); 71 server.message.setRetained(true); 72 server.message.setPayload("給客戶端124推送的信息".getBytes()); 73 server.publish(server.topic , server.message); 74 75 server.message = new MqttMessage(); 76 server.message.setQos(2); 77 server.message.setRetained(true); 78 server.message.setPayload("給客戶端125推送的信息".getBytes()); 79 server.publish(server.topic125 , server.message); 80 81 System.out.println(server.message.isRetained() + "------ratained狀態"); 82 } 83 }
客戶端代碼:服務器
1 package bsit.mqtt.demo.one_way; 2 3 import java.util.concurrent.Executors; 4 import java.util.concurrent.ScheduledExecutorService; 5 import java.util.concurrent.TimeUnit; 6 7 import org.eclipse.paho.client.mqttv3.MqttClient; 8 import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 9 import org.eclipse.paho.client.mqttv3.MqttException; 10 import org.eclipse.paho.client.mqttv3.MqttSecurityException; 11 import org.eclipse.paho.client.mqttv3.MqttTopic; 12 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 13 14 public class Client { 15 16 public static final String HOST = "tcp://192.168.1.3:61613"; 17 public static final String TOPIC = "toclient/124"; 18 private static final String clientid = "client124"; 19 private MqttClient client; 20 private MqttConnectOptions options; 21 private String userName = "admin"; 22 private String passWord = "password"; 23 24 private ScheduledExecutorService scheduler; 25 26 private void start() { 27 try { 28 // host爲主機名,clientid即鏈接MQTT的客戶端ID,通常以惟一標識符表示,MemoryPersistence設置clientid的保存形式,默認爲之內存保存 29 client = new MqttClient(HOST, clientid, new MemoryPersistence()); 30 // MQTT的鏈接設置 31 options = new MqttConnectOptions(); 32 // 設置是否清空session,這裏若是設置爲false表示服務器會保留客戶端的鏈接記錄,這裏設置爲true表示每次鏈接到服務器都以新的身份鏈接 33 options.setCleanSession(true); 34 // 設置鏈接的用戶名 35 options.setUserName(userName); 36 // 設置鏈接的密碼 37 options.setPassword(passWord.toCharArray()); 38 // 設置超時時間 單位爲秒 39 options.setConnectionTimeout(10); 40 // 設置會話心跳時間 單位爲秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並無重連的機制 41 options.setKeepAliveInterval(20); 42 // 設置回調 43 client.setCallback(new PushCallback()); 44 MqttTopic topic = client.getTopic(TOPIC); 45 //setWill方法,若是項目中須要知道客戶端是否掉線能夠調用該方法。設置最終端口的通知消息 46 options.setWill(topic, "close".getBytes(), 2, true); 47 48 client.connect(options); 49 //訂閱消息 50 int[] Qos = {1}; 51 String[] topic1 = {TOPIC}; 52 client.subscribe(topic1, Qos); 53 54 } catch (Exception e) { 55 e.printStackTrace(); 56 } 57 } 58 59 public static void main(String[] args) throws MqttException { 60 Client client = new Client(); 61 client.start(); 62 } 63 }
MQTT訂閱回調類:session
1 package bsit.mqtt.demo.one_way; 2 3 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 4 import org.eclipse.paho.client.mqttv3.MqttCallback; 5 import org.eclipse.paho.client.mqttv3.MqttMessage; 6 7 /** 8 * 發佈消息的回調類 9 * 10 * 必須實現MqttCallback的接口並實現對應的相關接口方法CallBack 類將實現 MqttCallBack。 11 * 每一個客戶機標識都須要一個回調實例。在此示例中,構造函數傳遞客戶機標識以另存爲實例數據。 12 * 在回調中,將它用來標識已經啓動了該回調的哪一個實例。 13 * 必須在回調類中實現三個方法: 14 * 15 * public void messageArrived(MqttTopic topic, MqttMessage message)接收已經預訂的發佈。 16 * 17 * public void connectionLost(Throwable cause)在斷開鏈接時調用。 18 * 19 * public void deliveryComplete(MqttDeliveryToken token)) 20 * 接收到已經發布的 QoS 1 或 QoS 2 消息的傳遞令牌時調用。 21 * 由 MqttClient.connect 激活此回調。 22 * 23 */ 24 public class PushCallback implements MqttCallback { 25 26 public void connectionLost(Throwable cause) { 27 // 鏈接丟失後,通常在這裏面進行重連 28 System.out.println("鏈接斷開,能夠作重連"); 29 } 30 31 public void deliveryComplete(IMqttDeliveryToken token) { 32 System.out.println("deliveryComplete---------" + token.isComplete()); 33 } 34 35 public void messageArrived(String topic, MqttMessage message) throws Exception { 36 // subscribe後獲得的消息會執行到這裏面 37 System.out.println("接收消息主題 : " + topic); 38 System.out.println("接收消息Qos : " + message.getQos()); 39 System.out.println("接收消息內容 : " + new String(message.getPayload())); 40 } 41 }
運行服務端代碼,可看到服務器會給客戶端124/125各推送一條消息,eclipse
在運行124客戶端代碼,可看到124客戶端接收的信息:tcp
而後把客戶端代碼的Topic改成TOPIC = "toclient/125";clientid = "client125";再運行該段代碼,可看到125客戶端接收的信息:函數
多個客戶端訂閱同一主題,其clientid必不相同。客戶端124/125訂閱各自主題的內容,可是不一樣時間啓動,都在啓動後接收到各自信息,這體現出了服務器的推送功能。一樣的,發送的主題信息,能夠在服務器的topic能夠看到,訪問路徑是:http://127.0.0.1:61680/學習
其實,如若服務端和客戶端相互通訊,即客戶端能夠訂閱能夠發佈,服務端能夠訂閱也能夠發佈,則可不區分服務端客戶端,兩邊代碼幾乎同樣。相似,兩個客戶端都在訂閱同一主題,這時由第三個客戶端發佈這一主題的請求,前兩個客戶端一樣能夠接受該主題的內容,這時三個客戶端的代碼幾乎同樣,只是前兩個是訂閱,後一個是發佈。spa
以上是本身純碎學習所得,可能有不少bug,望後來人多多批評指正。code