TPL DataFlow初探(二)

上一篇簡單的介紹了TDF提供的一些Block,經過對這些Block配置和組合,能夠知足不少的數據處理的場景。這一篇將繼續介紹與這些Block配置的相關類,和挖掘一些高級功能。html

 

在一些Block的構造函數中,咱們經常能夠看見須要你輸入DataflowBlockOptions 類型或者它的兩個派生類型ExecutionDataflowBlockOptions 和 GroupingDataflowBlockOptions。緩存

DataflowBlockOptions負載均衡

DataflowBlockOptions有五個屬性:BoundedCapacity,CancellationToken,MaxMessagesPerTask,NameFormat和TaskScheduler。異步

用BoundedCapacity來限定容量函數

這個屬性用來限制一個Block中最多能夠緩存數據項的數量,大多數Block都支持這個屬性,這個值默認是DataflowBlockOptions.Unbounded = -1,也就是說沒有限制。開發人員能夠制定這個屬性設置數量的上限。那後面的新數據將會延遲。好比說用一個BufferBlock鏈接一個ActionBlock,若是在ActionBlock上面設置了上限,ActionBlock處理的操做速度比較慢,留在ActionBlock中的數據到達了上限,那麼餘下的數據將留在BufferBlock中,直到ActionBlock中的數據量低於上限。這種狀況經常會發生在生產者生產的速度大於消費者速度的時候,致使的問題是內存愈來愈大,數據操做愈來愈延遲。咱們能夠經過一個BufferBlock鏈接多個ActionBlock來解決這樣的問題,也就是負載均衡。一個ActionBlock滿了,就會放到另一個ActionBlock中去了。性能

 

用CancellationToken來取消操做測試

TPL中經常使用的類型。在Block的構造函數中放入CancellationToken,Block將在它的整個生命週期中全程監控這個對象,只要在這個Block結束運行(調用Complete方法)前,用CancellationToken發送取消請求,該Block將會中止運行,若是Block中還有沒有處理的數據,那麼將不會再被處理。線程

用MaxMessagesPerTask控制公平性設計

每個Block內部都是異步處理,都是使用TPL的Task。TDF的設計是在保證性能的狀況下,儘可能使用最少的任務對象來完成數據的操做,這樣效率會高一些,一個任務執行完成一個數據之後,任務對象並不會銷燬,而是會保留着去處理下一個數據,直到沒有數據處理的時候,Block纔會回收掉這個任務對象。可是若是數據來自於多個Source,公平性就很難保證。從其餘Source來的數據必需要等到早前的那些Source的數據都處理完了才能被處理。這時咱們就能夠經過MaxMessagesPerTask來控制。這個屬性的默認值仍是DataflowBlockOptions.Unbounded=-1,表示沒有上限。假如這個數值被設置爲1的話,那麼單個任務只會處理一個數據。這樣就會帶來極致的公平性,可是將帶來更多的任務對象消耗。代理

用NameFormat來定義Block名稱

MSDN上說屬性NameFormat用來獲取或設置查詢塊的名稱時要使用的格式字符串。

Block的名字Name=string.format(NameFormat, block.GetType ().Name, block.Completion.Id)。因此當咱們輸入」{0}」的時候,名字就是block.GetType ().Name,若是咱們數據的是」{1}」,那麼名字就是block.Completion.Id。若是是「{2}」,那麼就會拋出異常。

用TaskScheduler來調度Block行爲

TaskScheduler是很是重要的屬性。一樣這個類型來源於TPL。每一個Block裏面都使用TaskScheduler來調度行爲,不管是源Block和目標Block之間的數據傳遞,仍是用戶自定義的執行數據方法委託,都是使用的TaskScheduler。若是沒有特別設置的話,將使用TaskScheduler.Default(System.Threading.Tasks.ThreadPoolTaskScheduler)來調度。咱們可使用其餘的一些繼承於TaskScheduler的類型來設置這個調度器,一旦設置了之後,Block中的全部行爲都會使用這個調度器來執行。.Net Framework 4中內建了兩個Scheduler,一個是默認的ThreadPoolTaskScheduler,另外一個是用於UI線程切換的SynchronizationContextTaskScheduler。若是你使用的Block設計到UI的話,那可使用後者,這樣在UI線程切換上面將更加方便。

