在面對 生產者-消費者
的場景下, netcore 提供了一個新的命名空間 System.Threading.Channels
來幫助咱們更高效的處理此類問題,有了這個 Channels 存在, 生產者
和 消費者
能夠各自處理本身的任務而不相互干擾,有利於兩方的併發處理,這篇文章咱們就來討論下如何使用 System.Threading.Channels
。html
在 System.Threading.Tasks.Dataflow
命名空間下提供了一個數據流庫,主要封裝了 存儲
和 處理
兩大塊,該庫專一於 pipeline 處理,而 System.Threading.Tasks.Channels
主要專一於 存儲
這塊,從單一職責上來講,在 生產者-消費者
場景下,Channels 比 Dataflow 性能要高得多。api
能夠利用 Channels 來實現 生產者和消費者
之間的解耦,大致上有兩個好處:併發
生產者 和 消費者 是相互獨立的,二者能夠並行執行。async
若是生產者不給力,能夠建立多個的生產者,若是消費者不給力,能夠建立更多的消費者。工具
總的來講,在 生產者-消費者
模式下能夠幫助咱們提升應用程序的吞吐率。性能
要想使用 Channel,須要用 nuget 引用 System.Threading.Channels
包,還能夠經過 Visual Studio 2019 的 NuGet package manager
可視化界面安裝 或者 經過 NuGet package manager
命令行工具輸入如下命令:命令行
dotnet add package System.Threading.Channels
本質上來講,你能夠建立兩種類型的 channel,一種是有限容量的 bound channel
,一種是無限容量的 unbound channel
,接下來的問題是,如何建立呢?Channels 提供了兩種 工廠方法 用於建立,以下代碼所示:code
CreateBounded<T>
建立的 channel 是一個有消息上限的通道。htm
CreateUnbounded<T>
建立的 channel 是一個無消息上限的通道。blog
下面的代碼片斷展現瞭如何建立 unbounded channel
,而且只能存放 string 類型。
static void Main(string[] args) { var channel = Channel.CreateUnbounded<string>(); }
對了,Bounded channel
還提供了一個 FullMode 屬性,用於指定當 channel 已滿時該如何對插入的 message 進行處理,一般有四種作法。
Wait
DropWrite
DropNewest
DropOldest
下面的代碼片斷展現瞭如何在 Bounded channel
上使用 FullMode。
static void Main(string[] args) { var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(1000) { FullMode = BoundedChannelFullMode.Wait }); }
要想將 message 寫入到 channel,可使用 WriteAsync()
方法,以下代碼所示:
static async Task Main(string[] args) { var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(1000) { FullMode = BoundedChannelFullMode.Wait }); await channel.Writer.WriteAsync("Hello World!"); }
要想從 channel 中讀取 message,可使用 ReadAsync()
,以下代碼所示:
static async Task Main(string[] args) { var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(1000) { FullMode = BoundedChannelFullMode.Wait }); while (await channel.Reader.WaitToReadAsync()) { if (channel.Reader.TryRead(out var message)) { Console.WriteLine(message); } } }
下面是完整的代碼清單,展現瞭如何從 channel 中讀寫 message。
class Program { static async Task Main(string[] args) { await SingleProducerSingleConsumer(); Console.ReadKey(); } public static async Task SingleProducerSingleConsumer() { var channel = Channel.CreateUnbounded<int>(); var reader = channel.Reader; for (int i = 0; i < 10; i++) { await channel.Writer.WriteAsync(i + 1); } while (await reader.WaitToReadAsync()) { if (reader.TryRead(out var number)) { Console.WriteLine(number); } } } }
能夠看到,控制檯中輸出了數字 1-10
,這些數字正是 Writer 寫入到 channel 中的,對吧。
總的來講,要想使用 生產者-消費者
場景,有幾種實現途徑,好比:BlockingCollection 和 TPL Dataflow,但本篇介紹的 Channels 要比前面的兩種性能更高,關於 Channels 更多的細節,我會在將來的文章中進行討論,若是您如今想急於瞭解的話,能夠參考MSDN: https://docs.microsoft.com/en-us/dotnet/api/system.threading.channels?view=netcore-3.0
譯文連接:https://www.infoworld.com/article/3445156/how-to-use-systemthreadingchannels-in-net-core.html