基於MQTT協議實現Broker

寫在前面:
前一篇文字<<基於MQTT協議談談物聯網開發-華佗寫代碼>>主要敘述了MQTT協議的編解碼以及基於MQTT協議的一些常見應用場景,並以一個簡單的消息推送系統做爲例子闡述具體MQTT應用的開發,這篇文字繼續敘述上述應用中Mqtt Broker部分的實現.html

1.Mqtt Broker開源產品:node

Mqtt Broker簡單來講就是基於MQTT協議的Server實現,而且github上有不少開源實現,以下圖:
git

 

各類Mqtt Broker開源實現的幾個關鍵技術指標對好比下圖:
github

 

能夠根據業務的需求不一樣選用不一樣的broker開源實現,整體來講,對Mqtt Broker來講,要求高可用,無狀態,易擴容,可靠傳輸等,下面就以Mqtt Broker的實現做爲例子,講講基於MQTT協議的Broker具體開發,不考慮太複雜的業務邏輯,僅以最簡潔的方式,闡述整個流程,基於Golang和RabbitMQ開發.數據結構

2.Mqtt Broker具體實現:
2.1Mqtt Broker架構草圖:架構

2.2Mqtt Broker實現細節說明:
(1)Broker與RabbitMQ創建TCP鏈接,實現斷線重連邏輯以及斷線通知機制;
(2)Broker聲明與RabbitMQ相關的exchanger, amqp隊列,以及rpc隊列;
(3)Broker監聽APP端mqttclient的tcp鏈接,對應每一個鏈接,Broker實例化一個appClient;
(4)每一個appClient處理與之對應的APP業務請求,包括髮布,訂閱,取消訂閱,心跳等;
(5)每一個appClient獲取一個channel,經過channel與RabbitMQ進行通訊;
(6)若Broker與RabbitMQ發生斷連,則channel不可用,必須等待重連成功,而後從新獲取channel;
(7)若RabbitMQ發生重啓,以前對應APP須要訂閱的數據,每一個appClient必須從新訂閱;
(8)若APP斷開鏈接,則對應的appClient必須釋放全部資源並結束生命週期,再也不服務;
(9)其餘...app

 

2.3Mqtt Broker代碼實現:
根據上述的架構草圖以及細節說明,就整個架構比較關鍵的幾個環節,用具體代碼實現加以闡述,以求透徹.
(1)Broker與RabbitMQ鏈接:
定義相關數據結構以及方法以下:tcp

package rabbit

var (
    RabbitServerAddr = flag.String("rabbitserveraddr", "amqp://guest:guest@localhost:5672", "")
)
const (
    CONNECT_INTERVAL = 3
)
var _rabbit *Rabbit = nil
var _once sync.Once
//定義Rabbit結構
type Rabbit struct {
    wg                  *sync.WaitGroup
    rabbitconn          *amqp.Connection
    rabbitconnListeners map[string]chan bool
    _mutex              sync.Mutex
}
//獲取Rabbit單例
func GetRabbitInstance(_wg *sync.WaitGroup, listenerID string, rabbitconnChan chan bool) *Rabbit {
    _once.Do(func() {
        _rabbit = &Rabbit{
            wg: _wg,
        }
        _rabbit.rabbitconnListeners = make(map[string]chan bool)
        _rabbit.initConn()
    })
    if listenerID != "" && rabbitconnChan != nil {
        _rabbit._mutex.Lock()
        _rabbit.rabbitconnListeners[listenerID] = rabbitconnChan
        _rabbit._mutex.Unlock()
    }

    return _rabbit
}
//創建RabbitMQ鏈接
func (r *Rabbit) initConn() {
    if r.rabbitconn != nil {
        return
    }
    conn, err := amqp.Dial(*RabbitServerAddr)
    if err != nil {
        time.Sleep(time.Second * CONNECT_INTERVAL)
        U.FailOnError(err)
    }
    r.rabbitconn = conn
    r.wg.Add(1)
    go r.checkConn()
}
//RabbitMQ斷線重連
func (r *Rabbit) checkConn() {
    defer r.wg.Done()
    for {
        <-r.rabbitconn.NotifyClose(make(chan *amqp.Error))
        U.GetLog().Printf("r.conn.NotifyClose")
        r.broadcastConnStatus(false)
        for {
            conn, err := amqp.Dial(*RabbitServerAddr)
            if err == nil {
                r.rabbitconn = conn
                r.broadcastConnStatus(true)
                break
            }
            U.GetLog().Printf("amqp.Dial failed")
            r.broadcastConnStatus(false)
            time.Sleep(time.Second * CONNECT_INTERVAL)
        }
    }
}

 

