網頁端HTML使用MQTTJs訂閱RabbitMQ數據

  最近在作一個公司的日誌組件時有一個問題難住了我。今天問題終於解決了。因爲在解決問題中,在網上也查了不少資料都沒有一個完整的實例能夠參考。因此本着無私分享的目的記錄一下完整的解決過程和實例。
  需求:作一個統一日誌系統能夠查看日誌列表和一個能夠訂閱最新日誌的頁面。經過提供一個封裝好日誌記錄方法的sdk文件將日誌統一收集。
  經過上面的需求進行咱們使用RabbitMQ+Mongodb來實現系統。
  使用C#封裝一個SDK你們都會這裏就不說了。C#鏈接RabbitMQ示例代碼也是一堆堆的也沒什麼好說的。下面重點說一下網頁端如何使用JS去訂閱RabbitMQ收到的最新日誌信息。
  後端都是使用RabbitMQ的AMQP協議,而前端要求在網頁HTML上顯示數據。咱們選擇了使用MQTT協議從RabbitMQ中訂閱數據。
  具體步驟:
1、先準備好相關JS庫。MQTT有一個叫browserMqtt.js看名字就知道是爲瀏覽器提供的JS庫。還有一個封裝了操做MQ的JS庫 mqfactory.js。最後還要一個jquery.js文件。這樣工具就準備好了。 JS文件下載
2、HTML端代碼。
<script type="text/javascript" src="~/js/MqJs/jquery.js"></script>
<script type="text/javascript" src="~/js/MqJs/browserMqtt.min.js"></script>
<script type="text/javascript" src="~/js/MqJs/mqfactory.js"></script>
<body>
    <div>
        <lable>Host: </lable><input id="txtHost" placeholder="192.168.1.88" value="10.1.0.7" /><br />
        <lable>Port: </lable><input id="txtPort" placeholder="15675" value="15675" /><br />
        <label>UserName: </label><input id="txtUserName" placeholder="username" value="admin" /><br />
        <label>Password: </label><input id="txtPassword" placeholder="password" value="admin" /><br />
        <label>Protocol: </label><input id="txtProtocol" placeholder="ws" value="ws" /><br />
        <input id="btnConnect" type="button" value="Connect RabbitMQ" />
    </div>
    <div>
        <input id="btnSubscribe" type="button" value="Subscribe" />
        <input id="btnPublish" type="button" value="Publish" /><br />
        <input id="btnSSHuanjing" type="button" value="Subscribe Huanjing" />
        <input id="hdnIsSubscribed" type="hidden" value="" />
        <input id="btnPubHuanjing" type="button" value="Publish Huanjing"><br />
        路由:<input id="btnRoutingKey" type="text" value="Dcon/Logs/Client"><br />
        <input id="txtMessage" type="text" placeholder="Please enter message" />
    </div>
    <div>
        <label>log:</label><br />
        <ul id="lstLog"></ul>
        <input id="btnClearLog" type="button" value="Clear Log" />
    </div>
