本文收錄在我的博客:www.chengxy-nds.top,共享技術資源,共同進步javascript
前一段有幸參與到一個智能家居項目的開發,因爲以前都沒有過這方面的開發經驗,因此對智能硬件的開發模式和技術棧都頗爲好奇。html
產品是一款可燃氣體報警器,若是家中燃氣泄露濃度到達必定閾值,報警器檢測到並上傳氣體濃度值給後臺,後臺以電話、短信、微信等方式,提醒用戶家中可能有氣體泄漏。java
用戶還可能向報警器發一些關閉報警、調整音量的指令等。總體功能仍是比較簡單的,大體的邏輯以下圖所示:
git
但當我真正的參與其中開發時,其實有一點小小的失望,由於在整個研發過程當中,並沒用到什麼新的技術,仍是常規的幾種中間件,只不過換個用法而已。github
技術選型用rabbitmq
來作核心的組件,主要考慮到運維成本低,組內成員使用的熟練度比較高。web
下面和小夥伴分享一下如何用 springboot
+ rabbitmq
搭建物聯網(IOT
)平臺,其實智能硬件也沒想象的那麼遙不可及!spring
不少小夥伴可能有點懵?rabbitmq
不是消息隊列嗎?怎麼又能作智能硬件了?緩存
其實rabbitmq
有兩種協議,咱們平時接觸的消息隊列是用的AMQP
協議,而用在智能硬件中的是MQTT
協議。springboot
MQTT
全稱(Message Queue Telemetry Transport):一種基於發佈/訂閱(publish
/subscribe
)模式的輕量級
通信協議,經過訂閱相應的主題來獲取消息,是物聯網(Internet of Thing
)中的一個標準傳輸協議。服務器
該協議將消息的發佈者(publisher
)與訂閱者(subscriber
)進行分離,所以能夠在不可靠的網絡環境中,爲遠程鏈接的設備提供可靠的消息服務,使用方式與傳統的MQ有點相似。
TCP
協議位於傳輸層,MQTT
協議位於應用層,MQTT
協議構建於TCP/IP
協議上,也就是說只要支持TCP/IP
協議棧的地方,均可以使用MQTT
協議。
MQTT
協議爲何在物聯網(IOT)中如此受偏心?而不是其它協議,好比咱們更爲熟悉的 HTTP
協議呢?
首先HTTP
協議它是一種同步協議,客戶端請求後須要等待服務器的響應。而在物聯網(IOT)環境中,設備會很受制於環境的影響,好比帶寬低、網絡延遲高、網絡通訊不穩定等,顯然異步消息協議更爲適合IOT
應用程序。
HTTP
是單向的,若是要獲取消息客戶端必須發起鏈接,而在物聯網(IOT)應用程序中,設備或傳感器每每都是客戶端,這意味着它們沒法被動地接收來自網絡的命令。
HTTP
要實現這樣的功能不但很困難,並且成本極高。 前邊說過MQTT
是一種輕量級的協議,它只專一於發消息, 因此此協議的結構也很是簡單。
在MQTT
協議中,一個MQTT
數據包由:固定頭
(Fixed header)、 可變頭
(Variable header)、 消息體
(payload)三部分構成。
一、固定頭
固定頭部,使用兩個字節,共16位:
(4-7)位表示消息類型,使用4位二進制表示,可表明以下的16種消息類型,不過 0 和 15位置屬於保留待用,因此共14種消息事件類型。
DUP Flag(重試標識)
DUP Flag:保證消息可靠傳輸,消息是否已送達的標識。默認爲0,只佔用一個字節,表示第一次發送,當值爲1時,表示當前消息先前已經被傳送過。
QoS Level(消息質量等級)
QoS Level:消息的質量等級,後邊會詳細介紹
RETAIN(持久化)
值爲1
:表示發送的消息須要一直持久保存,並且不受服務器重啓影響,不但要發送給當前的訂閱者,且之後新加入的客戶端訂閱了此Topic
,訂閱者也會立刻獲得推送。
注意:新加入的訂閱者,只會取出最新的一個RETAIN flag = 1
的消息推送。
0
:僅爲當前訂閱者推送此消息。Remaining Length(剩餘長度)
在當前消息中剩餘的byte
(字節)數,包含可變頭部和消息體payload。
二、可變頭
固定頭部僅定義了消息類型和一些標誌位,一些消息的元數據須要放入可變頭部中。可變頭部內容字節長度 + 消息體payload = 剩餘長度。
可變頭部居於固定頭部和payload中間,包含了協議名稱,版本號,鏈接標誌,用戶受權,心跳時間等內容。
可變頭存在於這些類型的消息:PUBLISH (QoS > 0)、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK。
三、消息體payload
消息體payload只存在於CONNECT
、PUBLISH
、SUBSCRIBE
、SUBACK
、UNSUBSCRIBE
這幾種類型的消息:
CONNECT
:包含客戶端的ClientId
、訂閱的Topic
、Message
以及用戶名
和密碼
。PUBLISH
:向對應主題發送消息。SUBSCRIBE
:要訂閱的主題以及QoS
。SUBACK
:服務器對於SUBSCRIBE
所申請的主題及QoS
進行確認和回覆。UNSUBSCRIBE
:取消要訂閱的主題。消息質量
(Quality of Service),即消息的發送質量,發佈者(publisher
)和訂閱者(subscriber
)均可以指定qos
等級,有QoS 0
、QoS 1
、QoS 2
三個等級。
下邊分別說明一下這三個等級的區別。
一、Qos 0:At most once
(至多一次),只發送一次消息,不保證消息是否成功送達,沒有確認機制,消息可能會丟失或重複。
二、Qos 1:At least once
(至少一次),相對於QoS 0
而言Qos 1
增長了ack
確認機制,發送者(publisher
)推送消息到MQTT代理(broker
)時,二者自身都會先持久化消息,只有當publisher
或者 Broker
分別收到 PUBACK
確認時,纔會刪除自身持久化的消息,不然就會重發。
但有個問題,儘管咱們能夠經過確認來保證必定收到客戶端 或 服務器的message
,可咱們卻不能保證僅收到一次message
,也就是當客戶端publisher
沒收到Broker
的puback
或者 Broker
沒有收到subscriber
的puback
,那麼就會一直重發。
publisher -> broker 大體流程:
三、Qos 2:Exactly once
(只有一次),相對於QoS 1
,QoS 2
升級實現了僅接受一次message
,publisher
和 broker
一樣對消息進行持久化,其中 publisher
緩存了message
和 對應的msgID
,而 broker
緩存了 msgID
,能夠保證消息不重複,因爲又增長了一個confirm
機制,整個流程變得複雜不少。
publisher -> broker 大體流程:
LWT
全稱爲 Last Will and Testament
,其實遺囑是一個由客戶端預先定義好的主題和對應消息,附加在CONNECT
的數據包中,包括遺願主題
、遺願 QoS
、遺願消息
等。
當MQTT代理 Broker
檢測到有客戶端client
非正常斷開鏈接時,再由服務器主動發佈此消息,而後相關的訂閱者會收到消息。
舉個栗子:聊天室中全部人都訂閱一個叫talk
的主題 ,但小富因爲網絡抖動忽然斷開了連接,這時聊天室中全部訂閱主題 talk
的客戶端都會收到一個 「小富離開聊天室
」 的遺願消息。
遺囑的相關參數:
Will Flag
:是否使用 LWT,1 開啓Will Topic
:遺願主題名,不可以使用通配符Will Qos
:發佈遺願消息時使用的 QoSWill Retain
:遺願消息的 Retain 標識Will Message
:遺願消息內容那客戶端Client
有哪些場景是非正常斷開鏈接呢?
Broker
檢測到底層的 I/O 異常;Keep Alive
的間隔內和 Broker
進行消息交互;TCP
鏈接前沒有發送 DISCONNECT
數據包;Broker
,致使關閉和客戶端的鏈接等。注意:當客戶端經過發佈 DISCONNECT
數據包斷開鏈接時,屬於正常斷開鏈接,並不會觸發 LWT
的機制,與此同時Broker
還會丟棄掉當前客戶端在鏈接時指定的相關 LWT
參數。
MQTT
協議普遍應用於物聯網、移動互聯網、智能硬件、車聯網、電力能源等領域。使用的場景也是很是很是多,下邊列舉一些:
具體 rabbitmq
的環境搭建就不贅述了,網上教程比較多,有條件的用服務器,沒條件的像我搞個Windows
版的也很快樂嘛。
咱們先開啓 rabbitmq
的 mqtt
協議,由於默認安裝下是關閉的,命令以下:
rabbitmq-plugins enable rabbitmq_mqtt
上一步中安裝rabbitmq
環境並開啓 mqtt
協議後,實際上mqtt
消息代理服務就搭建好了,接下來要作的就是實現客戶端消息的推送和訂閱。
這裏使用spring-integration-mqtt
、org.eclipse.paho.client.mqttv3
兩個工具包實現。
<!--mqtt依賴包--> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency>
消息的發送比較簡單,主要是應用到@ServiceActivator
註解,須要注意messageHandler.setAsync
屬性,若是設置成false
,關閉異步模式發送消息時可能會阻塞。
@Configuration public class IotMqttProducerConfig { @Autowired private MqttConfig mqttConfig; @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setServerURIs(mqttConfig.getServers()); return factory; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } @Bean @ServiceActivator(inputChannel = "iotMqttInputChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getServerClientId(), mqttClientFactory()); messageHandler.setAsync(false); messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic()); return messageHandler; } }
MQTT
對外提供發送消息的API
時,須要使用@MessagingGateway
註解,去提供一個消息網關代理,參數defaultRequestChannel
指定發送消息綁定的channel
。
能夠實現三種API
接口,payload
爲發送的消息,topic
發送消息的主題,qos
消息質量。
@MessagingGateway(defaultRequestChannel = "iotMqttInputChannel") public interface IotMqttGateway { // 向默認的 topic 發送消息 void sendMessage2Mqtt(String payload); // 向指定的 topic 發送消息 void sendMessage2Mqtt(String payload,@Header(MqttHeaders.TOPIC) String topic); // 向指定的 topic 發送消息,並指定服務質量參數 void sendMessage2Mqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); }
消息訂閱和咱們平時用的MQ消息監聽實現思路基本類似,@ServiceActivator
註解代表當前方法用於處理MQTT
消息,inputChannel
參數指定了用於接收消息的channel
。
/** * @Author: xiaofu * @Description: 消息訂閱配置 * @date 2020/6/8 18:24 */ @Configuration public class IotMqttSubscriberConfig { @Autowired private MqttConfig mqttConfig; @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setServerURIs(mqttConfig.getServers()); return factory; } @Bean public MessageChannel iotMqttInputChannel() { return new DirectChannel(); } @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory(), mqttConfig.getDefaultTopic()); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(iotMqttInputChannel()); return adapter; } /** * @author xiaofu * @description 消息訂閱 * @date 2020/6/8 18:20 */ @Bean @ServiceActivator(inputChannel = "iotMqttInputChannel") public MessageHandler handlerTest() { return message -> { try { String string = message.getPayload().toString(); System.out.println("接收到消息:" + string); } catch (MessagingException ex) { //logger.info(ex.getMessage()); } }; } }
額~ 因爲本渣渣對硬件一竅不通,爲了模擬硬件的發送消息,只能藉助一下工具,其實硬件端實現MQTT
協議,跟咱們前邊的基本沒什麼區別,只不過換種語言嵌入到硬件中而已。
這裏選的測試工具爲mqttbox
,下載地址:http://workswithweb.com/mqttbox.html
咱們用先用mqttbox
模擬向主題mqtt_test_topic
發送消息,看後臺是否能成功接收到。
看到後臺成功拿到了向主題mqtt_test_topic
發送的消息。
用mqttbox
模擬訂閱主題mqtt_test_topic
,在後臺向主題mqtt_test_topic
發送一條消息,這裏我簡單的寫了個controller
調用API發送消息。
http://127.0.0.1:8080/fun/testMqtt?topic=mqtt_test_topic&message=我是後臺向主題 mqtt_test_topic 發送的消息
咱們看mqttbox
的訂閱消息,已經成功的接收到了後臺的消息,到此咱們的MQTT
通訊環境就算搭建成功了。若是把mqttbox
工具換成具體硬件設備,整個流程就是咱們常說的智能家居了,其實真的沒那麼難。
在咱們實際的生產環境中遇到過的問題,這裏分享一下讓你們少踩坑。
在客戶端connect
鏈接的時,會有一個clientId
參數,須要每一個客戶端都保持惟一的。但咱們在開發測試階段clientId
直接在代碼中寫死了,並且服務都是單實例部署,並無暴露出什麼問題。
MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory(), mqttConfig.getDefaultTopic());
然而在生產環境內側的時候,因爲服務是多實例集羣部署,結果出現了下邊的奇怪問題。同一時間內只能有一個客戶端能拿到消息,其餘客戶端不但不能消費消息,並且還在不斷的掉線重連:Lost connection: 已斷開鏈接; retrying...
。
這就是因爲clientId
相同致使客戶端間相互競爭消費,最後將clientId
獲取方式換成從發號器中拿,問題就行了,因此這個地方是須要特別注意的。
平時程序在開發環境沒問題,可恰恰到了生產環境就一大堆問題,不少都是由於服務部署方式不一樣致使的。因此多學習分佈式仍是頗有必要的。
MQTT
它只是一種協議,支持MQTT
協議的消息中間件產品很是多,下邊的也只是其中的一部分
我也是第一次作和硬件相關的項目,以前聽到智能家居都會以爲好高大上,但實際上手開發後發現,技術嘛萬變不離其宗,也只是換種用法而已。
雙手奉上項目 demo 的github
地址 :https://github.com/chengxy-nds/springboot-rabbitmq-mqtt.git
感興趣的小夥伴能夠下載跑一跑,實現起來很是的簡單。
原創不易,燃燒秀髮輸出內容,但願你能有一丟丟收穫!