互操做

>>返回《C# 併發編程》html

異步封裝編程

1. 用 async 代碼封裝異步方法與 Completed 事件

public static void MyDownloadStringTaskAsyncRun()
{
    WebClient client = new WebClient();
    string res = client.MyDownloadStringTaskAsync(new Uri("http://www.baidu.com")).Result;
    System.Console.WriteLine(res);
}

public static Task<string> MyDownloadStringTaskAsync(this WebClient client, Uri address)
{
    var tcs = new TaskCompletionSource<string>();
    // 這個事件處理程序會完成 Task 對象,並自行註銷。
    DownloadStringCompletedEventHandler handler = null;
    handler = (_, e) =>
    {
        client.DownloadStringCompleted -= handler;
        if (e.Cancelled)
            tcs.TrySetCanceled();
        else if (e.Error != null)
            tcs.TrySetException(e.Error);
        else
            tcs.TrySetResult(e.Result);
    };
    // 登記事件,而後開始操做。
    client.DownloadStringCompleted += handler;
    client.DownloadStringAsync(address);
    return tcs.Task;
}

輸出:併發

<!DOCTYPE html><!--STATUS OK-->
<html>
... ...
</html>

2. 用 async 代碼封裝 Begin/End 方法

public static void GetResponseAsyncRun()
{
    WebRequest request = WebRequest.Create("http://www.baidu.com");
    var response = request.MyGetResponseAsync().Result;

    System.Console.WriteLine($"WebResponse.ContentLength:{response.ContentLength}");
}
public static Task<WebResponse> MyGetResponseAsync(this WebRequest client)
{
    return Task<WebResponse>.Factory.FromAsync(client.BeginGetResponse, client.EndGetResponse, null);
}

輸出:異步

WebResponse.ContentLength:14615
  • 建議: 要在調用 FromAsync 以前調用 BeginOperation
  • 調用 FromAsync ,並讓用 BeginOperation 方法返回的 IAsyncOperation 做爲參數,這樣也是能夠的,可是 FromAsync 會採用效率較低的實現方式。

3. 用 async 代碼封裝並行代碼

await Task.Run(() => Parallel.ForEach(...));async

經過使用 Task.Run ,全部的並行處理過程都推給了線程池this

Task.Run 返回一個表明並行任務的 Task 對象線程

  • UI 線程能夠(異步地)等待它完成(非阻塞)

4. 用 async 代碼封裝 Rx Observable 對象

事件流中幾種可能關注的狀況:code

  • 事件流結束前的最後一個事件;
  • 下一個事件;
  • 全部事件。
public delegate void HelloEventHandler(object sender, HelloEventArgs e);
public class HelloEventArgs : EventArgs
{
    public string Name { get; set; }
    public HelloEventArgs(string name)
    {
        Name = name;
    }
    public int SayHello()
    {
        System.Console.WriteLine(Name + " Hello.");
        return DateTime.Now.Millisecond;
    }
}

public static event HelloEventHandler HelloHandlerEvent;
public static void FirstLastRun()
{
    var task = Task.Run(() =>
    {
        Thread.Sleep(500);
        HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("lilei"));
        HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("HanMeimei"));
        HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("Tom"));
        HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("Jerry"));

    });

    var observable = Observable.FromEventPattern<HelloEventHandler, HelloEventArgs>(
     handler => (s, a) => handler.Invoke(s, a), handler => HelloHandlerEvent += handler, handler => HelloHandlerEvent -= handler)
     .Select(evt => evt.EventArgs.SayHello()).ObserveOn(Scheduler.Default)
     .Select(s =>
     {
        // 複雜的計算過程。
        Thread.Sleep(100);
        var result = s;
        Console.WriteLine("Now Millisecond result " + result + " on thread " + Environment.CurrentManagedThreadId);
        return result;
     })
     .Take(3)//這個標識3個就結束了
     ;

    var res =
        Task.Run(async () => await observable
    // //4個hello,3個result,res爲最後一個的結果
    //.FirstAsync()//4個hello,1個result,res爲第一個的結果
    //.LastAsync()//4個hello,3個result,res爲最後一個的結果
    //.ToList()//4個hello,3個result,res爲3個的結果
    ).Result;
    System.Console.WriteLine($"Res:{string.Join(',', res)},ResType:{res.GetType().Name}");

    task.Wait();
}

輸出:server

lilei Hello.
HanMeimei Hello.
Tom Hello.
Jerry Hello.
Now Millisecond result 534 on thread 7
Now Millisecond result 544 on thread 7
Now Millisecond result 544 on thread 7
Res:544,ResType:Int32

await 調用 Observable 對象或 LastAsync 時,代碼(異步地)等待事件流完成,而後返 回最後一個元素。htm

  • 在內部,await 實際是在訂閱事件流,完成後退訂
    cs IObservable<int> observable = ...; int lastElement = await observable.LastAsync(); // 或者 int lastElement = await observable;

