TPL 數據流庫向具備高吞吐量和低滯後時間的佔用大量 CPU 和 I/O 操做的應用程序的並行化和消息傳遞提供了基礎。 它還能顯式控制緩存數據的方式以及在系統中移動的方式。 爲了更好地瞭解數據流編程模型,請考慮一個以異步方式從磁盤加載圖像並建立複合圖像的應用程序。 傳統編程模型一般須要使用回調和同步對象(例如鎖)來協調任務和訪問共享數據。 經過使用數據流編程模型,您能夠從磁盤讀取時建立處理圖像的數據流對象。 在數據流模型下,您能夠聲明當數據可用時的處理方式,以及數據之間的全部依賴項。 因爲運行時管理數據之間的依賴項,所以一般能夠避免這種要求來同步訪問共享數據。 此外,由於運行時計劃基於數據的異步到達,因此數據流能夠經過有效管理基礎線程提升響應能力和吞吐量。web
System.Threading.Tasks.Dataflow 命名空間提供基於角色的編程模型,用以支持粗粒度數據流和流水線操做任務的進程內消息傳遞。TDP的主要做用就是Buffering Data和Processing Data,在TDF中,有兩個很是重要的接口,ISourceBlock<T> 和ITargetBlock<T>接口。繼承於ISourceBlock<T>的對象時做爲提供數據的數據源對象-生產者,而繼承於ITargetBlock<T>接口類主要是扮演目標對象-消費者。在這個類庫中,System.Threading.Tasks.Dataflow名稱空間下,提供了不少以Block名字結尾的類,ActionBlock,BufferBlock,TransformBlock,BroadcastBlock等9個Block,咱們在開發中一般使用單個或多個Block組合的方式來實現一些功能,如下逐個來簡單介紹一下。編程
BufferBlock緩存
BufferBlock是TDF中最基礎的Block。BufferBlock提供了一個有界限或沒有界限的Buffer,該Buffer中存儲T。該Block很像BlockingCollection<T>。能夠用過Post往裏面添加數據,也能夠經過Receive方法阻塞或異步的的獲取數據,數據處理的順序是FIFO的。它也能夠經過Link向其餘Block輸出數據。負載均衡
簡單的同步的生產者消費者代碼示例:異步
private static BufferBlock<int> m_buffer = new BufferBlock<int>(); // Producer private static void Producer() { while(true) { int item = Produce(); m_buffer.Post(item); } } // Consumer private static void Consumer() { while(true) { int item = m_buffer.Receive(); Process(item); } } // Main public static void Main() { var p = Task.Factory.StartNew(Producer); var c = Task.Factory.StartNew(Consumer); Task.WaitAll(p,c); }
ActionBlockasync
ActionBlock實現ITargetBlock,說明它是消費數據的,也就是對輸入的一些數據進行處理。它在構造函數中,容許輸入一個委託,來對每個進來的數據進行一些操做。若是使用Action(T)委託,那說明每個數據的處理完成須要等待這個委託方法結束,若是使用了Func<TInput, Task>)來構造的話,那麼數據的結束將不是委託的返回,而是Task的結束。默認狀況下,ActionBlock會FIFO的處理每個數據,並且一次只能處理一個數據,一個處理完了再處理第二個,但也能夠經過配置來並行的執行多個數據。函數
先看一個例子:post
public ActionBlock<int> abSync = new ActionBlock<int>((i) => { Thread.Sleep(1000); Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now); } ); public void TestSync() { for (int i = 0; i < 10; i++) { abSync.Post(i); } Console.WriteLine("Post finished"); }
可見,ActionBlock是順序處理數據的,這也是ActionBlock一大特性之一。主線程在往ActionBlock中Post數據之後立刻返回,具體數據的處理是另一個線程來作的。數據是異步處理的,但處理自己是同步的,這樣在必定程度上保證數據處理的準確性。下面的例子是使用async和await。性能
public ActionBlock<int> abSync2 = new ActionBlock<int>(async (i) => { await Task.Delay(1000); Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now); }
雖然仍是1秒鐘處理一個數據,可是處理數據的線程會有不一樣。測試
若是你想異步處理多個消息的話,ActionBlock也提供了一些接口,讓你輕鬆實現。在ActionBlock的構造函數中,能夠提供一個ExecutionDataflowBlockOptions的類型,讓你定義ActionBlock的執行選項,在下面了例子中,咱們定義了MaxDegreeOfParallelism選項,設置爲3。目的的讓ActionBlock中的Item最多能夠3個並行處理。
public ActionBlock<int> abAsync = new ActionBlock<int>((i) => { Thread.Sleep(1000); Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now); } , new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 3 }); public void TestAsync() { for (int i = 0; i < 10; i++) { abAsync.Post(i); } Console.WriteLine("Post finished"); }
運行程序,咱們看見,每3個數據幾乎同時處理,而且他們的線程ID也是不同的。
ActionBlock也有本身的生命週期,全部繼承IDataflowBlock的類型都有Completion屬性和Complete方法。調用Complete方法是讓ActionBlock中止接收數據,而Completion屬性則是一個Task,是在ActionBlock處理完全部數據時候會執行的任務,咱們可使用Completion.Wait()方法來等待ActionBlock完成全部的任務,Completion屬性只有在設置了Complete方法後纔會有效。
public void TestAsync() { for (int i = 0; i < 10; i++) { abAsync.Post(i); } abAsync.Complete(); Console.WriteLine("Post finished"); abAsync.Completion.Wait(); Console.WriteLine("Process finished"); }
TransformBlock
TransformBlock是TDF提供的另外一種Block,顧名思義它經常在數據流中充當數據轉換處理的功能。在TransformBlock內部維護了2個Queue,一個InputQueue,一個OutputQueue。InputQueue存儲輸入的數據,而經過Transform處理之後的數據則放在OutputQueue,OutputQueue就好像是一個BufferBlock。最終咱們能夠經過Receive方法來阻塞的一個一個獲取OutputQueue中的數據。TransformBlock的Completion.Wait()方法只有在OutputQueue中的數據爲0的時候纔會返回。
舉個例子,咱們有一組網址的URL,咱們須要對每一個URL下載它的HTML數據並存儲。那咱們經過以下的代碼來完成:
public TransformBlock<string, string> tbUrl = new TransformBlock<string, string>((url) => { WebClient webClient = new WebClient(); return webClient.DownloadString(new Uri(url)); } public void TestDownloadHTML() { tbUrl.Post("www.baidu.com"); tbUrl.Post("www.sina.com.cn"); string baiduHTML = tbUrl.Receive(); string sinaHTML = tbUrl.Receive(); }
固然,Post操做和Receive操做能夠在不一樣的線程中進行,Receive操做一樣也是阻塞操做,在OutputQueue中有可用的數據時,纔會返回。
TransformManyBlock
TransformManyBlock和TransformBlock很是相似,關鍵的不一樣點是,TransformBlock對應於一個輸入數據只有一個輸出數據,而TransformManyBlock能夠有多個,及能夠從InputQueue中取一個數據出來,而後放多個數據放入到OutputQueue中。
TransformManyBlock<int, int> tmb = new TransformManyBlock<int, int>((i) => { return new int[] { i, i + 1 }; }); ActionBlock<int> ab = new ActionBlock<int>((i) => Console.WriteLine(i)); public void TestSync() { tmb.LinkTo(ab); for (int i = 0; i < 4; i++) { tmb.Post(i); } Console.WriteLine("Finished post"); }
BroadcastBlock
BroadcastBlock的做用不像BufferBlock,它是使命是讓全部和它相聯的目標Block都收到數據的副本,這點從它的命名上面就能夠看出來了。還有一點不一樣的是,BroadcastBlock並不保存數據,在每個數據被髮送到全部接收者之後,這條數據就會被後面最新的一條數據所覆蓋。如沒有目標Block和BroadcastBlock相連的話,數據將被丟棄。但BroadcastBlock總會保存最後一個數據,無論這個數據是否是被髮出去過,若是有一個新的目標Block連上來,那麼這個Block將收到這個最後一個數據。
BroadcastBlock<int> bb = new BroadcastBlock<int>((i) => { return i; }); ActionBlock<int> displayBlock = new ActionBlock<int>((i) => Console.WriteLine("Displayed " + i)); ActionBlock<int> saveBlock = new ActionBlock<int>((i) => Console.WriteLine("Saved " + i)); ActionBlock<int> sendBlock = new ActionBlock<int>((i) => Console.WriteLine("Sent " + i)); public void TestSync() { bb.LinkTo(displayBlock); bb.LinkTo(saveBlock); bb.LinkTo(sendBlock); for (int i = 0; i < 4; i++) { bb.Post(i); } Console.WriteLine("Post finished"); }
若是咱們在Post之後再添加鏈接Block的話,那些Block就只會收到最後一個數據了。
public void TestSync() { for (int i = 0; i < 4; i++) { bb.Post(i); } Thread.Sleep(5000); bb.LinkTo(displayBlock); bb.LinkTo(saveBlock); bb.LinkTo(sendBlock); Console.WriteLine("Post finished"); }
WriteOnceBlock
若是說BufferBlock是最基本的Block,那麼WriteOnceBock則是最最簡單的Block。它最多隻能存儲一個數據,一旦這個數據被髮送出去之後,這個數據仍是會留在Block中,但不會被刪除或被新來的數據替換,一樣全部的接收者都會收到這個數據的備份。
和BroadcastBlock一樣的代碼,可是結果不同:
WriteOnceBlock<int> bb = new WriteOnceBlock<int>((i) => { return i; }); ActionBlock<int> displayBlock = new ActionBlock<int>((i) => Console.WriteLine("Displayed " + i)); ActionBlock<int> saveBlock = new ActionBlock<int>((i) => Console.WriteLine("Saved " + i)); ActionBlock<int> sendBlock = new ActionBlock<int>((i) => Console.WriteLine("Sent " + i)); public void TestSync() { bb.LinkTo(displayBlock); bb.LinkTo(saveBlock); bb.LinkTo(sendBlock); for (int i = 0; i < 4; i++) { bb.Post(i); } Console.WriteLine("Post finished"); }
WriteOnceBock只會接收一次數據。並且始終保留那個數據。
一樣使用Receive方法來獲取數據也是同樣的結果,獲取到的都是第一個數據:
public void TestReceive() { for (int i = 0; i < 4; i++) { bb.Post(i); } Console.WriteLine("Post finished"); Console.WriteLine("1st Receive:" + bb.Receive()); Console.WriteLine("2nd Receive:" + bb.Receive()); Console.WriteLine("3rd Receive:" + bb.Receive()); }
BatchBlock
BatchBlock提供了可以把多個單個的數據組合起來處理的功能,如上圖。應對有些需求須要固定多個數據才能處理的問題。在構造函數中須要制定多少個爲一個Batch,一旦它收到了那個數量的數據後,會打包放在它的OutputQueue中。當BatchBlock被調用Complete告知Post數據結束的時候,會把InputQueue中餘下的數據打包放入OutputQueue中等待處理,而無論InputQueue中的數據量是否是知足構造函數的數量。
BatchBlock<int> bb = new BatchBlock<int>(3); ActionBlock<int[]> ab = new ActionBlock<int[]>((i) => { string s = string.Empty; foreach (int m in i) { s += m + " "; } Console.WriteLine(s); }); public void TestSync() { bb.LinkTo(ab); for (int i = 0; i < 10; i++) { bb.Post(i); } bb.Complete(); Console.WriteLine("Finished post"); }
BatchBlock執行數據有兩種模式:貪婪模式和非貪婪模式。貪婪模式是默認的。貪婪模式是指任何Post到BatchBlock,BatchBlock都接收,並等待個數滿了之後處理。非貪婪模式是指BatchBlock須要等到構造函數中設置的BatchSize個數的Source都向BatchBlock發數據,Post數據的時候纔會處理。否則都會留在Source的Queue中。也就是說BatchBlock可使用在每次從N個Source那個收一個數據打包處理或從1個Source那裏收N個數據打包處理。這裏的Source是指其餘的繼承ISourceBlock的,用LinkTo鏈接到這個BatchBlock的Block。
在另外一個構造參數中GroupingDataflowBlockOptions,能夠經過設置Greedy屬性來選擇是否貪婪模式和MaxNumberOfGroups來設置最大產生Batch的數量,若是到達了這個數量,BatchBlock將不會再接收數據。
JoinBlock
JoinBlock一看名字就知道是須要和兩個或兩個以上的Source Block相鏈接的。它的做用就是等待一個數據組合,這個組合須要的數據都到達了,它纔會處理數據,並把這個組合做爲一個Tuple傳遞給目標Block。舉個例子,若是定義了JoinBlock<int, string>類型,那麼JoinBlock內部會有兩個ITargetBlock,一個接收int類型的數據,一個接收string類型的數據。那只有當兩個ITargetBlock都收到各自的數據後,纔會放到JoinBlock的OutputQueue中,輸出。
JoinBlock<int, string> jb = new JoinBlock<int, string>(); ActionBlock<Tuple<int, string>> ab = new ActionBlock<Tuple<int, string>>((i) => { Console.WriteLine(i.Item1 + " " + i.Item2); }); public void TestSync() { jb.LinkTo(ab); for (int i = 0; i < 5; i++) { jb.Target1.Post(i); } for (int i = 5; i > 0; i--) { Thread.Sleep(1000); jb.Target2.Post(i.ToString()); } Console.WriteLine("Finished post"); }
BatchedJoinBlock
BatchedJoinBlock一看就是BacthBlock和JoinBlick的組合。JoinBlick是組合目標隊列的一個數據,而BatchedJoinBlock是組合目標隊列的N個數據,固然這個N能夠在構造函數中配置。若是咱們定義的是BatchedJoinBlock<int, string>, 那麼在最後的OutputQueue中存儲的是Tuple<IList<int>, IList<string>>,也就是說最後獲得的數據是Tuple<IList<int>, IList<string>>。它的行爲是這樣的,仍是假設上文的定義,BatchedJoinBlock<int, string>, 構造BatchSize輸入爲3。那麼在這個BatchedJoinBlock種會有兩個ITargetBlock,會接收Post的數據。那何時會生成一個Tuple<IList<int>,IList<string>>到OutputQueue中呢,測試下來並非咱們想的須要有3個int數據和3個string數據,而是隻要2個ITargetBlock中的數據個數加起來等於3就能夠了。3和0,2和1,1和2或0和3的組合都會生成Tuple<IList<int>,IList<string>>到OutputQueue中。能夠參看下面的例子:
BatchedJoinBlock<int, string> bjb = new BatchedJoinBlock<int, string>(3); ActionBlock<Tuple<IList<int>, IList<string>>> ab = new ActionBlock<Tuple<IList<int>, IList<string>>>((i) => { Console.WriteLine("-----------------------------"); foreach (int m in i.Item1) { Console.WriteLine(m); }; foreach (string s in i.Item2) { Console.WriteLine(s); }; }); public void TestSync() { bjb.LinkTo(ab); for (int i = 0; i < 5; i++) { bjb.Target1.Post(i); } for (int i = 5; i > 0; i--) { bjb.Target2.Post(i.ToString()); } Console.WriteLine("Finished post"); }
最後剩下的一個數據1,因爲沒有滿3個,因此一直被保留在Target2中。
TDF中最有用的功能之一就是多個Block之間能夠組合應用。ISourceBlock能夠鏈接ITargetBlock,一對一,一對多,或多對多。下面的例子就是一個TransformBlock和一個ActionBlock的組合。TransformBlock用來把數據*2,並轉換成字符串,而後把數據扔到ActionBlock中,而ActionBlock則用來最後的處理數據打印結果。
public ActionBlock<string> abSync = new ActionBlock<string>((i) => { Thread.Sleep(1000); Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now); } ); public TransformBlock<int, string> tbSync = new TransformBlock<int, string>((i) => { i = i * 2; return i.ToString(); } ); public void TestSync() { tbSync.LinkTo(abSync); for (int i = 0; i < 10; i++) { tbSync.Post(i); } tbSync.Complete(); Console.WriteLine("Post finished"); tbSync.Completion.Wait(); Console.WriteLine("TransformBlock process finished"); }
TDF提供的一些Block,經過對這些Block配置和組合,能夠知足不少的數據處理的場景。這一篇將繼續介紹與這些Block配置的相關類,和挖掘一些高級功能。
在一些Block的構造函數中,咱們經常能夠看見須要你輸入DataflowBlockOptions 類型或者它的兩個派生類型ExecutionDataflowBlockOptions 和 GroupingDataflowBlockOptions。
DataflowBlockOptions
DataflowBlockOptions有五個屬性:BoundedCapacity,CancellationToken,MaxMessagesPerTask,NameFormat和TaskScheduler。
用BoundedCapacity來限定容量
這個屬性用來限制一個Block中最多能夠緩存數據項的數量,大多數Block都支持這個屬性,這個值默認是DataflowBlockOptions.Unbounded = -1,也就是說沒有限制。開發人員能夠制定這個屬性設置數量的上限。那後面的新數據將會延遲。好比說用一個BufferBlock鏈接一個ActionBlock,若是在ActionBlock上面設置了上限,ActionBlock處理的操做速度比較慢,留在ActionBlock中的數據到達了上限,那麼餘下的數據將留在BufferBlock中,直到ActionBlock中的數據量低於上限。這種狀況經常會發生在生產者生產的速度大於消費者速度的時候,致使的問題是內存愈來愈大,數據操做愈來愈延遲。咱們能夠經過一個BufferBlock鏈接多個ActionBlock來解決這樣的問題,也就是負載均衡。一個ActionBlock滿了,就會放到另一個ActionBlock中去了。
用CancellationToken來取消操做
TPL中經常使用的類型。在Block的構造函數中放入CancellationToken,Block將在它的整個生命週期中全程監控這個對象,只要在這個Block結束運行(調用Complete方法)前,用CancellationToken發送取消請求,該Block將會中止運行,若是Block中還有沒有處理的數據,那麼將不會再被處理。
用MaxMessagesPerTask控制公平性
每個Block內部都是異步處理,都是使用TPL的Task。TDF的設計是在保證性能的狀況下,儘可能使用最少的任務對象來完成數據的操做,這樣效率會高一些,一個任務執行完成一個數據之後,任務對象並不會銷燬,而是會保留着去處理下一個數據,直到沒有數據處理的時候,Block纔會回收掉這個任務對象。可是若是數據來自於多個Source,公平性就很難保證。從其餘Source來的數據必需要等到早前的那些Source的數據都處理完了才能被處理。這時咱們就能夠經過MaxMessagesPerTask來控制。這個屬性的默認值仍是DataflowBlockOptions.Unbounded=-1,表示沒有上限。假如這個數值被設置爲1的話,那麼單個任務只會處理一個數據。這樣就會帶來極致的公平性,可是將帶來更多的任務對象消耗。
用NameFormat來定義Block名稱
MSDN上說屬性NameFormat用來獲取或設置查詢塊的名稱時要使用的格式字符串。
Block的名字Name=string.format(NameFormat, block.GetType ().Name, block.Completion.Id)。因此當咱們輸入」{0}」的時候,名字就是block.GetType ().Name,若是咱們數據的是」{1}」,那麼名字就是block.Completion.Id。若是是「{2}」,那麼就會拋出異常。
用TaskScheduler來調度Block行爲
TaskScheduler是很是重要的屬性。一樣這個類型來源於TPL。每一個Block裏面都使用TaskScheduler來調度行爲,不管是源Block和目標Block之間的數據傳遞,仍是用戶自定義的執行數據方法委託,都是使用的TaskScheduler。若是沒有特別設置的話,將使用TaskScheduler.Default(System.Threading.Tasks.ThreadPoolTaskScheduler)來調度。咱們可使用其餘的一些繼承於TaskScheduler的類型來設置這個調度器,一旦設置了之後,Block中的全部行爲都會使用這個調度器來執行。.Net Framework 4中內建了兩個Scheduler,一個是默認的ThreadPoolTaskScheduler,另外一個是用於UI線程切換的SynchronizationContextTaskScheduler。若是你使用的Block設計到UI的話,那可使用後者,這樣在UI線程切換上面將更加方便。
.Net Framework 4.5 中,還有一個類型被加入到System.Threading.Tasks名稱空間下:ConcurrentExclusiveSchedulerPair。這個類是兩個TaskScheduler的組合。它提供兩個TaskScheduler:ConcurrentScheduler和ExclusiveScheduler;咱們能夠把這兩個TaskScheduler構造進要使用的Block中。他們保證了在沒有排他任務的時候(使用ExclusiveScheduler的任務),其餘任務(使用ConcurrentScheduler)能夠同步進行,當有排他任務在運行的時候,其餘任務都不能運行。其實它裏面就是一個讀寫鎖。這在多個Block操做共享資源的問題上是一個很方便的解決方案。
public ActionBlock<int> readerAB1; public ActionBlock<int> readerAB2; public ActionBlock<int> readerAB3; public ActionBlock<int> writerAB1; public BroadcastBlock<int> bb = new BroadcastBlock<int>((i) => { return i; }); public void Test() { ConcurrentExclusiveSchedulerPair pair = new ConcurrentExclusiveSchedulerPair(); readerAB1 = new ActionBlock<int>((i) => { Console.WriteLine("ReaderAB1 begin handling." + " Execute Time:" + DateTime.Now); Thread.Sleep(500); } , new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ConcurrentScheduler }); readerAB2 = new ActionBlock<int>((i) => { Console.WriteLine("ReaderAB2 begin handling." + " Execute Time:" + DateTime.Now); Thread.Sleep(500); } , new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ConcurrentScheduler }); readerAB3 = new ActionBlock<int>((i) => { Console.WriteLine("ReaderAB3 begin handling." + " Execute Time:" + DateTime.Now); Thread.Sleep(500); } , new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ConcurrentScheduler }); writerAB1 = new ActionBlock<int>((i) => { Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine("WriterAB1 begin handling." + " Execute Time:" + DateTime.Now); Console.ResetColor(); Thread.Sleep(3000); } , new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ExclusiveScheduler }); bb.LinkTo(readerAB1); bb.LinkTo(readerAB2); bb.LinkTo(readerAB3); Task.Run(() => { while (true) { bb.Post(1); Thread.Sleep(1000); } }); Task.Run(() => { while (true) { Thread.Sleep(6000); writerAB1.Post(1); } }); }
用MaxDegreeOfParallelism來並行處理
一般,Block中處理數據都是單線程的,一次只能處理一個數據,好比說ActionBlock中自定義的代理。使用MaxDegreeOfParallelism可讓你並行處理這些數據。屬性的定義是最大的並行處理個數。若是定義成-1的話,那就是沒有限制。用戶須要在實際狀況中選擇這個值的大小,並非越大越好。若是是平行處理的話,還應該考慮是否有共享資源。
TDF中的負載均衡
咱們可使用Block很方便的構成一個生產者消費者的模式來處理數據。當生產者產生數據的速度快於消費者的時候,消費者Block的Buffer中的數據會愈來愈多,消耗大量的內存,數據處理也會延時。這時,咱們能夠用一個生產者Block鏈接多個消費者Block來解決這個問題。因爲多個消費者Block必定是並行處理,因此對共享資源的處理必定要作同步處理。
使用BoundedCapacity屬性來實現
當鏈接多個ActionBlock的時候,能夠經過設置ActionBlock的BoundedCapacity屬性。當第一個滿了,就會放到第二個,第二個滿了就會放到第三個。
public BufferBlock<int> bb = new BufferBlock<int>(); public ActionBlock<int> ab1 = new ActionBlock<int>((i) => { Thread.Sleep(1000); Console.WriteLine("ab1 handle data" + i + " Execute Time:" + DateTime.Now); } , new ExecutionDataflowBlockOptions() { BoundedCapacity = 2 }); public ActionBlock<int> ab2 = new ActionBlock<int>((i) => { Thread.Sleep(1000); Console.WriteLine("ab2 handle data" + i + " Execute Time:" + DateTime.Now); } , new ExecutionDataflowBlockOptions() { BoundedCapacity = 2 }); public ActionBlock<int> ab3 = new ActionBlock<int>((i) => { Thread.Sleep(1000); Console.WriteLine("ab3 handle data:" + i + " Execute Time:" + DateTime.Now); } , new ExecutionDataflowBlockOptions() { BoundedCapacity = 2 }); public void Test() { bb.LinkTo(ab1); bb.LinkTo(ab2); bb.LinkTo(ab3); for (int i = 0; i < 9; i++) { bb.Post(i); } }