MQTT的學習研究(四)moquette-mqtt 的使用之mqtt Blocking API客戶端訂閱並接收主題信息

在上面兩篇關於mqtt的broker的啓動和mqtt的服務端發佈主題信息以後,咱們客戶端須要訂閱相關的信息並接收相關的主題信息。java

Java代碼   收藏代碼
  1. package com.etrip.mqtt;  
  2.   
  3. import java.net.URISyntaxException;  
  4.   
  5. import org.fusesource.mqtt.client.BlockingConnection;  
  6. import org.fusesource.mqtt.client.MQTT;  
  7. import org.fusesource.mqtt.client.Message;  
  8. import org.fusesource.mqtt.client.QoS;  
  9. import org.fusesource.mqtt.client.Topic;  
  10. import org.slf4j.Logger;  
  11. import org.slf4j.LoggerFactory;  
  12. /** 
  13.  *  
  14.  * MQTT moquette 的Client 段用於訂閱主題,並接收主題信息 
  15.  *  
  16.  * 採用阻塞式 訂閱主題  
  17.  *  
  18.  * @author longgangbai 
  19.  */  
  20. public class MQTTClient {  
  21.       private static final Logger LOG = LoggerFactory.getLogger(MQTTClient.class);  
  22.         private final static String CONNECTION_STRING = "tcp://192.168.208.46:1883";  
  23.         private final static boolean CLEAN_START = true;  
  24.         private final static short KEEP_ALIVE = 30;// 低耗網絡,可是又須要及時獲取數據,心跳30s  
  25.         private final static String CLIENT_ID = "publishService";  
  26.         public  static Topic[] topics = {  
  27.                         new Topic("china/beijing", QoS.EXACTLY_ONCE),  
  28.                         new Topic("china/tianjin", QoS.AT_LEAST_ONCE),  
  29.                         new Topic("china/henan", QoS.AT_MOST_ONCE)};  
  30.         public final  static long RECONNECTION_ATTEMPT_MAX=6;  
  31.         public final  static long RECONNECTION_DELAY=2000;  
  32.           
  33.         public final static int SEND_BUFFER_SIZE=2*1024*1024;//發送最大緩衝爲2M  
  34.           
  35.           
  36.       public static void main(String[] args)   {  
  37.         //建立MQTT對象  
  38.         MQTT mqtt = new MQTT();  
  39.         BlockingConnection connection=null;  
  40.         try {  
  41.             //設置mqtt broker的ip和端口  
  42.             mqtt.setHost(CONNECTION_STRING);  
  43.             //鏈接前清空會話信息  
  44.             mqtt.setCleanSession(CLEAN_START);  
  45.             //設置從新鏈接的次數  
  46.             mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);  
  47.             //設置重連的間隔時間  
  48.             mqtt.setReconnectDelay(RECONNECTION_DELAY);  
  49.             //設置心跳時間  
  50.             mqtt.setKeepAlive(KEEP_ALIVE);  
  51.             //設置緩衝的大小  
  52.             mqtt.setSendBufferSize(SEND_BUFFER_SIZE);  
  53.               
  54.               
  55.             //獲取mqtt的鏈接對象BlockingConnection  
  56.             connection = mqtt.blockingConnection();  
  57.             //MQTT鏈接的建立   
  58.             connection.connect();  
  59.             //建立相關的MQTT 的主題列表   
  60.             Topic[] topics = {new Topic("china/beijing", QoS.AT_LEAST_ONCE)};  
  61.             //訂閱相關的主題信息   
  62.             byte[] qoses = connection.subscribe(topics);  
  63.             //  
  64.             while(true){  
  65.                 //接收訂閱的消息內容  
  66.                 Message message = connection.receive();  
  67.                 //獲取訂閱的消息內容   
  68.                 byte[] payload = message.getPayload();  
  69.                 // process the message then:  
  70.                 LOG.info("MQTTClient Message  Topic="+message.getTopic()+" Content :"+new String(payload));  
  71.                 //簽收消息的回執  
  72.                 message.ack();  
  73.                   
  74.                 Thread.sleep(2000);  
  75.             }  
  76.         } catch (URISyntaxException e) {  
  77.             // TODO Auto-generated catch block  
  78.             e.printStackTrace();  
  79.         } catch (Exception e) {  
  80.             // TODO Auto-generated catch block  
  81.             e.printStackTrace();  
  82.         }finally{  
  83.             try {  
  84.                 connection.disconnect();  
  85.             } catch (Exception e) {  
  86.                 // TODO Auto-generated catch block  
  87.                 e.printStackTrace();  
  88.             }  
  89.         }  
  90.     }  
  91. }  
相關文章
相關標籤/搜索