如何在 C# 中使用 Channels

在面對 生產者-消費者 的場景下, netcore 提供了一個新的命名空間 System.Threading.Channels 來幫助咱們更高效的處理此類問題,有了這個 Channels 存在, 生產者消費者 能夠各自處理本身的任務而不相互干擾,有利於兩方的併發處理,這篇文章咱們就來討論下如何使用 System.Threading.Channelshtml

Dataflow vs Channel

System.Threading.Tasks.Dataflow 命名空間下提供了一個數據流庫,主要封裝了 存儲處理 兩大塊,該庫專一於 pipeline 處理,而 System.Threading.Tasks.Channels 主要專一於 存儲 這塊,從單一職責上來講,在 生產者-消費者 場景下,Channels 比 Dataflow 性能要高得多。api

爲何要使用 Channels

能夠利用 Channels 來實現 生產者和消費者 之間的解耦,大致上有兩個好處:併發

  • 生產者 和 消費者 是相互獨立的,二者能夠並行執行。async

  • 若是生產者不給力,能夠建立多個的生產者,若是消費者不給力,能夠建立更多的消費者。工具

總的來講,在 生產者-消費者 模式下能夠幫助咱們提升應用程序的吞吐率。性能

安裝 System.Threading.Channels

要想使用 Channel,須要用 nuget 引用 System.Threading.Channels 包,還能夠經過 Visual Studio 2019 的 NuGet package manager 可視化界面安裝 或者 經過 NuGet package manager 命令行工具輸入如下命令:命令行

dotnet add package System.Threading.Channels

建立 channel

本質上來講,你能夠建立兩種類型的 channel,一種是有限容量的 bound channel,一種是無限容量的 unbound channel,接下來的問題是,如何建立呢?Channels 提供了兩種 工廠方法 用於建立,以下代碼所示:code

  • CreateBounded<T> 建立的 channel 是一個有消息上限的通道。htm

  • CreateUnbounded<T> 建立的 channel 是一個無消息上限的通道。blog

下面的代碼片斷展現瞭如何建立 unbounded channel,而且只能存放 string 類型。

static void Main(string[] args)
        {
            var channel = Channel.CreateUnbounded<string>();
        }

對了,Bounded channel 還提供了一個 FullMode 屬性,用於指定當 channel 已滿時該如何對插入的 message 進行處理,一般有四種作法。

  • Wait

  • DropWrite

  • DropNewest

  • DropOldest

下面的代碼片斷展現瞭如何在 Bounded channel 上使用 FullMode。

static void Main(string[] args)
        {
            var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(1000)
            {
                FullMode = BoundedChannelFullMode.Wait
            });
        }

將消息寫入到 channel

要想將 message 寫入到 channel,可使用 WriteAsync() 方法,以下代碼所示:

static async Task Main(string[] args)
        {
            var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(1000)
            {
                FullMode = BoundedChannelFullMode.Wait
            });

            await channel.Writer.WriteAsync("Hello World!");
        }

從 channel 中讀取消息

要想從 channel 中讀取 message,可使用 ReadAsync(),以下代碼所示:

static async Task Main(string[] args)
        {
            var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(1000)
            {
                FullMode = BoundedChannelFullMode.Wait
            });

            while (await channel.Reader.WaitToReadAsync())
            {
                if (channel.Reader.TryRead(out var message))
                {
                    Console.WriteLine(message);
                }
            }
        }

System.Threading.Channels 例子

下面是完整的代碼清單,展現瞭如何從 channel 中讀寫 message。

class Program
    {
        static async Task Main(string[] args)
        {
            await SingleProducerSingleConsumer();

            Console.ReadKey();
        }

        public static async Task SingleProducerSingleConsumer()
        {
            var channel = Channel.CreateUnbounded<int>();
            var reader = channel.Reader;
            for (int i = 0; i < 10; i++)
            {
                await channel.Writer.WriteAsync(i + 1);
            }

            while (await reader.WaitToReadAsync())
            {
                if (reader.TryRead(out var number))
                {
                    Console.WriteLine(number);
                }
            }
        }
    }

能夠看到,控制檯中輸出了數字 1-10,這些數字正是 Writer 寫入到 channel 中的,對吧。

總的來講,要想使用 生產者-消費者 場景,有幾種實現途徑,好比:BlockingCollection 和 TPL Dataflow,但本篇介紹的 Channels 要比前面的兩種性能更高,關於 Channels 更多的細節,我會在將來的文章中進行討論,若是您如今想急於瞭解的話,能夠參考MSDN: https://docs.microsoft.com/en-us/dotnet/api/system.threading.channels?view=netcore-3.0

更多精彩,歡迎訂閱 👇👇👇

譯文連接:https://www.infoworld.com/article/3445156/how-to-use-systemthreadingchannels-in-net-core.html

相關文章
相關標籤/搜索