在moquette-mqtt中提供了回調callback模式的發佈和訂閱可是在訂閱以後沒有發現有消息接收的方法,參看moquette-mqtt中Block,Future式的發佈訂閱基礎是callback式訂閱發佈,可是本人在研究源代碼測試,發現java
callback方式接收沒有成功。因此本文中只是callback式的發佈和訂閱沒有消息接收的過程,還沒有查到緣由。網絡
採用Callback式 發佈主題併發
- package com.etrip.mqtt.callback;
-
- import java.net.URISyntaxException;
-
- import org.fusesource.hawtbuf.Buffer;
- import org.fusesource.hawtbuf.UTF8Buffer;
- import org.fusesource.mqtt.client.Callback;
- import org.fusesource.mqtt.client.CallbackConnection;
- import org.fusesource.mqtt.client.Listener;
- import org.fusesource.mqtt.client.MQTT;
- import org.fusesource.mqtt.client.QoS;
- import org.fusesource.mqtt.client.Topic;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- public class MQTTCallbackServer {
- private static final Logger LOG = LoggerFactory.getLogger(MQTTCallbackServer.class);
- private final static String CONNECTION_STRING = "tcp://localhost:1883";
- private final static boolean CLEAN_START = true;
- private final static short KEEP_ALIVE = 30;
- public static Topic[] topics = {
- new Topic("china/beijing", QoS.EXACTLY_ONCE),
- new Topic("china/tianjin", QoS.AT_LEAST_ONCE),
- new Topic("china/henan", QoS.AT_MOST_ONCE)};
- public final static long RECONNECTION_ATTEMPT_MAX=6;
- public final static long RECONNECTION_DELAY=2000;
-
- public final static int SEND_BUFFER_SIZE=2*1024*1024;
-
-
- public static void main(String[] args) {
-
- MQTT mqtt = new MQTT();
- try {
-
- mqtt.setHost(CONNECTION_STRING);
-
- mqtt.setCleanSession(CLEAN_START);
-
- mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);
-
- mqtt.setReconnectDelay(RECONNECTION_DELAY);
-
- mqtt.setKeepAlive(KEEP_ALIVE);
-
- mqtt.setSendBufferSize(SEND_BUFFER_SIZE);
-
-
-
-
-
- final CallbackConnection connection = mqtt.callbackConnection();
-
-
-
- connection.listener(new Listener() {
-
- public void onDisconnected() {
- }
- public void onConnected() {
- }
-
- public void onPublish(UTF8Buffer topic, Buffer payload, Runnable ack) {
-
-
- ack.run();
- System.out.println("topic"+topic.toString()+"="+new String(payload.getData()));
- }
- public void onFailure(Throwable value) {
- }
- });
-
- connection.connect(new Callback<Void>() {
-
- public void onFailure(Throwable value) {
-
- System.out.println("MQTTCallbackServer.CallbackConnection.connect.onFailure"+"鏈接失敗......"+value.getMessage());
- value.printStackTrace();
- }
-
-
- public void onSuccess(Void v) {
-
- int count=1;
- while(true){
- count++;
-
-
- final String message="hello "+count+"chinese people !";
- final String topic = "china/beijing";
- System.out.println("MQTTCallbackServer publish topic="+topic+" message :"+message);
- connection.publish(topic, message.getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {
- public void onSuccess(Void v) {
-
- }
- public void onFailure(Throwable value) {
- value.printStackTrace();
- }
- });
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
-
- e.printStackTrace();
- }
- }
-
-
-
- }
- });
- Thread.sleep(10000000000L);
- } catch (URISyntaxException e) {
-
- e.printStackTrace();
- } catch (Exception e) {
-
- e.printStackTrace();
- }finally{
-
- }
- }
- }
採用Callback式 訂閱主題tcp