.Net Core中利用TPL(任務並行庫)構建Pipeline處理Dataflow

在學習的過程當中,看一些一線的技術文檔很吃力,並且考慮到國內那些技術牛人英語都不差的,要向他們看齊,因此天天下班都在瘋狂地背單詞,博客有些日子沒有更新了,見諒見諒 Smile with tongue outgit

什麼是TPL?

Task Parallel Library (TPL), 在.NET Framework 4微軟推出TPL,並把TPL做爲編寫多線程和並行代碼的首選方式,可是,在國內,到目前爲止好像用的人並很少。(TPL)是System.ThreadingSystem.Threading.Tasks命名空間中的一組公共類型和API 。TPL的目的是經過簡化嚮應用程序添加並行性和併發性的過程來提升開發人員的工做效率,TPL動態地擴展併發度,以最有效地使用全部可用的處理器。經過使用TPL,您能夠最大限度地提升代碼的性能,讓咱們專一於程序自己而不用去關注負責的多線程管理。github

出自: https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/task-parallel-library-tplapi

爲何使用TPL?

在上面介紹了什麼是TPL,可能你們仍是雲裏霧裏,不知道TPL的好處究竟是什麼。多線程

我在youtube上找到了一個優秀的視頻,講述的是TPL和Thread的區別,我以爲對比一下,TPL的優點很快就能體現出來,若是你們能打開的話建議你們必定要看看。併發

地址是:https://www.youtube.com/watch?v=No7QqSc5cl8性能

現現在,咱們的電腦的CPU怎麼也是2核以上,下面假設個人電腦是四核的,咱們來作一個實驗。學習

使用Thread

代碼中,若是使用Thread來處理任務,若是不作特出的處理,只是thread.Start(),監測電腦的核心的使用狀況是下面這樣的。測試

TIM截圖20181003221820

每一條線表明CPU某個核心的使用狀況,明顯,隨着代碼Run起來,其實只有某一個核心的使用率迅速提高,其餘核心並沒有明顯波動,爲何會這樣呢?ui

 

TIM截圖20181003221925

原來,默認狀況下,操做系統並不會調用全部的核心來處理任務,即便咱們使用多線程,其實也是在一個核內心面運行這些Thread,並且Thread之間涉及到線程同步等問題,其實,效率也不會明顯提升。spa

使用TPL

在代碼中,引入了TPL來處理相同的任務,再次監視各個核心的使用狀況,效果就變得大相徑庭,以下。

TIM截圖20181003222605

能夠看到各個核心的使用狀況都同時有了明顯的提升。

TIM截圖20181003222044

說明使用TPL後,再也不是使用CPU的某個核心來處理任務了,而是TPL自動把任務分攤給每一個核心來處理,處理效率可想而知,理論上會有明顯提高的(爲何說理論上?和使用多線程同樣,各個核心之間的同步管理也是要佔用必定的效率的,因此對於並不複雜的任務,使用TPL可能拔苗助長)。

實驗結果出自https://www.youtube.com/watch?v=No7QqSc5cl8

看了這個實驗講解,是否是理解了上面所說的這句。

TPL的目的是經過簡化嚮應用程序添加並行性和併發性的過程來提升開發人員的工做效率,TPL動態地擴展併發度,以最有效地使用全部可用的處理器。

 

因此說,使用TPL 來處理多線程任務可讓你沒必要吧把精力放在如何提升多線程處理效率上,由於這一切,TPL 能自動地幫你完成。

TPL Dataflow?

TPL處理Dataflow是TPL強大功能中的一種,它提供一套完整的數據流組件,這些數據流組件統稱爲TPL Dataflow Library,那麼,在什麼場景下適合使用TPL Dataflow Library呢?

官方舉的一個 栗子 再恰當不過:

例如,經過TPL Dataflow提供的功能來轉換圖像,執行光線校訂或防紅眼,能夠建立管道數據流組件,管道中的每一個功能能夠並行執行,而且TPL能自動控制圖像流在不一樣線程之間的同步,再也不須要Thread 中的Lock。

TPL數據流庫由Block組成,Block是緩衝和處理數據的單元,TPL定義了三種最基礎的Block。

source blocksSystem.Threading.Tasks.Dataflow.ISourceBlock <TOutput>),源塊充當數據源而且能夠從中讀取。

target blocksSystem.Threading.Tasks.Dataflow.ITargetBlock <TInput>,目標塊充當數據接收器並能夠寫入。

propagator blocksSystem.Threading.Tasks.Dataflow.IPropagatorBlock <TInput,TOutput>),傳播器塊充當源塊和目標塊,而且能夠被讀取和寫入。它繼承自ISourceBlock <TOutput>ITargetBlock <TInput>

 

還有其餘一些個性化的Block,但其實他們都是對這三種Block進行一些擴充,能夠結合下面的代碼來理解這三種Block.

Code Show

1.source block 和 target block 合併成propagator block.

 

