生產者和消費者隊列, 生產者有多個, 消費者也有多個, 生產到消費須要異步.web
建立兩個API, 一個Get(), 一個Set(), Get返回一個字符串, Set放入一個字符串, Get返回的就是Set進去的字符串.api
實現以下: 瀏覽器
[Route("api/[controller]/[action]")] public class FooController : Control { IMessageQueue _mq; public FooController(IMessageQueue mq) { _mq = mq; } [HttpGet] public string Get() { string str = _mq.ReadOne<string>(); return str; } [HttpGet] public void Set(string v) { _mq.WriteOne(v); } } public interface IMessageQueue { T ReadOne<T>(); void WriteOne<T>(T value); } public class MessageQueue: IMessageQueue { private object _value; public T ReadOne<T>() { return (T)_value; } public void WriteOne<T>(T value) { _value = value; } }
接着在StartUp中把IMessageQueue給注入了.安全
services.AddSingleton<IMessageQueue, MessageQueue>();
運行後, 先調用/api/foo/set/?v=xxx, 再調用/api/foo/get/異步
能夠看到成功返回了xxxasync
使set進去的值不會被下一個覆蓋, get取隊列最前的值分佈式
爲了線程安全, 這裏使用了ConcurrentQueue<T>post
代碼以下:測試
public class MessageQueue: IMessageQueue { private readonly ConcurrentQueue<object> _queue = new ConcurrentQueue<object>(); public T ReadOne<T>() { _queue.TryDequeue(out object str); return (T)str ; } public void WriteOne<T>(Tvalue) { _queue.Enqueue(value); } }
那麼此時, 只要get不斷地輪詢, 就能夠取到set生產出來的數據了.spa
調用/api/foo/set/
再增長需求, 調換get和set的順序,先get後set模擬異步, (我這裏的demo是個web-api會有http請求超時之類的...僞裝不存在)我想要get調用等待有數據時才返回.
也就是說我想要在瀏覽器地址欄輸入http://localhost:5000/api/foo/get/以後會不斷地轉圈直到我用set接口放入一個值
方案A: while(true), 根本無情簡直無敵, 死等Read() != null時break; 爲防單核滿轉加個Thread.Sleep();
方案B: Monitor, 一個Wait()一個Exit/Release();
可是以上兩個方案都是基於Thread的, .Net4.0以後伴隨ConcurrentQueue一塊兒來的還有個BlockingCollection<T>至關好用
方案C: 修改後代碼以下:
public class MessageQueue : IMessageQueue { private readonly BlockingCollection<object> _queue = new BlockingCollection<object>(new ConcurrentQueue<object>()); public T ReadOne<T>() { var obj = _queue.Take(); return (T)obj; } public void WriteOne<T>(T value) { _queue.Add(value); } }
此時, 若是先get, 會阻塞等待set; 若是已經有set過數據就會直接返回隊列中的數據. get不會無功而返了. 基於這個類型, 能夠實現更像樣的訂閱模型.
這裏的set是生產者, get是消費者, 那若是個人這個生產者並不單純產生數據返回void而是須要等待一個結果的呢? 此時訂閱模型不夠用了, 我須要一個異步的RPC .
好比有個Ask請求會攜帶參數發起請求, 並等待, 知道另外有個地方處理了這個任務產生結果, ask結束等待返回這個結果answer.
我能夠回頭繼續用方案A或B, 但連.net4.0都已通過去好久了, 因此應該用更好的基於Task的異步方案.
代碼以下, 首先新增兩個接口:
public interface IMessageQueue { void Respond<TRequest, TResponse>(Func<TRequest, TResponse> func); Task<TResponse> Rpc<TRequest, TResponse>(TRequest req); T ReadOne<T>(); void WriteOne<T>(T data); }
接着定義一個特殊的任務類:
public class RpcTask<TRequest, TResponse> { public TaskCompletionSource<TResponse> Tcs { get; set; } public TRequest Request { get; set; } }
實現剛纔新加的兩個接口:
public Task<TResponse> Rpc<TRequest, TResponse>(TRequest req) { TaskCompletionSource<TResponse> tcs = new TaskCompletionSource<TResponse>(); _queue.Add(new RpcTask<TRequest, TResponse> { Request = req, Tcs = tcs}); return tcs.Task; } public void Respond<TRequest, TResponse>(Func<TRequest, TResponse> func) { var obj = _queue.Take(); if(obj is RpcTask<TRequest, TResponse> t) { var response = func(t.Request); t.Tcs.SetResult(response); } }
一樣的, 寫兩個Web API接口, 一個請求等待結果 一個負責處理工做
[HttpGet] public async Task<string> Ask(string v) { var response = await _mq.Rpc<MyRequest, MyResponse>(new MyRequest { Id = v }); return $"[{response.DoneTime}] {response.Id}"; } [HttpGet] public void Answer() { _mq.Respond<MyRequest, MyResponse>((req)=> new MyResponse { Id = req.Id, DoneTime = DateTime.Now }); }
上面還隨便寫了兩個class做爲請求和返回
public class MyRequest { public string Id { get; set; } } public class MyResponse { public string Id { get; set; } public DateTime DoneTime { get; set; } }
測試一下, 用瀏覽器或postman打開三個選項卡, 各發起一個Ask接口的請求, 參數v分別爲1 2 3, 三個選項卡都開始轉圈等待
而後再打開一個選項卡訪問answer接口, 處理剛纔放進隊列的任務, 發起一次以前的三個選項卡之中就有一箇中止等待並顯示返回數據. 需求實現.
這裏用到的關鍵類型是TaskCompletionSource<T>.
若是是個分佈式系統, 請求和處理邏輯不是在一個程序裏呢? 那麼這個隊列可能也是一個單獨的服務. 此時就要再加個返回隊列了, 給隊列中傳輸的每個任務打上Id, 返回隊列中取出返回以後再找到Id對於的TCS.SetResult()