</body>
<script type="text/javascript">
    $(function () {
        var mqclient;
        //var routingKey = 'Dcon.Logs.ServerWebShow';
        var message;

        $('#btnSubscribe').attr('disabled', 'disabled');
        $('#btnPublish').attr('disabled', 'disabled');
        $('#btnSSHuanjing').attr('disabled', 'disabled');
        $('#btnPubHuanjing').attr('disabled', 'disabled');

        $('#btnConnect').click(function () {
            var mqttOpts = {
                host: (() => $('#txtHost').val())(),
                port: (() => $('#txtPort').val())(),
                username: (() => $('#txtUserName').val())(),
                password: (() => $('#txtPassword').val())(),
                //transformWsUrl方法用於在瀏覽器中使用MQTT的場景,默認狀況下,MQTT自動生成的url爲ws://ip:port形式,
                //然而服務器要求的格式是ws://ip:port/ws,因此MQTT提供了此接口用於在生成url時自定義url格式
                transformWsUrl: (url, opts, client) => { return opts.protocol && opts.protocol == 'ws' ? url + 'ws' : url; },
                clientId: (() => { return 'mqttjs_' + Math.random().toString(16).substr(2, 8); })()
            };
            var biz = {
                huanjing: function (handler, isOn) {
                    if (isOn !== false) {
                        this.ss(this.topics.huanjing, handler);
                    } else {
                        this.sus(this.topics.huanjing, handler);
                    }
                },
                topics: {
                    huanjing: '/hyj/huanjing/monitor'
                }
            };
            //系統初始化時注入鏈接選項
            mqfactory.inject(mqttOpts, biz);
            //建立mqclient單例 
            mqclient = mqfactory.create();
            //註冊mqclient的鏈接成功事件
            mqclient.on('connect', mqconnected);
        });

        $('#btnSubscribe').click(function () {
            if ($(this).val() == 'Subscribe') {
                //訂閱成功後,僅註冊一次事件(要考慮每次註冊事件時,事件處理器調用的次數,若是僅用一次,就用once方法)
                //routingKey = $("#btnRoutingKey").val();
                mqclient.once('onss', mqSubscribeSuccess);
                //簡單訂閱
                mqclient.ss($("#btnRoutingKey").val());
            } else {
                mqclient.once('onsus', mqUnsubscribeSuccess)
                mqclient.sus($("#btnRoutingKey").val());
            }
        });

        $('#btnPublish').click(function () {
            var msg = $('#txtMessage').val().length > 0 ? $('#txtMessage').val() : guid();
            if (message === msg) {
                msg = guid();
            }
            message = msg;
            $('#txtMessage').val(message);
            //發送消息
            mqclient.pub($("#btnRoutingKey").val(), message);
            $('#lstLog').append('<li>Send Message: ' + message + '</li>');
        });

        $('#btnSSHuanjing').click(function () {
            if ($(this).val() == 'Subscribe Huanjing') {
                mqclient.once('onss', mqHJSubscribeSuccess);
                mqclient.huanjing(onHuanjingMessageArrived);
            } else {
                mqclient.once('onsus', mqHJUnsubscribeSuccess);
                mqclient.huanjing(onHuanjingMessageArrived, false);
            }
        });

        $('#btnPubHuanjing').click(function () {
            var msg = $('#txtMessage').val().length > 0 ? $('#txtMessage').val() : guid();
            if (message === msg) {
                msg = guid();
            }
            message = msg;
            $('#txtMessage').val(message);
            //發送消息
            mqclient.pub(mqclient.topics.huanjing, message);
            $('#lstLog').append('<li>Send Huanjing Message: ' + message + '</li>');
        });

        $('#btnClearLog').click(function () {
            $('#lstLog').empty();
        });

        function mqconnected() {
            //alert("mqconnected");
            $('#btnSubscribe').removeAttr('disabled');
            $('#btnPublish').removeAttr('disabled');
            $('#btnSSHuanjing').removeAttr('disabled');
            $('#btnPubHuanjing').removeAttr('disabled');
            $('#lstLog').append('<li>mqclient connected</li>');
        }

        function mqSubscribeSuccess() {
            //訂閱成功,就註冊接受消息的方法,此處要接收屢次,所以使用了on
            mqclient.on($("#btnRoutingKey").val(), onMessageArrived);
            $('#btnSubscribe').val('Unsubscribe');
            $('#lstLog').append('<li>Subscribe successful.' + $("#btnRoutingKey").val()+'</li>');
        }

        function mqUnsubscribeSuccess() {
            //註銷訂閱,因此將事件處理器解除綁定
            mqclient.off($("#btnRoutingKey").val(), onMessageArrived);
            $('#btnSubscribe').val('Subscribe');
            $('#lstLog').append('<li>Unsubscribe successful</li>');
        }

        function mqHJSubscribeSuccess() {
            $('#btnSSHuanjing').val('Unsubscribe Huanjing');
            $('#lstLog').append('<li>Hanjing Subscribe successful</li>');
        }

        function mqHJUnsubscribeSuccess() {
            $('#btnSSHuanjing').val('Subscribe Huanjing');
            $('#lstLog').append('<li>Huanjing Unsubscribe successful</li>');
        }

        function onMessageArrived(message) {
            $('#lstLog').append('<li>Receive message: ' + new Date().toString() + '    ' + message.toString() + '</li>');
        }

        function onHuanjingMessageArrived(message) {
            $('#lstLog').append('<li>Receive Huanjing message: ' + new Date().toString() + '    ' + message.toString() + '</li>');
        }

        function guid() {
            function s4() {
                return Math.floor((1 + Math.random()) * 0x10000)
                    .toString(16)
                    .substring(1);
            }
            return s4() + s4() + '-' + s4() + '-' + s4() + '-' +
                s4() + '-' + s4() + s4() + s4();
        }
    });