使用 FirstAsync 可捕獲事件流中, FirstAsync 方法執行後的下一個事件。

  • 本例中 await 訂閱事件流,而後在第一個事件到達後當即結束(並退訂):
    cs IObservable<int> observable = ...; int nextElement = await observable.FirstAsync();

使用 ToList 可捕獲事件流中的全部事件:

IObservable<int> observable = ...;     
IList<int> allElements = await observable.ToList();

5. 用 Rx Observable 對象封裝 async 代碼

任何異步操做都可看做一個知足如下條件之一的可觀察流:

  • 生成一個元素後就完成;
  • 發生錯誤,不生成任何元素。

ToObservableStartAsync 都會當即啓動異步操做,而不會等待訂閱

  • 但以後訂閱呢,或等待執行完再訂閱呢,能獲得結果嗎
    • 能夠,後面例子中的「輸出」中有體現

若是要讓 observable 對象在接受訂閱後才啓動操做,可以使用 FromAsync

  • StartAsync 同樣,它也支持使用 CancellationToken取消
public static void AsyncObservableRun()
{
    var client = new HttpClient();

    IObservable<int> response1 = Task.Run(() => { System.Console.WriteLine("Run 1."); return 1; }).ToObservable();//直接執行

    IObservable<int> response2 = Observable.StartAsync(token => Task.Run(() => { System.Console.WriteLine("Run 2."); return 2; }, token));//直接執行

    IObservable<int> response3 = Observable.FromAsync(token => Task.Run(() => { System.Console.WriteLine("Run 3."); return 3; }, token));//訂閱後執行

    var res = Task.Run(async () =>
        await response1
        //await response2
        //await response3
    ).Result;
    System.Console.WriteLine($"Res:{res}");
}

輸出(response1):

Run 1.
Run 2.
Res:1

輸出(response2):

Run 1.
Run 2.
Res:2

輸出(response1):

Run 1.
Run 2.
Run 3.
Res:3
  • ToObservableStartAsync 都返回一個 observable 對象,表示一個已經啓動的異步操做
  • FromAsync 在每次被訂閱時都會啓動一個全新獨立的異步操做。

下面的例子使用一個已有的 URL 事件流,在每一個 URL 到達時發出一個請求:

public static void SelectManyRun()
{
    IObservable<int> nums = new int[] { 1, 2, 3 }.ToObservable();
    IObservable<int> observable = nums.SelectMany((n, token) => Task.Run<int>(() => { System.Console.WriteLine($"Run {n}."); return n + 1; }, token));

    var res = Task.Run(async () => await observable.LastAsync()).Result;

    System.Console.WriteLine($"Res:{res}");
}

輸出:

Run 1.
Run 2.
Run 3.
Res:3

6. Rx Observable 對象和數據流網格

同一個項目中

  • 一部分使用了 Rx Observable 對象
  • 一部分使用了數據流網格

如今須要它們能互相溝通。

網格轉可觀察流

public static void BlockToObservableRun()
{
    var buffer = new BufferBlock<int>();
    IObservable<int> integers = buffer.AsObservable();
    integers.Subscribe(
        data => Console.WriteLine(data),
        ex => Console.WriteLine(ex),
        () => Console.WriteLine("Done"));

    buffer.Post(1);
    buffer.Post(2);
    buffer.Complete();

    buffer.Completion.Wait();
}

輸出:

1
2

AsObservable 方法會把數據流塊的完成信息(或出錯信息)轉化可觀察流的完成信息。

  • 若是數據流塊出錯並拋出異常,這個異常信息在傳遞給可觀察流時,會被封裝在 AggregateException 對象中。

可觀察流轉網格

public static void ObservableToBlockRun()
{
    IObservable<DateTimeOffset> ticks = Observable.Interval(TimeSpan.FromSeconds(1))
        .Timestamp()
        .Select(x => x.Timestamp)
        .Take(5);

    var display = new ActionBlock<DateTimeOffset>(x => Console.WriteLine(x));
    ticks.Subscribe(display.AsObserver());

    try
    {
        display.Completion.Wait();
        Console.WriteLine("Done.");
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex);
    }
}

輸出:

2020/2/1 上午1:42:24 +00:00
2020/2/1 上午1:42:25 +00:00
2020/2/1 上午1:42:26 +00:00
2020/2/1 上午1:42:27 +00:00
2020/2/1 上午1:42:28 +00:00
Done.
  • 跟前面同樣,可觀察流的完成信息會轉化爲塊的完成信息
  • 可觀察流的錯誤信息會轉化爲 塊的錯誤信息。
相關文章
相關標籤/搜索