C#異步案例一則

場景

  生產者和消費者隊列, 生產者有多個, 消費者也有多個, 生產到消費須要異步.web

下面用一個Asp.NetCore Web-API項目來模擬

  建立兩個API, 一個Get(), 一個Set(), Get返回一個字符串, Set放入一個字符串, Get返回的就是Set進去的字符串.api

  實現以下:  瀏覽器

[Route("api/[controller]/[action]")]
public class FooController : Control
{
    IMessageQueue _mq;
    public FooController(IMessageQueue mq)
    {
        _mq = mq;
    }

    [HttpGet]
    public string Get()
    {
        string str = _mq.ReadOne<string>();
        return str;
    }

    [HttpGet]
    public void Set(string v)
    {
        _mq.WriteOne(v);
    }
}

public interface IMessageQueue
{
    T ReadOne<T>();
    void WriteOne<T>(T value);
}

public class MessageQueue: IMessageQueue
{
    private object _value;

    public T ReadOne<T>()
    {
        return (T)_value;
    }

    public void WriteOne<T>(T value)
    {
        _value = value;

    }
}

接着在StartUp中把IMessageQueue給注入了.安全

services.AddSingleton<IMessageQueue, MessageQueue>();

運行後, 先調用/api/foo/set/?v=xxx, 再調用/api/foo/get/異步

能夠看到成功返回了xxxasync

第二步, value字段改成隊列:

使set進去的值不會被下一個覆蓋, get取隊列最前的值分佈式

爲了線程安全, 這裏使用了ConcurrentQueue<T>post

代碼以下:測試

public class MessageQueue: IMessageQueue
{
    private readonly ConcurrentQueue<object> _queue = new ConcurrentQueue<object>();

    public T ReadOne<T>()
    {
        _queue.TryDequeue(out object str);
        return (T)str ;
    }

    public void WriteOne<T>(Tvalue)
    {
        _queue.Enqueue(value);
    }
}

那麼此時, 只要get不斷地輪詢, 就能夠取到set生產出來的數據了.spa

調用/api/foo/set/

三, 異步阻塞

再增長需求, 調換get和set的順序,先get後set模擬異步, (我這裏的demo是個web-api會有http請求超時之類的...僞裝不存在)我想要get調用等待有數據時才返回.

也就是說我想要在瀏覽器地址欄輸入http://localhost:5000/api/foo/get/以後會不斷地轉圈直到我用set接口放入一個值

方案A: while(true), 根本無情簡直無敵, 死等Read() != null時break; 爲防單核滿轉加個Thread.Sleep();

方案B: Monitor, 一個Wait()一個Exit/Release();

可是以上兩個方案都是基於Thread的, .Net4.0以後伴隨ConcurrentQueue一塊兒來的還有個BlockingCollection<T>至關好用

方案C: 修改後代碼以下:

public class MessageQueue : IMessageQueue
{
    private readonly BlockingCollection<object> _queue = new BlockingCollection<object>(new ConcurrentQueue<object>());

    public T ReadOne<T>()
    {
        var obj = _queue.Take();
        return (T)obj;
    }

    public void WriteOne<T>(T value)
    {
        _queue.Add(value);
    }
}

此時, 若是先get, 會阻塞等待set; 若是已經有set過數據就會直接返回隊列中的數據. get不會無功而返了. 基於這個類型, 能夠實現更像樣的訂閱模型.

擴展RPC

這裏的set是生產者, get是消費者, 那若是個人這個生產者並不單純產生數據返回void而是須要等待一個結果的呢? 此時訂閱模型不夠用了, 我須要一個異步的RPC .

好比有個Ask請求會攜帶參數發起請求, 並等待, 知道另外有個地方處理了這個任務產生結果, ask結束等待返回這個結果answer. 

我能夠回頭繼續用方案A或B, 但連.net4.0都已通過去好久了, 因此應該用更好的基於Task的異步方案.

代碼以下, 首先新增兩個接口:

public interface IMessageQueue
{
    void Respond<TRequest, TResponse>(Func<TRequest, TResponse> func); Task<TResponse> Rpc<TRequest, TResponse>(TRequest req);

    T ReadOne<T>();
    void WriteOne<T>(T data);
}

接着定義一個特殊的任務類:

public class RpcTask<TRequest, TResponse>
{
    public TaskCompletionSource<TResponse> Tcs { get; set; }
    public TRequest Request { get; set; }
}

實現剛纔新加的兩個接口:

public Task<TResponse> Rpc<TRequest, TResponse>(TRequest req)
{
    TaskCompletionSource<TResponse> tcs = new TaskCompletionSource<TResponse>();
    _queue.Add(new RpcTask<TRequest, TResponse> { Request = req, Tcs = tcs});
    return tcs.Task;
}

public void Respond<TRequest, TResponse>(Func<TRequest, TResponse> func)
{
    var obj = _queue.Take();
    if(obj is RpcTask<TRequest, TResponse> t)
    {
        var response = func(t.Request);
        t.Tcs.SetResult(response);
    }
}

一樣的, 寫兩個Web API接口, 一個請求等待結果 一個負責處理工做

[HttpGet]
public async Task<string> Ask(string v)
{
    var response = await _mq.Rpc<MyRequest, MyResponse>(new MyRequest { Id = v });
    return $"[{response.DoneTime}] {response.Id}";
}

[HttpGet]
public void Answer()
{
    _mq.Respond<MyRequest, MyResponse>((req)=> new MyResponse { Id = req.Id, DoneTime = DateTime.Now });
}

上面還隨便寫了兩個class做爲請求和返回

public class MyRequest
{
    public string Id { get; set; }
}
public class MyResponse
{
    public string Id { get; set; }
    public DateTime DoneTime { get; set; }
}

測試一下, 用瀏覽器或postman打開三個選項卡, 各發起一個Ask接口的請求, 參數v分別爲1 2 3, 三個選項卡都開始轉圈等待

而後再打開一個選項卡訪問answer接口, 處理剛纔放進隊列的任務, 發起一次以前的三個選項卡之中就有一箇中止等待並顯示返回數據. 需求實現.

這裏用到的關鍵類型是TaskCompletionSource<T>.

再擴展

若是是個分佈式系統, 請求和處理邏輯不是在一個程序裏呢? 那麼這個隊列可能也是一個單獨的服務. 此時就要再加個返回隊列了, 給隊列中傳輸的每個任務打上Id, 返回隊列中取出返回以後再找到Id對於的TCS.SetResult()

相關文章
相關標籤/搜索