MQTT的學習研究(五) MQTT moquette 的 Blocking API 發佈消息服務端使用

  參看官方文檔:java

http://publib.boulder.ibm.com/infocenter/wmqv7/v7r0/index.jsp?topic=/com.ibm.mq.amqtat.doc/tt00000_.htm服務器

 *  Java 爲 MQ Telemetry Transport 建立異步發佈程序
 *在此任務中,您將遵循教程來修改第一個發佈程序。經過修改,
 *使應用程序可以發送發佈而不等待傳遞確認信息。傳遞確認
 *信息由您建立的回調類來接收。
 *
 *
 *
 *4.使客戶機斷開鏈接
 *  a.除去其中包含 token.waitForCompletion 表達式的語句。 主線程將繼續執行,而不等待傳遞發佈。
 *  b.測試客戶機是否已斷開鏈接。 將錯誤返回到 MqttCallback 中的 lostConnection 方法以後,MQTT 客戶機將斷開鏈接,客戶機應用程序也可能斷開鏈接。測試是否有打開的鏈接。
 *  c.使用常量 Example.quiesceTimeout 來設置使客戶機停頓的最長時間。
 *  if (client.isConnected())
 *      client.disconnect(Example.quiesceTimeout);
 *當知足下面三種狀況的組合形式時,客戶機就完成了: 
 *  a.已經對在此會話中(若是從新啓動了會話,則是在先前會話中)已發佈的全部消息調用了回調。
 *  b.消息未完成,然而停頓時間間隔已到期。缺省狀況下,停頓時間間隔爲 30 秒。經過將要等待的毫秒數做爲 client.disconnect 的一個參數來傳遞,便可更改停頓超時。
 *  c.在發佈了某些消息並由客戶機進行排隊以後,可是在發送這些消息以前調用了 client.disconnect。已排隊的消息還沒有處於「未完成」狀態。若是會話可從新啓動,那麼從新啓動會話時就會從新發送消息。
 *  缺省狀況下,停頓時間間隔爲 30 秒。dom

 

MQTT的消息發佈代碼:異步

 

Java代碼   收藏代碼
  1. package com.etrip.wsmqtt.server;  
  2.   
  3. import com.ibm.micro.client.mqttv3.MqttClient;  
  4. import com.ibm.micro.client.mqttv3.MqttDeliveryToken;  
  5. import com.ibm.micro.client.mqttv3.MqttMessage;  
  6. import com.ibm.micro.client.mqttv3.MqttTopic;  
  7. /** 
  8.  * 使用 Java 爲 MQ Telemetry Transport 建立異步發佈程序 
  9.  *  
  10.  *  
  11.  *  
  12.  * 
  13.  * 消息發佈的類的具體的實現 
  14.  *  
  15.  * @author longgangbai 
  16.  *  
  17.  */  
  18. public class WSMQTTServerPubAsync {  
  19.       public static void main(String[] args) {  
  20.             try {  
  21.                   //建立MqttClient對象  
  22.                   MqttClient client = new MqttClient(WSMQTTServerCommon.TCPAddress, WSMQTTServerCommon.clientId);  
  23.                    
  24.                   //建立MQTT相關的主題  
  25.                   MqttTopic topic = client.getTopic(WSMQTTServerCommon.topicString);  
  26.                     
  27.                   //建立MQTT的消息體  
  28.                   MqttMessage message = new MqttMessage();  
  29.                   //設置消息傳輸的類型  
  30.                   message.setQos(2);  
  31.                     
  32.                   //設置是否在服務器中保存消息體  
  33.                   message.setRetained(false);  
  34.                     
  35.                   //設置消息的內容  
  36.                   message.setPayload(WSMQTTServerCommon.publication.getBytes());  
  37.                     
  38.                   //建立一個MQTT的回調類  
  39.                   WSMQTTServerCallBack callback = new WSMQTTServerCallBack(WSMQTTServerCommon.clientId);  
  40.                     
  41.                   //MqttClient綁定  
  42.                   client.setCallback(callback);  
  43.                     
  44.                   //MqttClient鏈接  
  45.                   client.connect();  
  46.                     
  47.                   System.out.println("Publishing \"" + message.toString()  
  48.                       + "\" on topic \"" + topic.getName() + "\" with QoS = "  
  49.                       + message.getQos());  
  50.                   System.out.println("For client instance \"" + client.getClientId()  
  51.                       + "\" on address " + client.getServerURI() + "\"");  
  52.                     
  53.                   //發送消息並獲取回執  
  54.                   MqttDeliveryToken token = topic.publish(message);  
  55.                     
  56.                   System.out.println("With delivery token \"" + token.hashCode()  
  57.                       + " delivered: " + token.isComplete());  
  58.                   Thread.sleep(100000000000000l);  
  59.                     
  60.                   //關閉鏈接  
  61.                   if (client.isConnected())  
  62.                       client.disconnect(WSMQTTServerCommon.quiesceTimeout);  
  63.                   System.out.println("Disconnected: delivery token \"" + token.hashCode()  
  64.                       + "\" received: " + token.isComplete());  
  65.             } catch (Exception e) {  
  66.               e.printStackTrace();  
  67.             }  
  68.       }  
  69. }  

 

 

 

