哎呀,我老大寫Bug啦——記一次MessageQueue的優化

  MessageQueue,顧名思義消息隊列,在系統開發中也是用的比較多的一箇中間件吧。咱們這裏主要用它來作日誌管理和訂單管理的,記得老老大(恩,是的,就是老老大,由於他已經跳槽了)還在的時候,當時也是爲了趕項目進度,他也參與開發了,那時候我纔剛剛入職,他負責寫後端這塊,我來了就把他手上的任務接過來了,(接着接着……就辭職了)。git

以後咱們的開發仍然有條不紊的開發着,直到今年的一月份吧,才上線開始運行,而後就出現了常規狀態,上線以後就開始爆炸,github

                                                                                     

這個頁面打不開呀,那個內容沒東西呀,第三方登陸問題呀,支付問題呀,臨時再改需求呀……(該來的都來了),加班、debug、測試、再debug……,而後通過幾天的修復,終於完成了跟本身電腦同樣穩定的運行,組員們都美滋滋的,今晚加個雞腿才行。後端

                                                                                    

都說禍不單行,古人是不會騙咱們的,Bug怎麼會修得完呢?天真,要是Bug能修得完還要咱們來幹啥,好景不長,果真,過了一週以後,組員忽然羣裏叫喳喳,服務器

what is it ? 

 

 

來了,今天的主角登場了,我也要開始加班了。app

RabbitMQ

  這個是今天要說的東西,基礎概念什麼的不是今天要說的重點,重點是:異步

 

RabbitMQ內存使得整個服務器瀕臨癱瘓,遠程登陸服務器都差點擠不進去的狀態,別看截圖目前才1.3G,吃個午餐回來,就2.3G了,可怕不可怕?咋回事?ide

老闆喊你回來加班啦

  先無論了,線上優先解決,手動先Reset回收資源以釋放空間,這個只是臨時的辦法,而後檢查一下rabbitMQ的配置有沒有問題,路徑在測試

 C:\Users\Administrator\AppData\Roaming\RabbitMQ 

徹底是默認的配置,徹底ojbk啊,那到底咋回事?繼續檢查,想一想不如從項目開始吧,而後查看項目中的代碼,都是歷來自【MessageLib】的組件調用ui

好了,叫我老老大要這個組件的代碼,他把git的地址就發給我,我把項目down下來,this

這個封裝的組件內容很少,主要的文件一目瞭然,其實就是用到這個兩個組件來進行的二次封裝來調用

主要的代碼是在【MessageQueue.cs】文件裏,展現一下當時的代碼狀況:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using MessageLib.ClassBean;
using EasyNetQ;
using System.Threading;

namespace MessageLib
{
    public static class MessageQueue
    {
        public static IBus bus = MQBusBuilder.CreateMessageBus();
        //消息隊列
        private static Queue<Item> NoticQueue = new Queue<Item>(5000);
        //日誌隊列
        private static Queue<Item> LogQueue = new Queue<Item>(5000);
        //隊列數目發佈數量
        private static int max_count_to_pulish = 1000;

        /// <summary>
        /// 可供外部使用的消息入列操做
        /// </summary>
        public static void push(Item item)
        {
            if (item.type == ItemType.notic)
            {
                NoticQueue.Enqueue(item);
            }

            if (item.type == ItemType.log)
            {
                LogQueue.Enqueue(item);
            }
        }

        /// <summary>
        /// 監聽後須要調用的發佈接口
        /// </summary>
        private static void Pulish(object source, System.Timers.ElapsedEventArgs e)
        {
            if (NoticQueue.Count > 0 || LogQueue.Count > 0)
            {
                if (bus == null || !bus.IsConnected)
                {
                    bus = MQBusBuilder.CreateMessageBus();
                }

                if (bus.IsConnected)
                {
                    Send(ItemType.notic);
                    Send(ItemType.log);
                }
            }
        }