</script>
View Code
3.後端代碼:
3.1客戶端sdk代碼
/// <summary>
        /// 寫日誌
        /// </summary>
        /// <param name="model"></param>
        public static void Write(LogModel model)
        {
            //判斷寫入的日誌級別
            if (model != null && model.LogLevel >= LogLevel)
            {
                try
                {
                    var mqMsg = new MqMessage()
                    {
                        MessageBody = JSON.Serialize(model),
                        MessageRouter = SystemConst.RoutingKeyTopic.LogTopic_Producer
                    };
                    //MQHelper.Instance.ProducerMessage_Fanout(mqMsg);
                    MQHelper.Instance.ProducerMessage_Topic(mqMsg);
                }
                catch (Exception ex)
                {
                    var errorLog = string.Format("Ip:{0},LogHelper.Write方法異常,{1}", IpHelper.LocalHostIp, ex.Message);
                    //MQHelper.Instance.ProducerMessage_Fanout(new MqMessage() { MessageBody = errorLog });
                    MQHelper.Instance.ProducerMessage_Topic(new MqMessage() { MessageBody = errorLog });
                }
            }
        }
View Code

3.2後端MQ代碼:javascript

#region 主題 交換機
        /// <summary>
        /// 生產者 客戶端調用
        /// </summary>
        /// <param name="msg"></param>
        public void ProducerMessage_Topic(MqMessage msg)
        {
            try
            {
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        var body = Encoding.UTF8.GetBytes(msg.MessageBody);
                        channel.BasicPublish(exchange: SystemConst.MqName_LogMq_TopicDefault,
                                             routingKey: msg.MessageRouter,
                                             basicProperties: null,
                                             body: body);
                        Console.WriteLine(" [x] Sent {0}", msg.MessageBody);
                    }
                }
            }
            catch (Exception ex)
            {
                var exMsg = ex.Message;
            }
        }

        /// <summary>
        /// 消費者 服務器接收並寫入數據庫
        /// 消費方法沒法經過參數傳入
        /// EventHandler<BasicDeliverEventArgs> received
        /// </summary>
       public void ConsumeMessage_Topic(params string[] routingKeys)
        {
            if (routingKeys == null || routingKeys.Length == 0)
            {
                throw new Exception("請指定接收路由");
            }
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    var queueName = channel.QueueDeclare().QueueName;//得到已經生成的隨機隊列名
                    //對列與交換機綁定
                    foreach (var rKey in routingKeys)
                    {
                        channel.QueueBind(queue: queueName,
                                      exchange: SystemConst.MqName_LogMq_TopicDefault,
                                      routingKey: rKey);
                    }

                    var consumer = new EventingBasicConsumer(channel);
                    //綁定消費方法
                    consumer.Received += consomer_Received_Topic;
                    //綁定消費者
                    channel.BasicConsume(queue: queueName,
                                         autoAck: true,
                                         consumer: consumer);
                    Console.WriteLine("日誌訂閱服務啓動成功.");
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
        }

        /// <summary>
        /// 接收通知服務異步的推送
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        void consomer_Received_Topic(object sender, BasicDeliverEventArgs e)
        {
            var body = e.Body;
            var message = Encoding.UTF8.GetString(body);
            Console.WriteLine(" [x] {0}", message);
//這裏能夠增長寫入數據庫的代碼
        }
        #endregion
View Code

3.3路由前端

/// <summary>
        /// 主題路由
        /// </summary>
        public class RoutingKeyTopic
        {
            /// <summary>
            /// 生產者
            /// </summary>
            public const string LogTopic_Producer = "Dcon.Logs.Client";

            /// <summary>
            /// 消息者_日誌服務_保存日誌
            /// </summary>
            public const string LogTopic_Consume_Server_SaveDB = "Dcon.Logs.*";

            /// <summary>
            /// 消息者_日誌服務_Web顯示日誌
            /// </summary>
            public const string LogTopic_Consume_Server_WebShow = "Dcon.Logs#";//".Logs.Client";

            /// <summary>
            /// 消息者_日誌服務_Web顯示日誌
            /// </summary>
            public const string LogTopic_Consume_Server_WebShow_T = "*.Logs.Client";//".Logs.Client";

            /// <summary>
            /// 消息者_日誌服務_ # 接收全部
            /// </summary>
            public const string LogTopic_Consume_Server_All = "#";//".Logs.Client";
        }
    }
View Code
注意點:
1、MQTT的路由是以 / 來分割的。在RabbitMQ中會被轉義成 . 如示例中的路由Dcon/Logs/Client會被轉換成 Dcon.Logs.Client
2、網頁端接收時的路由要和發送端的路由一至。也就是說 後端用 Dcon.Logs.Client 來推數據前端就要使用 Dcon/Logs/Client來接收數據。
3、MQTT路由不支持通配符.
4、因爲MQTT的JS庫沒有提供Topic交換機與路由綁定功能。因此前端接收時 不能設置訂閱主題交換機名稱。若是要和amqp交互只能使用amqp的默認主題交換機名稱 amq.topic
 
運行效果圖:
 
相關文章
相關標籤/搜索