MQTT消息發佈回調代碼:jsp

Java代碼   收藏代碼
  1. package com.etrip.wsmqtt.server;  
  2.   
  3. import com.ibm.micro.client.mqttv3.*;  
  4. /** 
  5.  * 發佈消息的回調類 
  6.  *  
  7.  * 必須實現MqttCallback的接口並實現對應的相關接口方法 
  8.  *      ◦CallBack 類將實現 MqttCallBack。每一個客戶機標識都須要一個回調實例。在此示例中,構造函數傳遞客戶機標識以另存爲實例數據。在回調中,將它用來標識已經啓動了該回調的哪一個實例。 
  9.  *  ◦必須在回調類中實現三個方法: 
  10.  *  
  11.  *  public void messageArrived(MqttTopic topic, MqttMessage message) 
  12.  *  接收已經預訂的發佈。 
  13.  *  
  14.  *  public void connectionLost(Throwable cause) 
  15.  *  在斷開鏈接時調用。 
  16.  *  
  17.  *  public void deliveryComplete(MqttDeliveryToken token)) 
  18.  *      接收到已經發布的 QoS 1 或 QoS 2 消息的傳遞令牌時調用。 
  19.  *  
  20.  *  
  21.  *  ◦由 MqttClient.connect 激活此回調。 
  22.  *  
  23.  * @author longgangbai 
  24.  */  
  25. public class WSMQTTServerCallBack implements MqttCallback {  
  26.       private String instanceData = "";  
  27.       public WSMQTTServerCallBack(String instance) {  
  28.         instanceData = instance;  
  29.       }  
  30.       /** 
  31.        * 接收到消息的回調的方法 
  32.        */  
  33.       public void messageArrived(MqttTopic topic, MqttMessage message) {  
  34.         try {  
  35.           System.out.println("Message arrived: \"" + message.toString()  
  36.               + "\" on topic \"" + topic.toString() + "\" for instance \""  
  37.               + instanceData + "\"");  
  38.         } catch (Exception e) {  
  39.           e.printStackTrace();  
  40.         }  
  41.       }  
  42.       /** 
  43.        * 消息鏈接丟失 
  44.        */  
  45.       public void connectionLost(Throwable cause) {  
  46.         System.out.println("Connection lost on instance \"" + instanceData  
  47.             + "\" with cause \"" + cause.getMessage() + "\" Reason code "   
  48.             + ((MqttException)cause).getReasonCode() + "\" Cause \""   
  49.             + ((MqttException)cause).getCause() +  "\"");      
  50.         cause.printStackTrace();  
  51.       }  
  52.       /** 
  53.        *  
  54.        */  
  55.       public void deliveryComplete(MqttDeliveryToken token) {  
  56.         try {  
  57.           System.out.println("Delivery token \"" + token.hashCode()  
  58.               + "\" received by instance \"" + instanceData + "\"");  
  59.         } catch (Exception e) {  
  60.           e.printStackTrace();  
  61.         }  
  62.       }  
  63. }  

 

常量類:tcp

Java代碼   收藏代碼
  1. package com.etrip.wsmqtt.server;  
  2.   
  3. import java.util.UUID;  
  4. /** 
  5.  *  
  6.  * 消息發佈消息的常量字段 
  7.  *  
  8.  * @author longgangbai 
  9.  */  
  10. public final class WSMQTTServerCommon {  
  11.   //發佈broker的ip和端口  
  12.   public static final String  TCPAddress =System.getProperty("TCPAddress", "tcp://192.168.208.46:1883");  
  13.   //客戶端的Id  
  14.   public static String clientId =String.format("%-23.23s",  System.getProperty("clientId", (UUID.randomUUID().toString())).trim()).replace('-', '_');  
  15.   //發佈消息的主題  
  16.   public static final String topicString = System.getProperty("topicString", "china/beijing");  
  17.   //發佈的消息  
  18.   public static final String publication =System.getProperty("publication", "Hello World " + String.format("%tc", System.currentTimeMillis()));  
  19.   //超時時間  
  20.   public static final int quiesceTimeout = Integer.parseInt(System.getProperty("timeout", "10000"));  
  21.     
  22.   public static final int  sleepTimeout = Integer.parseInt(System.getProperty("timeout", "10000"));  
  23.     
  24.   public static final boolean cleanSession =Boolean.parseBoolean(System.getProperty("cleanSession", "false"));  
  25.     
  26.   public static final int QoS =Integer.parseInt(System.getProperty("QoS", "1"));  
  27.     
  28.   public static final boolean retained =Boolean.parseBoolean(System.getProperty("retained", "false"));  
  29. }  
相關文章
相關標籤/搜索