像Labview同樣,使用C#構建測量數據流式處理框架

1. C# DataFlow介紹

介紹部分參考博客:TPL DataFlow初探(一)
侵權請聯繫刪除html

官方解釋:git

TPL(任務並行庫) 數據流庫向具備高吞吐量和低滯後時間的佔用大量 CPU 和 I/O 操做的應用程序的並行化和消息傳遞提供了基礎。 它還能顯式控制緩存數據的方式以及在系統中移動的方式。傳統編程模型一般須要使用回調和同步對象(例如鎖)來協調任務和訪問共享數據。在數據流模型下,您能夠聲明當數據可用時的處理方式,以及數據之間的全部依賴項。 因爲運行時管理數據之間的依賴項,所以一般能夠避免這種要求來同步訪問共享數據。 此外,由於運行時計劃基於數據的異步到達,因此數據流能夠經過有效管理基礎線程提升響應能力和吞吐量。

藉助於異步消息傳遞與管道,它能夠提供比線程池更好的控制,也比手工線程方式具有更好的性能。咱們經常能夠消息傳遞,生產-消費模式或Actor-Agent模式中使用。在TDF是構建於Task Parallel Library (TPL)之上的,它是咱們開發高性能,高併發的應用程序的又一利器。github

TDP的主要做用就是Buffering Data和Processing Data,在TDF中,有兩個很是重要的接口,ISourceBlock<T> 和ITargetBlock<T>接口。繼承於ISourceBlock<T>的對象時做爲提供數據的數據源對象-生產者,而繼承於ITargetBlock<T>接口類主要是扮演目標對象-消費者。在這個類庫中,System.Threading.Tasks.Dataflow名稱空間下,提供了不少以Block名字結尾的類,ActionBlock,BufferBlock,TransformBlock,BroadcastBlock等9個Block,咱們在開發中一般使用單個或多個Block組合的方式來實現一些功能。web

1.1 9個典型Block的使用

1.1.1 BufferBlock

BufferBlock是TDF中最基礎的Block。BufferBlock提供了一個有界限或沒有界限的Buffer,該Buffer中存儲T。該Block很像BlockingCollection<T>。能夠用過Post往裏面添加數據,也能夠經過Receive方法阻塞或異步的的獲取數據,數據處理的順序是FIFO的。它也能夠經過Link向其餘Block輸出數據。編程

private static BufferBlock<int> m_buffer = new BufferBlock<int>();

// Producer
private static void Producer()
{
    while(true)
    {
        int item = Produce();
        m_buffer.Post(item);
    }
}

// Consumer
private static void Consumer()
{
    while(true)
    {
        int item = m_buffer.Receive();
        Process(item);
    }
}

// Main
public static void Main()
{
    var p = Task.Factory.StartNew(Producer);
    var c = Task.Factory.StartNew(Consumer);
    Task.WaitAll(p,c);
}

1.1.2 ActionBlock

ActionBlock實現ITargetBlock,說明它是消費數據的,也就是對輸入的一些數據進行處理。它在構造函數中,容許輸入一個委託,來對每個進來的數據進行一些操做。若是使用Action(T)委託,那說明每個數據的處理完成須要等待這個委託方法結束,若是使用了Func<TInput, Task>)來構造的話,那麼數據的結束將不是委託的返回,而是Task的結束。默認狀況下,ActionBlock會FIFO的處理每個數據,並且一次只能處理一個數據,一個處理完了再處理第二個,但也能夠經過配置來並行的執行多個數據。緩存

public ActionBlock<int> abSync = new ActionBlock<int>((i) =>
            {
                Thread.Sleep(1000);
                Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now);
            }
        );

        public void TestSync()
        {
            for (int i = 0; i < 10; i++)
            {
                abSync.Post(i);
            }

            Console.WriteLine("Post finished");
        }

1.1.3 TransformBlock

TransformBlock是TDF提供的另外一種Block,顧名思義它經常在數據流中充當數據轉換處理的功能。在TransformBlock內部維護了2個Queue,一個InputQueue,一個OutputQueue。InputQueue存儲輸入的數據,而經過Transform處理之後的數據則放在OutputQueue,OutputQueue就好像是一個BufferBlock。最終咱們能夠經過Receive方法來阻塞的一個一個獲取OutputQueue中的數據。TransformBlock的Completion.Wait()方法只有在OutputQueue中的數據爲0的時候纔會返回。服務器