.Net Framework 4.5 中,還有一個類型被加入到System.Threading.Tasks名稱空間下:ConcurrentExclusiveSchedulerPair。這個類是兩個TaskScheduler的組合。它提供兩個TaskScheduler:ConcurrentScheduler和ExclusiveScheduler;咱們能夠把這兩個TaskScheduler構造進要使用的Block中。他們保證了在沒有排他任務的時候(使用ExclusiveScheduler的任務),其餘任務(使用ConcurrentScheduler)能夠同步進行,當有排他任務在運行的時候,其餘任務都不能運行。其實它裏面就是一個讀寫鎖。這在多個Block操做共享資源的問題上是一個很方便的解決方案。

 
     public ActionBlock<int> readerAB1;

        public ActionBlock<int> readerAB2;

        public ActionBlock<int> readerAB3;

        public ActionBlock<int> writerAB1;

        public BroadcastBlock<int> bb = new BroadcastBlock<int>((i) => { return i; });

        public void Test()
        {
            ConcurrentExclusiveSchedulerPair pair = new ConcurrentExclusiveSchedulerPair();

            readerAB1 = new ActionBlock<int>((i) =>
                {
                    Console.WriteLine("ReaderAB1 begin handling." + " Execute Time:" + DateTime.Now);
                    Thread.Sleep(500);
                }
            , new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ConcurrentScheduler });

            readerAB2 = new ActionBlock<int>((i) =>
                {
                    Console.WriteLine("ReaderAB2 begin handling." + " Execute Time:" + DateTime.Now);
                    Thread.Sleep(500);
                }
            , new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ConcurrentScheduler });

            readerAB3 = new ActionBlock<int>((i) =>
                {
                    Console.WriteLine("ReaderAB3 begin handling." + " Execute Time:" + DateTime.Now);
                    Thread.Sleep(500);
                }
            , new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ConcurrentScheduler });

            writerAB1 = new ActionBlock<int>((i) =>
            {
                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine("WriterAB1 begin handling." + " Execute Time:" + DateTime.Now);
                Console.ResetColor();
                Thread.Sleep(3000);
            }
            , new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ExclusiveScheduler });

            bb.LinkTo(readerAB1);
            bb.LinkTo(readerAB2);
            bb.LinkTo(readerAB3);


            Task.Run(() =>
                {
                    while (true)
                    {
                        bb.Post(1);
                        Thread.Sleep(1000);
                    }
                });

            Task.Run(() =>
            {
                while (true)
                {
                    Thread.Sleep(6000);
                    writerAB1.Post(1);
                }
            });

        }
 

 

FXE@)NN3Q($(J$AXHGWOP~N

 

用MaxDegreeOfParallelism來並行處理

一般,Block中處理數據都是單線程的,一次只能處理一個數據,好比說ActionBlock中自定義的代理。使用MaxDegreeOfParallelism可讓你並行處理這些數據。屬性的定義是最大的並行處理個數。若是定義成-1的話,那就是沒有限制。用戶須要在實際狀況中選擇這個值的大小,並非越大越好。若是是平行處理的話,還應該考慮是否有共享資源。

 

 

TDF中的負載均衡

咱們可使用Block很方便的構成一個生產者消費者的模式來處理數據。當生產者產生數據的速度快於消費者的時候,消費者Block的Buffer中的數據會愈來愈多,消耗大量的內存,數據處理也會延時。這時,咱們能夠用一個生產者Block鏈接多個消費者Block來解決這個問題。因爲多個消費者Block必定是並行處理,因此對共享資源的處理必定要作同步處理。

使用BoundedCapacity屬性來實現

當鏈接多個ActionBlock的時候,能夠經過設置ActionBlock的BoundedCapacity屬性。當第一個滿了,就會放到第二個,第二個滿了就會放到第三個。

 
public BufferBlock<int> bb = new BufferBlock<int>();

        public ActionBlock<int> ab1 = new ActionBlock<int>((i) =>
            {
                Thread.Sleep(1000);
                Console.WriteLine("ab1 handle data" + i + " Execute Time:" + DateTime.Now);
            }
        , new ExecutionDataflowBlockOptions() { BoundedCapacity = 2 });

        public ActionBlock<int> ab2 = new ActionBlock<int>((i) =>
        {
            Thread.Sleep(1000);
            Console.WriteLine("ab2 handle data" + i + " Execute Time:" + DateTime.Now);
        }
        , new ExecutionDataflowBlockOptions() { BoundedCapacity = 2 });

        public ActionBlock<int> ab3 = new ActionBlock<int>((i) =>
        {
            Thread.Sleep(1000);
            Console.WriteLine("ab3 handle data:" + i + " Execute Time:" + DateTime.Now);
        }
        , new ExecutionDataflowBlockOptions() { BoundedCapacity = 2 });

        public void Test()
        {
            bb.LinkTo(ab1);
            bb.LinkTo(ab2);
            bb.LinkTo(ab3);

            for (int i = 0; i < 9; i++)
            {
                bb.Post(i);
            }
        }
 

PNQFKIK2OK)}SOWA]RF(~$M

測試代碼能夠從這裏下載。

相關文章
相關標籤/搜索