private IPropagatorBlock<string, Dictionary<int, string>> Process1()
        {
            var bufferBlock = new BufferBlock<Dictionary<int, string>>();
            var actionBlock = new ActionBlock<string>(x =>
              {
                  Console.WriteLine($"Process1 處理中:{x}");
                  Thread.Sleep(5000);
                  var dic = new Dictionary<int, string> { { 0, x } };
                  dic.Add(1, "Process1");
                  bufferBlock.Post(dic);
              }, new ExecutionDataflowBlockOptions
              {
                  MaxDegreeOfParallelism = _maxDegreeOfParallelism
              });
            actionBlock.Completion.ContinueWith(_ =>
            {
                Console.WriteLine($"Process1 Complete,State{_.Status}");
                bufferBlock.Complete();
            });
            return DataflowBlock.Encapsulate(actionBlock, bufferBlock);
        }

 

能夠看到,我定義了BufferBlock和ActionBlock,它們分別繼承於ISourceBlock 和 ITargetBlock ,因此說,他們其實就是源塊和目標塊,在new actionBlock()中傳入了一個Action<String>,該Action就是該Block所執行的任務。 最後,DataflowBlock.Encapsulate(actionBlock, bufferBlock)把源塊和目標塊合併成了一個傳遞塊。

2.TransformBlock

private IPropagatorBlock<Dictionary<int, string>, Dictionary<int, string>> Process2()
        {
            var block = new TransformBlock<Dictionary<int, string>, Dictionary<int, string>>(dic =>
                  {
                      Console.WriteLine($"Process2 處理中:{dic.First().Value}");
                      Thread.Sleep(5000);
                      dic.Add(2, "Process2");
                      return dic;
                  }, new ExecutionDataflowBlockOptions
                  {
                      MaxDegreeOfParallelism = _maxDegreeOfParallelism
                  }
               );

            block.Completion.ContinueWith(_ =>
            {
                Console.WriteLine($"Process2 Complete,State{_.Status}");
            });

            return block;
        }

TransfromBlock繼承了IPropagatorBlock,因此它自己就是一個傳遞塊,因此它除了要處理出入數據,還要返回數據,因此給new TransformBlock()中傳入的是Func<TInput, TOutput>而不是Action<TInput>.

 

3.TargetBlock來收尾

private ITargetBlock<Dictionary<int, string>> Process3()
        {
            var actionBlock = new ActionBlock<Dictionary<int, string>>(dic =>
               {
                   Console.WriteLine($"Process3 處理中:{dic.First().Value}");
                   Thread.Sleep(5000);
                   dic.Add(3, "Process3");
                   Console.WriteLine("Dic中的內容以下:");
                   foreach (var item in dic)
                   {
                       Console.Write($"{item.Key}:{item.Value}||");
                   }
                   Console.WriteLine();
               }, new ExecutionDataflowBlockOptions
               {
                   MaxDegreeOfParallelism = _maxDegreeOfParallelism
               });
            return actionBlock;
        }

TargetBlock只能寫入並處理數據,不能讀取,因此TargetBlock適合做爲Pipeline的最後一個Block。

 

4.控制每一個Block的並行度

在在構造TargetBlock(包括其子類)的時候,能夠傳入ExecutionDataflowBlockOptions參數,ExecutionDataflowBlockOptions對象裏面有一個MaxDegreeOfParallelism屬性,經過改制,能夠控制該Block的同時處理任務的數量(能夠理解成線程數)。

new ExecutionDataflowBlockOptions
               {
                   MaxDegreeOfParallelism = _maxDegreeOfParallelism
               }

 

5.構建Pipeline,鏈接Block

public Task Builder()
        {
            _startBlock = Process1();
            var process2Block = Process2();
            var process3Block = Process3();

            _startBlock.LinkTo(process2Block, new DataflowLinkOptions() { PropagateCompletion = true });

            process2Block.LinkTo(process3Block, new DataflowLinkOptions() { PropagateCompletion = true });

            process3Block.Completion.ContinueWith(_ =>
            {
                Console.WriteLine($"Process3 Complete,State{_.Status}");
                Console.WriteLine("全部任務處理完成");
            });

            return process3Block.Completion;
        }

經過

ISourceBlock<TOutput>.LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOption)

方法,能夠把Block鏈接起來,即構建Pipeline,當DataflowLinkOptions對象的PropagateCompletion屬性爲true時,SorceBlock任務處理完成是,會把TargetBlock也標記爲完成。

 

Block被標記爲Complete 後,沒法傳入新的數據了,即不能再處理新的任務了。

 

6.Pipeline的運行

public void Process(string[] inputs)
        {
            if (inputs == null)
                return;
            foreach (var input in inputs)
            {
                _startBlock.Post(input);
            }
            _startBlock.Complete();
        }

Pipeline構建好後,咱們只須要給第一個Block傳入數據,該數據就會在管道內流動起來了,全部數據傳入完成後,調用Block的Complete方法,把該Block標記爲完成,就不能夠再往裏面Post數據了。

 

