SpringBoot集成MQTT

SpringBoot集成MQTT

MQTT

MQTT(消息隊列遙測傳輸)是ISO標準(ISO/IEC PRF 20922)下基於發佈/訂閱範式的消息協議。它工做在 TCP/IP協議族上,是爲硬件性能低下的遠程設備以及網絡情況糟糕的狀況下而設計的發佈/訂閱型消息協議。國內不少企業都普遍使用MQTT做爲Android手機客戶端與服務器端推送消息的協議。html

特色

MQTT協議是爲大量計算能力有限,且工做在低帶寬、不可靠的網絡的遠程傳感器和控制設備通信而設計的協議,它具備如下主要的幾項特性:java

  1. 使用發佈/訂閱消息模式,提供一對多的消息發佈,解除應用程序耦合;
  2. 對負載內容屏蔽的消息傳輸;
  3. 使用TCP/IP提供網絡鏈接;
  4. 有三種消息發佈服務質量;web

    1. 至多一次:消息發佈徹底依賴底層 TCP/IP 網絡。會發生消息丟失或重複。這一級別可用於以下狀況,環境傳感器數據,丟失一次讀記錄無所謂,由於不久後還會有第二次發送。
    2. 至少一次:確保消息到達,但消息重複可能會發生。
    3. 只有一次:確保消息到達一次。這一級別可用於以下狀況,在計費系統中,消息重複或丟失會致使不正確的結果。
  5. 小型傳輸,開銷很小(固定長度的頭部是 2 字節),協議交換最小化,以下降網絡流量;
  6. 使用Last WillTestament特性通知有關各方客戶端異常中斷的機制。

Apache-Apollo

Apache Apollo是一個代理服務器,其是在ActiveMQ基礎上發展而來的,能夠支持STOMP, AMQP, MQTT, Openwire, SSL, WebSockets 等多種協議。
原理:服務器端建立一個惟一訂閱號,發送者能夠向這個訂閱號中發東西,而後接受者(即訂閱了這個訂閱號的人)都會收到這個訂閱號發出來的消息。以此來完成消息的推送。服務器實際上是一個消息中轉站。spring

下載

下載地址:http://activemq.apache.org/ap...apache

配置與啓動

  1. 須要安裝JDK環境
  2. 在命令行模式下進入bin,執行apollo create mybroker d:\apache-apollo\broker,建立一個名爲mybroker虛擬主機(Virtual Host)。須要特別注意的是,生成的目錄就是之後真正啓動程序的位置。
  3. 在命令行模式下進入d:\apache-apollo\broker\bin,執行apollo-broker run,也能夠用apollo-broker-service.exe配置服務。
  4. 訪問http://127.0.0.1:61680打開web管理界面。(密碼查看broker/etc/users.properties
  5. 啓動端口,看cmd輸出。

SpringBoot2的開發

添加依賴

<!-- 
  spring-boot版本 2.1.0.RELEASE
-->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-mqtt</artifactId>
</dependency>

自定義配置

# src/main/resources/config/mqtt.properties
##################
#  MQTT 配置
##################
# 用戶名
mqtt.username=admin
# 密碼
mqtt.password=password
# 推送信息的鏈接地址,若是有多個,用逗號隔開,如:tcp://127.0.0.1:61613,tcp://192.168.1.61:61613
mqtt.url=tcp://127.0.0.1:61613
##################
#  MQTT 生產者
##################
# 鏈接服務器默認客戶端ID
mqtt.producer.clientId=mqttProducer
# 默認的推送主題,實際可在調用接口時指定
mqtt.producer.defaultTopic=topic1
##################
#  MQTT 消費者
##################
# 鏈接服務器默認客戶端ID
mqtt.consumer.clientId=mqttConsumer
# 默認的接收主題,能夠訂閱多個Topic,逗號分隔
mqtt.consumer.defaultTopic=topic1

配置MQTT發佈和訂閱

import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

/**
 * MQTT配置,生產者
 *
 * @author BBF
 */
@Configuration
public class MqttConfig {

  private static final Logger LOGGER = LoggerFactory.getLogger(MqttConfig.class);

  private static final byte[] WILL_DATA;

  static {
    WILL_DATA = "offline".getBytes();
  }

  /**
   * 訂閱的bean名稱
   */
  public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
  /**
   * 發佈的bean名稱
   */
  public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";

  @Value("${mqtt.username}")
  private String username;

  @Value("${mqtt.password}")
  private String password;

  @Value("${mqtt.url}")
  private String url;

  @Value("${mqtt.producer.clientId}")
  private String producerClientId;

  @Value("${mqtt.producer.defaultTopic}")
  private String producerDefaultTopic;

  @Value("${mqtt.consumer.clientId}")
  private String consumerClientId;

  @Value("${mqtt.consumer.defaultTopic}")
  private String consumerDefaultTopic;