        /// <summary>
        /// 程序自運行並開始監聽
        /// </summary>
        public static void Run()
        {
            System.Timers.Timer timer = new System.Timers.Timer();
            timer.Interval = 1000;
            timer.Elapsed += new System.Timers.ElapsedEventHandler(Pulish);//到達時間的時候執行事件;    
            timer.AutoReset = true;//設置是執行一次(false)仍是一直執行(true);    
            timer.Enabled = true;//是否執行System.Timers.Timer.Elapsed事件;    
        }

        /// <summary>
        /// 啓動線程異步調用
        /// </summary>
        /// <param name="channelType"></param>
        private static void Send(string channelType)
        {
            Thread thread = new Thread(new ParameterizedThreadStart(PublishAction));
            thread.IsBackground = true;
            thread.Start(channelType);
        }

        /// <summary>
        /// 調用發佈日誌及提醒兩個接口
        /// </summary>
        /// <param name="channel"></param>
        private static void PublishAction(object channel)
        {
            PublisLog();
            PublisNotic();
        }

        /// <summary>
        /// 日誌消息發送至RabbitMQ指定exchange、Queue
        /// </summary>
        private static void PublisLog()
        {
            string channelName = ItemType.log;
            try
            {
                var routingKey = channelName;
                var mqqueue = bus.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));
                var exchange = bus.Advanced.ExchangeDeclare(string.Format("Exchange.{0}",channelName), "direct");
                var binding = bus.Advanced.Bind(exchange, mqqueue, routingKey);
                while (LogQueue.Count > 0)
                {
                    Item item = LogQueue.Dequeue();
                    if (item != null)
                    {
                        var properties = new MessageProperties();
                        var Message = new Message<string>(Newtonsoft.Json.JsonConvert.SerializeObject(item));
                        Message.Properties.AppId = item.appid;
                        bus.Advanced.Publish(exchange, routingKey, false, Message);
                    }

                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

        /// <summary>
        /// 提醒消息發送至RabbitMQ指定exchange、Queue
        /// </summary>
        private static void PublisNotic()
        {
            string channelName = ItemType.notic;
            var routingKey = channelName;
            var mqqueue = bus.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));
            var exchange = bus.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), "direct");
            var binding = bus.Advanced.Bind(exchange, mqqueue, routingKey);
            while(NoticQueue.Count > 0)
            {
                Item item = NoticQueue.Dequeue();
                if (item != null)
                {
                    var properties = new MessageProperties();
                    var Message = new Message<string>(Newtonsoft.Json.JsonConvert.SerializeObject(item));
                    Message.Properties.AppId = item.appid;
                    bus.Advanced.Publish(exchange, routingKey, false, Message);
                }
            }
        }
    }
}
View Code

而後我就發現了這一段代碼!

        /// <summary>
        /// 程序自運行並開始監聽
        /// </summary>
        public static void Run()
        {
            System.Timers.Timer timer = new System.Timers.Timer();
            timer.Interval = 1000;
            timer.Elapsed += new System.Timers.ElapsedEventHandler(Pulish);//到達時間的時候執行事件;    
            timer.AutoReset = true;//設置是執行一次(false)仍是一直執行(true);    
            timer.Enabled = true;//是否執行System.Timers.Timer.Elapsed事件;    
        }
        /// <summary>
        /// 啓動線程異步調用
        /// </summary>
        /// <param name="channelType"></param>
        private static void Send(string channelType)
        {
            Thread thread = new Thread(new ParameterizedThreadStart(PublishAction));
            thread.IsBackground = true;
            thread.Start(channelType);
        }

  老老大寫Bug了,當Run()起來以後,隊列中【NoticQueue】有內容,就開始推送消息,發送消息Send(),每來一次推送new一個線程並設置爲後臺線程,而後發送消息。好了,明白了,這裏的線程很混亂,由於線程操做不當,new了N多個頻道,而且沒有主動回收,這也難怪內存暴漲呢。而且要是Run()調用屢次,後果更加不堪設想。