(2)Broker聲明exchanger, amqp隊列,以及rpc隊列:
定義相關數據結構以及方法以下:函數

package mqtt

type TOPIC_CHAN_TYPE map[string]chan ExchangeMessage
//定義BrokerExchanger結構
type BrokerExchanger struct {
    wg               *sync.WaitGroup
    channel          *amqp.Channel
    exchangerChan    chan *MqttClientCmd
    amqpqueueName    string
    amqpDeliveryChan <-chan amqp.Delivery
    amqpTopicChan    map[string]TOPIC_CHAN_TYPE
    rpcqueueName     string
    rpcDeliveryChan  <-chan amqp.Delivery
    rpcClientChan    map[string]chan ExchangeMessage
}
//實例化BrokerExchanger
func NewBrokerExchanger(_wg *sync.WaitGroup) *BrokerExchanger {
    _be := &BrokerExchanger{
        wg: _wg,
    }
    _be.once()
    return _be
}
//聲明Exchange,區分amqp與rpc類型
func (be *BrokerExchanger) declareExchange(exctype string) error {
    if be.channel == nil {
        return fmt.Errorf("[BrokerExchanger::declareExchange] channel not ready")
    }
    var defaultExchange string
    var exchangeType string
    switch exctype {
    case defaultAMQPExchange:
        defaultExchange = defaultAMQPExchange
        exchangeType = "topic"
    case defaultRPCExchange:
        defaultExchange = defaultRPCExchange
        exchangeType = "direct"
    default:
        U.GetLog().Printf("unexpected exchange type:%s", exctype)
        return fmt.Errorf("[BrokerExchanger::declareExchange] unexpected exchange type:%s", exctype)
    }
    err := be.channel.ExchangeDeclare(
        defaultExchange, // name
        exchangeType,    // type
        true,            // durable
        false,           // auto-deleted
        false,           // internal
        false,           // no-wait
        nil,             // arguments
    )
    return err
}
//聲明Queue,區分amqp與rpc類型
func (be *BrokerExchanger) declareQueue(exctype string) error {
    if be.channel == nil {
        return fmt.Errorf("[BrokerExchanger::declareQueue] channel not ready")
    }
    q, err := be.channel.QueueDeclare(
        "",    // name
        true,  // durable
        true,  // delete when usused
        true,  // exclusive
        false, // no-wait
        nil,   // arguments
    )
    if err != nil {
        return err
    }

    switch exctype {
    case defaultAMQPExchange:
        be.amqpqueueName = q.Name
    case defaultRPCExchange:
        be.rpcqueueName = q.Name
    default:
        U.GetLog().Printf("unexpected exchange type:%s", exctype)
        return fmt.Errorf("[BrokerExchanger::declareQueue] unexpected exchange type:%s", exctype)
    }
    return nil
}
//綁定Queue,區分amqp與rpc類型
func (be *BrokerExchanger) queueBind(exctype string) error {
    var queueName string
    switch exctype {
    case defaultAMQPExchange:
        return nil
    case defaultRPCExchange:
        queueName = be.rpcqueueName
    default:
        U.GetLog().Printf("unexpected exchange type:%s", exctype)
        return fmt.Errorf("[BrokerExchanger::queueBind] unexpected exchange type:%s", exctype)
    }

    err := be.channel.QueueBind(
        queueName,          // queue name
        queueName,          // routing key
        defaultRPCExchange, // exchange
        false,
        nil,
    )

    return err
}
//消費Queue,區分amqp與rpc類型
func (be *BrokerExchanger) consume(exctype string) error {
    var queueName string
    switch exctype {
    case defaultAMQPExchange:
        queueName = be.amqpqueueName
    case defaultRPCExchange:
        queueName = be.rpcqueueName
    default:
        U.GetLog().Printf("unexpected exchange type:%s", exctype)
        return fmt.Errorf("[BrokerExchanger::consume] unexpected exchange type:%s", exctype)
    }
    deliveryChan, err := be.channel.Consume(
        queueName, // queue
        "",        // consumer
        true,      // auto-ack
        false,     // exclusive
        false,     // no-local
        false,     // no-wait
        nil,       // args
    )
    if err != nil {
        return err
    }
    switch exctype {
    case defaultAMQPExchange:
        be.amqpDeliveryChan = deliveryChan
    case defaultRPCExchange:
        be.rpcDeliveryChan = deliveryChan
    default:
        U.GetLog().Printf("unexpected exchange type:%s", exctype)
        return fmt.Errorf("[BrokerExchanger::consume] unexpected exchange type:%s", exctype)
    }
    return nil
}

 

