MQTT客戶端與服務代理的案列

服務端,採用 Mosquitto 來轉發分發消息。java

客戶端本身寫。服務器

 

服務端 啓動 mosquitto

(底下的命令是我本身放到環境變量裏面的,經過alias 運行mosquitto)
IshallbeThatIshallbe:~ iamthat$ mosquitto_start
1427629906: mosquitto version 1.3.5 (build date 2014-10-27 15:12:32+0000) starting
1427629906: Config loaded from /usr/local/Cellar/mosquitto/1.3.5/etc/mosquitto/mosquitto.conf.
IshallbeThatIshallbe:~ iamthat$ 1427629906: Opening ipv4 listen socket on port 1883.
1427629906: Opening ipv6 listen socket on port 1883.

 

客戶端代碼以下: 記住導入三個ibm 的jar 包。網絡

package mqtthelloworld;



import com.ibm.mqtt.MqttClient;
import com.ibm.mqtt.MqttException;
import com.ibm.mqtt.MqttNotConnectedException;
import com.ibm.mqtt.MqttPersistenceException;

public class Client1 {

	 private final static String CONNECTION_STRING = "tcp://127.0.0.1:1883";
	 private final static boolean CLEAN_START = true;
	 private final static short KEEP_ALIVE = 30;//低耗網絡,可是又須要及時獲取數據,心跳30s
	 private final static String CLIENT_ID = "client1";
	 private final static String[] SUBSCRIBE_TOPICS = {
	  "gabriel"
	 };
	private final static int[] QOS_VALUES = {01};
	private static final String[] PUBLISH_TOPICS = {"gabriel"};
	private static final String publish_topic="gabriel"; 
	 //////////////////
	 private MqttClient mqttClient;
	 
	 public Client1(){
	  try {
	   mqttClient = new MqttClient(CONNECTION_STRING);
	   SimpleCallbackHandler simpleCallbackHandler = new SimpleCallbackHandler();
	   mqttClient.registerSimpleHandler(simpleCallbackHandler);//註冊接收消息方法
	   mqttClient.connect(CLIENT_ID, CLEAN_START, KEEP_ALIVE);
	   mqttClient.subscribe(SUBSCRIBE_TOPICS, QOS_VALUES);//訂閱接主題
	   /**
	    * 完成訂閱後,能夠增長心跳,保持網絡通暢,也能夠發佈本身的消息
	    */
	  } catch (MqttException e) {
	   // TODO Auto-generated catch block
	   e.printStackTrace();
	  }
	 }
	 
	 private void publishMessage(String message) throws MqttNotConnectedException, MqttPersistenceException, IllegalArgumentException, MqttException{
		 mqttClient.publish(publish_topic, message.getBytes(), QOS_VALUES[0], true);
	 }
	 
	 public static void main(String[] args) throws MqttNotConnectedException, MqttPersistenceException, IllegalArgumentException, MqttException {
		Client1 client1=new Client1();
		for(int i=0;i<10;i++){
			client1.publishMessage("testing Client1 and this is "+i+"th message published");
		}
	}
}

 

package mqtthelloworld;

import com.ibm.mqtt.MqttSimpleCallback;

public class SimpleCallbackHandler implements MqttSimpleCallback {

	  /**
	   * 當客戶機和broker意外斷開時觸發
	   * 能夠再此處理從新訂閱
	   */
	  @Override
	  public void connectionLost() throws Exception {
	   // TODO Auto-generated method stub
	   System.out.println("客戶機和broker已經斷開");
	  }

	  /**
	   * 客戶端訂閱消息後,該方法負責回調接收處理消息
	   */
	  @Override
	  public void publishArrived(String topicName, byte[] payload, int Qos, boolean retained) throws Exception {
	   // TODO Auto-generated method stub
	   System.out.println("訂閱主題: " + topicName);
	   System.out.println("消息數據: " + new String(payload));
	   System.out.println("消息級別(0,1,2): " + Qos);
	   System.out.println("是不是實時發送的消息(false=實時,true=服務器上保留的最後消息): " + retained);
	  }

}

 

再終端開一個客戶端:socket

IshallbeThatIshallbe:bin iamthat$ mosquitto_sub -v -t 'gabriel'

 

 

運行結果:tcp

運行Client1 結果以下:ide

終端顯示ui

IshallbeThatIshallbe:bin iamthat$ mosquitto_sub -v -t 'gabriel'
1427630077: New connection from ::1 on port 1883.
1427630077: New client connected from ::1 as mosqsub/418-IshallbeTha (c1, k60).
1427630094: New connection from 127.0.0.1 on port 1883.
1427630094: New client connected from 127.0.0.1 as client1 (c1, k30).
gabriel testing Client1 and this is 0th message published
gabriel testing Client1 and this is 1th message published
gabriel testing Client1 and this is 2th message published
gabriel testing Client1 and this is 3th message published
gabriel testing Client1 and this is 4th message published
gabriel testing Client1 and this is 5th message published
gabriel testing Client1 and this is 6th message published
gabriel testing Client1 and this is 7th message published
gabriel testing Client1 and this is 8th message published
gabriel testing Client1 and this is 9th message published

 控制檯顯示:this