完整代碼以下:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace Tpl.Dataflow
{
    public class Pipeline
    {
        IPropagatorBlock<string, Dictionary<int, string>> _startBlock;
        private int _maxDegreeOfParallelism;

        public Pipeline(int maxDegreeOfParallelism)
        {
            _maxDegreeOfParallelism = maxDegreeOfParallelism;
        }

        public void Process(string[] inputs)
        {
            if (inputs == null)
                return;
            foreach (var input in inputs)
            {
                _startBlock.Post(input);
            }
            _startBlock.Complete();
        }

        public Task Builder()
        {
            _startBlock = Process1();
            var process2Block = Process2();
            var process3Block = Process3();

            _startBlock.LinkTo(process2Block, new DataflowLinkOptions() { PropagateCompletion = true });

            process2Block.LinkTo(process3Block, new DataflowLinkOptions() { PropagateCompletion = true });

            process3Block.Completion.ContinueWith(_ =>
            {
                Console.WriteLine($"Process3 Complete,State{_.Status}");
                Console.WriteLine("全部任務處理完成");
            });

            return process3Block.Completion;
        }

        private IPropagatorBlock<string, Dictionary<int, string>> Process1()
        {
            var bufferBlock = new BufferBlock<Dictionary<int, string>>();
            var actionBlock = new ActionBlock<string>(x =>
              {
                  Console.WriteLine($"Process1 處理中:{x}");
                  Thread.Sleep(5000);
                  var dic = new Dictionary<int, string> { { 0, x } };
                  dic.Add(1, "Process1");
                  bufferBlock.Post(dic);
              }, new ExecutionDataflowBlockOptions
              {
                  MaxDegreeOfParallelism = _maxDegreeOfParallelism
              });
            actionBlock.Completion.ContinueWith(_ =>
            {
                Console.WriteLine($"Process1 Complete,State{_.Status}");
                bufferBlock.Complete();
            });
            return DataflowBlock.Encapsulate(actionBlock, bufferBlock);
        }

        private IPropagatorBlock<Dictionary<int, string>, Dictionary<int, string>> Process2()
        {
            var block = new TransformBlock<Dictionary<int, string>, Dictionary<int, string>>(dic =>
                  {
                      Console.WriteLine($"Process2 處理中:{dic.First().Value}");
                      Thread.Sleep(5000);
                      dic.Add(2, "Process2");
                      return dic;
                  }, new ExecutionDataflowBlockOptions
                  {
                      MaxDegreeOfParallelism = _maxDegreeOfParallelism
                  }
               );

            block.Completion.ContinueWith(_ =>
            {
                Console.WriteLine($"Process2 Complete,State{_.Status}");
            });

            return block;
        }

        private ITargetBlock<Dictionary<int, string>> Process3()
        {
            var actionBlock = new ActionBlock<Dictionary<int, string>>(dic =>
               {
                   Console.WriteLine($"Process3 處理中:{dic.First().Value}");
                   Thread.Sleep(5000);
                   dic.Add(3, "Process3");
                   Console.WriteLine("Dic中的內容以下:");
                   foreach (var item in dic)
                   {
                       Console.Write($"{item.Key}:{item.Value}||");
                   }
                   Console.WriteLine();
               }, new ExecutionDataflowBlockOptions
               {
                   MaxDegreeOfParallelism = _maxDegreeOfParallelism
               });
            return actionBlock;
        }
    }
}

 

Main方法以下:

static void Main(string[] args)
        {
            Console.WriteLine("請輸入管道併發數:");
            if (int.TryParse(Console.ReadLine(), out int max))
            {
                var pipeline = new Pipeline(max);
                var task = pipeline.Builder();
                pipeline.Process(new[] { "", "", "", "" });
                task.Wait();
                Console.ReadKey();
            }
        }

 

測試運行如圖:

image

我來解釋一下,爲何是這麼運行的,由於把管道的並行度設置爲2,因此每一個Block能夠同時處理兩個任務,因此,若是給管道傳入四個字符 ,每一個字符做爲一個任務,假設傳入  「碼農阿宇」四個任務,會時這樣的一個過程…..

  1. 碼   農  兩個首先進入Process1,
  2. 處理完成後,碼  農   兩個任務流出,
  3. Process1位置空出來, 阿  宇 兩個任務流入 Process1,
  4. 碼  農 兩個任務流向 Process2,
  5. 阿  宇 從 Process1 處理完成後流出,此時Process1任務完成
  6. 碼  農 流出 Process2 ,同時 阿 宇  流入 Process2 ……
  7. 依此類推….

 

該項目Github地址: https://github.com/liuzhenyulive/Tpl-Dataflow-Demo

參考文獻:https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library

碼字不易,若是對您有用,歡迎推薦和關注,謝謝Flirt male

相關文章
相關標籤/搜索