加班改起來

  開始動手吧,業務主要推送有普通消息、錯誤消息和通知消息,那麼將隊列與線程組裝一塊兒,新增一個類QueueTask.cs:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MessageLib.Core;
using MessageLib.Core.ClassBean;
using EasyNetQ;
using EasyNetQ.Topology;
using System.Linq.Expressions;

namespace MessageLib.Core
{
    public class QueueTask
    {
        private Queue<Item> QueueData = new Queue<Item>(5000);
        //隊列數目發佈數量
        private int max_count_to_pulish = 1000;
        public  bool isRunning = false;
        private string itemType = ItemType.info;
        private string MessageRouter = ItemType.info;

        public QueueTask(string itemType,string MessageRouter)
        {
            this.itemType = itemType;
            this.MessageRouter = MessageRouter;
        }

        /// <summary>
        /// 可供外部使用的消息入列操做
        /// </summary>
        public void Push(Item item, IBus IBus)
        {
            QueueData.Enqueue(item);
            if (!isRunning)
                Run(IBus);
        }

        public void Run(IBus IBus)
        {
            if (!isRunning)
            {
                Timer timerNotic = new Timer(PulishMsg, IBus, 1000, 1000);
                isRunning = true;
            }
        }

        private void PulishMsg(object state)
        {
            IBus IBus = state as IBus;
            if (QueueData.Count > 0)
            {
                PublisMsg(itemType, IBus);
            }
        }

        private void PublisMsg(object channel, IBus BusInstance)
        {
            try
            {
                string channelName = channel as string;
                if (QueueData.Count > 0)
                {
                    var mqqueue = BusInstance.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));
                    var exchange = BusInstance.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), ExchangeType.Direct);
                    var binding = BusInstance.Advanced.Bind(exchange, mqqueue, mqqueue.Name);

                    while (QueueData.Count > 0)
                    {
                        Item item = QueueData.Dequeue();
                        if (item != null)
                        {
                            var properties = new MessageProperties();
                            var Message = new Message<string>(Newtonsoft.Json.JsonConvert.SerializeObject(item));
                            Message.Properties.AppId = item.appid;
                            BusInstance.Advanced.Publish(exchange, mqqueue.Name, false, Message);
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine("PublisMsg error:" + ex.Message);
            }
        } 

        public void Read<T>(IBus BusInstance,Action<Item> dealAction) where T : Item
        {
            try
            {
                string channelName = itemType;
                var mqqueue = BusInstance.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));
                var exchange = BusInstance.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), ExchangeType.Direct);
                var binding = BusInstance.Advanced.Bind(exchange, mqqueue, mqqueue.Name);

                var Consume = BusInstance.Advanced.Consume(mqqueue, registration => Task.Run(() =>
                {
                    registration.Add<string>((message, info) => 
                    {
                        Item data = Newtonsoft.Json.JsonConvert.DeserializeObject<T>(message.Body);
                        dealAction(data);
                    });
                }));
            }
            catch (Exception ex)
            {
                Console.WriteLine("Read error:" + ex.Message);
            }
        }
    }
}

 

