這幾天弄了下mqtt ,發現有不少問題,網上搜不到什麼解決辦法,因此本身記錄下來,也讓初識mqtt的人少走一些坑,關於我寫的不對的也但願看到的人能指出來互相學習下html
說到mqtt,首先確定要安裝了,安裝什麼的地址:http://activemq.apache.org/ap...
我本地是Windows的環境,因此裝的是Windows版本,這裏是第一個注意的地方,由於後面使用的時候windows和linux的有一些不一樣linux
下載完成以後就是解壓安裝了,這裏解壓完成以後進入bin目錄下,本身用cmd或者直接進去在此處打開命令窗口也行,而後運行apollo.cmd 建立一個服務實例個人實例名稱是mybroker因此命令是 apollo.cmd create mybroker,這個名稱本身能夠隨便指定spring
建立完實例後發現bin 目錄下多了一個文件夾,這個文件夾就是你實例名稱,進入文件夾運行
.apollo-broker.cmd run 命令
這樣就啓動成功了apache
啓動成功能夠去http://localhost:61680/console/index.html看看,登陸帳號和密碼在mybrokeretcusers.properties文件中找到輸入就能夠進去了windows
頁面上有鏈接信息和訂閱主題的一些對應信息,有興趣的本身看下,後面也會講到的服務器
安裝成功接下來就是使用了,首先建立一個maven工程,引入配置maven
<!--mqtt--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
因爲咱們後面處理訂閱消息的消費者打印的日誌是用了slf4j爲了方便也引入了lombok的配置 :tcp
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency>
引入完成之後就能夠開始準備開始使用mqtt了
這裏爲了方便維護和配置我把一些配置參數放在了properties文件裏面:spring-boot
#MQTT配置信息 spring.mqtt.username=admin spring.mqtt.password=password spring.mqtt.url=tcp://localhost:61613 spring.mqtt.client.id=clientId spring.mqtt.server.id=serverId spring.mqtt.default.topic=topic
這裏我遇到了一個坑,專門註釋了,就是訂閱端訂閱消息的id 和 發佈端發佈消息的id 必定不能同樣,這樣會致使mqtt識別到兩個同樣的id,消息一發就斷開鏈接了,訂閱端老是收不到消息,這個問題我找了好長時間都不知道問題出在哪,剛接觸的很容易搞錯,第二個問題就是mqtt的服務器鏈接地址,在Windows和linux下tcp的端口是不同的,在啓動的apollo的日誌中能夠看出來性能
監聽的tcp端口是61613,看別人不少的demo上都是1883,若是一直連不上,緣由多是由於這個
接下來就是spring.mqtt.default.topic
配置了,這個是mqtt訂閱和推送的消息主題,既然你想發消息那麼訂閱消息的主題和發佈消息的主題一致才能收到消息,和rabbitmq同樣
而後就是客戶端
@Configuration @IntegrationComponentScan @Slf4j public class MqttSenderConfig { @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.url}") private String hostUrl; @Value("${spring.mqtt.client.id}") private String clientId; @Value("${spring.mqtt.default.topic}") private String defaultTopic; @Bean public MqttConnectOptions getMqttConnectOptions(){ MqttConnectOptions mqttConnectOptions=new MqttConnectOptions(); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setServerURIs(new String[]{hostUrl}); mqttConnectOptions.setKeepAliveInterval(2); return mqttConnectOptions; } @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions()); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(defaultTopic); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } }
這裏有點問題,若是你是複製個人代碼的話MessageHandler 這個類是沒有的須要本身手動導包,看了源碼發現這裏須要的是一個消息處理的handler須要是org.springframework.messaging.MessageHandler的實現,直接導入這個包就好了
@Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MsgWriter { void sendToMqtt(String data); void sendToMqtt(String payload,@Header(MqttHeaders.TOPIC) String topic); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); }
這個是消息發送接口,須要發送消息的時候直接調用就好了,提供了幾個重載方法payload或者data是發送消息的內容
topic是消息發送的主題,這裏能夠本身靈活定義,也可使用默認的主題,就是配置文件的主題,qos是mqtt 對消息處理的幾種機制分爲0,1,2 其中0表示的是訂閱者沒收到消息不會再次發送,消息會丟失,1表示的是會嘗試重試,一直到接收到消息,但這種狀況可能致使訂閱者收到屢次重複消息,2相比多了一次去重的動做,確保訂閱者收到的消息有一次
固然,這三種模式下的性能確定也不同,qos=0是最好的,2是最差的 ,有興趣的能夠去詳細瞭解我在這很少贅述
上面就完成了消息的發送,能夠去http://localhost:61680/console/index.html看看消息的記錄,這裏我寫了一個接口調用sendToMqtt方法發送一條消息
會看到收到有兩個主題,個人是由於我訂閱了兩個主題因此上面顯示的是兩個,個人剛纔發佈消息的主題是too因此打開會看到too有消息送達過來
若是你還沒寫訂閱方的話consumers是沒有的,如今顯示我發了7條消息,證實發送成功了
接下來就是訂閱方,爲了方便我就直接寫在啓動類上了,沒有用到全部的配置
@SpringBootApplication @EnableAutoConfiguration public class MytestApplication { public static void main(String[] args) { SpringApplication.run(MytestApplication.class, args); } @Value("${spring.mqtt.server.id}") private String serverId; @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setServerURIs("tcp://localhost:61613"); factory.setUserName("admin"); factory.setPassword("password"); return factory; } // consumer 訂閱者監聽消息 @Bean public IntegrationFlow mqttInFlow() { return IntegrationFlows.from(mqttInbound()) .transform(p -> p + ", received from MQTT") .handle(logger()) .get(); } private LoggingHandler logger() { LoggingHandler loggingHandler = new LoggingHandler("INFO"); loggingHandler.setLoggerName("siSample"); return loggingHandler; } @Bean public MessageProducerSupport mqttInbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(serverId, mqttClientFactory(), "too"); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); return adapter; } }
這裏訂閱的主題能夠指定,我訂閱的是剛纔發的too主題,還有訂閱方的id 別和發送方的id 同樣
從新啓動項目,發送消息,會發現控制檯已經打印出消息
表明訂閱方已經成功收到消息,同時
也顯示消息訂閱方和記錄,至此一個完整的消息發送和訂閱完成,比較簡單,可是一不留神很容易出現問題,但願能幫助到新入門的人