MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),是一種基於發佈/訂閱(publish/subscribe)模式的「輕量級」通信協議,該協議構建於TCP/IP協議上,由IBM在1999年發佈。MQTT最大優勢在於,能夠以極少的代碼和有限的帶寬,爲鏈接遠程設備提供實時可靠的消息服務。做爲一種低開銷、低帶寬佔用的即時通信協議,使其在物聯網、小型設備、移動應用等方面有較普遍的應用。 MQTT是一個基於客戶端-服務器的消息發佈/訂閱傳輸協議。MQTT協議是輕量、簡單、開放和易於實現的,這些特色使它適用範圍很是普遍。在不少狀況下,包括受限的環境中,如:機器與機器(M2M)通訊和物聯網(IoT)。其在,經過衛星鏈路通訊傳感器、偶爾撥號的醫療設備、智能家居、及一些小型化設備中已普遍使用。java
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
inBound配置屬性web
package com.chen.config; import lombok.Data; /** * @author: ChenJie * @date 2018/8/21 */ @Data public class MqttInboundProperties { private String url; private String username; private String password; private String clientId; private String topics; }
package com.chen.config; import lombok.Getter; import lombok.Setter; /** * @author: ChenJie * @date 2018/8/21 */ @Setter @Getter public class MqttOutboundProperties { private String urls; private String username; private String password; private String clientId; private String topic; }
package com.chen.config; import org.springframework.boot.context.properties.ConfigurationProperties; /** * @author: ChenJie * @date 2018/8/21 */ @ConfigurationProperties(prefix = "com.mqtt") public class MqttProperties { private MqttInboundProperties inbound; private MqttOutboundProperties outbound; public MqttInboundProperties getInbound() { return inbound; } public void setInbound(MqttInboundProperties inbound) { this.inbound = inbound; } public MqttOutboundProperties getOutbound() { return outbound; } public void setOutbound(MqttOutboundProperties outbound) { this.outbound = outbound; } }
package com.chen.mqtt; import com.chen.config.MqttProperties; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; /** * @author: ChenJie * @date 2018/8/21 */ @Configuration @Slf4j public class MqttInboundConfiguration { @Autowired private MqttProperties mqttProperties; @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public MessageProducer inbound(MqttPahoClientFactory mqttPahoClientFactory) { String[] inboundTopics = mqttProperties.getInbound().getTopics().split(","); MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInbound().getUrl(), mqttProperties.getInbound().getClientId(), mqttPahoClientFactory,inboundTopics); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { log.info("收到消息:"+(String) message.getPayload()); } }; } }
package com.chen.mqtt; import com.chen.config.MqttProperties; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; /** * @author: ChenJie * @date 2018/8/21 */ @Configuration public class MqttOutboundConfiguration { @Autowired private MqttProperties mqttProperties; @Bean public MqttPahoClientFactory mqttClientFactory() { String[] serverURIs = mqttProperties.getOutbound().getUrls().split(","); DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); // factory.setServerURIs("tcp://192.168.10.100:1883", "tcp://host2:1883"); factory.setServerURIs(serverURIs); factory.setCleanSession(false); factory.setUserName(mqttProperties.getOutbound().getUsername()); factory.setPassword(mqttProperties.getOutbound().getPassword()); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getOutbound().getClientId(), mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(mqttProperties.getOutbound().getTopic()); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } }
package com.chen.service; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; /** * @author: ChenJie * @date 2018/8/21 */ @Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { void sendToMqtt(String data); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); }
package com.chen.controller; import com.chen.service.MqttGateway; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * @author: ChenJie * @date 2018/8/21 */ @RestController public class MessageController { @Autowired MqttGateway mqttGateway; @RequestMapping(value="/sendMsg") public String sendMsg(@RequestParam String message){ mqttGateway.sendToMqtt("defualt-topic:"+message); return "success"; } }
@SpringBootApplication @Configuration @EnableConfigurationProperties(MqttProperties.class) public class MqttSpringbootApplication { public static void main(String[] args) { SpringApplication.run(MqttSpringbootApplication.class, args); } }