寫在前面:
前一篇文字<<基於MQTT協議談談物聯網開發-華佗寫代碼>>主要敘述了MQTT協議的編解碼以及基於MQTT協議的一些常見應用場景,並以一個簡單的消息推送系統做爲例子闡述具體MQTT應用的開發,這篇文字繼續敘述上述應用中Mqtt Broker部分的實現.html
1.Mqtt Broker開源產品:node
Mqtt Broker簡單來講就是基於MQTT協議的Server實現,而且github上有不少開源實現,以下圖:git
各類Mqtt Broker開源實現的幾個關鍵技術指標對好比下圖:github
能夠根據業務的需求不一樣選用不一樣的broker開源實現,整體來講,對Mqtt Broker來講,要求高可用,無狀態,易擴容,可靠傳輸等,下面就以Mqtt Broker的實現做爲例子,講講基於MQTT協議的Broker具體開發,不考慮太複雜的業務邏輯,僅以最簡潔的方式,闡述整個流程,基於Golang和RabbitMQ開發.數據結構
2.Mqtt Broker具體實現:
2.1Mqtt Broker架構草圖:架構
2.2Mqtt Broker實現細節說明:
(1)Broker與RabbitMQ創建TCP鏈接,實現斷線重連邏輯以及斷線通知機制;
(2)Broker聲明與RabbitMQ相關的exchanger, amqp隊列,以及rpc隊列;
(3)Broker監聽APP端mqttclient的tcp鏈接,對應每一個鏈接,Broker實例化一個appClient;
(4)每一個appClient處理與之對應的APP業務請求,包括髮布,訂閱,取消訂閱,心跳等;
(5)每一個appClient獲取一個channel,經過channel與RabbitMQ進行通訊;
(6)若Broker與RabbitMQ發生斷連,則channel不可用,必須等待重連成功,而後從新獲取channel;
(7)若RabbitMQ發生重啓,以前對應APP須要訂閱的數據,每一個appClient必須從新訂閱;
(8)若APP斷開鏈接,則對應的appClient必須釋放全部資源並結束生命週期,再也不服務;
(9)其餘...app
2.3Mqtt Broker代碼實現:
根據上述的架構草圖以及細節說明,就整個架構比較關鍵的幾個環節,用具體代碼實現加以闡述,以求透徹.
(1)Broker與RabbitMQ鏈接:
定義相關數據結構以及方法以下:tcp
package rabbit var ( RabbitServerAddr = flag.String("rabbitserveraddr", "amqp://guest:guest@localhost:5672", "") ) const ( CONNECT_INTERVAL = 3 ) var _rabbit *Rabbit = nil var _once sync.Once //定義Rabbit結構 type Rabbit struct { wg *sync.WaitGroup rabbitconn *amqp.Connection rabbitconnListeners map[string]chan bool _mutex sync.Mutex } //獲取Rabbit單例 func GetRabbitInstance(_wg *sync.WaitGroup, listenerID string, rabbitconnChan chan bool) *Rabbit { _once.Do(func() { _rabbit = &Rabbit{ wg: _wg, } _rabbit.rabbitconnListeners = make(map[string]chan bool) _rabbit.initConn() }) if listenerID != "" && rabbitconnChan != nil { _rabbit._mutex.Lock() _rabbit.rabbitconnListeners[listenerID] = rabbitconnChan _rabbit._mutex.Unlock() } return _rabbit } //創建RabbitMQ鏈接 func (r *Rabbit) initConn() { if r.rabbitconn != nil { return } conn, err := amqp.Dial(*RabbitServerAddr) if err != nil { time.Sleep(time.Second * CONNECT_INTERVAL) U.FailOnError(err) } r.rabbitconn = conn r.wg.Add(1) go r.checkConn() } //RabbitMQ斷線重連 func (r *Rabbit) checkConn() { defer r.wg.Done() for { <-r.rabbitconn.NotifyClose(make(chan *amqp.Error)) U.GetLog().Printf("r.conn.NotifyClose") r.broadcastConnStatus(false) for { conn, err := amqp.Dial(*RabbitServerAddr) if err == nil { r.rabbitconn = conn r.broadcastConnStatus(true) break } U.GetLog().Printf("amqp.Dial failed") r.broadcastConnStatus(false) time.Sleep(time.Second * CONNECT_INTERVAL) } } }
(2)Broker聲明exchanger, amqp隊列,以及rpc隊列:
定義相關數據結構以及方法以下:函數
package mqtt type TOPIC_CHAN_TYPE map[string]chan ExchangeMessage //定義BrokerExchanger結構 type BrokerExchanger struct { wg *sync.WaitGroup channel *amqp.Channel exchangerChan chan *MqttClientCmd amqpqueueName string amqpDeliveryChan <-chan amqp.Delivery amqpTopicChan map[string]TOPIC_CHAN_TYPE rpcqueueName string rpcDeliveryChan <-chan amqp.Delivery rpcClientChan map[string]chan ExchangeMessage } //實例化BrokerExchanger func NewBrokerExchanger(_wg *sync.WaitGroup) *BrokerExchanger { _be := &BrokerExchanger{ wg: _wg, } _be.once() return _be } //聲明Exchange,區分amqp與rpc類型 func (be *BrokerExchanger) declareExchange(exctype string) error { if be.channel == nil { return fmt.Errorf("[BrokerExchanger::declareExchange] channel not ready") } var defaultExchange string var exchangeType string switch exctype { case defaultAMQPExchange: defaultExchange = defaultAMQPExchange exchangeType = "topic" case defaultRPCExchange: defaultExchange = defaultRPCExchange exchangeType = "direct" default: U.GetLog().Printf("unexpected exchange type:%s", exctype) return fmt.Errorf("[BrokerExchanger::declareExchange] unexpected exchange type:%s", exctype) } err := be.channel.ExchangeDeclare( defaultExchange, // name exchangeType, // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) return err } //聲明Queue,區分amqp與rpc類型 func (be *BrokerExchanger) declareQueue(exctype string) error { if be.channel == nil { return fmt.Errorf("[BrokerExchanger::declareQueue] channel not ready") } q, err := be.channel.QueueDeclare( "", // name true, // durable true, // delete when usused true, // exclusive false, // no-wait nil, // arguments ) if err != nil { return err } switch exctype { case defaultAMQPExchange: be.amqpqueueName = q.Name case defaultRPCExchange: be.rpcqueueName = q.Name default: U.GetLog().Printf("unexpected exchange type:%s", exctype) return fmt.Errorf("[BrokerExchanger::declareQueue] unexpected exchange type:%s", exctype) } return nil } //綁定Queue,區分amqp與rpc類型 func (be *BrokerExchanger) queueBind(exctype string) error { var queueName string switch exctype { case defaultAMQPExchange: return nil case defaultRPCExchange: queueName = be.rpcqueueName default: U.GetLog().Printf("unexpected exchange type:%s", exctype) return fmt.Errorf("[BrokerExchanger::queueBind] unexpected exchange type:%s", exctype) } err := be.channel.QueueBind( queueName, // queue name queueName, // routing key defaultRPCExchange, // exchange false, nil, ) return err } //消費Queue,區分amqp與rpc類型 func (be *BrokerExchanger) consume(exctype string) error { var queueName string switch exctype { case defaultAMQPExchange: queueName = be.amqpqueueName case defaultRPCExchange: queueName = be.rpcqueueName default: U.GetLog().Printf("unexpected exchange type:%s", exctype) return fmt.Errorf("[BrokerExchanger::consume] unexpected exchange type:%s", exctype) } deliveryChan, err := be.channel.Consume( queueName, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { return err } switch exctype { case defaultAMQPExchange: be.amqpDeliveryChan = deliveryChan case defaultRPCExchange: be.rpcDeliveryChan = deliveryChan default: U.GetLog().Printf("unexpected exchange type:%s", exctype) return fmt.Errorf("[BrokerExchanger::consume] unexpected exchange type:%s", exctype) } return nil }
說明:amqp發佈訂閱類型與rpc類型,在聲明exchange時,exchange type字段不一樣,一個是topic,一個direct;測試
(3)MqttBroker定義及實現:
定義相關數據結構以及方法以下:
package mqtt //定義MqttBroker結構 type MqttBroker struct { wg *sync.WaitGroup addr string serverid string rabbit *rabbit.Rabbit channel *amqp.Channel brokerexchanger *BrokerExchanger mqttconnChan chan net.Conn rabbitconnChan chan bool connListeners map[string]chan bool mqttClientChan chan *MqttClientCmd clientMap map[string]*MqttClient Authenticate AuthenticateFunc AuthorizePublish AuthorizePublishFunc AuthorizeSubscribe AuthorizeSubscribeFunc } //監聽RabbitMQ鏈接狀態 func (mb *MqttBroker) handleRabbitConnChan() { defer mb.wg.Done() for connStatus := range mb.rabbitconnChan { U.GetLog().Printf("serverid:%s, rabbitmq connection status:%v", mb.serverid, connStatus) if !connStatus { mb.reset() } if connStatus { mb.rabbitConnSuccessCallback() } } } //初始化Broker Exchanger func (mb *MqttBroker) rabbitConnSuccessCallback() error { err := mb.initChannel() if err != nil { U.GetLog().Printf("initChannel error:%v", err) return err } err = mb.initBrokerExchanger() if err != nil { U.GetLog().Printf("mb.brokerexchanger.Init error:%v", err) return err } mb.broadcastRabbitConnStatus(true) return err } //監聽APP鏈接 func (mb *MqttBroker) ListenAndServe() { defer mb.wg.Done() var listener net.Listener var err error listener, err = net.Listen("tcp", mb.addr) U.FailOnError(err) U.GetLog().Printf("listen and serve mqtt broker on %s", mb.addr) for { conn, err := listener.Accept() if err != nil { U.GetLog().Printf("accepting new connection error:%v", err) continue } mb.wg.Add(1) mb.mqttconnChan <- conn } } //針對每一個APP鏈接,實例化MqttClient處理相關業務數據 func (mb *MqttBroker) handleMqttConnection() { defer mb.wg.Done() for conn := range mb.mqttconnChan { mqttclient, err := NewMqttClient(mb.wg, conn, mb) if err != nil { U.GetLog().Printf("NewMqttClient error:%v", err) continue } mb.clientMap[mqttclient.GetClientID()] = mqttclient mb.wg.Add(1) go mqttclient.Serve() } }
(4)MqttClient定義及實現:
定義相關數據結構以及方法以下:
package mqtt //定義MqttClient結構 type MqttClient struct { wg *sync.WaitGroup tcpconn net.Conn broker *MqttBroker keepalive int lastheartbeat int clientid string rabbit *rabbit.Rabbit channel *amqp.Channel exchangers map[string]Exchanger topicMap map[string]chan ExchangeMessage brokerChan chan *MqttClientCmd rabbitconnChan chan bool needDisConn bool } //監聽處理APP的業務請求 func (mc *MqttClient) Serve() { defer mc.wg.Done() defer mc.commonDefer() mc.wg.Add(1) go mc.timer() for { if mc.needDisConn { break } packet, err := MP.ReadPacket(mc.tcpconn) if err != nil { U.GetLog().Printf("ReadPacket error:%v", err) mc.needDisConn = true break } switch packet.GetMqttType() { case MP.Connect: err = mc.handleConn(packet) case MP.Disconnect: err = mc.handleDisConn(packet) case MP.Pingreq: err = mc.handlePing(packet) case MP.Publish: err = mc.handlePublish(packet) case MP.Subscribe: err = mc.handleSubscibe(packet) case MP.Unsubscribe: err = mc.handleUnSubscribe(packet) default: U.GetLog().Printf("unexpected packet type:%v", packet.GetMqttType()) } if err != nil { U.GetLog().Printf("handle packet error:%v", err) } if err == amqp.ErrClosed { mc.channel = nil U.GetLog().Printf("handle packet error amqp.ErrClosed:%v", amqp.ErrClosed) continue } mc.lastheartbeat = 0 } } //處理mqtt鏈接報文 func (mc *MqttClient) handleConn(packet MP.ControlPacket) error { U.GetLog().Printf("receive connect request...") p := packet.(*MP.ConnectPacket) pA := MP.NewControlPacket(MP.Connack).(*MP.ConnackPacket) pA.ReturnCode = p.Validate() mc.keepalive = int(p.KeepaliveTimer) //mc.keepalive == 10 if mc.broker != nil && mc.broker.Authenticate != nil { authRet := mc.broker.Authenticate(mc, string(p.Username), string(p.Password)) if !authRet { pA.ReturnCode = MP.ErrRefusedBadUsernameOrPassword } } if pA.ReturnCode != MP.Accepted { mc.needDisConn = true } err := mc.trySendPacket(pA) return err } //給APP發送mqtt鏈接報文的回包 func (mc *MqttClient) trySendPacket(packet MP.ControlPacket) error { // U.GetLog().Printf("try send packet:%v", packet.GetMqttType()) return packet.Write(mc.tcpconn) }
說明:做爲例子,以上僅以mqtt鏈接報文敘述了相關鏈接請求處理,包括用到了前一篇文字<<基於MQTT協議談談物聯網開發-華佗寫代碼>>中講到的mqtt協議編解碼方法;
3.運行測試
針對上述Mqtt Broker的具體實現,簡單用nodejs編寫了兩個測試程序,一個是subscribe.js模擬訂閱者,一個是publisher.js模擬生產者,以下:
生產者publisher.js:
var mqtt = require('mqtt'); var client = mqtt.createClient(6666, '127.0.0.1', {clientId:'xixi', username:'hello', password: 'world', clean:false}); var num = 0; setInterval(function () { for(var i = 0; i < 1; i++) { client.publish('someonelikeyou', 'hello mqtt ' + num,{qos:1, retain: true}); console.log("publish topic someonelikeyou, num:", num++); } }, 1000);
訂閱者subscriber.js:
var mqtt = require('mqtt'); var client = mqtt.createClient(6666, '127.0.0.1', {clientId:'haha', username:'hello', password: 'world', clean:false}); client.subscribe('someonelikeyou',{qos:1}); console.log("subscribe topic someonelikeyou"); var num = 0; client.on('message', function (topic, message) { console.log(message.toString()); num++ });
運行結果以下圖所示:
生產者運行結果:
訂閱者運行結果:
MqttBroker運行結果:
出於篇幅考慮,上述使用到的具體一些函數,如broadcastConnStatus,handlePing等,其具體實現就不一一列舉出來了,也有一些成員變量,用到了也不一一具體註釋了,主要經過代碼關鍵路徑敘述實現的一些細節,若有錯誤,懇請指出,轉載也請註明出處!!!
未完待續...
參考文字:mqtt