.Net中的並行編程-4.實現高性能異步隊列

上文《.Net中的並行編程-3.ConcurrentQueue實現與分析》分析了ConcurrentQueue的實現,本章就基於ConcurrentQueue實現一個高性能的異步隊列,該隊列主要用於實時數據流的處理並簡化多線程編程模型。設計該隊列時考慮如下幾點需求(需求來自公司的一個實際項目)html

1. 支持多線程入隊出隊,儘可能簡化多線程編程的複雜度。編程

2. 支持事件觸發機制,數據入隊時才進行處理而不是使用定時處理機制, 並且內部能阻塞消費者線程。數據結構

3. 出隊時數據處理的順序要保證和入隊時是一致的。多線程

4. 容錯性強,能夠不間斷運行。異步

以上需求點對應的解決方案:函數

1.ConcurrentQueue支持多線程並且多線程環境下的性能較高,對於多線程編程模型簡化可用適配器模式可將消費者線程封裝到隊列內部,內部採用處理事件方式處理用戶的任務。oop

2.對於事件觸發機制首先信號量不適合,由於信號量達到指定數目時會阻塞線程,因此該部分須要本身編程實現(具體參考源碼)。性能

3.隊列的特性以及保證入隊和出隊順序,這裏須要保證的是線程處理數據項的順序。this

4.可經過註冊異常處理函數的方式解決異常的問題。spa

因此開發出如下代碼:

    public class AsynQueue<T>
    {
        //隊列是否正在處理數據
        private int isProcessing;
        //有線程正在處理數據
        private const int Processing = 1;
        //沒有線程處理數據
        private const int UnProcessing = 0;
        //隊列是否可用
        private volatile bool enabled = true;
        private Task currentTask;
        public event Action<T> ProcessItemFunction;
        public event EventHandler<EventArgs<Exception>> ProcessException;
        private ConcurrentQueue<T> queue;
        
        public AsynQueue()
        {
            queue = new ConcurrentQueue<T>();
            Start();
        }

        public int Count
        {
            get
            {
                return queue.Count;
            }
        }

        private void Start()
        {
            Thread process_Thread = new Thread(PorcessItem);
            process_Thread.IsBackground = true;
            process_Thread.Start();
        }

        public void Enqueue(T items)
        {
            if (items == null)
            {
                throw new ArgumentException("items");
            }

            queue.Enqueue(items);
            DataAdded();
        }

        //數據添加完成後通知消費者線程處理
        private void DataAdded()
        {
            if (enabled)
            {
                if (!IsProcessingItem())
                {
                    currentTask = Task.Factory.StartNew(ProcessItemLoop);
                }
            }
        }

        //判斷是否隊列有線程正在處理 
         private bool IsProcessingItem()
         {
            return !(Interlocked.CompareExchange(ref isProcessing, Processing, UnProcessing) == 0);
         }

        private void ProcessItemLoop()
        {

            if (!enabled && queue.IsEmpty)
            {
                Interlocked.Exchange(ref isProcessing, 0);
                return;
            }
            //處理的線程數 是否小於當前最大任務數
            //if (Thread.VolatileRead(ref runingCore) <= this.MaxTaskCount)
            //{
            T publishFrame;

            if (queue.TryDequeue(out publishFrame))
            {
                
                try
                {
                    ProcessItemFunction(publishFrame);
                }
                catch (Exception ex)
                {
                    OnProcessException(ex);
                }
            }

            if (enabled && !queue.IsEmpty)
            {
                currentTask = Task.Factory.StartNew(ProcessItemLoop);
            }
            else
            {
                Interlocked.Exchange(ref isProcessing, UnProcessing);
            }
        }

       /// <summary>
       ///定時處理線程調用函數  
        ///主要是監視入隊的時候線程 沒有來的及處理的狀況
        /// </summary>
        private void PorcessItem(object state)
        {
            int sleepCount = 0;
            int sleepTime = 1000;
            while (enabled)
            {
                //若是隊列爲空則根據循環的次數肯定睡眠的時間
                if (queue.IsEmpty)
                {
                    if (sleepCount == 0)
                    {
                        sleepTime = 1000;
                    }
                    else if (sleepCount <= 3)
                    {
                        sleepTime = 1000 * 3;
                    }
                    else
                    {
                        sleepTime = 1000 * 50;
                    }
                    sleepCount++;
                    Thread.Sleep(sleepTime);
                }
                else
                {
                    //判斷是否隊列有線程正在處理 
                    if (enabled && Interlocked.CompareExchange(ref isProcessing, Processing, UnProcessing) == 0)
                    {
                        if (!queue.IsEmpty)
                        {
                            currentTask = Task.Factory.StartNew(ProcessItemLoop);
                        }
                        else
                        {
                            Interlocked.Exchange(ref isProcessing, 0);
                        }
                        sleepCount = 0;
                        sleepTime = 1000;
                    }
                }
            }
        }

        public void Flsuh()
        {
            Stop();

            if (currentTask != null)
            {
                currentTask.Wait();
            }

            while (!queue.IsEmpty)
            {
                try
                {
                    T publishFrame;
                    if (queue.TryDequeue(out publishFrame))
                    {
                        ProcessItemFunction(publishFrame);
                    }
                }
                catch (Exception ex)
                {
                    OnProcessException(ex);
                }
            }
            currentTask = null;
        }

        public void Stop()
        {
            this.enabled = false;
        }

        private void OnProcessException(System.Exception ex)
        {
            var tempException = ProcessException;
            Interlocked.CompareExchange(ref ProcessException, null, null);

            if (tempException != null)
            {
                ProcessException(ex, new EventArgs<Exception>(ex));
            }
        }  
    }
