Spring Boot 集成 MQTT

1、添加配置

spring:
  mqtt:
    client:
      username: 用戶名
      password: 密碼
      serverURIs: tcp://ip:port # 客戶端地址,多個使用逗號隔開
      clientId: client0001 # ${random.value}
      keepAliveInterval: 30
      connectionTimeout: 30
    producer:
      defaultQos: 1
      defaultRetained: true
      defaultTopic: defaultTopicName
    consumer:
      defaultQos: 1
      completionTimeout: 30000
      consumerTopics: topic1,topic2 # 監聽的 topic,多個使用逗號隔開

2、客戶端配置

/* 客戶端 */
    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(serverURIs);
        mqttConnectOptions.setKeepAliveInterval(keepAliveInterval);
        mqttConnectOptions.setConnectionTimeout(connectionTimeout);

        return mqttConnectOptions;
    }

    @Bean
    public MqttPahoClientFactory getMqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());

        return factory;
    }

3、發佈消息

3.1 配置

@Bean
    public MessageChannel outboundChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = OUTBOUND_CHANNEL)
    public MessageHandler getMqttProducer() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, getMqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(defaultTopic);
        messageHandler.setDefaultRetained(defaultRetained);
        messageHandler.setDefaultQos(defaultProducerQos);

        return messageHandler;
    }

3.2 消息推送接口類

@MessagingGateway(defaultRequestChannel = MqttConfig.OUTBOUND_CHANNEL)
public interface MqttSender {

    void sendToMqtt(String data);

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

3.3 測試

@RestController
public class TestController {

    @Autowired
    private MqttSender mqttSender;

    @RequestMapping("/send")
    private void send(String data){
        mqttSender.sendToMqtt(data);
    }
}

4、訂閱消息

4.1 配置

@Bean
    public MessageChannel inboundChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer getMqttConsumer() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId, getMqttClientFactory(), consumerTopics);
        adapter.setCompletionTimeout(completionTimeout);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(defaultConsumerQos);
        adapter.setOutputChannel(inboundChannel());

        return adapter;
    }

4.2 測試

@Component
public class MqttConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(MqttConsumer.class);

    @Bean
    @ServiceActivator(inputChannel = MqttConfig.INBOUND_CHANNEL)
    public MessageHandler handler() {
        return message -> {
            String topic = message.getHeaders().get(MqttConfig.RECEIVED_TOPIC_KEY).toString();
            LOGGER.info("[{}]主題接收到消息:{}", topic, message.getPayload().toString());
        };
    }
}

注意事項html

  1. @ServiceActivator 和 @MessagingGateway 中綁定的 Channel 名,需與返回 MessageChannel 的 Bean 的方法名同樣:vue

    如發佈者綁定的 Channel 名爲 outboundChannel,則須要有對應的方法,以下:java

    @Bean
     public MessageChannel outboundChannel() {
         return new DirectChannel();
     }
  2. 發佈者與訂閱者的 Channel 名不能相同
  3. 鏈接服務器的超時時間和訂閱的超時時間單位不同

參考git

  1. MQTT系列教程1(基本概念介紹)
  2. SpringBoot - 集成MQTT教程1(發佈消息)
  3. SpringBoot - 集成MQTT教程2(訂閱消息)

完整代碼:GitHubgithub

相關文章
相關標籤/搜索