說明:amqp發佈訂閱類型與rpc類型,在聲明exchange時,exchange type字段不一樣,一個是topic,一個direct;測試


(3)MqttBroker定義及實現:
定義相關數據結構以及方法以下:

package mqtt
//定義MqttBroker結構
type MqttBroker struct {
    wg                 *sync.WaitGroup
    addr               string
    serverid           string
    rabbit             *rabbit.Rabbit
    channel            *amqp.Channel
    brokerexchanger    *BrokerExchanger
    mqttconnChan       chan net.Conn
    rabbitconnChan     chan bool
    connListeners      map[string]chan bool
    mqttClientChan     chan *MqttClientCmd
    clientMap          map[string]*MqttClient
    Authenticate       AuthenticateFunc
    AuthorizePublish   AuthorizePublishFunc
    AuthorizeSubscribe AuthorizeSubscribeFunc
}

//監聽RabbitMQ鏈接狀態
func (mb *MqttBroker) handleRabbitConnChan() {
    defer mb.wg.Done()
    for connStatus := range mb.rabbitconnChan {
        U.GetLog().Printf("serverid:%s, rabbitmq connection status:%v", mb.serverid, connStatus)
        if !connStatus {
            mb.reset()
        }
        if connStatus {
            mb.rabbitConnSuccessCallback()
        }
    }
}

//初始化Broker Exchanger
func (mb *MqttBroker) rabbitConnSuccessCallback() error {
    err := mb.initChannel()
    if err != nil {
        U.GetLog().Printf("initChannel error:%v", err)
        return err
    }
    err = mb.initBrokerExchanger()
    if err != nil {
        U.GetLog().Printf("mb.brokerexchanger.Init error:%v", err)
        return err
    }

    mb.broadcastRabbitConnStatus(true)
    return err
}
//監聽APP鏈接
func (mb *MqttBroker) ListenAndServe() {
    defer mb.wg.Done()

    var listener net.Listener
    var err error
    listener, err = net.Listen("tcp", mb.addr)
    U.FailOnError(err)

    U.GetLog().Printf("listen and serve mqtt broker on %s", mb.addr)
    for {
        conn, err := listener.Accept()
        if err != nil {
            U.GetLog().Printf("accepting new connection error:%v", err)
            continue
        }
        mb.wg.Add(1)
        mb.mqttconnChan <- conn
    }
}
//針對每一個APP鏈接,實例化MqttClient處理相關業務數據
func (mb *MqttBroker) handleMqttConnection() {
    defer mb.wg.Done()
    for conn := range mb.mqttconnChan {
        mqttclient, err := NewMqttClient(mb.wg, conn, mb)
        if err != nil {
            U.GetLog().Printf("NewMqttClient error:%v", err)
            continue
        }
        mb.clientMap[mqttclient.GetClientID()] = mqttclient
        mb.wg.Add(1)
        go mqttclient.Serve()
    }
}

 

(4)MqttClient定義及實現:

定義相關數據結構以及方法以下:

