介紹部分參考博客:TPL DataFlow初探(一)
侵權請聯繫刪除html
官方解釋:git
TPL(任務並行庫) 數據流庫向具備高吞吐量和低滯後時間的佔用大量 CPU 和 I/O 操做的應用程序的並行化和消息傳遞提供了基礎。 它還能顯式控制緩存數據的方式以及在系統中移動的方式。傳統編程模型一般須要使用回調和同步對象(例如鎖)來協調任務和訪問共享數據。在數據流模型下,您能夠聲明當數據可用時的處理方式,以及數據之間的全部依賴項。 因爲運行時管理數據之間的依賴項,所以一般能夠避免這種要求來同步訪問共享數據。 此外,由於運行時計劃基於數據的異步到達,因此數據流能夠經過有效管理基礎線程提升響應能力和吞吐量。
藉助於異步消息傳遞與管道,它能夠提供比線程池更好的控制,也比手工線程方式具有更好的性能。咱們經常能夠消息傳遞,生產-消費模式或Actor-Agent模式中使用。在TDF是構建於Task Parallel Library (TPL)之上的,它是咱們開發高性能,高併發的應用程序的又一利器。github
TDP的主要做用就是Buffering Data和Processing Data,在TDF中,有兩個很是重要的接口,ISourceBlock<T> 和ITargetBlock<T>接口。繼承於ISourceBlock<T>的對象時做爲提供數據的數據源對象-生產者,而繼承於ITargetBlock<T>接口類主要是扮演目標對象-消費者。在這個類庫中,System.Threading.Tasks.Dataflow名稱空間下,提供了不少以Block名字結尾的類,ActionBlock,BufferBlock,TransformBlock,BroadcastBlock等9個Block,咱們在開發中一般使用單個或多個Block組合的方式來實現一些功能。web
BufferBlock是TDF中最基礎的Block。BufferBlock提供了一個有界限或沒有界限的Buffer,該Buffer中存儲T。該Block很像BlockingCollection<T>。能夠用過Post往裏面添加數據,也能夠經過Receive方法阻塞或異步的的獲取數據,數據處理的順序是FIFO的。它也能夠經過Link向其餘Block輸出數據。編程
private static BufferBlock<int> m_buffer = new BufferBlock<int>(); // Producer private static void Producer() { while(true) { int item = Produce(); m_buffer.Post(item); } } // Consumer private static void Consumer() { while(true) { int item = m_buffer.Receive(); Process(item); } } // Main public static void Main() { var p = Task.Factory.StartNew(Producer); var c = Task.Factory.StartNew(Consumer); Task.WaitAll(p,c); }
ActionBlock實現ITargetBlock,說明它是消費數據的,也就是對輸入的一些數據進行處理。它在構造函數中,容許輸入一個委託,來對每個進來的數據進行一些操做。若是使用Action(T)委託,那說明每個數據的處理完成須要等待這個委託方法結束,若是使用了Func<TInput, Task>)來構造的話,那麼數據的結束將不是委託的返回,而是Task的結束。默認狀況下,ActionBlock會FIFO的處理每個數據,並且一次只能處理一個數據,一個處理完了再處理第二個,但也能夠經過配置來並行的執行多個數據。緩存
public ActionBlock<int> abSync = new ActionBlock<int>((i) => { Thread.Sleep(1000); Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now); } ); public void TestSync() { for (int i = 0; i < 10; i++) { abSync.Post(i); } Console.WriteLine("Post finished"); }
TransformBlock是TDF提供的另外一種Block,顧名思義它經常在數據流中充當數據轉換處理的功能。在TransformBlock內部維護了2個Queue,一個InputQueue,一個OutputQueue。InputQueue存儲輸入的數據,而經過Transform處理之後的數據則放在OutputQueue,OutputQueue就好像是一個BufferBlock。最終咱們能夠經過Receive方法來阻塞的一個一個獲取OutputQueue中的數據。TransformBlock的Completion.Wait()方法只有在OutputQueue中的數據爲0的時候纔會返回。服務器
舉個例子,咱們有一組網址的URL,咱們須要對每一個URL下載它的HTML數據並存儲。那咱們經過以下的代碼來完成:網絡
public TransformBlock<string, string> tbUrl = new TransformBlock<string, string>((url) => { WebClient webClient = new WebClient(); return webClient.DownloadString(new Uri(url)); } public void TestDownloadHTML() { tbUrl.Post("www.baidu.com"); tbUrl.Post("www.sina.com.cn"); string baiduHTML = tbUrl.Receive(); string sinaHTML = tbUrl.Receive(); }
其餘的Block請參考上述博客架構
咱們須要採集三個通道的數據$x$,$y$,$z$. 而後使用$x$,$y$,$z$組合通過以下公式計算獲得一個結果$m$併發
$$m=x*y+z $$
而後對$m$數列作中值濾波,每5個數值求中間值,生成一個最終的值。
獲得最終值以後,在界面中顯示波形,存二進制文件,經過網絡發送到數據服務器。
其流式處理圖以下:
按照C# Dataflow的思想,流式處理的各個節點可使用的Block以下圖:
細心的人能夠看到,最後的業務處理前面加上了一個BroadCastBlock
,是爲了同時給三個業務分發消息。
本博客採用WPF窗體框架,界面以下
因爲手上確實沒有合適的板卡作測試,我就用三個Task模擬數據生成,而後放入三個ConcurrentQueue
.
代碼以下:
/// <summary> /// 通道1隊列 /// </summary> private readonly ConcurrentQueue<double> _queue1 = new ConcurrentQueue<double>(); /// <summary> /// 通道2隊列 /// </summary> private readonly ConcurrentQueue<double> _queue2 = new ConcurrentQueue<double>(); /// <summary> /// 通道3隊列 /// </summary> private readonly ConcurrentQueue<double> _queue3 = new ConcurrentQueue<double>(); /// <summary> /// 生成通道1數據按鈕事件 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void GenerateChannel1Button_OnClick(object sender, RoutedEventArgs e) { Task.Factory.StartNew(() => GenerateData(this._queue1)); } /// <summary> /// 生成通道2數據按鈕事件 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void GenerateChannel2Button_OnClick(object sender, RoutedEventArgs e) { Task.Factory.StartNew(() => GenerateData(this._queue2)); } /// <summary> /// 生成通道3數據按鈕事件 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void GenerateChannel3Button_OnClick(object sender, RoutedEventArgs e) { Task.Factory.StartNew(() => GenerateData(this._queue3)); } /// <summary> /// 生成數據 /// </summary> /// <param name="queue"></param> /// <returns></returns> private async Task GenerateData(ConcurrentQueue<double> queue) { var random = new Random(); while (this._stop) { queue.Enqueue(random.NextDouble() * 10); await Task.Delay(TimeSpan.FromMilliseconds(50)); } }
初始化各類Block
/// <summary> /// 通道1BufferBlock /// </summary> private BufferBlock<double> _bufferBlock1 = new BufferBlock<double>(); /// <summary> /// 通道2BufferBlock /// </summary> private BufferBlock<double> _bufferBlock2 = new BufferBlock<double>(); /// <summary> /// 通道3BufferBlock /// </summary> private BufferBlock<double> _bufferBlock3 = new BufferBlock<double>(); /// <summary> /// 拼接3個通道JoinBlock /// </summary> private JoinBlock<double, double,double> _joinBlock = new JoinBlock<double, double, double>(); /// <summary> /// 計算M的TransformBlock /// </summary> private TransformBlock<Tuple<double,double,double>, double> _calculateMTransformBlock = new TransformBlock<Tuple<double, double, double>, double>(t => t.Item1 * t.Item2 + t.Item3); /// <summary> /// 每5個m組成一組BatchBlock /// </summary> private BatchBlock<double> _mBatchBlock = new BatchBlock<double>(5); /// <summary> /// m的中值濾波TransformBlock /// </summary> private TransformBlock<double[], double> _mMiddleFilterTransformBlock = new TransformBlock<double[], double>( t => { Array.Sort(t); return t[2]; }); /// <summary> /// 廣播mBroadcastBlock /// </summary> private BroadcastBlock<double> _broadcastBlock = new BroadcastBlock<double>(t => t); /// <summary> /// 界面顯示ActionBlock /// </summary> private ActionBlock<double> _showPlotActionBlock; /// <summary> /// 寫入文件ActionBlock /// </summary> private ActionBlock<double> _writeFileActionBlock; /// <summary> /// 網絡上傳ActionBlock /// </summary> private ActionBlock<double> _netUpActionBlock;
因爲Lambda
須要訪問外部變量,則須要在laod
事件中初始化:
//UI顯示ActionBlock this._showPlotActionBlock = new ActionBlock<double>(t => { if (this.Datas.Count >= 10000) { this.Datas.RemoveAt(0); } this.Datas.Add(new DataPoint(_xIndex++, t)); Application.Current.Dispatcher.Invoke(() => { Plot?.InvalidatePlot(); }); }, new ExecutionDataflowBlockOptions() { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() }); //寫入文件 this._writeFileActionBlock = new ActionBlock<double>(t => { this._binaryWriter.Write(t); }); //上傳數據,暫時不實現 this._netUpActionBlock = new ActionBlock<double>(t => Console.WriteLine($@"Net upload value: {t}"));
連接這些Block
/// <summary> /// 連接Blocks /// </summary> private void LinkBlocks() { this._bufferBlock1.LinkTo(this._joinBlock.Target1); this._bufferBlock2.LinkTo(this._joinBlock.Target2); this._bufferBlock3.LinkTo(this._joinBlock.Target3); this._joinBlock.LinkTo(this._calculateMTransformBlock); this._calculateMTransformBlock.LinkTo(this._mBatchBlock); this._mBatchBlock.LinkTo(this._mMiddleFilterTransformBlock); this._mMiddleFilterTransformBlock.LinkTo(this._broadcastBlock); this._broadcastBlock.LinkTo(this._showPlotActionBlock); this._broadcastBlock.LinkTo(this._writeFileActionBlock); this._broadcastBlock.LinkTo(this._netUpActionBlock); }
開始測量按鈕事件:
this._stop = false; this._fileStream = new FileStream($"{DateTime.Now:yyyy_MM_dd_HH_mm_ss}.dat", FileMode.OpenOrCreate, FileAccess.Write); this._binaryWriter = new BinaryWriter(this._fileStream); Task.Factory.StartNew(async () => { while (!this._stop) { if (this._queue1.Count > 0) { double result; this._queue1.TryDequeue(out result); this._bufferBlock1.Post(result); } else { await Task.Delay(TimeSpan.FromMilliseconds(30)); } } }); Task.Factory.StartNew(async () => { while (!this._stop) { if (this._queue2.Count > 0) { double result; this._queue2.TryDequeue(out result); this._bufferBlock2.Post(result); } else { await Task.Delay(TimeSpan.FromMilliseconds(30)); } } }); Task.Factory.StartNew(async () => { while (!this._stop) { if (this._queue3.Count > 0) { double result; this._queue3.TryDequeue(out result); this._bufferBlock3.Post(result); } else { await Task.Delay(TimeSpan.FromMilliseconds(30)); } } });
結束測量事件
this._stop = true; this._binaryWriter.Flush(); this._binaryWriter.Close(); this._fileStream.Close();
最終的效果:
本文源碼已發佈到github,地址:https://github.com/spartajet/...