訂閱主題: gabriel
消息數據: testing Client1 and this is 0th message published
消息級別(0,1,2): 1
是不是實時發送的消息(false=實時,true=服務器上保留的最後消息): false
訂閱主題: gabriel
消息數據: testing Client1 and this is 1th message published
消息級別(0,1,2): 1
是不是實時發送的消息(false=實時,true=服務器上保留的最後消息): false
訂閱主題: gabriel
消息數據: testing Client1 and this is 2th message published
消息級別(0,1,2): 1
是不是實時發送的消息(false=實時,true=服務器上保留的最後消息): false
訂閱主題: gabriel

 

 

再開一個終端,看看訂閱者如何與發佈者溝通:spa

 

IshallbeThatIshallbe:bin iamthat$ mosquitto_pub -t 'gabriel' -m 'connecting to publisher'
IshallbeThatIshallbe:bin iamthat$ 

 

以前的訂閱者終端:代理

IshallbeThatIshallbe:bin iamthat$ mosquitto_sub -v -t 'gabriel'
1427630077: New connection from ::1 on port 1883.
1427630077: New client connected from ::1 as mosqsub/418-IshallbeTha (c1, k60).
1427630094: New connection from 127.0.0.1 on port 1883.
1427630094: New client connected from 127.0.0.1 as client1 (c1, k30).
gabriel testing Client1 and this is 0th message published
gabriel testing Client1 and this is 1th message published
gabriel testing Client1 and this is 2th message published
gabriel testing Client1 and this is 3th message published
gabriel testing Client1 and this is 4th message published
gabriel testing Client1 and this is 5th message published
gabriel testing Client1 and this is 6th message published
gabriel testing Client1 and this is 7th message published
gabriel testing Client1 and this is 8th message published
gabriel testing Client1 and this is 9th message published
1427631029: New connection from ::1 on port 1883.
1427631029: New client connected from ::1 as mosqpub/468-IshallbeTha (c1, k60).
gabriel connecting to publisher

 

控制檯顯示:

訂閱主題: gabriel
消息數據: testing Client1 and this is 0th message published
消息級別(0,1,2): 1
是不是實時發送的消息(false=實時,true=服務器上保留的最後消息): false
訂閱主題: gabriel
消息數據: testing Client1 and this is 1th message published
消息級別(0,1,2): 1
是不是實時發送的消息(false=實時,true=服務器上保留的最後消息): false
訂閱主題: gabriel
消息數據: testing Client1 and this is 2th message published
消息級別(0,1,2): 1
是不是實時發送的消息(false=實時,true=服務器上保留的最後消息): false
訂閱主題: gabriel
消息數據: testing Client1 and this is 3th message published
消息級別(0,1,2): 1
是不是實時發送的消息(false=實時,true=服務器上保留的最後消息): false
訂閱主題: gabriel
消息數據: testing Client1 and this is 4th message published
消息級別(0,1,2): 1
是不是實時發送的消息(false=實時,true=服務器上保留的最後消息): false
訂閱主題: gabriel
消息數據: testing Client1 and this is 5th message published
消息級別(0,1,2): 1
是不是實時發送的消息(false=實時,true=服務器上保留的最後消息): false
訂閱主題: gabriel
消息數據: testing Client1 and this is 6th message published
消息級別(0,1,2): 1
是不是實時發送的消息(false=實時,true=服務器上保留的最後消息): false
訂閱主題: gabriel
消息數據: testing Client1 and this is 7th message published
消息級別(0,1,2): 1
是不是實時發送的消息(false=實時,true=服務器上保留的最後消息): false
訂閱主題: gabriel
消息數據: testing Client1 and this is 8th message published
消息級別(0,1,2): 1
是不是實時發送的消息(false=實時,true=服務器上保留的最後消息): false
訂閱主題: gabriel
消息數據: testing Client1 and this is 9th message published
消息級別(0,1,2): 1
是不是實時發送的消息(false=實時,true=服務器上保留的最後消息): false
訂閱主題: gabriel 消息數據: connecting to publisher 消息級別(0,1,2): 0 是不是實時發送的消息(false=實時,true=服務器上保留的最後消息): false 

 能夠看出訂閱者成功與發佈者聯繫交互。

 

我的對MQTT的理解是:

就三種角色: 訂閱者,服務代理,發佈者。

客戶端訂閱者訂閱主題---》  服務代理broker-----> 反饋發佈者消息給訂閱者(消息的傳遞經過主題)

發佈者建立主題發佈消息---》服務端broker ---》反饋給訂閱者(消息傳遞經過主題)

訂閱者發送消息---》服務端broker---> 發佈者(消息傳遞經過主題)

相關文章
相關標籤/搜索