基於WebSocket協議實現Broker

寫在前面:
前兩篇文字<<基於MQTT協議談談物聯網開發-華佗寫代碼>>,<<基於MQTT協議實現Broker-華佗寫代碼>>主要敘述了MQTT協議的編解碼以及基於MQTT協議的一些常見應用場景,並以一個簡單的消息推送系統做爲例子具體闡述了Mqtt Broker部分的實現,以前主要以原生android或者iOS或者服務端代理做爲例子,考慮到在移動端開發時,選擇的技術棧有所不一樣,有的選擇web前端開發.做爲例子,這裏以以前的消息推送系統爲例基於web前端開發,繼續敘述基於WebSocket協議實現Broker.html

 

1.WebSocket協議主要特色:前端

(1)基於http協議握手創建tcp長鏈接;android

(2)相比http,WebSocket協議交換最小化,下降網絡流量;nginx

(3)雙向通訊,服務器能夠主動推送數據給客戶端;web

 

2.Mqtt Broker具體實現(WebSocket部分):json

2.1Mqtt Broker架構草圖:服務器

 

2.2Mqtt Broker實現細節:websocket

(1)新增實現websocket server,監聽不一樣端口;網絡

(2)每一個websocket鏈接,實例化一個mqttclient負責其協議解析,消息發佈和訂閱等;架構

(3)複用以前Mqtt Broker與RabbitMQ通訊部分,具體參考上一篇文字;

(4)其餘...

 

2.3Mqtt Broker代碼實現(WebSocket部分):

type tcpKeepAliveListener struct {
    *net.TCPListener
}

var upgrader = websocket.Upgrader{
    ReadBufferSize: 1024,
    WriteBufferSize: 1024,
}
 
//監聽websocket server地址,註冊websocket handler
func (mb *MqttBroker) ListenAndServeWeb() {
 defer mb.wg.Done() http.HandleFunc("/", mb.webHandler) webserver := &http.Server{Addr: mb.webaddr, Handler: nil} var listener net.Listener var err error listener, err = net.Listen("tcp", mb.webaddr) if err != nil { return } U.GetLog().Printf("listen and serve web broker on %s", mb.webaddr) err = webserver.Serve(tcpKeepAliveListener{listener.(*net.TCPListener)})
}

//每個Websocket鏈接,實例化一個MqttClient負責其協議解析,以及與rabbitmq的通訊
func (mb *MqttBroker) webHandler(w http.ResponseWriter, r *http.Request) {
    upgrader.CheckOrigin = checkSameOrigin
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        U.GetLog().Printf("upgrade error:%v", err)
        return
    }
    mqttclient, err := NewMqttClient(mb.wg, mb, nil, conn, "web")
    if err != nil {
        return
    }
    mb.clientMap[mqttclient.GetClientID()] = mqttclient
    mb.wg.Add(1)
    go mqttclient.ServeWeb()
}

 

2.4Mqtt Client代碼實現(WebSocket部分):

//定義WebSocket通訊消息格式
//Action選項有publish,subscribe,unsubscribe
type WebMessage struct { Action
string Topic string Payload string } type MqttClient struct { wg *sync.WaitGroup broker *MqttBroker tcpconn net.Conn wconn *websocket.Conn ... needDisConn bool } //經過WebMessage.Action區分消息指令類型 func (mc *MqttClient) ServeWeb() { defer mc.wg.Done() defer mc.commonDefer() if mc.wconn == nil { return } for { if mc.needDisConn { break } _, message, err := mc.wconn.ReadMessage() if err != nil { U.GetLog().Printf("handle message error:%v", err) mc.needDisConn = true continue } wm := WebMessage{} err = json.Unmarshal(message, &wm) if err != nil { U.GetLog().Printf("json.Unmarshal(message, &wm) error:%v", err) continue } switch wm.Action { case "subscribe": err = mc.handleWebSubscibe(wm.Topic) case "publish": err = mc.handleWebPublish(wm.Topic, wm.Payload) case "unsubscribe": err = mc.handleWebUnSubscribe(wm.Topic) case "ping": mc.lastheartbeat = 0 default: U.GetLog().Printf("unexpected WebMessage Action:%s", wm.Action) continue } if err != nil { U.GetLog().Printf("handle message error:%v", err) } mc.lastheartbeat = 0 } }

 

