SpringBoot 集成MQTT配置

1. 前言

公司的IOT平臺主要採用MQTT(消息隊列遙測傳輸)對底層的驅動作命令下發和數據採集。也用到了redis、zeroMQ、nats等消息中間件。今天先整理SpringBoot集成MQTT筆記和工做中遇到的問題。html

2. MQTT介紹

MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.java

官網地址:http://mqtt.org/https://www.mqtt.com/redis

MQTT除了具有大部分消息中間件擁有的功能外,其最大的特色就是小型傳輸。以減小開銷,減低網絡流量的方式去知足低帶寬、不穩定的網絡遠程傳輸。spring

MQTT服務器有不少,好比Apache-Apollo和EMQX,ITDragon龍 目前使用的時EMQX做爲MQTT的服務器。使用也很簡單,下載解壓後,進入bin目錄執行emqx console 啓動服務。數據庫

MQTT調試工具能夠用MQTTBox編程

3. SpringBoot 集成MQTT

3.1 導入mqtt庫

第一步:導入面向企業應用集成庫和對應mqtt集成庫springboot

compile('org.springframework.boot:spring-boot-starter-integration')
compile('org.springframework.integration:spring-integration-mqtt')

這裏要注意spring-integration-mqtt的版本。由於會存在org.eclipse.paho.client.mqttv3修復了一些bug,並迭代了新版本。但spring-integration-mqtt並無及時更新的狀況。修改方法以下服務器

compile("org.springframework.integration:spring-integration-mqtt") {
    exclude group: "org.eclipse.paho" , module: "org.eclipse.paho.client.mqttv3"
}
compile("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.2")

第二步:MQTT鏈接配置文件網絡

# MQTT Config
mqtt.server=tcp://x.x.x.x:1883
mqtt.username=xxx
mqtt.password=xxx
mqtt.client-id=clientID
mqtt.cache-number=100
mqtt.message.topic=itDragon/tags/cov

3.2 配置MQTT訂閱者

Inbound 入站消息適配器eclipse

第一步:配置MQTT客戶端工廠類DefaultMqttPahoClientFactory

第二步:配置MQTT入站消息適配器MqttPahoMessageDrivenChannelAdapter

第三步:定義MQTT入站消息通道MessageChannel

第四步:聲明MQTT入站消息處理器MessageHandler

如下有些配置是衝突或者重複的,主要是體現一些重要配置。

package com.itdragon.server.config

import com.itdragon.server.message.ITDragonMQTTMessageHandler
import org.eclipse.paho.client.mqttv3.MqttConnectOptions
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.integration.annotation.ServiceActivator
import org.springframework.integration.channel.DirectChannel
import org.springframework.integration.core.MessageProducer
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory
import org.springframework.integration.mqtt.core.MqttPahoClientFactory
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter
import org.springframework.messaging.MessageChannel
import org.springframework.messaging.MessageHandler
import java.time.Instant

@Configuration
class MQTTConfig {

    @Value("\${mqtt.server}")
    lateinit var mqttServer: String
    @Value("\${mqtt.user-name}")
    lateinit var mqttUserName: String
    @Value("\${mqtt.password}")
    lateinit var mqttUserPassword: String
    @Value("\${mqtt.client-id}")
    lateinit var clientID: String
    @Value("\${mqtt.cache-number}")
    lateinit var maxMessageInFlight: String
    @Value("\${mqtt.message.topic}")
    lateinit var messageTopic: String

    /**
     * 配置DefaultMqttPahoClientFactory
     * 1. 配置基本的連接信息
     * 2. 配置maxInflight,在mqtt消息量比較大的狀況下將值設大
     */
    fun mqttClientFactory(): MqttPahoClientFactory {
        val mqttConnectOptions = MqttConnectOptions()
        // 配置mqtt服務端地址,登陸帳號和密碼
        mqttConnectOptions.serverURIs = arrayOf(mqttServer)
        mqttConnectOptions.userName = mqttUserName
        mqttConnectOptions.password = mqttUserPassword.toCharArray()
        // 配置最大不肯定接收消息數量,默認值10,qos!=0 時生效
        mqttConnectOptions.maxInflight = maxMessageInFlight.toInt()
        val factory = DefaultMqttPahoClientFactory()
        factory.connectionOptions = mqttConnectOptions
        return factory
    }