舉個例子,咱們有一組網址的URL,咱們須要對每一個URL下載它的HTML數據並存儲。那咱們經過以下的代碼來完成:網絡

public TransformBlock<string, string> tbUrl = new TransformBlock<string, string>((url) =>
        {
            WebClient webClient = new WebClient();
            return webClient.DownloadString(new Uri(url));
        }

        public void TestDownloadHTML()
        {
            tbUrl.Post("www.baidu.com");
            tbUrl.Post("www.sina.com.cn");

            string baiduHTML = tbUrl.Receive();
            string sinaHTML = tbUrl.Receive();
        }

其餘的Block請參考上述博客架構

2. 實例

2.1 測試需求

咱們須要採集三個通道的數據$x$,$y$,$z$. 而後使用$x$,$y$,$z$組合通過以下公式計算獲得一個結果$m$併發

$$m=x*y+z $$

而後對$m$數列作中值濾波,每5個數值求中間值,生成一個最終的值。

獲得最終值以後,在界面中顯示波形,存二進制文件,經過網絡發送到數據服務器。

其流式處理圖以下:

2.2 業務分析

按照C# Dataflow的思想,流式處理的各個節點可使用的Block以下圖:

細心的人能夠看到,最後的業務處理前面加上了一個BroadCastBlock,是爲了同時給三個業務分發消息。

2.3 業務實現

2.3.1 架構設計

本博客採用WPF窗體框架,界面以下

2.3.2 代碼實現

因爲手上確實沒有合適的板卡作測試,我就用三個Task模擬數據生成,而後放入三個ConcurrentQueue.

代碼以下:

/// <summary>
/// 通道1隊列
/// </summary>
private readonly ConcurrentQueue<double> _queue1 = new ConcurrentQueue<double>();

/// <summary>
/// 通道2隊列
/// </summary>
private readonly ConcurrentQueue<double> _queue2 = new ConcurrentQueue<double>();

/// <summary>
/// 通道3隊列
/// </summary>
private readonly ConcurrentQueue<double> _queue3 = new ConcurrentQueue<double>();


/// <summary>
/// 生成通道1數據按鈕事件
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void GenerateChannel1Button_OnClick(object sender, RoutedEventArgs e)
{
    Task.Factory.StartNew(() => GenerateData(this._queue1));
}

/// <summary>
/// 生成通道2數據按鈕事件
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void GenerateChannel2Button_OnClick(object sender, RoutedEventArgs e)
{
    Task.Factory.StartNew(() => GenerateData(this._queue2));
}

/// <summary>
/// 生成通道3數據按鈕事件
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void GenerateChannel3Button_OnClick(object sender, RoutedEventArgs e)
{
    Task.Factory.StartNew(() => GenerateData(this._queue3));
}

/// <summary>
/// 生成數據
/// </summary>
/// <param name="queue"></param>
/// <returns></returns>
private async Task GenerateData(ConcurrentQueue<double> queue)
{
    var random = new Random();
    while (this._stop)
    {
        queue.Enqueue(random.NextDouble() * 10);
        await Task.Delay(TimeSpan.FromMilliseconds(50));
    }
}

初始化各類Block

/// <summary>
/// 通道1BufferBlock
/// </summary>
private BufferBlock<double> _bufferBlock1 = new BufferBlock<double>();

/// <summary>
/// 通道2BufferBlock
/// </summary>
private BufferBlock<double> _bufferBlock2 = new BufferBlock<double>();

/// <summary>
/// 通道3BufferBlock
/// </summary>
private BufferBlock<double> _bufferBlock3 = new BufferBlock<double>();

/// <summary>
/// 拼接3個通道JoinBlock
/// </summary>
private JoinBlock<double, double,double> _joinBlock = new JoinBlock<double, double, double>();

/// <summary>
/// 計算M的TransformBlock
/// </summary>
private TransformBlock<Tuple<double,double,double>, double> _calculateMTransformBlock =
    new TransformBlock<Tuple<double, double, double>, double>(t => t.Item1 * t.Item2 + t.Item3);

/// <summary>
/// 每5個m組成一組BatchBlock
/// </summary>
private BatchBlock<double> _mBatchBlock = new BatchBlock<double>(5);

/// <summary>
/// m的中值濾波TransformBlock
/// </summary>
private TransformBlock<double[], double> _mMiddleFilterTransformBlock = new TransformBlock<double[], double>(
    t =>
    {
        Array.Sort(t);
        return t[2];
    });

/// <summary>
/// 廣播mBroadcastBlock
/// </summary>
private BroadcastBlock<double> _broadcastBlock = new BroadcastBlock<double>(t => t);

/// <summary>
/// 界面顯示ActionBlock
/// </summary>
private ActionBlock<double> _showPlotActionBlock;

/// <summary>
/// 寫入文件ActionBlock
/// </summary>
private ActionBlock<double> _writeFileActionBlock;
/// <summary>
/// 網絡上傳ActionBlock
/// </summary>
private ActionBlock<double> _netUpActionBlock;

因爲Lambda 須要訪問外部變量,則須要在laod事件中初始化:

//UI顯示ActionBlock
this._showPlotActionBlock = new ActionBlock<double>(t =>
{
    if (this.Datas.Count >= 10000)
    {
        this.Datas.RemoveAt(0);
    }

    this.Datas.Add(new DataPoint(_xIndex++, t));
    Application.Current.Dispatcher.Invoke(() => { Plot?.InvalidatePlot(); });
}, new ExecutionDataflowBlockOptions()
{
    TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
});
//寫入文件
this._writeFileActionBlock = new ActionBlock<double>(t =>
{
    this._binaryWriter.Write(t);
});
//上傳數據,暫時不實現
this._netUpActionBlock = new ActionBlock<double>(t => Console.WriteLine($@"Net upload value: {t}"));

連接這些Block

/// <summary>
/// 連接Blocks
/// </summary>
private void LinkBlocks()
{
    this._bufferBlock1.LinkTo(this._joinBlock.Target1);
    this._bufferBlock2.LinkTo(this._joinBlock.Target2);
    this._bufferBlock3.LinkTo(this._joinBlock.Target3);
    this._joinBlock.LinkTo(this._calculateMTransformBlock);
    this._calculateMTransformBlock.LinkTo(this._mBatchBlock);
    this._mBatchBlock.LinkTo(this._mMiddleFilterTransformBlock);
    this._mMiddleFilterTransformBlock.LinkTo(this._broadcastBlock);
    this._broadcastBlock.LinkTo(this._showPlotActionBlock);
    this._broadcastBlock.LinkTo(this._writeFileActionBlock);
    this._broadcastBlock.LinkTo(this._netUpActionBlock);
}

開始測量按鈕事件:

this._stop = false;
this._fileStream = new FileStream($"{DateTime.Now:yyyy_MM_dd_HH_mm_ss}.dat", FileMode.OpenOrCreate,
    FileAccess.Write);
this._binaryWriter = new BinaryWriter(this._fileStream);
Task.Factory.StartNew(async () =>
{
    while (!this._stop)
    {
        if (this._queue1.Count > 0)
        {
            double result;
            this._queue1.TryDequeue(out result);
            this._bufferBlock1.Post(result);
        }
        else
        {
            await Task.Delay(TimeSpan.FromMilliseconds(30));
        }

    }
});
Task.Factory.StartNew(async () =>
{
    while (!this._stop)
    {
        if (this._queue2.Count > 0)
        {
            double result;
            this._queue2.TryDequeue(out result);
            this._bufferBlock2.Post(result);
        }
        else
        {
            await Task.Delay(TimeSpan.FromMilliseconds(30));
        }

    }
});
Task.Factory.StartNew(async () =>
{
    while (!this._stop)
    {
        if (this._queue3.Count > 0)
        {
            double result;
            this._queue3.TryDequeue(out result);
            this._bufferBlock3.Post(result);
        }
        else
        {
            await Task.Delay(TimeSpan.FromMilliseconds(30));
        }

    }
});

結束測量事件

this._stop = true;
this._binaryWriter.Flush();
this._binaryWriter.Close();
this._fileStream.Close();

最終的效果:

2.3.3 效果體驗:

  1. 先點擊三個生個數據按鈕,而後開始測量,能夠看到數據立刻就會顯示,同時會保存,也會上傳數據。中止測量後能夠看到數據保存到了文件中。
  2. 先點擊生成通道1和2生成數據,不點擊通道3,而後點擊開始測量,能夠看到沒有反應,再點擊通道3,就有數據了,根據咱們的測量邏輯,這個是對的。

本文源碼已發佈到github,地址:https://github.com/spartajet/...

相關文章
相關標籤/搜索