Spring Boot 基於Spring Integration 實現MQTT客戶端簡單訂閱發佈功能

本文屬於翻譯,轉載註明出處,歡迎關注微信小程序小白AI博客 微信公衆號小白AI或者網站 https://xiaobaiai.nethtml

[TOC]java

1 簡介

Spring Integration 提供入站(inbound)和出站(outbound)通道適配器,以支持MQTT消息協議。使用這兩適配器,須要加入依賴:web

<!-- Maven -->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>5.2.1.RELEASE</version>
</dependency>
// Gradle
compile "org.springframework.integration:spring-integration-mqtt:5.2.1.RELEASE"

當前的MQTT Integration實現使用的是Eclipse Paho MQTT客戶端庫。兩個適配器的配置都是使用DefaultMqttPahoClientFactory實現的。有關配置選項的更多信息,請參閱Eclipse Paho MQTT文檔定義。spring

建議配置 MqttConnectOptions對象並將其注入工廠(factory),而不是在工廠自己裏設置(不推薦使用)MQTT鏈接選項。

2 Inbound(消息驅動)通道適配器

入站通道適配器由MqttPahoMessageDrivenChannelAdapter實現。經常使用的配置項有:express

  • 客戶端ID
  • MQTT Broker URL
  • 待訂閱的主題列表
  • 帶訂閱的主題QoS值列表
  • MqttMessageConverter(可選)。默認狀況下,默認的DefaultPaHomeMessageConverter生成一條帶有字符串有效負載的消息,其頭部內容以下:編程

    • mqtt_topic: 接收消息的主題
    • mqtt_duplicate: 若是消息是重複的,則爲true
    • mqtt_qos: 服務質量,你能夠將DefaultPahoMessageConverter聲明爲<bean />並將payloadAsBytes屬性設置爲true,從而將DefaultPahoMessageConverter返回有效負載中的原始byte[]
  • 客戶端工廠
  • 發送超時。僅當通道可能阻塞(例如當前已滿的有界隊列通道)時才適用。
  • 錯誤通道。下游異常將以錯誤消息的形式發送到此通道(若是提供)。有效負載是包含失敗消息和緣由的MessagingException
  • 恢復間隔。它控制適配器在發生故障後嘗試從新鏈接的時間間隔。默認爲10000毫秒(10秒)。
從Spring 4.1版開始,能夠省略 URL。相反,你能夠在 DefaultMqttPahoClientFactoryserver URIs屬性中提供服務器uri。例如,這樣作容許鏈接到高可用(HA)集羣。

Spring 4.2.2開始,當適配器成功訂閱到主題了,MqttSubscribedEvent事件就會被觸發。當鏈接失敗或者訂閱失敗,MqttConnectionFailedEvent事件會被觸發。這兩個事件都可以被一個Bean經過實現ApplicationListener而接收到。另外,名爲recoveryInterval的新屬性控制適配器在失敗後嘗試從新鏈接的時間間隔。默認爲10000毫秒(10秒)。小程序

@Component
public class MQTTSubscribedListener implements ApplicationListener<MqttSubscribedEvent> {
    private static final Logger LOGGER = LogManager.getLogger(MQTTSubscribedListener.class);

    @Override
    public void onApplicationEvent(MqttSubscribedEvent event) {
        LOGGER.debug("Subscribed Success: " + event.getMessage());
    }
}
在版本Spring 4.2.3以前,當適配器中止時,客戶端老是取消訂閱。這是不正確的,由於若是客戶端QOS大於0,咱們須要保持訂閱處於活動狀態,以便在下次啓動時傳遞適配器中止時到達的消息。這還須要將客戶機工廠上的 cleanSession屬性設置爲false。默認爲true。從4.2.3版開始,若是cleanSession屬性爲false,則適配器不會取消訂閱(默認狀況下),這個默認行爲能夠經過在工廠上設置 consumerCloseAction屬性來重寫此行爲。它能夠有如下值: UNSUBSCRIBE_ALWAYSUNSUBSCRIBE_NEVERUNSUBSCRIBE_CLEAN,最後一項(默認設置)僅在cleanSession屬性爲true時取消訂閱。若要還原到4.2.3以前的行爲,請始終使用「取消訂閱」設置項。

注意:從Spring 5.0開始,topic、qos和retained屬性映射到.RECEIVED_…headers(MqttHeaders.RECEIVED_topic、MqttHeaders.RECEIVED_qos和MqttHeaders.RECEIVED_retained),以免意外傳播到(默認狀況下)使用MqttHeaders.topic、MqttHeaders.qos和MqttHeaders.retained headers的出站消息。segmentfault

public MessageHandler handler() {
    return new MessageHandler() {
        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            LOGGER.debug("===Received Msg(topic {}): {}", message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), message.getPayload());
        }
    };
}

2.1 在運行時添加和刪除主題