  /**
   * MQTT鏈接器選項
   *
   * @return {@link org.eclipse.paho.client.mqttv3.MqttConnectOptions}
   */
  @Bean
  public MqttConnectOptions getMqttConnectOptions() {
    MqttConnectOptions options = new MqttConnectOptions();
    // 設置是否清空session,這裏若是設置爲false表示服務器會保留客戶端的鏈接記錄,
    // 這裏設置爲true表示每次鏈接到服務器都以新的身份鏈接
    options.setCleanSession(true);
    // 設置鏈接的用戶名
    options.setUserName(username);
    // 設置鏈接的密碼
    options.setPassword(password.toCharArray());
    options.setServerURIs(StringUtils.split(url, ","));
    // 設置超時時間 單位爲秒
    options.setConnectionTimeout(10);
    // 設置會話心跳時間 單位爲秒 服務器會每隔1.5*20秒的時間向客戶端發送心跳判斷客戶端是否在線,但這個方法並無重連的機制
    options.setKeepAliveInterval(20);
    // 設置「遺囑」消息的話題,若客戶端與服務器之間的鏈接意外中斷,服務器將發佈客戶端的「遺囑」消息。
    options.setWill("willTopic", WILL_DATA, 2, false);
    return options;
  }


  /**
   * MQTT客戶端
   *
   * @return {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory}
   */
  @Bean
  public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    factory.setConnectionOptions(getMqttConnectOptions());
    return factory;
  }

  /**
   * MQTT信息通道(生產者)
   *
   * @return {@link org.springframework.messaging.MessageChannel}
   */
  @Bean(name = CHANNEL_NAME_OUT)
  public MessageChannel mqttOutboundChannel() {
    return new DirectChannel();
  }

  /**
   * MQTT消息處理器(生產者)
   *
   * @return {@link org.springframework.messaging.MessageHandler}
   */
  @Bean
  @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
  public MessageHandler mqttOutbound() {
    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
        producerClientId,
        mqttClientFactory());
    messageHandler.setAsync(true);
    messageHandler.setDefaultTopic(producerDefaultTopic);
    return messageHandler;
  }

  /**
   * MQTT消息訂閱綁定(消費者)
   *
   * @return {@link org.springframework.integration.core.MessageProducer}
   */
  @Bean
  public MessageProducer inbound() {
    // 能夠同時消費(訂閱)多個Topic
    MqttPahoMessageDrivenChannelAdapter adapter =
        new MqttPahoMessageDrivenChannelAdapter(
            consumerClientId, mqttClientFactory(),
            StringUtils.split(consumerDefaultTopic, ","));
    adapter.setCompletionTimeout(5000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    // 設置訂閱通道
    adapter.setOutputChannel(mqttInboundChannel());
    return adapter;
  }

  /**
   * MQTT信息通道(消費者)
   *
   * @return {@link org.springframework.messaging.MessageChannel}
   */
  @Bean(name = CHANNEL_NAME_IN)
  public MessageChannel mqttInboundChannel() {
    return new DirectChannel();
  }

  /**
   * MQTT消息處理器(消費者)
   *
   * @return {@link org.springframework.messaging.MessageHandler}
   */
  @Bean
  @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
  public MessageHandler handler() {
    return new MessageHandler() {
      @Override
      public void handleMessage(Message<?> message) throws MessagingException {
        LOGGER.error("===================={}============", message.getPayload());
      }
    };
  }
}

消息發佈器

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * MQTT生產者消息發送接口
 * <p>MessagingGateway要指定生產者的通道名稱</p>
 * @author BBF
 */
@Component
@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT)
public interface IMqttSender {

  /**
   * 發送信息到MQTT服務器
   *
   * @param data 發送的文本
   */
  void sendToMqtt(String data);

  /**
   * 發送信息到MQTT服務器
   *
   * @param topic 主題
   * @param payload 消息主體
   */
  void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
      String payload);

  /**
   * 發送信息到MQTT服務器
   *
   * @param topic 主題
   * @param qos 對消息處理的幾種機制。<br> 0 表示的是訂閱者沒收到消息不會再次發送,消息會丟失。<br>
   * 1 表示的是會嘗試重試,一直到接收到消息,但這種狀況可能致使訂閱者收到屢次重複消息。<br>
   * 2 多了一次去重的動做,確保訂閱者收到的消息有一次。
   * @param payload 消息主體
   */
  void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
      @Header(MqttHeaders.QOS) int qos,
      String payload);
}

發送消息

/**
 * MQTT消息發送
 *
 * @author BBF
 */
@Controller
@RequestMapping(value = "/")
public class MqttController {

  /**
   * 注入發送MQTT的Bean
   */
  @Resource
  private IMqttSender iMqttSender;

  /**
   * 發送MQTT消息
   * @param message 消息內容
   * @return 返回
   */
  @ResponseBody
  @GetMapping(value = "/mqtt", produces ="text/html")
  public ResponseEntity<String> sendMqtt(@RequestParam(value = "msg") String message) {
    iMqttSender.sendToMqtt(message);
    return new ResponseEntity<>("OK", HttpStatus.OK);
  }
}

入口類

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.context.annotation.PropertySource;

/**
 * SpringBoot 入口類
 *
 * @author BBF
 */
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class,
    HibernateJpaAutoConfiguration.class})
@PropertySource(encoding = "UTF-8", value = {"classpath:config/mqtt.properties"})
public class Application {

  public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }
}
相關文章
相關標籤/搜索