在.NET Core中使用Channel(二)

在咱們以前的文章中,看了一些很是簡單的例子來講明Channel是如何工做的,咱們看到了一些很是漂亮的特性,但大多數狀況下它與其餘某某Queue實現很是類似。讓咱們進入一些更高級的話題。我說的是高級,但其中不少都很是簡單。app

讀/寫分離async

若是你曾經在兩個類之間共享隊列,你就會知道任何一個類均可以讀/寫,即便它們本不該該這樣作。例如:spa

class MyProducer
{
    private readonly Queue<int> _queue;

    public MyProducer(Queue<int> queue)
    {
        _queue = queue;
    }
}

class MyConsumer
{
    private readonly Queue<int> _queue;

    public MyConsumer(Queue<int> queue)
    {
        _queue = queue;
    }
}

所以,生產者應該只寫隊列,消費者應該只讀隊列,在這兩種狀況下,它們能夠對隊列執行全部操做。雖然你可能在本身的腦海中但願消費者只讀取,但另外一個開發人員可能會出現調用Enqueue,除了代碼審查以外,沒有什麼能夠阻止他們犯這個錯誤。.net

可是有了Channel,咱們能夠作不一樣的事情。線程

class Program
{
    static async Task Main(string[] args)
    {
        var myChannel = Channel.CreateUnbounded<int>();
        var producer = new MyProducer(myChannel.Writer);
        var consumer = new MyConsumer(myChannel.Reader);
    }
}

class MyProducer
{
    private readonly ChannelWriter<int> _channelWriter;

    public MyProducer(ChannelWriter<int> channelWriter)
    {
        _channelWriter = channelWriter;
    }
}

class MyConsumer
{
    private readonly ChannelReader<int> _channelReader;

    public MyConsumer(ChannelReader<int> channelReader)
    {
        _channelReader = channelReader;
    }
}

在這個例子中,我添加了一個main方法來向你展現如何建立writer/reader,但它很是簡單。這裏咱們能夠看到,對於咱們的生產者,我只傳遞給它一個ChannelWriter,因此它只能作寫操做。對於咱們的消費者,咱們傳遞給它一個ChannelReader,因此它只能讀取。code

固然,這並不意味着其餘開發人員不能修改代碼並開始注入根Channel對象,或者同時傳入ChannelWriter/ChannelReader,但這至少比以前的狀況要好得多。對象

完成一個Channelblog

咱們在前面看到,當在通道上調用ReadAsync()時,它實際上會在那裏等待消息,可是若是沒有更多的消息到來呢?對於.net中的其餘隊列,咱們一般須要傳遞某種共享的布爾值或一個CancellationToken。但有了Channel,就更容易了。隊列

考慮如下幾點:ip

static async Task Main(string[] args)
{
    var myChannel = Channel.CreateUnbounded<int>();

    _ = Task.Factory.StartNew(async () =>
    {
        for (int i = 0; i < 10; i++)
        {
            await myChannel.Writer.WriteAsync(i);
        }

        myChannel.Writer.Complete();
    });

    try
    {
        while (true)
        {
            var item = await myChannel.Reader.ReadAsync();
            Console.WriteLine(item);
            await Task.Delay(1000);
        }
    }catch(ChannelClosedException e)
    {
        Console.WriteLine("Channel was closed!");
    }
}

我讓第二個線程儘量快地寫入咱們的Channel,而後完成它。而後咱們的讀取器緩慢讀取,每次讀取之間有1秒的延遲。注意,咱們捕獲了ChannelClosedExecption,當你嘗試從關閉通道讀取最後消息以後時將調用它。

我只是想說清楚。在Channel上調用Complete()不會當即關閉通道並殺死讀取該通道的全部人。而是通知全部服務,一旦最後一條消息被讀取,咱們就完成了。這很重要,由於這意味着當咱們等待新條目時,當隊列是空的時,當隊列是滿的時,是否調用Complete()都可有可無。咱們能夠確定,咱們將完成全部可獲得的工做。

在Channel中使用IAsyncEnumerable

以咱們試圖關閉一個Channel爲例,有兩件事引發了個人注意。

咱們有一個while(true)循環。這並非真的那麼糟糕,但它有點礙眼。

爲了打破這個循環,並知道Channel已經完成,咱們必須捕獲異常並將其吞下。

使用命令「ReadAllAsync()」來解決這些問題,它返回一個IAsyncEnumerable。代碼看起來有點像這樣:

static async Task Main(string[] args)
{
    var myChannel = Channel.CreateUnbounded<int>();

    _ = Task.Factory.StartNew(async () =>
    {
        for (int i = 0; i < 10; i++)
        {
            await myChannel.Writer.WriteAsync(i);
        }

        myChannel.Writer.Complete();
    });

    await foreach(var item in myChannel.Reader.ReadAllAsync())
    {
        Console.WriteLine(item);
        await Task.Delay(1000);
    }
}

如今代碼讀起來好多了,而且去掉了捕獲異常的一些多餘的東西。由於咱們使用的是IAsyncEnumerable,因此咱們仍然能夠像之前那樣等待每一項,可是咱們再也不須要捕獲異常,由於當Channel完成時,它只是簡單地說沒有其餘東西了,而後循環退出。

一樣,這消除了在處理隊列時必須編寫的一些混亂代碼。之前你必須編寫某種無限循環,而如今它只是一個真正整潔的循環,能夠處理底層的全部東西。

接下來是什麼

到目前爲止,咱們一直在使用「無限的」通道。你可能已經猜到了,固然也能夠選擇使用BoundedChannel。查看本系列的下一部分,更好地理解這些東西。

 歡迎關注個人公衆號,若是你有喜歡的外文技術文章,能夠經過公衆號留言推薦給我。

 

原文連接:https://dotnetcoretutorials.com/2020/11/24/using-channels-in-net-core-part-2-advanced-channels/

相關文章
相關標籤/搜索