MQTT
(消息隊列遙測傳輸)是ISO標準(ISO/IEC PRF 20922
)下基於發佈/訂閱範式的消息協議。它工做在 TCP/IP
協議族上,是爲硬件性能低下的遠程設備以及網絡情況糟糕的狀況下而設計的發佈/訂閱型消息協議。國內不少企業都普遍使用MQTT
做爲Android
手機客戶端與服務器端推送消息的協議。html
MQTT
協議是爲大量計算能力有限,且工做在低帶寬、不可靠的網絡的遠程傳感器和控制設備通信而設計的協議,它具備如下主要的幾項特性:java
TCP/IP
提供網絡鏈接;有三種消息發佈服務質量;web
Last Will
和Testament
特性通知有關各方客戶端異常中斷的機制。Apache Apollo
是一個代理服務器,其是在ActiveMQ
基礎上發展而來的,能夠支持STOMP
, AMQP
, MQTT
, Openwire
, SSL
, WebSockets
等多種協議。
原理:服務器端建立一個惟一訂閱號,發送者能夠向這個訂閱號中發東西,而後接受者(即訂閱了這個訂閱號的人)都會收到這個訂閱號發出來的消息。以此來完成消息的推送。服務器實際上是一個消息中轉站。spring
下載地址:http://activemq.apache.org/ap...apache
bin
,執行apollo create mybroker d:\apache-apollo\broker
,建立一個名爲mybroker
虛擬主機(Virtual Host
)。須要特別注意的是,生成的目錄就是之後真正啓動程序的位置。d:\apache-apollo\broker\bin
,執行apollo-broker run
,也能夠用apollo-broker-service.exe
配置服務。http://127.0.0.1:61680
打開web管理界面。(密碼查看broker/etc/users.properties
)cmd
輸出。<!-- spring-boot版本 2.1.0.RELEASE --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
# src/main/resources/config/mqtt.properties ################## # MQTT 配置 ################## # 用戶名 mqtt.username=admin # 密碼 mqtt.password=password # 推送信息的鏈接地址,若是有多個,用逗號隔開,如:tcp://127.0.0.1:61613,tcp://192.168.1.61:61613 mqtt.url=tcp://127.0.0.1:61613 ################## # MQTT 生產者 ################## # 鏈接服務器默認客戶端ID mqtt.producer.clientId=mqttProducer # 默認的推送主題,實際可在調用接口時指定 mqtt.producer.defaultTopic=topic1 ################## # MQTT 消費者 ################## # 鏈接服務器默認客戶端ID mqtt.consumer.clientId=mqttConsumer # 默認的接收主題,能夠訂閱多個Topic,逗號分隔 mqtt.consumer.defaultTopic=topic1
import org.apache.commons.lang3.StringUtils; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; /** * MQTT配置,生產者 * * @author BBF */ @Configuration public class MqttConfig { private static final Logger LOGGER = LoggerFactory.getLogger(MqttConfig.class); private static final byte[] WILL_DATA; static { WILL_DATA = "offline".getBytes(); } /** * 訂閱的bean名稱 */ public static final String CHANNEL_NAME_IN = "mqttInboundChannel"; /** * 發佈的bean名稱 */ public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel"; @Value("${mqtt.username}") private String username; @Value("${mqtt.password}") private String password; @Value("${mqtt.url}") private String url; @Value("${mqtt.producer.clientId}") private String producerClientId; @Value("${mqtt.producer.defaultTopic}") private String producerDefaultTopic; @Value("${mqtt.consumer.clientId}") private String consumerClientId; @Value("${mqtt.consumer.defaultTopic}") private String consumerDefaultTopic; /** * MQTT鏈接器選項 * * @return {@link org.eclipse.paho.client.mqttv3.MqttConnectOptions} */ @Bean public MqttConnectOptions getMqttConnectOptions() { MqttConnectOptions options = new MqttConnectOptions(); // 設置是否清空session,這裏若是設置爲false表示服務器會保留客戶端的鏈接記錄, // 這裏設置爲true表示每次鏈接到服務器都以新的身份鏈接 options.setCleanSession(true); // 設置鏈接的用戶名 options.setUserName(username); // 設置鏈接的密碼 options.setPassword(password.toCharArray()); options.setServerURIs(StringUtils.split(url, ",")); // 設置超時時間 單位爲秒 options.setConnectionTimeout(10); // 設置會話心跳時間 單位爲秒 服務器會每隔1.5*20秒的時間向客戶端發送心跳判斷客戶端是否在線,但這個方法並無重連的機制 options.setKeepAliveInterval(20); // 設置「遺囑」消息的話題,若客戶端與服務器之間的鏈接意外中斷,服務器將發佈客戶端的「遺囑」消息。 options.setWill("willTopic", WILL_DATA, 2, false); return options; } /** * MQTT客戶端 * * @return {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory} */ @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions()); return factory; } /** * MQTT信息通道(生產者) * * @return {@link org.springframework.messaging.MessageChannel} */ @Bean(name = CHANNEL_NAME_OUT) public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } /** * MQTT消息處理器(生產者) * * @return {@link org.springframework.messaging.MessageHandler} */ @Bean @ServiceActivator(inputChannel = CHANNEL_NAME_OUT) public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( producerClientId, mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(producerDefaultTopic); return messageHandler; } /** * MQTT消息訂閱綁定(消費者) * * @return {@link org.springframework.integration.core.MessageProducer} */ @Bean public MessageProducer inbound() { // 能夠同時消費(訂閱)多個Topic MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( consumerClientId, mqttClientFactory(), StringUtils.split(consumerDefaultTopic, ",")); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); // 設置訂閱通道 adapter.setOutputChannel(mqttInboundChannel()); return adapter; } /** * MQTT信息通道(消費者) * * @return {@link org.springframework.messaging.MessageChannel} */ @Bean(name = CHANNEL_NAME_IN) public MessageChannel mqttInboundChannel() { return new DirectChannel(); } /** * MQTT消息處理器(消費者) * * @return {@link org.springframework.messaging.MessageHandler} */ @Bean @ServiceActivator(inputChannel = CHANNEL_NAME_IN) public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { LOGGER.error("===================={}============", message.getPayload()); } }; } }
import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; /** * MQTT生產者消息發送接口 * <p>MessagingGateway要指定生產者的通道名稱</p> * @author BBF */ @Component @MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT) public interface IMqttSender { /** * 發送信息到MQTT服務器 * * @param data 發送的文本 */ void sendToMqtt(String data); /** * 發送信息到MQTT服務器 * * @param topic 主題 * @param payload 消息主體 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); /** * 發送信息到MQTT服務器 * * @param topic 主題 * @param qos 對消息處理的幾種機制。<br> 0 表示的是訂閱者沒收到消息不會再次發送,消息會丟失。<br> * 1 表示的是會嘗試重試,一直到接收到消息,但這種狀況可能致使訂閱者收到屢次重複消息。<br> * 2 多了一次去重的動做,確保訂閱者收到的消息有一次。 * @param payload 消息主體 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); }
/** * MQTT消息發送 * * @author BBF */ @Controller @RequestMapping(value = "/") public class MqttController { /** * 注入發送MQTT的Bean */ @Resource private IMqttSender iMqttSender; /** * 發送MQTT消息 * @param message 消息內容 * @return 返回 */ @ResponseBody @GetMapping(value = "/mqtt", produces ="text/html") public ResponseEntity<String> sendMqtt(@RequestParam(value = "msg") String message) { iMqttSender.sendToMqtt(message); return new ResponseEntity<>("OK", HttpStatus.OK); } }
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration; import org.springframework.context.annotation.PropertySource; /** * SpringBoot 入口類 * * @author BBF */ @SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, HibernateJpaAutoConfiguration.class}) @PropertySource(encoding = "UTF-8", value = {"classpath:config/mqtt.properties"}) public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }