MQTT協議因低延遲、效率高在工業物聯網領域使用的頻率特別高。前文介紹瞭如何用代碼發送MQTT消息,本文在前文的基礎上實現MQTT消息的訂閱接收。
操做步驟:spring
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency>
server: port: 8090 mqtt: host: tcp://127.0.0.1:1883 clientinid: mqttinId clientoutid: mqttoutid topic: virus qoslevel: 1 #MQTT 認證 username: xxx password: xxx timeout: 10000 #20s keepalive: 20
package com.favccxx.mqtt.config; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; @Slf4j @Configuration @IntegrationComponentScan public class MQTTReceiveConfig { @Value("${mqtt.username}") private String username; @Value("${mqtt.password}") private String password; @Value("${mqtt.host}") private String hostUrl; @Value("${mqtt.clientinid}") private String clientId; @Value("${mqtt.topic}") private String defaultTopic; @Value("${mqtt.timeout}") private int completionTimeout ; //鏈接超時 @Bean public MqttConnectOptions getReceiverMqttConnectOptions(){ MqttConnectOptions mqttConnectOptions=new MqttConnectOptions(); mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setConnectionTimeout(10); mqttConnectOptions.setKeepAliveInterval(90); mqttConnectOptions.setAutomaticReconnect(true); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setServerURIs(new String[]{hostUrl}); mqttConnectOptions.setKeepAliveInterval(2); return mqttConnectOptions; } @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getReceiverMqttConnectOptions()); return factory; } //接收通道 @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } //配置client,監聽的topic @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(), defaultTopic); adapter.setCompletionTimeout(completionTimeout); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } //經過通道獲取數據 @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { log.info("主題:{},消息接收到的數據:{}", message.getHeaders().get("mqtt_receivedTopic"), message.getPayload()); } }; } }
啓動服務,使用上一篇博文的消息接口發送消息。服務器
主題: virus, 消息接收到的數據: 打敗武漢肺炎app