Dotnet Core下的Channel, 你用了嗎?

今天給你們分享一個微軟官方的好東西:Channel。html

前言

今天給你們分享一個微軟官方的生產者/消費者方案的特性解決:Channel。git

Channel在System.Threading.Channels命名空間下,Core 2.1使用時,須要從Nuget上安裝。github

% dotnet add package System.Threading.Channels

而在Core 3.0 preview 7開始,就直接包含在框架中了。c#

這是一個相對較新的特性。從Core 2.1開始加入,如今版本是5.0.0(嗯,這個版本號有點騙人,Channel的第一個版本就是4.5.0)。安全

Channel能作什麼?bash

邏輯上,Channel實際就是一個高效的、線程安全的隊列,支持在生產者和消費者之間傳遞數據。服務器

利用Channel,經過發佈和訂閱,能夠將生產者和消費者分開。生產者Producer負責接收請求,並寫入Channel,而消費者Consumer爲每一個進入Channel的數據執行處理。這樣作,一方面可使生產者和消費者並行工做來提升性能,另外一方面,能夠經過建立更多的生產者或消費者來提升應用的吞吐量。微信

    爲防止非受權轉發,這兒給出本文的原文連接:http://www.javashuo.com/article/p-cbvfbxke-nv.html併發

下面,咱們以一個實際例子,來解釋這個特性。app

建立Channel

Channel提供了一個靜態Channel類,提供了兩個公開方法來建立兩種類型的Channel。

  • CreateUnbounded - 建立一個具備無限容量的Channel。

  • CreateBounded - 建立一個具備有限容量的Channel。

人一般來講,這兩種方式使用上沒有太大的區別。實際應用中,具體要看生產和消費的速度,以及指望產生的結果。有限容量的Channel,容量是有上限的,到達上限後,可讓生產者非阻塞等待消費者使用並釋放Channel容量後再繼續。這種方式,好處是能夠控制生產的速度,控制系統資源的使用,缺點也是。由於控制速度意味着生產速度會被限制,甚至中止。而無限容量,生產者能夠全速進行生產。但也有缺點,若是消費者的消費速度低於生產者,Channel的資源使用會無限增長,會有服務器資源耗盡的可能。

今天的例子,咱們使用無限Channel。

var channel = Channel.CreateUnbounded<string>();

很是簡單的一行代碼,就建立了一個無限容量的Channel。

咱們定義這個Channel用來保存字符串對象。

建立方法是一個通用的工廠方法,因此咱們能夠爲須要使用的任何類型的對象和數據建立Channel。

Channel有兩個屬性:閱讀器返回ChannelReader,寫入器返回ChannelWriter。

寫入Channel

使用寫入器ChannelWriter,能夠對Channel進行寫入操做。ChannelWriter提供瞭如下幾個方法:

  • WriteAsync - 異步寫入
  • WaitToWriteAsync - 非阻塞等待,直到有空間可寫入時或Channel關閉時,返回true/false
  • TryWrite - 嘗試寫入
  • Complete - 標記Channel爲關閉,並再也不寫入數據到該Channel
  • TryComplete - 嘗試標記Channel爲關閉。

這幾個方法很容易理解,就不解釋了。

在本文的例子裏,我用了:

await channel.Writer.WriteAsync("New message");

讀取Channel

使用閱讀器ChannelReader從Channel進行數據的讀取。也提供了幾個方法:

  • ReadAsync - 異步讀取
  • ReadAllAsync - 異步讀取Channel中的全部數據
  • TryRead - 嘗試讀取
  • WaitToReadAsync - 非阻塞等待,直到有數據可讀取或Channel關閉時,返回true/false

不一樣的消費者模式,會用到不一樣的讀取方法。這個根據經驗來寫就好。

本文的例子中,我是採用WaitToReadAsync和ReadAsync配合來使用的:

while (await ChannelReader.WaitToReadAsync())
{
    if (ChannelReader.TryRead(out var timeString))
    {
          /***/
    }
}

WaitToReadAsync是一個非阻塞等待,在有消息可讀或Channel關閉時,纔會喚醒並繼續。

考慮到有多個消費者的狀況,有可能別的線程已經進行了讀取,這兒使用TryRead進行讀取操做。

要注意:數據的同步工做是由Channel進行管理的。Channel會確保多個消費者不會讀到相同的數據。Channel同時也管理數據的次序。

示例代碼

今天的示例代碼我放到了Github上。連接是文章最後。

這個例子中,我作了三個場景。

首先是Channel。我使用了無限Channel。而後是建立生產者和消費者。數據傳輸過程就簡單化了,生產者只簡單將一個字符串寫入到Channel。消費者也是,簡單等待並從Channel讀取數據字符串,寫入控制檯。

三個場景分別是:

單一輩子產者/單一消費者

這個例子中,建立了一個生產者和一個消費者。二者的任務都是併發啓動的。

裏面的延時,是用來模擬工做負載的。

多個生產者/單一消費者

這個例子中有兩個生產者。一般在應用中有多個生產者時,咱們須要確保生產與單個消費者所能處理的消息數量大體至關,這樣能更好地利用服務器資源。

單一輩子產者/多個消費者

這個實際上是應用中最多見的狀況,就是產生消息很快,但處理工做相關較慢,並且工做也更密集。這種狀況,實際應用中咱們能夠經過擴大消費者數量來知足生產的需求。

總結

最近的項目在作一個大數據的採集,用到了一些Channel的技術。而後發現網上這部份內容不多,就作了個例子,寫了這個文章。

Channel內容自己並很少,但用着很方便,並且實際應用中,比想像的更強大。它能夠簡化不少生產者/消費者模式的使用,並且,任務間交換數據,使用Channel會更方便,更直接。

示例代碼在:https://github.com/humornif/Demo-Code/tree/master/0033/demo

 

 


 

微信公衆號:老王Plus

掃描二維碼,關注我的公衆號,能夠第一時間獲得最新的我的文章和內容推送

本文版權歸做者全部,轉載請保留此聲明和原文連接

相關文章
相關標籤/搜索