參看官方文檔:java
http://publib.boulder.ibm.com/infocenter/wmqv7/v7r0/index.jsp?topic=/com.ibm.mq.amqtat.doc/tt00000_.htm服務器
* Java 爲 MQ Telemetry Transport 建立異步發佈程序
*在此任務中,您將遵循教程來修改第一個發佈程序。經過修改,
*使應用程序可以發送發佈而不等待傳遞確認信息。傳遞確認
*信息由您建立的回調類來接收。
*
*
*
*4.使客戶機斷開鏈接
* a.除去其中包含 token.waitForCompletion 表達式的語句。 主線程將繼續執行,而不等待傳遞發佈。
* b.測試客戶機是否已斷開鏈接。 將錯誤返回到 MqttCallback 中的 lostConnection 方法以後,MQTT 客戶機將斷開鏈接,客戶機應用程序也可能斷開鏈接。測試是否有打開的鏈接。
* c.使用常量 Example.quiesceTimeout 來設置使客戶機停頓的最長時間。
* if (client.isConnected())
* client.disconnect(Example.quiesceTimeout);
*當知足下面三種狀況的組合形式時,客戶機就完成了:
* a.已經對在此會話中(若是從新啓動了會話,則是在先前會話中)已發佈的全部消息調用了回調。
* b.消息未完成,然而停頓時間間隔已到期。缺省狀況下,停頓時間間隔爲 30 秒。經過將要等待的毫秒數做爲 client.disconnect 的一個參數來傳遞,便可更改停頓超時。
* c.在發佈了某些消息並由客戶機進行排隊以後,可是在發送這些消息以前調用了 client.disconnect。已排隊的消息還沒有處於「未完成」狀態。若是會話可從新啓動,那麼從新啓動會話時就會從新發送消息。
* 缺省狀況下,停頓時間間隔爲 30 秒。dom
MQTT的消息發佈代碼:異步
- package com.etrip.wsmqtt.server;
-
- import com.ibm.micro.client.mqttv3.MqttClient;
- import com.ibm.micro.client.mqttv3.MqttDeliveryToken;
- import com.ibm.micro.client.mqttv3.MqttMessage;
- import com.ibm.micro.client.mqttv3.MqttTopic;
- public class WSMQTTServerPubAsync {
- public static void main(String[] args) {
- try {
-
- MqttClient client = new MqttClient(WSMQTTServerCommon.TCPAddress, WSMQTTServerCommon.clientId);
-
-
- MqttTopic topic = client.getTopic(WSMQTTServerCommon.topicString);
-
-
- MqttMessage message = new MqttMessage();
-
- message.setQos(2);
-
-
- message.setRetained(false);
-
-
- message.setPayload(WSMQTTServerCommon.publication.getBytes());
-
-
- WSMQTTServerCallBack callback = new WSMQTTServerCallBack(WSMQTTServerCommon.clientId);
-
-
- client.setCallback(callback);
-
-
- client.connect();
-
- System.out.println("Publishing \"" + message.toString()
- + "\" on topic \"" + topic.getName() + "\" with QoS = "
- + message.getQos());
- System.out.println("For client instance \"" + client.getClientId()
- + "\" on address " + client.getServerURI() + "\"");
-
-
- MqttDeliveryToken token = topic.publish(message);
-
- System.out.println("With delivery token \"" + token.hashCode()
- + " delivered: " + token.isComplete());
- Thread.sleep(100000000000000l);
-
-
- if (client.isConnected())
- client.disconnect(WSMQTTServerCommon.quiesceTimeout);
- System.out.println("Disconnected: delivery token \"" + token.hashCode()
- + "\" received: " + token.isComplete());
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
MQTT消息發佈回調代碼:jsp
- package com.etrip.wsmqtt.server;
-
- import com.ibm.micro.client.mqttv3.*;
- public class WSMQTTServerCallBack implements MqttCallback {
- private String instanceData = "";
- public WSMQTTServerCallBack(String instance) {
- instanceData = instance;
- }
-
- public void messageArrived(MqttTopic topic, MqttMessage message) {
- try {
- System.out.println("Message arrived: \"" + message.toString()
- + "\" on topic \"" + topic.toString() + "\" for instance \""
- + instanceData + "\"");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public void connectionLost(Throwable cause) {
- System.out.println("Connection lost on instance \"" + instanceData
- + "\" with cause \"" + cause.getMessage() + "\" Reason code "
- + ((MqttException)cause).getReasonCode() + "\" Cause \""
- + ((MqttException)cause).getCause() + "\"");
- cause.printStackTrace();
- }
-
- public void deliveryComplete(MqttDeliveryToken token) {
- try {
- System.out.println("Delivery token \"" + token.hashCode()
- + "\" received by instance \"" + instanceData + "\"");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
常量類:tcp
- package com.etrip.wsmqtt.server;
-
- import java.util.UUID;
- public final class WSMQTTServerCommon {
-
- public static final String TCPAddress =System.getProperty("TCPAddress", "tcp://192.168.208.46:1883");
-
- public static String clientId =String.format("%-23.23s", System.getProperty("clientId", (UUID.randomUUID().toString())).trim()).replace('-', '_');
-
- public static final String topicString = System.getProperty("topicString", "china/beijing");
-
- public static final String publication =System.getProperty("publication", "Hello World " + String.format("%tc", System.currentTimeMillis()));
-
- public static final int quiesceTimeout = Integer.parseInt(System.getProperty("timeout", "10000"));
-
- public static final int sleepTimeout = Integer.parseInt(System.getProperty("timeout", "10000"));
-
- public static final boolean cleanSession =Boolean.parseBoolean(System.getProperty("cleanSession", "false"));
-
- public static final int QoS =Integer.parseInt(System.getProperty("QoS", "1"));
-
- public static final boolean retained =Boolean.parseBoolean(System.getProperty("retained", "false"));
- }