    /**
     * 配置Inbound入站,消費者基本鏈接配置
     * 1. 經過DefaultMqttPahoClientFactory 初始化入站通道適配器
     * 2. 配置超時時長,默認30000毫秒
     * 3. 配置Paho消息轉換器
     * 4. 配置發送數據的服務質量 0~2
     * 5. 配置訂閱通道
     */
    @Bean
    fun itDragonMqttInbound(): MessageProducer {
        // 初始化入站通道適配器,使用的是Eclipse Paho MQTT客戶端庫
        val adapter = MqttPahoMessageDrivenChannelAdapter(clientID + Instant.now().toEpochMilli(), mqttClientFactory(), messageTopic)
        // 設置鏈接超時時長(默認30000毫秒)
        adapter.setCompletionTimeout(30000)
        // 配置默認Paho消息轉換器(qos=0, retain=false, charset=UTF-8)
        adapter.setConverter(DefaultPahoMessageConverter())
        // 設置服務質量
        // 0 最多一次,數據可能丟失;
        // 1 至少一次,數據可能重複;
        // 2 只有一次,有且只有一次;最耗性能
        adapter.setQos(0)
        // 設置訂閱通道
        adapter.outputChannel = itDragonMqttInputChannel()
        return adapter
    }

    /**
     * 配置Inbound入站,消費者訂閱的消息通道
     */
    @Bean
    fun itDragonMqttInputChannel(): MessageChannel {
        return DirectChannel()
    }

    /**
     * 配置Inbound入站,消費者的消息處理器
     * 1. 使用@ServiceActivator註解,代表所修飾的方法用於消息處理
     * 2. 使用inputChannel值,代表從指定通道中取值
     * 3. 利用函數式編程的思路,解耦MessageHandler的業務邏輯
     */
    @Bean
    @ServiceActivator(inputChannel = "itDragonMqttInputChannel")
    fun commandDataHandler(): MessageHandler {
        /*return MessageHandler { message ->
            println(message.payload)
        }*/
        return ITDragonMQTTMessageHandler()
    }

}

注意:

  • 1)MQTT的客戶端ID要惟一。
  • 2)MQTT在消息量大的狀況下會出現消息丟失的狀況。
  • 3)MessageHandler注意解耦問題。

3.3 配置MQTT發佈者

Outbound 出站消息適配器

第一步:配置Outbound出站,出站通道適配器

第二步:配置Outbound出站,發佈者發送的消息通道

第三步:對外提供推送消息的接口

在原有的MQTTConfig配置類的集成上補充如下內容

/**
     * 配置Outbound出站,出站通道適配器
     * 1. 經過MqttPahoMessageHandler 初始化出站通道適配器
     * 2. 配置異步發送
     * 3. 配置默認的服務質量
     */
    @Bean
    @ServiceActivator(inputChannel = "itDragonMqttOutputChannel")
    fun itDragonMqttOutbound(): MqttPahoMessageHandler {
        // 初始化出站通道適配器,使用的是Eclipse Paho MQTT客戶端庫
        val messageHandler = MqttPahoMessageHandler(clientID + Instant.now().toEpochMilli() + "_set", mqttClientFactory())
        // 設置異步發送,默認是false(發送時阻塞)
        messageHandler.setAsync(true)
        // 設置默認的服務質量
        messageHandler.setDefaultQos(0)
        return messageHandler
    }

    /**
     * 配置Outbound出站,發佈者發送的消息通道
     */
    @Bean
    fun itDragonMqttOutputChannel(): MessageChannel {
        return DirectChannel()
    }

    /**
     * 對外提供推送消息的接口
     * 1. 使用@MessagingGateway註解,配置MQTTMessageGateway消息推送接口
     * 2. 使用defaultRequestChannel值,調用時將向其發送消息的默認通道
     * 3. 配置靈活的topic主題
     */
    @MessagingGateway(defaultRequestChannel = "itDragonMqttOutputChannel")
    interface MQTTMessageGateway {
        fun sendToMqtt(data: String, @Header(MqttHeaders.TOPIC) topic: String)
        fun sendToMqtt(data: String, @Header(MqttHeaders.QOS) qos: Int, @Header(MqttHeaders.TOPIC) topic: String)
    }

