TPL DataFlow初探(一)

屬性TPL Dataflow是微軟面向高併發應用而推出的一個類庫。藉助於異步消息傳遞與管道,它能夠提供比線程池更好的控制,也比手工線程方式具有更好的性能。咱們經常能夠消息傳遞,生產-消費模式或Actor-Agent模式中使用。在TDF是構建於Task Parallel Library (TPL)之上的,它是咱們開發高性能,高併發的應用程序的又一利器。您能夠在NuGet中下載使用,目前最新的版本只支持.net framework 4.5。最先支持.net framework 4.0是做爲Microsoft Visual Studio Async CTP中的一部分發布的,你能夠在這裏下載到。html

TDP的主要做用就是Buffering Data和Processing Data,在TDF中,有兩個很是重要的接口,ISourceBlock<T> 和ITargetBlock<T>接口。繼承於ISourceBlock<T>的對象時做爲提供數據的數據源對象-生產者,而繼承於ITargetBlock<T>接口類主要是扮演目標對象-消費者。在這個類庫中,System.Threading.Tasks.Dataflow名稱空間下,提供了不少以Block名字結尾的類,ActionBlock,BufferBlock,TransformBlock,BroadcastBlock等9個Block,咱們在開發中一般使用單個或多個Block組合的方式來實現一些功能。如下咱們逐個來簡單介紹一下。web

 

BufferBlock併發

BufferBlock是TDF中最基礎的Block。BufferBlock提供了一個有界限或沒有界限的Buffer,該Buffer中存儲T。該Block很像BlockingCollection<T>。能夠用過Post往裏面添加數據,也能夠經過Receive方法阻塞或異步的的獲取數據,數據處理的順序是FIFO的。它也能夠經過Link向其餘Block輸出數據。異步

image

 

 

簡單的同步的生產者消費者代碼示例:async

 
private static BufferBlock<int> m_buffer = new BufferBlock<int>();

// Producer
private static void Producer()
{
    while(true)
    {
        int item = Produce();
        m_buffer.Post(item);
    }
}

// Consumer
private static void Consumer()
{
    while(true)
    {
        int item = m_buffer.Receive();
        Process(item);
    }
}

// Main
public static void Main()
{
    var p = Task.Factory.StartNew(Producer);
    var c = Task.Factory.StartNew(Consumer);
    Task.WaitAll(p,c);
}
 

 

 

 

 

ActionBlock函數

 

ActionBlock實現ITargetBlock,說明它是消費數據的,也就是對輸入的一些數據進行處理。它在構造函數中,容許輸入一個委託,來對每個進來的數據進行一些操做。若是使用Action(T)委託,那說明每個數據的處理完成須要等待這個委託方法結束,若是使用了Func<TInput, Task>)來構造的話,那麼數據的結束將不是委託的返回,而是Task的結束。默認狀況下,ActionBlock會FIFO的處理每個數據,並且一次只能處理一個數據,一個處理完了再處理第二個,但也能夠經過配置來並行的執行多個數據。高併發

 

image

先看一個例子:post

 
public ActionBlock<int> abSync = new ActionBlock<int>((i) =>
            {
                Thread.Sleep(1000);
                Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now);
            }
        );

        public void TestSync()
        {
            for (int i = 0; i < 10; i++)
            {
                abSync.Post(i);
            }

            Console.WriteLine("Post finished");
        }
 

 