3.WebSocket Client端實現:

3.1實現細節:

(1)創建與WebSocket Server的鏈接;

(2)初始化WebSocket,註冊相關回調函數;

(3)實現WebSocket斷線重連機制;

(4)封裝相似mqtt基於topic的發佈訂閱等接口;

(5)Nodejs端須要browserify相關js文件,Javascript端能夠直接調用WebSocket;

(6)其餘...

 

3.2具體代碼實現:

var WebSocket = require('ws'); var WEBSOCKET_MQTT_BROKER = 'ws://your_server_ip/ws/'; var ping = { Action: "ping" }; var _listeners = {}; var _websocket = null; var _connected = false; _access = function () { console.log('try mqtt.connect'); _connect_websocket(); setInterval(function () { _reconnect_websocket(); if (_websocket != null && _connected) { _websocket.send(JSON.stringify(ping)); } }, 3000); }; //websocket初始化,並實現相關回調函數 _init_websocket = function () { if (_websocket == null) { return; } _websocket.onopen = function () { _connected = true; console.log("Connected to WebSocket server."); for (var topic in _listeners) { var sub = { Action: "subscribe", Topic: topic, Payload: "" }; _websocket.send(JSON.stringify(sub)); } }; _websocket.onclose = function () { _connected = false; _websocket = null; console.log("Disconnected"); }; _websocket.onmessage = function (evt) { console.log('recv data from server: ' + evt.data); var dataObj = JSON.parse(evt.data); _listeners[dataObj.Topic] && _listeners[dataObj.Topic](dataObj.Payload); }; _websocket.onerror = function (evt) { _connected = false; _websocket = null; console.log('Error occured: ' + evt); }; }; _connect_websocket = function () { if (_connected) { return; } _websocket = new WebSocket(WEBSOCKET_MQTT_BROKER); _init_websocket(); }; //斷線重連,經過定時器實現每三秒斷線重連 _reconnect_websocket = function () { if (_connected) { return; } _websocket = new WebSocket(WEBSOCKET_MQTT_BROKER); _init_websocket(); }; //模擬mqtt發佈消息 sendMessage = function (topic, data) { if (!_websocket || !_connected) { var err = new Error('iot client not ready.'); console.warn(err); return; } var send_data = JSON.stringify(data); var pub = { Action: "publish", Topic: topic, Payload: send_data }; _websocket.send(JSON.stringify(pub)); }; //模擬mqtt訂閱消息,並根據topic註冊回調函數 onMessage = function (topic, callback) { _listeners[topic] = callback; if (!_websocket || !_connected) { console.warn('onMessage, but iot client not ready.'); return; } var sub = { Action: "subscribe", Topic: topic, Payload: "" }; _websocket.send(JSON.stringify(sub)); }; //模擬mqtt取消訂閱,並根據topic刪除對應回調函數 stopReceiveMessage = function (topic) { delete _listeners[topic]; if (!_websocket || !_connected) { console.warn('stopReceiveMessage, but iot client not ready.'); return; } var unsub = { Action: "unsubscribe", Topic: topic, Payload: "" }; _websocket.send(JSON.stringify(unsub)); }; _access();

 

4.WebSocket相關nginx配置:

server { listen 80; server_name your_server_name;  ... location /ws/ { proxy_redirect off; add_header Access-Control-Allow-Origin *; add_header Access-Control-Allow-Methods 'GET, POST, OPTIONS'; add_header Access-Control-Allow-Headers 'DNT,X-Mx-ReqToken,Keep-Alive,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Authorization'; proxy_pass http://127.0.0.1:2884/;
            proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; }
}

 

出於篇幅考慮,以前兩篇文字敘述過的內容,好比Mqtt Broker其餘實現部分以及與RabbitMQ通訊部分,都是複用以前的代碼邏輯,這裏再也不贅述,Mqtt Broker中WebSocket部分至關於使用WebSocket協議作了MQTT協議的翻譯轉換,也有一些成員變量,用到了也不一一具體註釋了,主要經過代碼關鍵路徑敘述實現的一些細節,若有錯誤,懇請指出,轉載也請註明出處!!!

 

未完待續...

相關文章
相關標籤/搜索