>>返回《C# 併發編程》html
要在應用中安裝一個 NuGet 包 System.Reactive
。編程
LINQ to events
(基於 IObservable<T>
)public static void ProgressRun() { var progress = new Progress<int>(); var progressReports = Observable.FromEventPattern<int>( handler => progress.ProgressChanged += handler, handler => progress.ProgressChanged -= handler) //.Where(u => u.EventArgs % 2 == 0) ; progressReports.Subscribe(data => Console.WriteLine($"OnNext:{data.EventArgs},ThreadId:{Thread.CurrentThread.ManagedThreadId}.")); Reports(progress); } private static void Reports(IProgress<int> progress) { System.Console.WriteLine($"Reporting ThreadId:{Thread.CurrentThread.ManagedThreadId}."); for (int i = 0; i < 10; i++) { progress.Report(i); } System.Console.WriteLine($"Reported ThreadId:{Thread.CurrentThread.ManagedThreadId}."); }
輸出:併發
Reporting ThreadId:1. Reported ThreadId:1. OnNext:5,ThreadId:9. OnNext:0,ThreadId:4. OnNext:6,ThreadId:10. OnNext:1,ThreadId:5. OnNext:2,ThreadId:6. OnNext:4,ThreadId:8. OnNext:3,ThreadId:7. OnNext:7,ThreadId:11. OnNext:9,ThreadId:13. OnNext:8,ThreadId:12.
public static void TimerRun() { var timer = new System.Timers.Timer(interval: 300) { Enabled = true }; var ticks = Observable.FromEventPattern<ElapsedEventHandler, ElapsedEventArgs>( handler => (s, a) => handler(s, a), handler => timer.Elapsed += handler, handler => timer.Elapsed -= handler); ticks.Subscribe(data => Console.WriteLine($"OnNext:{data.EventArgs.SignalTime.Millisecond}, ThreadId:{Thread.CurrentThread.ManagedThreadId}.")); System.Console.WriteLine($"Timer start ThreadId:{Thread.CurrentThread.ManagedThreadId}."); timer.Start(); Thread.Sleep(2000); timer.Stop(); System.Console.WriteLine($"Timer stop ThreadId:{Thread.CurrentThread.ManagedThreadId}."); }
輸出:ui
Timer start ThreadId:1. OnNext:473, ThreadId:4. OnNext:772, ThreadId:5. OnNext:73, ThreadId:5. OnNext:373, ThreadId:5. OnNext:673, ThreadId:5. OnNext:975, ThreadId:5. Timer stop ThreadId:1.
public static void ObErrorRun() { var tcs = new TaskCompletionSource<string>(); var client = new WebClient(); var downloadedStrings = Observable.FromEventPattern(client, "DownloadStringCompleted"); downloadedStrings.Subscribe( data => { var eventArgs = (DownloadStringCompletedEventArgs)data.EventArgs; if (eventArgs.Error != null) { Console.WriteLine("OnNext: (Error) " + eventArgs.Error.GetType()); } else { Console.WriteLine("OnNext: " + eventArgs.Result); } }, ex => Console.WriteLine("OnError: " + ex.GetType()), () => Console.WriteLine("OnCompleted")); client.DownloadStringAsync(new Uri("http://invalid.example.com/")); //client.DownloadStringAsync(new Uri("http://www.baidu.com/")); Thread.Sleep(3000); }
輸出:.net
OnNext: (Error) System.Net.WebException
把事件封裝進 Observable
對象後,每次引起該事件都會調用 OnNext
。在處理 AsyncCompletedEventArgs
時會發生使人奇怪的現象,全部的異常信息都是經過數據形式傳遞的(OnNext
),而不是經過錯誤傳遞(OnError
)。線程
如 UI 元素只能被它所屬的 UI 線程控制,所以,若是要根據 Rx 的通知來修改 UI,就應該把通知「轉移」到 UI 線程。code
ObserveOn
把通知移動到一個線程池線程,在那裏進行計算,而後再把表示結果的通知返回給 UI 線程public delegate void HelloEventHandler(object sender, HelloEventArgs e); public class HelloEventArgs : EventArgs { public string Name { get; set; } public HelloEventArgs(string name) { Name = name; } public int SayHello() { System.Console.WriteLine(Name + " Hello."); return DateTime.Now.Millisecond; } } public static event HelloEventHandler HelloHandlerEvent; public static void ObservableEventRun() { IDisposable ob = null; var task = Task.Run(() => { Thread.Sleep(500); HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("lilei")); HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("HanMeimei")); HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("Tom")); HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("Jerry")); Thread.Sleep(2000); ob?.Dispose(); HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("NoConsole")); // 因爲 }); // AsyncContext 好比就是 UI上下文 AsyncContext.Run(() => { var uiContext = SynchronizationContext.Current; Console.WriteLine("UI thread is " + Environment.CurrentManagedThreadId); //Observable.FromEvent() ob = Observable.FromEventPattern<HelloEventHandler, HelloEventArgs>( handler => (s, a) => handler.Invoke(s, a), handler => HelloHandlerEvent += handler, handler => HelloHandlerEvent -= handler) .Select(evt => evt.EventArgs.SayHello()).ObserveOn(Scheduler.Default) .Select(s => { // 複雜的計算過程。 Thread.Sleep(100); var result = s; Console.WriteLine("Now Millisecond result " + result + " on thread " + Environment.CurrentManagedThreadId); return result; }) .ObserveOn(uiContext) .Subscribe(s => Console.WriteLine("Subscribe Result " + s + " on thread " + Environment.CurrentManagedThreadId)); //此處不能 task.Wait(); ,會和 Subscribe 中的委託發生死鎖 System.Console.WriteLine("AsyncContext.Run Done on thread " + Environment.CurrentManagedThreadId); }); task.Wait(); }
輸出:htm
UI thread is 1 AsyncContext.Run Done on thread 1 lilei Hello. HanMeimei Hello. Tom Hello. Jerry Hello. Now Millisecond result 36 on thread 6 Subscribe Result 36 on thread 1 Now Millisecond result 44 on thread 6 Subscribe Result 44 on thread 1 Now Millisecond result 44 on thread 6 Subscribe Result 44 on thread 1 Now Millisecond result 44 on thread 6 Subscribe Result 44 on thread 1
下面的例子使用 Interval
,每秒建立 1 個 OnNext
通知,而後, 使用 Buffer
, 每 2 個通知作一次緩衝:對象
public static void BufferRun() { System.Console.WriteLine($"Buffer start ThreadId:{Thread.CurrentThread.ManagedThreadId}."); var ob = Observable.Interval(TimeSpan.FromMilliseconds(10)) .Buffer(2) .Subscribe(x => Console.WriteLine($"{DateTime.Now.Millisecond}: Got {string.Join(",", x)}({Thread.CurrentThread.ManagedThreadId})")); Thread.Sleep(100); ob.Dispose(); System.Console.WriteLine($"Buffer end ThreadId:{Thread.CurrentThread.ManagedThreadId}."); }
輸出:事件
Buffer start ThreadId:1. 459: Got 0,1(5) 478: Got 2,3(5) 498: Got 4,5(5) 516: Got 6,7(5) 536: Got 8,9(5) Buffer end ThreadId:1.
下面的例子有些相似,使用 Window
建立一些事件組,每組包含 2 個事件:
public static void WindowRun() { System.Console.WriteLine($"Window start ThreadId:{Thread.CurrentThread.ManagedThreadId}."); var ob = Observable.Interval(TimeSpan.FromMilliseconds(10)) .Window(2) .Subscribe(group => { Console.WriteLine($"{DateTime.Now.Millisecond}: Starting new group({Thread.CurrentThread.ManagedThreadId})"); group.Subscribe(x => Console.WriteLine($"{DateTime.Now.Millisecond}: Saw {x},(TID:{Thread.CurrentThread.ManagedThreadId})"), () => Console.WriteLine($"{DateTime.Now.Millisecond}: Ending group")); }); Thread.Sleep(100); ob.Dispose(); System.Console.WriteLine($"Window end ThreadId:{Thread.CurrentThread.ManagedThreadId}."); }
輸出:
Window start ThreadId:1. 959: Starting new group(1) 987: Saw 0,(TID:4) 991: Saw 1,(TID:4) 992: Ending group 994: Starting new group(4) 0: Saw 2,(TID:4) 11: Saw 3,(TID:4) 11: Ending group 11: Starting new group(4) 21: Saw 4,(TID:4) 30: Saw 5,(TID:4) 30: Ending group 30: Starting new group(4) 40: Saw 6,(TID:4) 50: Saw 7,(TID:4) 50: Ending group 51: Starting new group(4) 60: Saw 8,(TID:4) 70: Saw 9,(TID:4) 70: Ending group 70: Starting new group(4) Window end ThreadId:1.
這幾個例子說明了 Buffer
和 Window
的區別:
Buffer
等待組內的全部事件,而後把全部事件做爲一個集合發佈Window
用一樣的方法進行分組,但它是在每一個事件到達時就發佈下面的例子也是監視鼠標移動, 但使用了 Throttle
,在鼠標保持靜止 1 秒後才報告最近一條移動事件。
public delegate void MouseEventHandler(object sender, MouseEventArgs e); public class MouseEventArgs : EventArgs { public (int, int) XY { get; set; } public MouseEventArgs((int, int) xy) { XY = xy; } public (int, int) GetPosition() { return XY; } } public static event MouseEventHandler MouseMove; public static void ThrottleRun() { IDisposable ob = null; var task = Task.Run(() => { Thread.Sleep(200); //不觸發 MouseMoveProcess((1, 1)); MouseMoveProcess((1, 11)); MouseMoveProcess((1, 111)); MouseMoveProcess((1, 1111)); //觸發 MouseMoveProcess((2, 2), 2000); //超時結束 MouseMoveProcess((2, 22)); ob?.Dispose(); }); ob = Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>( handler => (s, a) => handler(s, a), handler => MouseMove += handler, handler => MouseMove -= handler) .Select(x => x.EventArgs.GetPosition()) .Throttle(TimeSpan.FromMilliseconds(500)) .Subscribe(x => Console.WriteLine($"{DateTime.Now.Millisecond}: Saw {x.Item1},{x.Item2}")); task.Wait(); } private static void MouseMoveProcess((int, int) xy, int sleepMillsecond = 200) { System.Console.WriteLine($"Mouse Move {xy.Item1},{xy.Item2},After sleep {sleepMillsecond}."); MouseMove?.Invoke(new object(), new MouseEventArgs(xy)); Thread.Sleep(sleepMillsecond); }
輸出:
Mouse Move 1,1,After sleep 200. Mouse Move 1,11,After sleep 200. Mouse Move 1,111,After sleep 200. Mouse Move 1,1111,After sleep 200. Mouse Move 2,2,After sleep 2000. 251: Saw 2,2 Mouse Move 2,22,After sleep 200.
Throttle
經常使用於相似「文本框自動填充」這樣的場合
爲抑制快速運動的事件序列, Sample 創建了一個有規律的超時時間段, 每一個時間段結束時,它就發佈該時間段內最後的一條數據。若是這個時間段沒有數據,就不發佈。
每隔一秒採樣一次
public static void SampleRun() { IDisposable ob = null; var task = Task.Run(() => { Thread.Sleep(200); //不觸發 MouseMoveProcess((1, 1)); MouseMoveProcess((1, 11)); MouseMoveProcess((1, 111)); MouseMoveProcess((1, 1111)); //觸發 MouseMoveProcess((2, 2), 2000); //超時結束 MouseMoveProcess((2, 22)); ob?.Dispose(); }); ob = Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>( handler => (s, a) => handler(s, a), handler => MouseMove += handler, handler => MouseMove -= handler) .Select(x => x.EventArgs.GetPosition()) .Sample(TimeSpan.FromMilliseconds(500)) .Subscribe(x => Console.WriteLine($"{DateTime.Now.Millisecond}: Saw {x.Item1},{x.Item2}")); task.Wait(); }
輸出:
Mouse Move 1,1,After sleep 200. Mouse Move 1,11,After sleep 200. 498: Saw 1,11 Mouse Move 1,111,After sleep 200. Mouse Move 1,1111,After sleep 200. Mouse Move 2,2,After sleep 2000. 991: Saw 2,2 Mouse Move 2,22,After sleep 200. 992: Saw 2,22
Throttle
和 Sample
操做符與 Where
基本差很少,惟一的區別是:
Throttle
、 Sample
根據時間段過濾Where
根據事件的數據過濾在抑制快速涌來的輸入流時,這三種操做符提供了三種不一樣的方法。
Timeout 操做符在輸入流上創建一個可調節的超時窗口。一旦新的事件到達,就重置超時窗口。若是超過時限後事件仍沒到達, Timeout 操做符就結束流,併產生一個包含TimeoutException 的 OnError 通知。
public static void TimeoutRun() { IDisposable ob = null; var task = Task.Run(() => { Thread.Sleep(200); //不觸發 MouseMoveProcess((1, 1)); MouseMoveProcess((1, 11)); MouseMoveProcess((1, 111)); MouseMoveProcess((1, 1111)); //觸發 MouseMoveProcess((2, 2), 1100); //超時結束 MouseMoveProcess((2, 22)); ob?.Dispose(); }); ob = Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>( handler => (s, a) => handler(s, a), handler => MouseMove += handler, handler => MouseMove -= handler) .Select(x => x.EventArgs.GetPosition()) .Timeout(TimeSpan.FromSeconds(1))//Subscribe後相對一秒超時(連續觸發則不會超時) .Subscribe( x => Console.WriteLine($"{DateTime.Now.Millisecond}: Saw {x.Item1},{x.Item2}"), ex => Console.WriteLine($"{DateTime.Now.Millisecond}: {ex.GetType().Name}"), // onCompleted 不會執行 () => System.Console.WriteLine($"{DateTime.Now.Millisecond}: Finished.") ); System.Console.WriteLine($"{DateTime.Now.Millisecond} Subscribe Done"); task.Wait(); }
輸出:
138 Subscribe Done Mouse Move 1,1,After sleep 200. 313: Saw 1,1 Mouse Move 1,11,After sleep 200. 517: Saw 1,11 Mouse Move 1,111,After sleep 200. 722: Saw 1,111 Mouse Move 1,1111,After sleep 200. 923: Saw 1,1111 Mouse Move 2,2,After sleep 1100. 124: Saw 2,2 139: TimeoutException Mouse Move 2,22,After sleep 200.
在超時以前觀察鼠標移動,超時發生後進行切換
public static event MouseEventHandler OtherMouseMove; public static void TimeoutMoveRun() { IDisposable ob = null; var task = Task.Run(() => { Thread.Sleep(200); //不觸發 MouseMoveProcess((1, 1), 400); MouseMoveProcess((1, 11), 0); //爲了觸發超時 Thread.Sleep(1100); System.Console.WriteLine("sleep: 1100"); //因爲超時,時間流被遷移到other,下面不會觸發 MouseMoveProcess((2, 2), 400); MouseMoveProcess((2, 22), 400); //other的事件,能夠觸發 OtherMouseMoveProcess((3, 3), 400); OtherMouseMoveProcess((3, 33), 400); ob?.Dispose(); }); var other = Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>( handler => (s, a) => handler(s, a), handler => OtherMouseMove += handler, handler => OtherMouseMove -= handler) .Select(x => x.EventArgs.GetPosition()); ob = Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>( handler => (s, a) => handler(s, a), handler => MouseMove += handler, handler => MouseMove -= handler) .Select(x => x.EventArgs.GetPosition()) .Timeout(TimeSpan.FromSeconds(1), other) .Subscribe( x => Console.WriteLine($"{DateTime.Now.Millisecond}: Saw {x.Item1},{x.Item2}"), ex => Console.WriteLine($"{DateTime.Now.Millisecond}: {ex.GetType().Name}")); System.Console.WriteLine($"{DateTime.Now.Millisecond} Subscribe Done"); task.Wait(); } private static void OtherMouseMoveProcess((int, int) xy, int sleepMillsecond = 200) { System.Console.WriteLine($"Other Mouse Move {xy.Item1},{xy.Item2},After sleep {sleepMillsecond}."); OtherMouseMove?.Invoke(new object(), new MouseEventArgs(xy)); Thread.Sleep(sleepMillsecond); }
輸出:
793 Subscribe Done Mouse Move 1,1,After sleep 400. 970: Saw 1,1 Mouse Move 1,11,After sleep 0. 373: Saw 1,11 sleep: 1100 Mouse Move 2,2,After sleep 400. Mouse Move 2,22,After sleep 400. Other Mouse Move 3,3,After sleep 400. 281: Saw 3,3 Other Mouse Move 3,33,After sleep 400. 684: Saw 3,33