6{%7WGLQU90CW4[(OF)H6DC

 

可見,ActionBlock是順序處理數據的,這也是ActionBlock一大特性之一。主線程在往ActionBlock中Post數據之後立刻返回,具體數據的處理是另一個線程來作的。數據是異步處理的,但處理自己是同步的,這樣在必定程度上保證數據處理的準確性。下面的例子是使用async和await。性能

public ActionBlock<int> abSync2 = new ActionBlock<int>(async (i) =>
        {
            await Task.Delay(1000);
            Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now);
        }

U55C4LS4`0SY0O)}[5W]{%C

雖然仍是1秒鐘處理一個數據,可是處理數據的線程會有不一樣。測試

 

若是你想異步處理多個消息的話,ActionBlock也提供了一些接口,讓你輕鬆實現。在ActionBlock的構造函數中,能夠提供一個ExecutionDataflowBlockOptions的類型,讓你定義ActionBlock的執行選項,在下面了例子中,咱們定義了MaxDegreeOfParallelism選項,設置爲3。目的的讓ActionBlock中的Item最多能夠3個並行處理。

 
public ActionBlock<int> abAsync = new ActionBlock<int>((i) =>
        {
            Thread.Sleep(1000);
            Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now);
        }
        , new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 3 });

public void TestAsync()
        {
            for (int i = 0; i < 10; i++)
            {
                abAsync.Post(i);
            }
            Console.WriteLine("Post finished");
        }
 

XVGW}JJK7YY7(%E}])11J7V

 

 

運行程序,咱們看見,每3個數據幾乎同時處理,而且他們的線程ID也是不同的。

 

 

ActionBlock也有本身的生命週期,全部繼承IDataflowBlock的類型都有Completion屬性和Complete方法。調用Complete方法是讓ActionBlock中止接收數據,而Completion屬性則是一個Task,是在ActionBlock處理完全部數據時候會執行的任務,咱們可使用Completion.Wait()方法來等待ActionBlock完成全部的任務,Completion屬性只有在設置了Complete方法後纔會有效。

 

 

 
public void TestAsync()
        {
            for (int i = 0; i < 10; i++)
            {
                abAsync.Post(i);
            }
            abAsync.Complete();
            Console.WriteLine("Post finished");
            abAsync.Completion.Wait();
            Console.WriteLine("Process finished");
        }
 

 

 

$WSKZ$6M1`[J7T_W@~Y~WZ3

 

TransformBlock

TransformBlock是TDF提供的另外一種Block,顧名思義它經常在數據流中充當數據轉換處理的功能。在TransformBlock內部維護了2個Queue,一個InputQueue,一個OutputQueue。InputQueue存儲輸入的數據,而經過Transform處理之後的數據則放在OutputQueue,OutputQueue就好像是一個BufferBlock。最終咱們能夠經過Receive方法來阻塞的一個一個獲取OutputQueue中的數據。TransformBlock的Completion.Wait()方法只有在OutputQueue中的數據爲0的時候纔會返回。

image

舉個例子,咱們有一組網址的URL,咱們須要對每一個URL下載它的HTML數據並存儲。那咱們經過以下的代碼來完成:

 
public TransformBlock<string, string> tbUrl = new TransformBlock<string, string>((url) =>
        {
            WebClient webClient = new WebClient();
            return webClient.DownloadString(new Uri(url));
        }

        public void TestDownloadHTML()
        {
            tbUrl.Post("www.baidu.com");
            tbUrl.Post("www.sina.com.cn");

            string baiduHTML = tbUrl.Receive();
            string sinaHTML = tbUrl.Receive();
        }
 

固然,Post操做和Receive操做能夠在不一樣的線程中進行,Receive操做一樣也是阻塞操做,在OutputQueue中有可用的數據時,纔會返回。

 

TransformManyBlock

TransformManyBlock和TransformBlock很是相似,關鍵的不一樣點是,TransformBlock對應於一個輸入數據只有一個輸出數據,而TransformManyBlock能夠有多個,及能夠從InputQueue中取一個數據出來,而後放多個數據放入到OutputQueue中。

image

 

 
TransformManyBlock<int, int> tmb = new TransformManyBlock<int, int>((i) => { return new int[] { i, i + 1 }; });

        ActionBlock<int> ab = new ActionBlock<int>((i) => Console.WriteLine(i));

        public void TestSync()
        {
            tmb.LinkTo(ab);

            for (int i = 0; i < 4; i++)
            {
                tmb.Post(i);
            }

            Console.WriteLine("Finished post");
        }
 

GC(K]J4DB4UKP$S@8C9ZVMV

 

BroadcastBlock

BroadcastBlock的做用不像BufferBlock,它是使命是讓全部和它相聯的目標Block都收到數據的副本,這點從它的命名上面就能夠看出來了。還有一點不一樣的是,BroadcastBlock並不保存數據,在每個數據被髮送到全部接收者之後,這條數據就會被後面最新的一條數據所覆蓋。如沒有目標Block和BroadcastBlock相連的話,數據將被丟棄。但BroadcastBlock總會保存最後一個數據,無論這個數據是否是被髮出去過,若是有一個新的目標Block連上來,那麼這個Block將收到這個最後一個數據。

image

 
        BroadcastBlock<int> bb = new BroadcastBlock<int>((i) => { return i; });

        ActionBlock<int> displayBlock = new ActionBlock<int>((i) => Console.WriteLine("Displayed " + i));

        ActionBlock<int> saveBlock = new ActionBlock<int>((i) => Console.WriteLine("Saved " + i));

        ActionBlock<int> sendBlock = new ActionBlock<int>((i) => Console.WriteLine("Sent " + i));

        public void TestSync()
        {
            bb.LinkTo(displayBlock);
            bb.LinkTo(saveBlock);
            bb.LinkTo(sendBlock);

            for (int i = 0; i < 4; i++)
            {
                bb.Post(i);
            }

            Console.WriteLine("Post finished");
        }
 

A][PVWN1@4UMGZ[YTEV$[E9

 

若是咱們在Post之後再添加鏈接Block的話,那些Block就只會收到最後一個數據了。

 
public void TestSync()
        {
            for (int i = 0; i < 4; i++)
            {
                bb.Post(i);
            }

            Thread.Sleep(5000);

            bb.LinkTo(displayBlock);
            bb.LinkTo(saveBlock);
            bb.LinkTo(sendBlock);
            Console.WriteLine("Post finished");
        }
 

 

AC}VT(NM__HO1@UJ948)$@W

 

WriteOnceBlock

若是說BufferBlock是最基本的Block,那麼WriteOnceBock則是最最簡單的Block。它最多隻能存儲一個數據,一旦這個數據被髮送出去之後,這個數據仍是會留在Block中,但不會被刪除或被新來的數據替換,一樣全部的接收者都會收到這個數據的備份。

image

和BroadcastBlock一樣的代碼,可是結果不同:

 
WriteOnceBlock<int> bb = new WriteOnceBlock<int>((i) => { return i; });

        ActionBlock<int> displayBlock = new ActionBlock<int>((i) => Console.WriteLine("Displayed " + i));

        ActionBlock<int> saveBlock = new ActionBlock<int>((i) => Console.WriteLine("Saved " + i));

        ActionBlock<int> sendBlock = new ActionBlock<int>((i) => Console.WriteLine("Sent " + i));

        public void TestSync()
        {
            bb.LinkTo(displayBlock);
            bb.LinkTo(saveBlock);
            bb.LinkTo(sendBlock);
            for (int i = 0; i < 4; i++)
            {
                bb.Post(i);
            }

            Console.WriteLine("Post finished");
        }
 

@2[203}OL`G6VH2K}}9}DNE

WriteOnceBock只會接收一次數據。並且始終保留那個數據。

一樣使用Receive方法來獲取數據也是同樣的結果,獲取到的都是第一個數據:

 
public void TestReceive()
        {
            for (int i = 0; i < 4; i++)
            {
                bb.Post(i);
            }
            Console.WriteLine("Post finished");

            Console.WriteLine("1st Receive:" + bb.Receive());
            Console.WriteLine("2nd Receive:" + bb.Receive());
            Console.WriteLine("3rd Receive:" + bb.Receive());
        }
 

7M5Q]MH5K82OVQ}N]E(J8MV

 

 BatchBlock

 

 image

BatchBlock提供了可以把多個單個的數據組合起來處理的功能,如上圖。應對有些需求須要固定多個數據才能處理的問題。在構造函數中須要制定多少個爲一個Batch,一旦它收到了那個數量的數據後,會打包放在它的OutputQueue中。當BatchBlock被調用Complete告知Post數據結束的時候,會把InputQueue中餘下的數據打包放入OutputQueue中等待處理,而無論InputQueue中的數據量是否是知足構造函數的數量。

 
        BatchBlock<int> bb = new BatchBlock<int>(3);

        ActionBlock<int[]> ab = new ActionBlock<int[]>((i) => 
            {
                string s = string.Empty;

                foreach (int m in i)
                {
                    s += m + " ";
                }
                Console.WriteLine(s);
            });

        public void TestSync()
        {
            bb.LinkTo(ab);

            for (int i = 0; i < 10; i++)
            {
                bb.Post(i);
            }
            bb.Complete();

            Console.WriteLine("Finished post");
        }
 

@D__1{B5V72~T7`AGM74D_0

BatchBlock執行數據有兩種模式:貪婪模式和非貪婪模式。貪婪模式是默認的。貪婪模式是指任何Post到BatchBlock,BatchBlock都接收,並等待個數滿了之後處理。非貪婪模式是指BatchBlock須要等到構造函數中設置的BatchSize個數的Source都向BatchBlock發數據,Post數據的時候纔會處理。否則都會留在Source的Queue中。也就是說BatchBlock可使用在每次從N個Source那個收一個數據打包處理或從1個Source那裏收N個數據打包處理。這裏的Source是指其餘的繼承ISourceBlock的,用LinkTo鏈接到這個BatchBlock的Block。

在另外一個構造參數中GroupingDataflowBlockOptions,能夠經過設置Greedy屬性來選擇是否貪婪模式和MaxNumberOfGroups來設置最大產生Batch的數量,若是到達了這個數量,BatchBlock將不會再接收數據。

 

JoinBlock

image

JoinBlock一看名字就知道是須要和兩個或兩個以上的Source Block相鏈接的。它的做用就是等待一個數據組合,這個組合須要的數據都到達了,它纔會處理數據,並把這個組合做爲一個Tuple傳遞給目標Block。舉個例子,若是定義了JoinBlock<int, string>類型,那麼JoinBlock內部會有兩個ITargetBlock,一個接收int類型的數據,一個接收string類型的數據。那只有當兩個ITargetBlock都收到各自的數據後,纔會放到JoinBlock的OutputQueue中,輸出。

 

 

 

 
JoinBlock<int, string> jb = new JoinBlock<int, string>();
        ActionBlock<Tuple<int, string>> ab = new ActionBlock<Tuple<int, string>>((i) =>
            {
                Console.WriteLine(i.Item1 + " " + i.Item2);
            });
            
        public void TestSync()
        {
            jb.LinkTo(ab);

            for (int i = 0; i < 5; i++)
            {
                jb.Target1.Post(i);
            }

            for (int i = 5; i > 0; i--)
            {
                Thread.Sleep(1000);
                jb.Target2.Post(i.ToString());
            }

            Console.WriteLine("Finished post");
        }
 

)DNMCJE%H41G[2YBPD%W4%B

 

BatchedJoinBlock

image

BatchedJoinBlock一看就是BacthBlock和JoinBlick的組合。JoinBlick是組合目標隊列的一個數據,而BatchedJoinBlock是組合目標隊列的N個數據,固然這個N能夠在構造函數中配置。若是咱們定義的是BatchedJoinBlock<int, string>, 那麼在最後的OutputQueue中存儲的是Tuple<IList<int>, IList<string>>,也就是說最後獲得的數據是Tuple<IList<int>, IList<string>>。它的行爲是這樣的,仍是假設上文的定義,BatchedJoinBlock<int, string>, 構造BatchSize輸入爲3。那麼在這個BatchedJoinBlock種會有兩個ITargetBlock,會接收Post的數據。那何時會生成一個Tuple<IList<int>,IList<string>>到OutputQueue中呢,測試下來並非咱們想的須要有3個int數據和3個string數據,而是隻要2個ITargetBlock中的數據個數加起來等於3就能夠了。3和0,2和1,1和2或0和3的組合都會生成Tuple<IList<int>,IList<string>>到OutputQueue中。能夠參看下面的例子:

 
BatchedJoinBlock<int, string> bjb = new BatchedJoinBlock<int, string>(3);

        ActionBlock<Tuple<IList<int>, IList<string>>> ab = new ActionBlock<Tuple<IList<int>, IList<string>>>((i) =>
            {
                Console.WriteLine("-----------------------------");

                foreach (int m in i.Item1)
                {
                    Console.WriteLine(m);
                };

                foreach (string s in i.Item2)
                {
                    Console.WriteLine(s);
                };
            });

        public void TestSync()
        {
            bjb.LinkTo(ab);

            for (int i = 0; i < 5; i++)
            {
                bjb.Target1.Post(i);
            }

            for (int i = 5; i > 0; i--)
            {
                bjb.Target2.Post(i.ToString());
            }

            Console.WriteLine("Finished post");
        }
 

GZ}X_[]DM}42_()PXL05A(T

最後剩下的一個數據1,因爲沒有滿3個,因此一直被保留在Target2中。

 

TDF中最有用的功能之一就是多個Block之間能夠組合應用。ISourceBlock能夠鏈接ITargetBlock,一對一,一對多,或多對多。下面的例子就是一個TransformBlock和一個ActionBlock的組合。TransformBlock用來把數據*2,並轉換成字符串,而後把數據扔到ActionBlock中,而ActionBlock則用來最後的處理數據打印結果。

 

 

 

 

 
public ActionBlock<string> abSync = new ActionBlock<string>((i) =>
        {
            Thread.Sleep(1000);
            Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now);
        }
);

        public TransformBlock<int, string> tbSync = new TransformBlock<int, string>((i) =>
            {
                i = i * 2;
                return i.ToString();
            }
        );

        public void TestSync()
        {
            tbSync.LinkTo(abSync);

            for (int i = 0; i < 10; i++)
            {
                tbSync.Post(i);
            }
            tbSync.Complete();
            Console.WriteLine("Post finished");

            tbSync.Completion.Wait();
            Console.WriteLine("TransformBlock process finished");
        }
 

7S`N)T79TI4~0X8${XF8[PB


測試代碼能夠在這裏下載。下一篇將介紹Block的一些配置,來應對一些高級應用。

相關文章
相關標籤/搜索