Spring4.1開始,你能夠經過編程更改適配器訂閱的主題。Spring Integration提供了addTopic()removeTopic()方法。添加主題時,能夠選擇指定QoS值(默認是1)。你還能夠經過向具備適當有效負載的<control bus/>發送適當的消息來修改主題。示例:微信小程序

myMqttAdapter.addTopic('foo', 1)

中止和啓動適配器對主題列表(topics設置項)沒有影響(它不會還原到配置中的原始設置)。這些更改不會保留到應用程序上下文的生命週期以外。新的應用程序上下文將還原爲配置的設置。bash

在適配器中止(或與代理斷開鏈接)時更改主題列表(topics)將在下次創建鏈接時生效。

2.2 使用Java配置配置

如下Spring Boot應用程序顯示瞭如何使用Java配置配置入站(inbound)適配器的示例:

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
                .web(false)
                .run(args);
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
                                                 "topic1", "topic2");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }
}

2.3 使用Java DSL配置

下面的Spring Boot應用程序提供了使用Java DSL配置入站適配器的示例:

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow mqttInbound() {
        return IntegrationFlows.from(
                         new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
                                        "testClient", "topic1", "topic2");)
                .handle(m -> System.out.println(m.getPayload()))
                .get();
    }

}

3 出站通道適配器

出站通道適配器由MqttPahoMessageHandler實現,MqttPahoMessageHandler包裝在ConsumerEndpoint中。爲了方便起見,可使用名稱空間配置它。

從Spring 4.1開始,適配器支持異步發送操做,在確認交付以前避免阻塞。若是須要,能夠發出應用程序事件以使應用程序確認傳遞。

如下列表顯示出站通道適配器可用的屬性:

<int-mqtt:outbound-channel-adapter id="withConverter"
    client-id="foo"  
    url="tcp://localhost:1883"  
    converter="myConverter"  
    client-factory="clientFactory"  
    default-qos="1"  
    qos-expression="" 
    default-retained="true"  
    retained-expression="" 
    default-topic="bar"  
    topic-expression="" 
    async="false"  
    async-events="false"  
    channel="target" />
  • MQTT Client ID
  • MQTT Broker URL
  • Converter(MqttMessageConver,可選的),默認的DefaultPaHomeMessageConverter可識別如下標題:

    • mqtt_topic: 消息將發送到的主題
    • mqtt_retained: 若是要保留消息,則爲true
    • mqtt_qos:消息服務質量
  • 客戶端工廠
  • default-qos,默認的服務質量。若是找不到mqtt_qos頭或qos表達式返回空值,則使用它。若是提供自定義轉換器,則不使用它。
  • 用於計算以肯定qos的表達式。缺省值是headers[mqtt_qos]
  • 保留標誌的默認值。若是找不到mqtt_retained頭,則使用它。若是提供了自定義轉換器,則不使用它。
  • 要計算以肯定保留布爾值的表達式。默認爲headers[mqtt_retained]
  • 消息發送到的默認主題(若是找不到mqtt_topic頭,則使用)
  • 要計算以肯定目標主題的表達式。默認爲headers['mqtt_topic']
  • async若是爲true,則調用方不會阻塞。而是在發送消息時等待傳遞確認。默認值爲false(發送將阻塞,直到確認發送)
  • async-events,當async和async事件(async-events)都爲true時,將發出MqttMessageSentEvent。它包含消息、主題、客戶端庫生成的消息id、clientId和clientInstance(每次鏈接客戶端時遞增)。當客戶端庫確認傳遞時,將發出MqttMessageDeliveredEvent。它包含messageId、clientId和clientInstance,使傳遞與發送相關。任何ApplicationListener或事件入站通道適配器均可以接收這些事件。請注意,MqttMessageDeliveredEvent可能在MqttMessageSentEvent以前收到。默認值爲false
注意,一樣地,從Spring 4.1開始,能夠省略URL。相反,能夠在 DefaultMqttPahoClientFactorserver URIs屬性中提供服務器uri。例如,這容許鏈接到高可用(HA)集羣。

3.1 使用Java配置配置

下面的Spring Boot應用程序展現瞭如何使用Java配置配置出站適配器的示例:

@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                new SpringApplicationBuilder(MqttJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToMqtt("foo");
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
        options.setUserName("username");
        options.setPassword("password".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                       new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        void sendToMqtt(String data);

    }

}

3.2 使用Java DSL配置

下面的Spring Boot應用程序提供了使用Java DSL配置出站適配器的示例:

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

       @Bean
       public IntegrationFlow mqttOutboundFlow() {
           return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
    }

}

4 參考資料

本文屬於原創,轉載註明出處,歡迎關注CSDNfreeape或微信小程序小白AI博客 微信公衆號小白AI或者網站 https://xiaobaiai.net

相關文章
相關標籤/搜索