上一篇簡單的介紹了TDF提供的一些Block,經過對這些Block配置和組合,能夠知足不少的數據處理的場景。這一篇將繼續介紹與這些Block配置的相關類,和挖掘一些高級功能。html
在一些Block的構造函數中,咱們經常能夠看見須要你輸入DataflowBlockOptions 類型或者它的兩個派生類型ExecutionDataflowBlockOptions 和 GroupingDataflowBlockOptions。緩存
DataflowBlockOptions負載均衡
DataflowBlockOptions有五個屬性:BoundedCapacity,CancellationToken,MaxMessagesPerTask,NameFormat和TaskScheduler。異步
用BoundedCapacity來限定容量函數
這個屬性用來限制一個Block中最多能夠緩存數據項的數量,大多數Block都支持這個屬性,這個值默認是DataflowBlockOptions.Unbounded = -1,也就是說沒有限制。開發人員能夠制定這個屬性設置數量的上限。那後面的新數據將會延遲。好比說用一個BufferBlock鏈接一個ActionBlock,若是在ActionBlock上面設置了上限,ActionBlock處理的操做速度比較慢,留在ActionBlock中的數據到達了上限,那麼餘下的數據將留在BufferBlock中,直到ActionBlock中的數據量低於上限。這種狀況經常會發生在生產者生產的速度大於消費者速度的時候,致使的問題是內存愈來愈大,數據操做愈來愈延遲。咱們能夠經過一個BufferBlock鏈接多個ActionBlock來解決這樣的問題,也就是負載均衡。一個ActionBlock滿了,就會放到另一個ActionBlock中去了。性能
用CancellationToken來取消操做測試
TPL中經常使用的類型。在Block的構造函數中放入CancellationToken,Block將在它的整個生命週期中全程監控這個對象,只要在這個Block結束運行(調用Complete方法)前,用CancellationToken發送取消請求,該Block將會中止運行,若是Block中還有沒有處理的數據,那麼將不會再被處理。線程
用MaxMessagesPerTask控制公平性設計
每個Block內部都是異步處理,都是使用TPL的Task。TDF的設計是在保證性能的狀況下,儘可能使用最少的任務對象來完成數據的操做,這樣效率會高一些,一個任務執行完成一個數據之後,任務對象並不會銷燬,而是會保留着去處理下一個數據,直到沒有數據處理的時候,Block纔會回收掉這個任務對象。可是若是數據來自於多個Source,公平性就很難保證。從其餘Source來的數據必需要等到早前的那些Source的數據都處理完了才能被處理。這時咱們就能夠經過MaxMessagesPerTask來控制。這個屬性的默認值仍是DataflowBlockOptions.Unbounded=-1,表示沒有上限。假如這個數值被設置爲1的話,那麼單個任務只會處理一個數據。這樣就會帶來極致的公平性,可是將帶來更多的任務對象消耗。代理
用NameFormat來定義Block名稱
MSDN上說屬性NameFormat用來獲取或設置查詢塊的名稱時要使用的格式字符串。
Block的名字Name=string.format(NameFormat, block.GetType ().Name, block.Completion.Id)。因此當咱們輸入」{0}」的時候,名字就是block.GetType ().Name,若是咱們數據的是」{1}」,那麼名字就是block.Completion.Id。若是是「{2}」,那麼就會拋出異常。
用TaskScheduler來調度Block行爲
TaskScheduler是很是重要的屬性。一樣這個類型來源於TPL。每一個Block裏面都使用TaskScheduler來調度行爲,不管是源Block和目標Block之間的數據傳遞,仍是用戶自定義的執行數據方法委託,都是使用的TaskScheduler。若是沒有特別設置的話,將使用TaskScheduler.Default(System.Threading.Tasks.ThreadPoolTaskScheduler)來調度。咱們可使用其餘的一些繼承於TaskScheduler的類型來設置這個調度器,一旦設置了之後,Block中的全部行爲都會使用這個調度器來執行。.Net Framework 4中內建了兩個Scheduler,一個是默認的ThreadPoolTaskScheduler,另外一個是用於UI線程切換的SynchronizationContextTaskScheduler。若是你使用的Block設計到UI的話,那可使用後者,這樣在UI線程切換上面將更加方便。
.Net Framework 4.5 中,還有一個類型被加入到System.Threading.Tasks名稱空間下:ConcurrentExclusiveSchedulerPair。這個類是兩個TaskScheduler的組合。它提供兩個TaskScheduler:ConcurrentScheduler和ExclusiveScheduler;咱們能夠把這兩個TaskScheduler構造進要使用的Block中。他們保證了在沒有排他任務的時候(使用ExclusiveScheduler的任務),其餘任務(使用ConcurrentScheduler)能夠同步進行,當有排他任務在運行的時候,其餘任務都不能運行。其實它裏面就是一個讀寫鎖。這在多個Block操做共享資源的問題上是一個很方便的解決方案。
public ActionBlock<int> readerAB1; public ActionBlock<int> readerAB2; public ActionBlock<int> readerAB3; public ActionBlock<int> writerAB1; public BroadcastBlock<int> bb = new BroadcastBlock<int>((i) => { return i; }); public void Test() { ConcurrentExclusiveSchedulerPair pair = new ConcurrentExclusiveSchedulerPair(); readerAB1 = new ActionBlock<int>((i) => { Console.WriteLine("ReaderAB1 begin handling." + " Execute Time:" + DateTime.Now); Thread.Sleep(500); } , new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ConcurrentScheduler }); readerAB2 = new ActionBlock<int>((i) => { Console.WriteLine("ReaderAB2 begin handling." + " Execute Time:" + DateTime.Now); Thread.Sleep(500); } , new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ConcurrentScheduler }); readerAB3 = new ActionBlock<int>((i) => { Console.WriteLine("ReaderAB3 begin handling." + " Execute Time:" + DateTime.Now); Thread.Sleep(500); } , new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ConcurrentScheduler }); writerAB1 = new ActionBlock<int>((i) => { Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine("WriterAB1 begin handling." + " Execute Time:" + DateTime.Now); Console.ResetColor(); Thread.Sleep(3000); } , new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ExclusiveScheduler }); bb.LinkTo(readerAB1); bb.LinkTo(readerAB2); bb.LinkTo(readerAB3); Task.Run(() => { while (true) { bb.Post(1); Thread.Sleep(1000); } }); Task.Run(() => { while (true) { Thread.Sleep(6000); writerAB1.Post(1); } }); }
用MaxDegreeOfParallelism來並行處理
一般,Block中處理數據都是單線程的,一次只能處理一個數據,好比說ActionBlock中自定義的代理。使用MaxDegreeOfParallelism可讓你並行處理這些數據。屬性的定義是最大的並行處理個數。若是定義成-1的話,那就是沒有限制。用戶須要在實際狀況中選擇這個值的大小,並非越大越好。若是是平行處理的話,還應該考慮是否有共享資源。
TDF中的負載均衡
咱們可使用Block很方便的構成一個生產者消費者的模式來處理數據。當生產者產生數據的速度快於消費者的時候,消費者Block的Buffer中的數據會愈來愈多,消耗大量的內存,數據處理也會延時。這時,咱們能夠用一個生產者Block鏈接多個消費者Block來解決這個問題。因爲多個消費者Block必定是並行處理,因此對共享資源的處理必定要作同步處理。
使用BoundedCapacity屬性來實現
當鏈接多個ActionBlock的時候,能夠經過設置ActionBlock的BoundedCapacity屬性。當第一個滿了,就會放到第二個,第二個滿了就會放到第三個。
public BufferBlock<int> bb = new BufferBlock<int>(); public ActionBlock<int> ab1 = new ActionBlock<int>((i) => { Thread.Sleep(1000); Console.WriteLine("ab1 handle data" + i + " Execute Time:" + DateTime.Now); } , new ExecutionDataflowBlockOptions() { BoundedCapacity = 2 }); public ActionBlock<int> ab2 = new ActionBlock<int>((i) => { Thread.Sleep(1000); Console.WriteLine("ab2 handle data" + i + " Execute Time:" + DateTime.Now); } , new ExecutionDataflowBlockOptions() { BoundedCapacity = 2 }); public ActionBlock<int> ab3 = new ActionBlock<int>((i) => { Thread.Sleep(1000); Console.WriteLine("ab3 handle data:" + i + " Execute Time:" + DateTime.Now); } , new ExecutionDataflowBlockOptions() { BoundedCapacity = 2 }); public void Test() { bb.LinkTo(ab1); bb.LinkTo(ab2); bb.LinkTo(ab3); for (int i = 0; i < 9; i++) { bb.Post(i); } }
測試代碼能夠從這裏下載。