>>返回《C# 併發編程》html
異步封裝編程
public static void MyDownloadStringTaskAsyncRun() { WebClient client = new WebClient(); string res = client.MyDownloadStringTaskAsync(new Uri("http://www.baidu.com")).Result; System.Console.WriteLine(res); } public static Task<string> MyDownloadStringTaskAsync(this WebClient client, Uri address) { var tcs = new TaskCompletionSource<string>(); // 這個事件處理程序會完成 Task 對象,並自行註銷。 DownloadStringCompletedEventHandler handler = null; handler = (_, e) => { client.DownloadStringCompleted -= handler; if (e.Cancelled) tcs.TrySetCanceled(); else if (e.Error != null) tcs.TrySetException(e.Error); else tcs.TrySetResult(e.Result); }; // 登記事件,而後開始操做。 client.DownloadStringCompleted += handler; client.DownloadStringAsync(address); return tcs.Task; }
輸出:併發
<!DOCTYPE html><!--STATUS OK--> <html> ... ... </html>
public static void GetResponseAsyncRun() { WebRequest request = WebRequest.Create("http://www.baidu.com"); var response = request.MyGetResponseAsync().Result; System.Console.WriteLine($"WebResponse.ContentLength:{response.ContentLength}"); } public static Task<WebResponse> MyGetResponseAsync(this WebRequest client) { return Task<WebResponse>.Factory.FromAsync(client.BeginGetResponse, client.EndGetResponse, null); }
輸出:異步
WebResponse.ContentLength:14615
FromAsync
以前調用 BeginOperation
。FromAsync
,並讓用 BeginOperation
方法返回的 IAsyncOperation
做爲參數,這樣也是能夠的,可是 FromAsync
會採用效率較低的實現方式。await Task.Run(() => Parallel.ForEach(...));
async
經過使用 Task.Run
,全部的並行處理過程都推給了線程池。this
Task.Run
返回一個表明並行任務的 Task
對象線程
事件流中幾種可能關注的狀況:code
public delegate void HelloEventHandler(object sender, HelloEventArgs e); public class HelloEventArgs : EventArgs { public string Name { get; set; } public HelloEventArgs(string name) { Name = name; } public int SayHello() { System.Console.WriteLine(Name + " Hello."); return DateTime.Now.Millisecond; } } public static event HelloEventHandler HelloHandlerEvent; public static void FirstLastRun() { var task = Task.Run(() => { Thread.Sleep(500); HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("lilei")); HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("HanMeimei")); HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("Tom")); HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("Jerry")); }); var observable = Observable.FromEventPattern<HelloEventHandler, HelloEventArgs>( handler => (s, a) => handler.Invoke(s, a), handler => HelloHandlerEvent += handler, handler => HelloHandlerEvent -= handler) .Select(evt => evt.EventArgs.SayHello()).ObserveOn(Scheduler.Default) .Select(s => { // 複雜的計算過程。 Thread.Sleep(100); var result = s; Console.WriteLine("Now Millisecond result " + result + " on thread " + Environment.CurrentManagedThreadId); return result; }) .Take(3)//這個標識3個就結束了 ; var res = Task.Run(async () => await observable // //4個hello,3個result,res爲最後一個的結果 //.FirstAsync()//4個hello,1個result,res爲第一個的結果 //.LastAsync()//4個hello,3個result,res爲最後一個的結果 //.ToList()//4個hello,3個result,res爲3個的結果 ).Result; System.Console.WriteLine($"Res:{string.Join(',', res)},ResType:{res.GetType().Name}"); task.Wait(); }
輸出:server
lilei Hello. HanMeimei Hello. Tom Hello. Jerry Hello. Now Millisecond result 534 on thread 7 Now Millisecond result 544 on thread 7 Now Millisecond result 544 on thread 7 Res:544,ResType:Int32
在 await
調用 Observable 對象或 LastAsync
時,代碼(異步地)等待事件流完成,而後返 回最後一個元素。htm
cs IObservable<int> observable = ...; int lastElement = await observable.LastAsync(); // 或者 int lastElement = await observable;
使用 FirstAsync
可捕獲事件流中, FirstAsync
方法執行後的下一個事件。
await
訂閱事件流,而後在第一個事件到達後當即結束(並退訂):cs IObservable<int> observable = ...; int nextElement = await observable.FirstAsync();
使用 ToList
可捕獲事件流中的全部事件:
IObservable<int> observable = ...; IList<int> allElements = await observable.ToList();
任何異步操做都可看做一個知足如下條件之一的可觀察流:
ToObservable
和 StartAsync
都會當即啓動異步操做,而不會等待訂閱
若是要讓 observable 對象在接受訂閱後才啓動操做,可以使用 FromAsync
StartAsync
同樣,它也支持使用 CancellationToken
取消public static void AsyncObservableRun() { var client = new HttpClient(); IObservable<int> response1 = Task.Run(() => { System.Console.WriteLine("Run 1."); return 1; }).ToObservable();//直接執行 IObservable<int> response2 = Observable.StartAsync(token => Task.Run(() => { System.Console.WriteLine("Run 2."); return 2; }, token));//直接執行 IObservable<int> response3 = Observable.FromAsync(token => Task.Run(() => { System.Console.WriteLine("Run 3."); return 3; }, token));//訂閱後執行 var res = Task.Run(async () => await response1 //await response2 //await response3 ).Result; System.Console.WriteLine($"Res:{res}"); }
輸出(response1):
Run 1. Run 2. Res:1
輸出(response2):
Run 1. Run 2. Res:2
輸出(response1):
Run 1. Run 2. Run 3. Res:3
ToObservable
和 StartAsync
都返回一個 observable 對象,表示一個已經啓動的異步操做FromAsync
在每次被訂閱時都會啓動一個全新獨立的異步操做。下面的例子使用一個已有的 URL 事件流,在每一個 URL 到達時發出一個請求:
public static void SelectManyRun() { IObservable<int> nums = new int[] { 1, 2, 3 }.ToObservable(); IObservable<int> observable = nums.SelectMany((n, token) => Task.Run<int>(() => { System.Console.WriteLine($"Run {n}."); return n + 1; }, token)); var res = Task.Run(async () => await observable.LastAsync()).Result; System.Console.WriteLine($"Res:{res}"); }
輸出:
Run 1. Run 2. Run 3. Res:3
同一個項目中
如今須要它們能互相溝通。
網格轉可觀察流
public static void BlockToObservableRun() { var buffer = new BufferBlock<int>(); IObservable<int> integers = buffer.AsObservable(); integers.Subscribe( data => Console.WriteLine(data), ex => Console.WriteLine(ex), () => Console.WriteLine("Done")); buffer.Post(1); buffer.Post(2); buffer.Complete(); buffer.Completion.Wait(); }
輸出:
1 2
AsObservable
方法會把數據流塊的完成信息(或出錯信息)轉化爲可觀察流的完成信息。
AggregateException
對象中。可觀察流轉網格
public static void ObservableToBlockRun() { IObservable<DateTimeOffset> ticks = Observable.Interval(TimeSpan.FromSeconds(1)) .Timestamp() .Select(x => x.Timestamp) .Take(5); var display = new ActionBlock<DateTimeOffset>(x => Console.WriteLine(x)); ticks.Subscribe(display.AsObserver()); try { display.Completion.Wait(); Console.WriteLine("Done."); } catch (Exception ex) { Console.WriteLine(ex); } }
輸出:
2020/2/1 上午1:42:24 +00:00 2020/2/1 上午1:42:25 +00:00 2020/2/1 上午1:42:26 +00:00 2020/2/1 上午1:42:27 +00:00 2020/2/1 上午1:42:28 +00:00 Done.