本文屬於翻譯,轉載註明出處,歡迎關注微信小程序小白AI博客
微信公衆號小白AI
或者網站 https://xiaobaiai.nethtml
[TOC]java
Spring Integration
提供入站(inbound)和出站(outbound)通道適配器,以支持MQTT消息協議。使用這兩適配器,須要加入依賴:web
<!-- Maven --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.2.1.RELEASE</version> </dependency>
// Gradle compile "org.springframework.integration:spring-integration-mqtt:5.2.1.RELEASE"
當前的MQTT Integration實現使用的是Eclipse Paho MQTT客戶端庫。兩個適配器的配置都是使用DefaultMqttPahoClientFactory
實現的。有關配置選項的更多信息,請參閱Eclipse Paho MQTT
文檔定義。spring
建議配置
MqttConnectOptions
對象並將其注入工廠(factory),而不是在工廠自己裏設置(不推薦使用)MQTT鏈接選項。
入站通道適配器由MqttPahoMessageDrivenChannelAdapter
實現。經常使用的配置項有:express
MqttMessageConverter
(可選)。默認狀況下,默認的DefaultPaHomeMessageConverter
生成一條帶有字符串有效負載的消息,其頭部內容以下:編程
mqtt_topic
: 接收消息的主題mqtt_duplicate
: 若是消息是重複的,則爲true
mqtt_qos
: 服務質量,你能夠將DefaultPahoMessageConverter
聲明爲<bean />
並將payloadAsBytes
屬性設置爲true
,從而將DefaultPahoMessageConverter
返回有效負載中的原始byte[]
MessagingException
。從Spring 4.1版開始,能夠省略URL
。相反,你能夠在DefaultMqttPahoClientFactory
的server URIs
屬性中提供服務器uri。例如,這樣作容許鏈接到高可用(HA)集羣。
從Spring 4.2.2
開始,當適配器成功訂閱到主題了,MqttSubscribedEvent
事件就會被觸發。當鏈接失敗或者訂閱失敗,MqttConnectionFailedEvent
事件會被觸發。這兩個事件都可以被一個Bean經過實現ApplicationListener
而接收到。另外,名爲recoveryInterval
的新屬性控制適配器在失敗後嘗試從新鏈接的時間間隔。默認爲10000毫秒(10秒)。小程序
@Component public class MQTTSubscribedListener implements ApplicationListener<MqttSubscribedEvent> { private static final Logger LOGGER = LogManager.getLogger(MQTTSubscribedListener.class); @Override public void onApplicationEvent(MqttSubscribedEvent event) { LOGGER.debug("Subscribed Success: " + event.getMessage()); } }
在版本Spring 4.2.3以前,當適配器中止時,客戶端老是取消訂閱。這是不正確的,由於若是客戶端QOS大於0,咱們須要保持訂閱處於活動狀態,以便在下次啓動時傳遞適配器中止時到達的消息。這還須要將客戶機工廠上的cleanSession
屬性設置爲false。默認爲true。從4.2.3版開始,若是cleanSession屬性爲false,則適配器不會取消訂閱(默認狀況下),這個默認行爲能夠經過在工廠上設置consumerCloseAction
屬性來重寫此行爲。它能夠有如下值:UNSUBSCRIBE_ALWAYS
、UNSUBSCRIBE_NEVER
和UNSUBSCRIBE_CLEAN
,最後一項(默認設置)僅在cleanSession屬性爲true時取消訂閱。若要還原到4.2.3以前的行爲,請始終使用「取消訂閱」設置項。注意:從Spring 5.0開始,topic、qos和retained屬性映射到
.RECEIVED_…headers
(MqttHeaders.RECEIVED_topic、MqttHeaders.RECEIVED_qos和MqttHeaders.RECEIVED_retained),以免意外傳播到(默認狀況下)使用MqttHeaders.topic、MqttHeaders.qos和MqttHeaders.retained headers的出站消息。segmentfault
public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { LOGGER.debug("===Received Msg(topic {}): {}", message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), message.getPayload()); } }; }
從Spring4.1
開始,你能夠經過編程更改適配器訂閱的主題。Spring Integration提供了addTopic()
和removeTopic()
方法。添加主題時,能夠選擇指定QoS
值(默認是1)。你還能夠經過向具備適當有效負載的<control bus/>
發送適當的消息來修改主題。示例:微信小程序
myMqttAdapter.addTopic('foo', 1)
中止和啓動適配器對主題列表(topics設置項)沒有影響(它不會還原到配置中的原始設置)。這些更改不會保留到應用程序上下文的生命週期以外。新的應用程序上下文將還原爲配置的設置。bash
在適配器中止(或與代理斷開鏈接)時更改主題列表(topics)將在下次創建鏈接時生效。
如下Spring Boot
應用程序顯示瞭如何使用Java配置配置入站(inbound)適配器的示例:
@SpringBootApplication public class MqttJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(MqttJavaApplication.class) .web(false) .run(args); } @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient", "topic1", "topic2"); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { System.out.println(message.getPayload()); } }; } }
下面的Spring Boot
應用程序提供了使用Java DSL配置入站適配器的示例:
@SpringBootApplication public class MqttJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(MqttJavaApplication.class) .web(false) .run(args); } @Bean public IntegrationFlow mqttInbound() { return IntegrationFlows.from( new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient", "topic1", "topic2");) .handle(m -> System.out.println(m.getPayload())) .get(); } }
出站通道適配器由MqttPahoMessageHandler
實現,MqttPahoMessageHandler
包裝在ConsumerEndpoint
中。爲了方便起見,可使用名稱空間配置它。
從Spring 4.1開始,適配器支持異步發送操做,在確認交付以前避免阻塞。若是須要,能夠發出應用程序事件以使應用程序確認傳遞。
如下列表顯示出站通道適配器可用的屬性:
<int-mqtt:outbound-channel-adapter id="withConverter" client-id="foo" url="tcp://localhost:1883" converter="myConverter" client-factory="clientFactory" default-qos="1" qos-expression="" default-retained="true" retained-expression="" default-topic="bar" topic-expression="" async="false" async-events="false" channel="target" />
Converter(MqttMessageConver,可選的),默認的DefaultPaHomeMessageConverter
可識別如下標題:
mqtt_topic
: 消息將發送到的主題mqtt_retained
: 若是要保留消息,則爲truemqtt_qos
:消息服務質量headers[mqtt_qos]
。mqtt_retained
頭,則使用它。若是提供了自定義轉換器,則不使用它。headers[mqtt_retained]
mqtt_topic
頭,則使用)headers['mqtt_topic']
MqttMessageSentEvent
。它包含消息、主題、客戶端庫生成的消息id、clientId和clientInstance(每次鏈接客戶端時遞增)。當客戶端庫確認傳遞時,將發出MqttMessageDeliveredEvent
。它包含messageId、clientId和clientInstance,使傳遞與發送相關。任何ApplicationListener
或事件入站通道適配器均可以接收這些事件。請注意,MqttMessageDeliveredEvent
可能在MqttMessageSentEvent
以前收到。默認值爲false注意,一樣地,從Spring 4.1開始,能夠省略URL。相反,能夠在DefaultMqttPahoClientFactor
的server URIs
屬性中提供服務器uri。例如,這容許鏈接到高可用(HA)集羣。
下面的Spring Boot應用程序展現瞭如何使用Java配置配置出站適配器的示例:
@SpringBootApplication @IntegrationComponentScan public class MqttJavaApplication { public static void main(String[] args) { ConfigurableApplicationContext context = new SpringApplicationBuilder(MqttJavaApplication.class) .web(false) .run(args); MyGateway gateway = context.getBean(MyGateway.class); gateway.sendToMqtt("foo"); } @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" }); options.setUserName("username"); options.setPassword("password".toCharArray()); factory.setConnectionOptions(options); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("testClient", mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic("testTopic"); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MyGateway { void sendToMqtt(String data); } }
下面的Spring Boot應用程序提供了使用Java DSL配置出站適配器的示例:
@SpringBootApplication public class MqttJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(MqttJavaApplication.class) .web(false) .run(args); } @Bean public IntegrationFlow mqttOutboundFlow() { return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient")); } }
本文屬於原創,轉載註明出處,歡迎關注CSDNfreeape或微信小程序小白AI博客
微信公衆號小白AI
或者網站 https://xiaobaiai.net