公司的IOT平臺主要採用MQTT(消息隊列遙測傳輸)對底層的驅動作命令下發和數據採集。也用到了redis、zeroMQ、nats等消息中間件。今天先整理SpringBoot集成MQTT筆記和工做中遇到的問題。html
MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.java
官網地址:http://mqtt.org/ 、 https://www.mqtt.com/redis
MQTT除了具有大部分消息中間件擁有的功能外,其最大的特色就是小型傳輸。以減小開銷,減低網絡流量的方式去知足低帶寬、不穩定的網絡遠程傳輸。spring
MQTT服務器有不少,好比Apache-Apollo和EMQX,ITDragon龍 目前使用的時EMQX做爲MQTT的服務器。使用也很簡單,下載解壓後,進入bin目錄執行emqx console 啓動服務。數據庫
MQTT調試工具能夠用MQTTBox編程
第一步:導入面向企業應用集成庫和對應mqtt集成庫springboot
compile('org.springframework.boot:spring-boot-starter-integration') compile('org.springframework.integration:spring-integration-mqtt')
這裏要注意spring-integration-mqtt的版本。由於會存在org.eclipse.paho.client.mqttv3修復了一些bug,並迭代了新版本。但spring-integration-mqtt並無及時更新的狀況。修改方法以下服務器
compile("org.springframework.integration:spring-integration-mqtt") { exclude group: "org.eclipse.paho" , module: "org.eclipse.paho.client.mqttv3" } compile("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.2")
第二步:MQTT鏈接配置文件網絡
# MQTT Config mqtt.server=tcp://x.x.x.x:1883 mqtt.username=xxx mqtt.password=xxx mqtt.client-id=clientID mqtt.cache-number=100 mqtt.message.topic=itDragon/tags/cov
Inbound 入站消息適配器eclipse
第一步:配置MQTT客戶端工廠類DefaultMqttPahoClientFactory
第二步:配置MQTT入站消息適配器MqttPahoMessageDrivenChannelAdapter
第三步:定義MQTT入站消息通道MessageChannel
第四步:聲明MQTT入站消息處理器MessageHandler
如下有些配置是衝突或者重複的,主要是體現一些重要配置。
package com.itdragon.server.config import com.itdragon.server.message.ITDragonMQTTMessageHandler import org.eclipse.paho.client.mqttv3.MqttConnectOptions import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.integration.annotation.ServiceActivator import org.springframework.integration.channel.DirectChannel import org.springframework.integration.core.MessageProducer import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory import org.springframework.integration.mqtt.core.MqttPahoClientFactory import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter import org.springframework.messaging.MessageChannel import org.springframework.messaging.MessageHandler import java.time.Instant @Configuration class MQTTConfig { @Value("\${mqtt.server}") lateinit var mqttServer: String @Value("\${mqtt.user-name}") lateinit var mqttUserName: String @Value("\${mqtt.password}") lateinit var mqttUserPassword: String @Value("\${mqtt.client-id}") lateinit var clientID: String @Value("\${mqtt.cache-number}") lateinit var maxMessageInFlight: String @Value("\${mqtt.message.topic}") lateinit var messageTopic: String /** * 配置DefaultMqttPahoClientFactory * 1. 配置基本的連接信息 * 2. 配置maxInflight,在mqtt消息量比較大的狀況下將值設大 */ fun mqttClientFactory(): MqttPahoClientFactory { val mqttConnectOptions = MqttConnectOptions() // 配置mqtt服務端地址,登陸帳號和密碼 mqttConnectOptions.serverURIs = arrayOf(mqttServer) mqttConnectOptions.userName = mqttUserName mqttConnectOptions.password = mqttUserPassword.toCharArray() // 配置最大不肯定接收消息數量,默認值10,qos!=0 時生效 mqttConnectOptions.maxInflight = maxMessageInFlight.toInt() val factory = DefaultMqttPahoClientFactory() factory.connectionOptions = mqttConnectOptions return factory } /** * 配置Inbound入站,消費者基本鏈接配置 * 1. 經過DefaultMqttPahoClientFactory 初始化入站通道適配器 * 2. 配置超時時長,默認30000毫秒 * 3. 配置Paho消息轉換器 * 4. 配置發送數據的服務質量 0~2 * 5. 配置訂閱通道 */ @Bean fun itDragonMqttInbound(): MessageProducer { // 初始化入站通道適配器,使用的是Eclipse Paho MQTT客戶端庫 val adapter = MqttPahoMessageDrivenChannelAdapter(clientID + Instant.now().toEpochMilli(), mqttClientFactory(), messageTopic) // 設置鏈接超時時長(默認30000毫秒) adapter.setCompletionTimeout(30000) // 配置默認Paho消息轉換器(qos=0, retain=false, charset=UTF-8) adapter.setConverter(DefaultPahoMessageConverter()) // 設置服務質量 // 0 最多一次,數據可能丟失; // 1 至少一次,數據可能重複; // 2 只有一次,有且只有一次;最耗性能 adapter.setQos(0) // 設置訂閱通道 adapter.outputChannel = itDragonMqttInputChannel() return adapter } /** * 配置Inbound入站,消費者訂閱的消息通道 */ @Bean fun itDragonMqttInputChannel(): MessageChannel { return DirectChannel() } /** * 配置Inbound入站,消費者的消息處理器 * 1. 使用@ServiceActivator註解,代表所修飾的方法用於消息處理 * 2. 使用inputChannel值,代表從指定通道中取值 * 3. 利用函數式編程的思路,解耦MessageHandler的業務邏輯 */ @Bean @ServiceActivator(inputChannel = "itDragonMqttInputChannel") fun commandDataHandler(): MessageHandler { /*return MessageHandler { message -> println(message.payload) }*/ return ITDragonMQTTMessageHandler() } }
注意:
第一步:配置Outbound出站,出站通道適配器
第二步:配置Outbound出站,發佈者發送的消息通道
第三步:對外提供推送消息的接口
在原有的MQTTConfig配置類的集成上補充如下內容
/** * 配置Outbound出站,出站通道適配器 * 1. 經過MqttPahoMessageHandler 初始化出站通道適配器 * 2. 配置異步發送 * 3. 配置默認的服務質量 */ @Bean @ServiceActivator(inputChannel = "itDragonMqttOutputChannel") fun itDragonMqttOutbound(): MqttPahoMessageHandler { // 初始化出站通道適配器,使用的是Eclipse Paho MQTT客戶端庫 val messageHandler = MqttPahoMessageHandler(clientID + Instant.now().toEpochMilli() + "_set", mqttClientFactory()) // 設置異步發送,默認是false(發送時阻塞) messageHandler.setAsync(true) // 設置默認的服務質量 messageHandler.setDefaultQos(0) return messageHandler } /** * 配置Outbound出站,發佈者發送的消息通道 */ @Bean fun itDragonMqttOutputChannel(): MessageChannel { return DirectChannel() } /** * 對外提供推送消息的接口 * 1. 使用@MessagingGateway註解,配置MQTTMessageGateway消息推送接口 * 2. 使用defaultRequestChannel值,調用時將向其發送消息的默認通道 * 3. 配置靈活的topic主題 */ @MessagingGateway(defaultRequestChannel = "itDragonMqttOutputChannel") interface MQTTMessageGateway { fun sendToMqtt(data: String, @Header(MqttHeaders.TOPIC) topic: String) fun sendToMqtt(data: String, @Header(MqttHeaders.QOS) qos: Int, @Header(MqttHeaders.TOPIC) topic: String) }
注意:
爲了讓消息處理函數和MQTT配置解耦,這裏提供MessageHandler 註冊類,將消息處理的業務邏輯以函數式編程的思惟註冊到Handler中。
package com.itdragon.server.message import org.springframework.messaging.Message import org.springframework.messaging.MessageHandler class ITDragonMQTTMessageHandler : MessageHandler { private var handler: ((String) -> Unit)? = null fun registerHandler(handler: (String) -> Unit) { this.handler = handler } override fun handleMessage(message: Message<*>) { handler?.run { this.invoke(message.payload.toString()) } } }
註冊MessageHandler
package com.itdragon.server.message import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import javax.annotation.PostConstruct @Service class ITDragonMessageDispatcher { private val logger = LoggerFactory.getLogger(ITDragonMessageDispatcher::class.java) @Autowired lateinit var itDragonMQTTMessageHandler: ITDragonMQTTMessageHandler @PostConstruct fun init() { itDragonMQTTMessageHandler.registerHandler { itDragonMsgHandler(it) } } fun itDragonMsgHandler(message: String) { logger.info("itdragon mqtt receive message: $message") try { // todo }catch (ex: Exception) { ex.printStackTrace() } } }
注入MQTT的MessageGateway,而後推送消息。
@Autowired lateinit var mqttGateway: MQTTConfig.MQTTMessageGateway @Scheduled(fixedDelay = 10*1000) fun sendMessage() { mqttGateway.sendToMqtt("Hello ITDragon ${Instant.now()}", "itDragon/tags/cov/set") }
項目上線一段時間後,客戶的服務器嚴重卡頓。緣由是客戶服務斷網後,MQTT在每次嘗試重連的過程當中一直在建立新的線程,致使一個Java服務建立了上萬個線程。解決方案是更新了org.eclipse.paho.client.mqttv3的版本,也是 "3.1 導入mqtt庫" 中提到的。後續就沒有出現這個問題了。
MQTT的消息量大的狀況下,既要保障數據的完整,又要保障性能的穩定。光從MQTT自己上來講,很難作到魚和熊掌不可兼得。ITDragon龍 先要理清需求:
在消息量大的狀況下,ITDragon龍 能夠將服務質量設置成0(最多一次)以減小消息確認的開銷,用來保證系統的穩定性。
將消息的服務質量設置成0後,會讓消息的丟失可能性變得更大,如何保證數據的完整性?其實ITDragon龍 能夠在往MQTT通道推送消息以前,先將底層驅動採集的數據先異步保存到Inflxudb數據庫中。
其實採集的數據消息,在這一批推送過程當中可能會丟失。可是會在下一批推送過程當中補上。命令下發也是同樣,若是下發失敗,再重寫下發一次。畢竟消息的丟失並非必現的狀況。是小几率事件,系統的穩定性纔是最重要的。