在上面兩篇關於mqtt的broker的啓動和mqtt的服務端發佈主題信息以後,咱們客戶端須要訂閱相關的信息並接收相關的主題信息。java
- package com.etrip.mqtt;
-
- import java.net.URISyntaxException;
-
- import org.fusesource.mqtt.client.BlockingConnection;
- import org.fusesource.mqtt.client.MQTT;
- import org.fusesource.mqtt.client.Message;
- import org.fusesource.mqtt.client.QoS;
- import org.fusesource.mqtt.client.Topic;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class MQTTClient {
- private static final Logger LOG = LoggerFactory.getLogger(MQTTClient.class);
- private final static String CONNECTION_STRING = "tcp://192.168.208.46:1883";
- private final static boolean CLEAN_START = true;
- private final static short KEEP_ALIVE = 30;
- private final static String CLIENT_ID = "publishService";
- public static Topic[] topics = {
- new Topic("china/beijing", QoS.EXACTLY_ONCE),
- new Topic("china/tianjin", QoS.AT_LEAST_ONCE),
- new Topic("china/henan", QoS.AT_MOST_ONCE)};
- public final static long RECONNECTION_ATTEMPT_MAX=6;
- public final static long RECONNECTION_DELAY=2000;
-
- public final static int SEND_BUFFER_SIZE=2*1024*1024;
-
-
- public static void main(String[] args) {
-
- MQTT mqtt = new MQTT();
- BlockingConnection connection=null;
- try {
-
- mqtt.setHost(CONNECTION_STRING);
-
- mqtt.setCleanSession(CLEAN_START);
-
- mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);
-
- mqtt.setReconnectDelay(RECONNECTION_DELAY);
-
- mqtt.setKeepAlive(KEEP_ALIVE);
-
- mqtt.setSendBufferSize(SEND_BUFFER_SIZE);
-
-
-
- connection = mqtt.blockingConnection();
-
- connection.connect();
-
- Topic[] topics = {new Topic("china/beijing", QoS.AT_LEAST_ONCE)};
-
- byte[] qoses = connection.subscribe(topics);
-
- while(true){
-
- Message message = connection.receive();
-
- byte[] payload = message.getPayload();
-
- LOG.info("MQTTClient Message Topic="+message.getTopic()+" Content :"+new String(payload));
-
- message.ack();
-
- Thread.sleep(2000);
- }
- } catch (URISyntaxException e) {
-
- e.printStackTrace();
- } catch (Exception e) {
-
- e.printStackTrace();
- }finally{
- try {
- connection.disconnect();
- } catch (Exception e) {
-
- e.printStackTrace();
- }
- }
- }
- }