MQTT的學習研究(十二) MQTT moquette 的 Future API 消息發佈訂閱的實現

 MQTT moquette 的Server發佈主題java

Java代碼   收藏代碼
  1. package com.etrip.mqtt.future;  
  2.   
  3. import java.net.URISyntaxException;  
  4.   
  5. import org.fusesource.mqtt.client.FutureConnection;  
  6. import org.fusesource.mqtt.client.MQTT;  
  7. import org.fusesource.mqtt.client.QoS;  
  8. import org.fusesource.mqtt.client.Topic;  
  9. import org.slf4j.Logger;  
  10. import org.slf4j.LoggerFactory;  
  11.   
  12. /** 
  13.  *  
  14.  *  
  15.  *  
  16.  * 採用Future式 發佈主題  
  17.  *  
  18.  * @author longgangbai 
  19.  */  
  20. public class MQTTFutureServer {  
  21.         private static final Logger LOG = LoggerFactory.getLogger(MQTTFutureServer.class);  
  22.         private final static String CONNECTION_STRING = "tcp://192.168.208.46:1883";  
  23.         private final static boolean CLEAN_START = true;  
  24.         private final static short KEEP_ALIVE = 30;// 低耗網絡,可是又須要及時獲取數據,心跳30s  
  25.         public  static Topic[] topics = {  
  26.                         new Topic("china/beijing", QoS.EXACTLY_ONCE),  
  27.                         new Topic("china/tianjin", QoS.AT_LEAST_ONCE),  
  28.                         new Topic("china/henan", QoS.AT_MOST_ONCE)};  
  29.         public final  static long RECONNECTION_ATTEMPT_MAX=6;  
  30.         public final  static long RECONNECTION_DELAY=2000;  
  31.           
  32.         public final static int SEND_BUFFER_SIZE=2*1024*1024;//發送最大緩衝爲2M  
  33.         public static void main(String[] args)   {  
  34.             MQTT mqtt = new MQTT();  
  35.             try {  
  36.                 //設置服務端的ip  
  37.                 mqtt.setHost(CONNECTION_STRING);  
  38.                 //鏈接前清空會話信息  
  39.                 mqtt.setCleanSession(CLEAN_START);  
  40.                 //設置從新鏈接的次數  
  41.                 mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);  
  42.                 //設置重連的間隔時間  
  43.                 mqtt.setReconnectDelay(RECONNECTION_DELAY);  
  44.                 //設置心跳時間  
  45.                 mqtt.setKeepAlive(KEEP_ALIVE);  
  46.                 //設置緩衝的大小  
  47.                 mqtt.setSendBufferSize(SEND_BUFFER_SIZE);  
  48.       
  49.                 //建立鏈接   
  50.                 final FutureConnection connection= mqtt.futureConnection();  
  51.                 connection.connect();  
  52.                 int count=1;  
  53.                 while(true){  
  54.                     count++;  
  55.                     // 用於發佈消息,目前手機段不須要向服務端發送消息  
  56.                     //主題的內容  
  57.                     String message="hello "+count+"chinese people !";  
  58.                     String topic = "china/beijing";  
  59.                     connection.publish(topic, message.getBytes(), QoS.AT_LEAST_ONCE,  
  60.                             false);  
  61.                     System.out.println("MQTTFutureServer.publish Message "+"Topic Title :"+topic+" context :"+message);  
  62.                       
  63.                 }  
  64.             } catch (URISyntaxException e) {  
  65.                 // TODO Auto-generated catch block  
  66.                 e.printStackTrace();  
  67.             } catch (Exception e) {  
  68.                 // TODO Auto-generated catch block  
  69.                 e.printStackTrace();  
  70.             }  
  71.         }  
  72. }  

 

 

 MQTT moquette 的Client接收主題網絡

 

Java代碼   收藏代碼
  1. package com.etrip.mqtt.future;  
  2.   
  3. import java.net.URISyntaxException;  
  4.   
  5. import org.fusesource.mqtt.client.Future;  
  6. import org.fusesource.mqtt.client.FutureConnection;  
  7. import org.fusesource.mqtt.client.MQTT;  
  8. import org.fusesource.mqtt.client.Message;  
  9. import org.fusesource.mqtt.client.QoS;  
  10. import org.fusesource.mqtt.client.Topic;  
  11. import org.slf4j.Logger;  
  12. import org.slf4j.LoggerFactory;  
  13. /** 
  14.  *  
  15.  * MQTT moquette 的Client 段用於訂閱主題,並接收主題信息 
  16.  *  
  17.  * 採用Future 式 訂閱主題  
  18.  *  
  19.  * @author longgangbai 
  20.  */  
  21. public class MQTTFutureClient {  
  22.         private static final Logger LOG = LoggerFactory.getLogger(MQTTFutureClient.class);  
  23.         private final static String CONNECTION_STRING = "tcp://192.168.208.46:1883";  
  24.         private final static boolean CLEAN_START = true;  
  25.         private final static short KEEP_ALIVE = 30;// 低耗網絡,可是又須要及時獲取數據,心跳30s  
  26.         private final static String CLIENT_ID = "publishService";  
  27.         public  static Topic[] topics = {  
  28.                         new Topic("china/beijing", QoS.EXACTLY_ONCE),  
  29.                         new Topic("china/tianjin", QoS.AT_LEAST_ONCE),  
  30.                         new Topic("china/henan", QoS.AT_MOST_ONCE)};  
  31.         public final  static long RECONNECTION_ATTEMPT_MAX=6;  
  32.         public final  static long RECONNECTION_DELAY=2000;  
  33.           
  34.         public final static int SEND_BUFFER_SIZE=2*1024*1024;//發送最大緩衝爲2M  
  35.           
  36.           
  37.           public static void main(String[] args)   {  
  38.                 //建立MQTT對象  
  39.                 MQTT mqtt = new MQTT();  
  40.                 try {  
  41.                     //設置mqtt broker的ip和端口  
  42.                     mqtt.setHost(CONNECTION_STRING);  
  43.                     //鏈接前清空會話信息  
  44.                     mqtt.setCleanSession(CLEAN_START);  
  45.                     //設置從新鏈接的次數  
  46.                     mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);  
  47.                     //設置重連的間隔時間  
  48.                     mqtt.setReconnectDelay(RECONNECTION_DELAY);  
  49.                     //設置心跳時間  
  50.                     mqtt.setKeepAlive(KEEP_ALIVE);  
  51.                     //設置緩衝的大小  
  52.                     mqtt.setSendBufferSize(SEND_BUFFER_SIZE);  
  53.                       
  54.                     //獲取mqtt的鏈接對象BlockingConnection  
  55.                     final FutureConnection connection= mqtt.futureConnection();  
  56.                     connection.connect();  
  57.                     connection.subscribe(topics);  
  58.                     while(true){  
  59.                         Future<Message> futrueMessage=connection.receive();  
  60.                         Message message =futrueMessage.await();  
  61.                           
  62.                           
  63.                         System.out.println("MQTTFutureClient.Receive Message "+ "Topic Title :"+message.getTopic()+" context :"+String.valueOf(message.getPayloadBuffer()));  
  64.                     }  
  65.                 } catch (URISyntaxException e) {  
  66.                     // TODO Auto-generated catch block  
  67.                     e.printStackTrace();  
  68.                 } catch (Exception e) {  
  69.                     // TODO Auto-generated catch block  
  70.                     e.printStackTrace();  
  71.                 }finally{  
  72.                       
  73.                 }  
  74.             }  
  75. }  
相關文章
相關標籤/搜索