而後,在MessageQueue.cs修改成單例模式:

    public static class MessageQueue
    {
        /*Install-Package EasyNetQ-dotnet-core -Version 2.0.2-radicalgeek-netc0001 -Pre*/

        private static IBus bus = null;
        public static bool isRunning = false;

        //消息隊列
        private static QueueTask NoticQueue = null;
        //日誌隊列
        private static QueueTask LogQueue = null;
        //自定義
        private static QueueTask InfoQueue = null;

        #region 同步鎖
        private static readonly object obj = new object();
        #endregion

        public static void Init(string Connection, string routeKey)
        {
            if (NoticQueue == null)
                NoticQueue = new QueueTask(ItemType.notic, ItemType.notic);
            if (LogQueue == null)
                LogQueue = new QueueTask(ItemType.error, ItemType.error);
            if (InfoQueue == null)
                InfoQueue = new QueueTask(ItemType.info, routeKey);
            if (string.IsNullOrEmpty(MQBusBuilder.Connnection))
                MQBusBuilder.Connnection = Connection;
        }

        public static IBus BusInstance
        {
            get
            {
                if (bus == null)
                {
                    lock (obj)
                    {
                        if (bus == null|| !bus.IsConnected)
                        {
                            bus = MQBusBuilder.CreateMessageBus();
                        }
                    }
                }
                return bus;
            }
        }


        /// <summary>
        /// 可供外部使用的消息入列操做
        /// </summary>
        public static void PushAndRun(Item item)
        {
            if (string.IsNullOrWhiteSpace(MQBusBuilder.Connnection) || BusInstance == null)
                return;
            if (item.type == ItemType.notic)
            {
                NoticQueue.Push(item, BusInstance);
            }
            if (item.type == ItemType.error)
            {
                LogQueue.Push(item, BusInstance);
            }
            if (item.type == ItemType.info)
            {
                InfoQueue.Push(item, BusInstance);
            }
        }

        public static void Read(string itemType, Action<Item> dealAction)
        {
            if (itemType == ItemType.notic)
            {
                NoticQueue.Read<NoticItem>(BusInstance, dealAction);
            }
            if (itemType == ItemType.error)
            {
                LogQueue.Read<ErrorItem>(BusInstance, dealAction);
            }
            if (itemType == ItemType.info)
            {
                InfoQueue.Read<Message>(BusInstance, dealAction);
            }
        }
    }
View Code

每次推送消息的時候,每一個QueueTask就本身維護本身的線程和隊列了,當調用推送以後,就開始運做起來。恩,應該沒問題了。而後就發佈nuget,再更新項目,而後發佈。觀察一段時間,恩,完美。

 

事件二

  事情事後,B端開始搞起來了,而後涉及到訂單系統,跟老大(不是老老大,老老大那時候已經跑了)商量以後肯定使用消息隊列來作訂單的事件的拓展,而後就直接美滋滋的調用好以前寫的了,沒想到啊,此次是線程漲!由於訂單是從B端推送過來的,B端確定沒事,訂單後臺訂閱消息以後,讀取過程當中出現的線程增多,而後看看以前寫的Read()方法,感受沒啥問題啊,每運行完一次,就多了一個線程,這個神奇了啊,那麼源代碼擼起來。

翻來覆去,看到這個Consume方法,繼承的是IDisposable接口,得勒,知道咋回事了。

Consume.Dispose(); 多個消費者的狀況下,用完請記得主動釋放啊。

這回真的能夠浪了。

 

總結

  遇到問題,冷靜下來,耐得了寂寞才行。線上的問題優先解決,而後再慢慢Debug,解決不了,看源碼,再解決不了,降級處理,歡迎共同探討。同時也感謝一下技術羣裏的兄弟給的一些建議,並幫忙查找資料,還好EasyNetQ是開源了,否則也打算說先不用了,畢竟一開始沒什麼用戶量,因此不必整那麼麻煩,加班加點的弄這個問題。不過最終都完美的解決了,內心仍是挺美滋滋的,程序猿隨之而來的成就感。

  別看咱們在工位上默不做聲,咱們可能在拯救世界呢!老闆,該加工資啦!

                                                                                             

 補充

2018-12-25  鑑於大夥私信我想看看原來的bug修復後的狀況,畢竟是公司代碼不適合徹底開源,我單獨把例子源碼作過修改的發佈出來,思路都差很少的,對比一下文章中原來的有問題的代碼就能夠了吧。由於都已經修復掉了,修改後的在這裏。👇

MessageLib.Core git 

本文已獨家受權給腳本之家(ID:jb51net)公衆號發佈

相關文章
相關標籤/搜索