[Serializable]
public class EventArgs<T> : System.EventArgs
{
    public T Argument;

    public EventArgs() : this(default(T))
    {
    }

    public EventArgs(T argument)
    {
        Argument = argument;
    }
}

 

 該隊列的思想是:當每次數據入隊時,隊列內部會調用DataAdded()判斷是否數據項已經開始被處理,若是已經開始處理則數據入到內部隊列後直接返回不然開啓消費者線程處理。隊列內部的消費者線程(線程池)(Task內部使用線程池實現,這裏就當作線程池吧)會採用採用遞歸的方式處理數據,也就是當前數據處理完成後再將另一個數據放到線程池去處理,這樣就造成一個處理環並且保證了每條數據都有序的進行處理。因爲ConcurrentQueue的IsEmpty只是當前內存的一個快照狀態,可能當前時刻爲空下一個時候不爲空, 因此還須要一個守護線程process_Thread定時監視隊列內部的消費者線程(線程池)是否正在處理數據,不然會形成消費者線程已經判斷隊列爲空而數據已經到達只是還沒插入隊列此時數據可能永遠得不處處理。

適用的場景:

  1.適合多個生產者一個消費者的情景(當前若是須要多個消費者可使用多個單獨線程來實現)。

  2.適合處理數據速度較快的情景而對於文件寫入等IO操做不適合,由於線程池內部都是後臺線程,當進程關閉時線程會同時關閉線程這時文件可能還沒寫入到磁盤。

  3.適合做爲流水線處理模型的基礎數據結構,隊列之間經過各自的事件處理函數進行通訊(後續會專門撰寫文章介紹關於流水線模型的應用)。

  注:內部的ConcurrentQueue隊列還可使用阻塞隊列(BlockingCollection)來替代,雖然使用阻塞隊列更簡單可是內部的消費者線程比較適合使用單獨的線程不適合使用線程池,並且阻塞隊列爲空時會阻塞消費者線程,固然阻塞線程池內的線程也沒什麼影響只是不推薦這麼作,並且阻塞的隊列的性能也沒有ConcurrentQueue的性能高。

相關文章
相關標籤/搜索