>>返回《C# 併發編程》html
TPL 數據流(dataflow)庫的功能很強大,可用來建立網格(mesh)和管道(pipleline), 並經過它們以異步方式發送數據。編程
主要命名空間: System.Threading.Tasks.Dataflow
緩存
建立網格時,須要把數據流塊互相鏈接起來。併發
public static void LinkBlockRun() { System.Console.WriteLine("Building Block link."); TransformBlock<int, int> multiplyBlock = new TransformBlock<int, int>(item => { System.Console.WriteLine("first block."); Thread.Sleep(500); return item * 2; }); var subtractBlock = new TransformBlock<int, int>(item => { System.Console.WriteLine("last block."); Thread.Sleep(500); return item - 2; }); var options = new DataflowLinkOptions { PropagateCompletion = true }; multiplyBlock.LinkTo(subtractBlock, options); System.Console.WriteLine("Builded Block link."); var task = Task.Run(async () => { System.Console.WriteLine("Posting"); for (int i = 0; i < 3; i++) { multiplyBlock.Post(i); } System.Console.WriteLine("Posted"); // 第一個塊的完成狀況自動傳遞給第二個塊。 // Complete 後,再進行 Post 是無效的 multiplyBlock.Complete(); await multiplyBlock.Completion; // 連接使用完了 System.Console.WriteLine("Block link Ended."); }); task.Wait(); }
輸出爲:異步
Building Block link. Builded Block link. Posting Posted first block. first block. last block. first block. last block. last block. Block link Ended.
public static void BlockErrorRun() { Task.Run(async () => { try { //單個塊異常類型 var block = new TransformBlock<int, int>(item => { if (item == 1) throw new InvalidOperationException("Blech."); return item * 2; }); block.Post(1); await block.Completion; } catch (InvalidOperationException ex) { System.Console.WriteLine(ex.GetType().Name); } try { //被鏈接的塊異常類型 var multiplyBlock = new TransformBlock<int, int>(item => { if (item == 1) throw new InvalidOperationException("Blech."); return item * 2; }); var subtractBlock = new TransformBlock<int, int>(item => item - 2); multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true }); multiplyBlock.Post(1); await subtractBlock.Completion; } catch (AggregateException ex) { System.Console.WriteLine(ex.GetType().Name); } }).Wait(); }
輸出爲:async
InvalidOperationException AggregateException
public static void BlockDisposeRun() { var multiplyBlock = new TransformBlock<int, int>(item => { System.Console.WriteLine("first block."); Thread.Sleep(500); return item * 2; }); var subtractBlock = new TransformBlock<int, int>(item => { System.Console.WriteLine("last block."); Thread.Sleep(500); return item - 2; }); IDisposable link = multiplyBlock.LinkTo(subtractBlock); multiplyBlock.Post(1); multiplyBlock.Post(2); // 斷開數據流塊的連接。 // 前面的代碼中,數據可能已經經過連接傳遞過去,也可能尚未。 // 在實際應用中,考慮使用代碼塊,而不是調用 Dispose。 link.Dispose(); Thread.Sleep(1200); }
輸出爲:ide
first block. first block.
用數據流塊 的 BoundedCapacity
屬性,來限制目標塊的流量(throttling)。 BoundedCapacity
的默認設置是 DataflowBlockOptions.Unbounded
ui
解決的問題:code
public static void BlockBoundedCapacityRun() { var sourceBlock = new BufferBlock<int>(); var options = new DataflowBlockOptions { BoundedCapacity = 10 //BoundedCapacity = DataflowBlockOptions.Unbounded }; var targetBlockA = new BufferBlock<int>(options); var targetBlockB = new BufferBlock<int>(options); sourceBlock.LinkTo(targetBlockA); sourceBlock.LinkTo(targetBlockB); for (int i = 0; i < 31; i++) { System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} Post:{i % 10}"); sourceBlock.Post(i % 10); } //向水管中注入31個水滴 //因爲分支的限流, targetBlockA 和 targetBlockB 各獲得了10各水滴 var task = Task.Run(() => { int i = 0; System.Console.WriteLine("先處理 targetBlockA 的水滴,此處循環接收會將水滴接幹,可是接不到存在 targetBlockB 中的水滴"); do { IList<int> res; if (targetBlockA.TryReceiveAll(out res)) { i += res.Count; System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} RevcA:{string.Join(",", res)} {i}"); } else { break; } Thread.Sleep(100); } while (true); i = 0; System.Console.WriteLine("處理 targetBlockB 的水滴,只剩下緩衝的水滴"); do { IList<int> res; if (targetBlockB.TryReceiveAll(out res)) { i += res.Count; System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} RevcB:{string.Join(",", res)} {i}"); } else { break; } Thread.Sleep(100); } while (true); }); task.Wait(); }
輸出爲:orm
40:28.026 Post:0 40:28.038 Post:1 40:28.038 Post:2 40:28.038 Post:3 40:28.038 Post:4 40:28.038 Post:5 40:28.038 Post:6 40:28.038 Post:7 40:28.038 Post:8 40:28.038 Post:9 40:28.038 Post:0 40:28.038 Post:1 40:28.038 Post:2 40:28.038 Post:3 40:28.038 Post:4 40:28.038 Post:5 40:28.038 Post:6 40:28.038 Post:7 40:28.038 Post:8 40:28.038 Post:9 40:28.038 Post:0 40:28.038 Post:1 40:28.038 Post:2 40:28.038 Post:3 40:28.038 Post:4 40:28.038 Post:5 40:28.038 Post:6 40:28.038 Post:7 40:28.038 Post:8 40:28.038 Post:9 40:28.038 Post:0 先處理 targetBlockA 的水滴,此處循環接收會將水滴接幹,可是接不到存在 targetBlockB 中的水滴 40:28.043 RevcA:0,1,2,3,4,5,6,7,8,9 10 40:28.149 RevcA:0,1,2,3,4,5,6,7,8,9 20 40:28.249 RevcA:0 21 處理 targetBlockB 的水滴,只剩下緩衝的水滴 40:28.350 RevcB:0,1,2,3,4,5,6,7,8,9 10
限流例子: 在用 I/O 操做的數據填充數據流網格時,能夠設置數據流塊的 BoundedCapacity
屬性。這樣,在網格來不及處理數據時,就不會讀取過多的 I/O 數據,網格也不會緩存全部數據。
public static void BlockParalleRun() { var multiplyBlock = new TransformBlock<int, int>( item => { System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} first block."); Thread.Sleep(100); return item * 2; }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded } ); var subtractBlock = new TransformBlock<int, int>(item => { System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} last block."); Thread.Sleep(100); return item - 2; }); multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true }); var task = Task.Run(async () => { for (int i = 0; i < 7; i++) { multiplyBlock.Post(i); } multiplyBlock.Complete(); await multiplyBlock.Completion; var tk = Task.Run(() => { IList<int> recvResList; //此處延時爲了TryReceiveAll獲取全部數據,防止 subtractBlock 還有數據未接收 Thread.Sleep(1500); if (subtractBlock.TryReceiveAll(out recvResList)) { System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} Revc {string.Join(",", recvResList)}."); } else { System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} Revc null."); } }); await tk; // multiplyBlock 已經調用完成,subtractBlock 的完成狀態依賴於 Link 參數 PropagateCompletion await subtractBlock.Completion; }); task.Wait(); }
輸出爲:
44:16.023 first block. 44:16.023 first block. 44:16.023 first block. 44:16.023 first block. 44:16.023 first block. 44:16.023 first block. 44:16.023 first block. 44:16.146 last block. 44:16.250 last block. 44:16.351 last block. 44:16.452 last block. 44:16.552 last block. 44:16.652 last block. 44:16.753 last block. 44:17.656 Revc -2,0,2,4,6,8,10.
真正的難點: 找出哪些數據流塊須要並行處理
public static void BlockCustomRun() { var block = CreateMyCustomBlock(); for (int i = 0; i < 7; i++) { block.Post(i);//target } var task = Task.Run(async () => { var tk = Task.Run(() => { List<int> recvResList = new List<int>(); //此處延時爲了TryReceiveAll獲取全部數據,防止 subtractBlock 還有數據未接收 while (true) { try { var recvRes = block.Receive();//source recvResList.Add(recvRes); } catch (System.InvalidOperationException) { break; } } Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} Revc {string.Join(",", recvResList)}."); }); block.Complete();//target await block.Completion;//source await tk; }); task.Wait(); } static IPropagatorBlock<int, int> CreateMyCustomBlock() { var multiplyBlock = new TransformBlock<int, int>(item => { int res = item * 2; System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} first block {res}."); Thread.Sleep(100); return res; }); var addBlock = new TransformBlock<int, int>(item => { int res = item + 2; System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} next block {res}."); Thread.Sleep(100); return res; }); var divideBlock = new TransformBlock<int, int>(item => { int res = item / 2; System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} last block {res}."); Thread.Sleep(100); return res; }); var flowCompletion = new DataflowLinkOptions { PropagateCompletion = true }; multiplyBlock.LinkTo(addBlock, flowCompletion); addBlock.LinkTo(divideBlock, flowCompletion); return DataflowBlock.Encapsulate(multiplyBlock, divideBlock); }
輸出爲:
45:00.528 first block 0. 45:00.639 first block 2. 45:00.641 next block 2. 45:00.739 first block 4. 45:00.746 next block 4. 45:00.747 last block 1. 45:00.844 first block 6. 45:00.847 next block 6. 45:00.848 last block 2. 45:00.947 first block 8. 45:00.951 next block 8. 45:00.951 last block 3. 45:01.049 first block 10. 45:01.055 next block 10. 45:01.056 last block 4. 45:01.152 first block 12. 45:01.159 next block 12. 45:01.160 last block 5. 45:01.264 next block 14. 45:01.265 last block 6. 45:01.365 last block 7. 45:01.472 Revc 1,2,3,4,5,6,7.
DataflowBlock.Encapsulate
只會封裝只有一個輸入塊和一個輸出塊的網格。若是一個可重用的網格帶有多個輸入或輸出,就應該把它封裝進一個自定義對象