package mqtt
//定義MqttClient結構
type MqttClient struct {
    wg             *sync.WaitGroup
    tcpconn        net.Conn
    broker         *MqttBroker
    keepalive      int
    lastheartbeat  int
    clientid       string
    rabbit         *rabbit.Rabbit
    channel        *amqp.Channel
    exchangers     map[string]Exchanger
    topicMap       map[string]chan ExchangeMessage
    brokerChan     chan *MqttClientCmd
    rabbitconnChan chan bool
    needDisConn    bool
}
//監聽處理APP的業務請求
func (mc *MqttClient) Serve() {
    defer mc.wg.Done()
    defer mc.commonDefer()
    mc.wg.Add(1)
    go mc.timer()
    for {
        if mc.needDisConn {
            break
        }
        packet, err := MP.ReadPacket(mc.tcpconn)
        if err != nil {
            U.GetLog().Printf("ReadPacket error:%v", err)
            mc.needDisConn = true
            break
        }
        switch packet.GetMqttType() {
        case MP.Connect:
            err = mc.handleConn(packet)
        case MP.Disconnect:
            err = mc.handleDisConn(packet)
        case MP.Pingreq:
            err = mc.handlePing(packet)
        case MP.Publish:
            err = mc.handlePublish(packet)
        case MP.Subscribe:
            err = mc.handleSubscibe(packet)
        case MP.Unsubscribe:
            err = mc.handleUnSubscribe(packet)
        default:
            U.GetLog().Printf("unexpected packet type:%v", packet.GetMqttType())
        }
        if err != nil {
            U.GetLog().Printf("handle packet error:%v", err)
        }
        if err == amqp.ErrClosed {
            mc.channel = nil
            U.GetLog().Printf("handle packet error amqp.ErrClosed:%v", amqp.ErrClosed)
            continue
        }
        mc.lastheartbeat = 0
    }
}
//處理mqtt鏈接報文
func (mc *MqttClient) handleConn(packet MP.ControlPacket) error {
    U.GetLog().Printf("receive connect request...")
    p := packet.(*MP.ConnectPacket)
    pA := MP.NewControlPacket(MP.Connack).(*MP.ConnackPacket)
    pA.ReturnCode = p.Validate()
    mc.keepalive = int(p.KeepaliveTimer) //mc.keepalive == 10
    if mc.broker != nil && mc.broker.Authenticate != nil {
        authRet := mc.broker.Authenticate(mc, string(p.Username), string(p.Password))
        if !authRet {
            pA.ReturnCode = MP.ErrRefusedBadUsernameOrPassword
        }
    }
    if pA.ReturnCode != MP.Accepted {
        mc.needDisConn = true
    }
    err := mc.trySendPacket(pA)
    return err
}
//給APP發送mqtt鏈接報文的回包
func (mc *MqttClient) trySendPacket(packet MP.ControlPacket) error {
    // U.GetLog().Printf("try send packet:%v", packet.GetMqttType())
    return packet.Write(mc.tcpconn)
}

 

說明:做爲例子,以上僅以mqtt鏈接報文敘述了相關鏈接請求處理,包括用到了前一篇文字<<基於MQTT協議談談物聯網開發-華佗寫代碼>>中講到的mqtt協議編解碼方法;


3.運行測試
針對上述Mqtt Broker的具體實現,簡單用nodejs編寫了兩個測試程序,一個是subscribe.js模擬訂閱者,一個是publisher.js模擬生產者,以下:
生產者publisher.js: 

var mqtt = require('mqtt');
var client = mqtt.createClient(6666, '127.0.0.1', {clientId:'xixi', username:'hello', password: 'world', clean:false});
var num = 0;
setInterval(function () {
  for(var i = 0; i < 1; i++) {
    client.publish('someonelikeyou', 'hello mqtt ' + num,{qos:1, retain: true});
    console.log("publish topic someonelikeyou, num:", num++);
  }
}, 1000);

 

訂閱者subscriber.js:

var mqtt = require('mqtt');
var client = mqtt.createClient(6666, '127.0.0.1', {clientId:'haha', username:'hello', password: 'world', clean:false});
client.subscribe('someonelikeyou',{qos:1});
console.log("subscribe topic someonelikeyou");
var num = 0;
client.on('message', function (topic, message) {
      console.log(message.toString());
      num++ 
});

 

運行結果以下圖所示:
生產者運行結果:

 

訂閱者運行結果:

 

MqttBroker運行結果:

 

出於篇幅考慮,上述使用到的具體一些函數,如broadcastConnStatus,handlePing等,其具體實現就不一一列舉出來了,也有一些成員變量,用到了也不一一具體註釋了,主要經過代碼關鍵路徑敘述實現的一些細節,若有錯誤,懇請指出,轉載也請註明出處!!!

 

未完待續...
參考文字:mqtt

相關文章
相關標籤/搜索