注意:

  • 1)發佈者和訂閱者的客戶端ID不能相同。
  • 2)消息的推送建議採用異步的方式。
  • 3)消息的推送方法能夠只傳payload消息體,但須要配置setDefaultTopic。

3.4 MQTT消息處理和發送

3.4.1 消息處理

爲了讓消息處理函數和MQTT配置解耦,這裏提供MessageHandler 註冊類,將消息處理的業務邏輯以函數式編程的思惟註冊到Handler中。

package com.itdragon.server.message

import org.springframework.messaging.Message
import org.springframework.messaging.MessageHandler

class ITDragonMQTTMessageHandler : MessageHandler {

    private var handler: ((String) -> Unit)? = null

    fun registerHandler(handler: (String) -> Unit) {
        this.handler = handler
    }

    override fun handleMessage(message: Message<*>) {
        handler?.run { this.invoke(message.payload.toString()) }
    }
}

註冊MessageHandler

package com.itdragon.server.message

import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import javax.annotation.PostConstruct

@Service
class ITDragonMessageDispatcher {

    private val logger = LoggerFactory.getLogger(ITDragonMessageDispatcher::class.java)

    @Autowired
    lateinit var itDragonMQTTMessageHandler: ITDragonMQTTMessageHandler

    @PostConstruct
    fun init() {
        itDragonMQTTMessageHandler.registerHandler { itDragonMsgHandler(it) }
    }

    fun itDragonMsgHandler(message: String) {
        logger.info("itdragon mqtt receive message: $message")
        try {
            // todo
        }catch (ex: Exception) {
            ex.printStackTrace()
        }
    }

}

3.4.1 消息發送

注入MQTT的MessageGateway,而後推送消息。

@Autowired
lateinit var mqttGateway: MQTTConfig.MQTTMessageGateway

@Scheduled(fixedDelay = 10*1000)
fun sendMessage() {
    mqttGateway.sendToMqtt("Hello ITDragon ${Instant.now()}", "itDragon/tags/cov/set")
}

4. 開發常見問題

4.1 MQTT每次重連失敗都會增加線程數

項目上線一段時間後,客戶的服務器嚴重卡頓。緣由是客戶服務斷網後,MQTT在每次嘗試重連的過程當中一直在建立新的線程,致使一個Java服務建立了上萬個線程。解決方案是更新了org.eclipse.paho.client.mqttv3的版本,也是 "3.1 導入mqtt庫" 中提到的。後續就沒有出現這個問題了。

4.2 MQTT消息量大存在消息丟失的狀況

MQTT的消息量大的狀況下,既要保障數據的完整,又要保障性能的穩定。光從MQTT自己上來講,很難作到魚和熊掌不可兼得。ITDragon龍 先要理清需求:

  • 1)數據的完整性,主要用於能耗的統計、報警的分析
  • 2)性能的穩定性,服務器不掛🤣🤣🤣🤣

在消息量大的狀況下,ITDragon龍 能夠將服務質量設置成0(最多一次)以減小消息確認的開銷,用來保證系統的穩定性。

將消息的服務質量設置成0後,會讓消息的丟失可能性變得更大,如何保證數據的完整性?其實ITDragon龍 能夠在往MQTT通道推送消息以前,先將底層驅動採集的數據先異步保存到Inflxudb數據庫中。

其實採集的數據消息,在這一批推送過程當中可能會丟失。可是會在下一批推送過程當中補上。命令下發也是同樣,若是下發失敗,再重寫下發一次。畢竟消息的丟失並非必現的狀況。是小几率事件,系統的穩定性纔是最重要的。

相關文章
相關標籤/搜索