1、反應式編程(Reactive Programming)html
一、什麼是反應式編程:反應式編程(Reactive programming)簡稱Rx,他是一個使用LINQ風格編寫基於觀察者模式的異步編程模型。簡單點說Rx = Observables + LINQ + Schedulers。python
二、爲何會產生這種風格的編程模型?我在本系列文章開始的時候說過一個使用事件的例子:react
var watch = new FileSystemWatcher(); watch.Created += (s, e) => { var fileType = Path.GetExtension(e.FullPath); if (fileType.ToLower() == "jpg") { //do some thing } };
這個代碼定義了一個FileSystemWatcher,而後在Watcher事件上註冊了一個匿名函數。事件的使用是一種命令式代碼風格,有沒有辦法寫出聲明性更強的代碼風格?咱們知道使用高階函數可讓代碼更具聲明性,整個LINQ擴展就是一個高階函數庫,常見的LINQ風格代碼以下:git
var list = Enumerable.Range(1, 10) .Where(x => x > 8) .Select(x => x.ToString()) .First();
可否使用這樣的風格來編寫事件呢?github
三、事件流
LINQ是對IEnumerable<T>的一系列擴展方法,咱們能夠簡單的將IEnumerable<T>認爲是一個集合。當咱們將事件放在一個時間範圍內,事件也變成了集合。咱們能夠將這個事件集合理解爲事件流。編程
事件流的出現給了咱們一個可以對事件進行LINQ操做的靈感。bash
2、反應式編程中的兩個重要類型多線程
事件模型從本質上來講是觀察者模式,因此IObservable<T>和IObserver<T>也是該模型的重頭戲。讓咱們來看看這兩個接口的定義:架構
public interface IObservable<out T> { //Notifies the provider that an observer is to receive notifications. IDisposable Subscribe(IObserver<T> observer); }
public interface IObserver<in T> { //Notifies the observer that the provider has finished sending push-based notifications. void OnCompleted(); //Notifies the observer that the provider has experienced an error condition. void OnError(Exception error); //Provides the observer with new data. void OnNext(T value); }
這兩個名稱準確的反應出了它兩的職責:IObservable<T>-可觀察的事物,IObserver<T>-觀察者。框架
IObservable<T>只有一個方法Subscribe(IObserver<T> observer),此方法用來對事件流注冊一個觀察者。
IObserver<T>有三個回調方法。當事件流中有新的事件產生的時候會回調OnNext(T value),觀察者會獲得事件中的數據。OnCompleted()和OnError(Exception error)則分別用來通知觀察者事件流已結束,事件流發生錯誤。
顯然事件流是可觀察的事物,咱們用Rx改寫上面的例子:
Observable.FromEventPattern<FileSystemEventArgs>(watch, "Created") .Where(e => Path.GetExtension(e.EventArgs.FullPath).ToLower() == "jpg") .Subscribe(e => { //do some thing });
注:在.net下使用Rx編程須要安裝如下Nuget組件:
Install-Package Rx-main
3、UI編程中使用Rx
Rx模型不但使得代碼更加具備聲明性,Rx還能夠用在UI編程中。
一、UI編程中的第一段Rx代碼
爲了簡單的展現如何在UI編程中使用Rx,咱們以Winform中的Button爲例,看看事件模型和Rx有何不一樣。
private void BindFirstGroupButtons() { btnFirstEventMode.Click += btnFirstEventMode_Click; } void btnFirstEventMode_Click(object sender, EventArgs e) { MessageBox.Show("hello world"); }
添加了一個Button,點擊Button的時候彈出一個對話框。使用Rx作一樣的實現:
//獲得了Button的Click事件流。 var clickedStream = Observable.FromEventPattern<EventArgs>(btnFirstReactiveMode, "Click"); //在事件流上註冊了一個觀察者。 clickedStream.Subscribe(e => MessageBox.Show("Hello world"));
有朋友指出字符串「Click」很是讓人不爽,這確實是個問題。因爲Click是一個event類型,沒法用表達式樹獲取其名稱,最終我想到使用擴展方法來實現:
public static IObservable<EventPattern<EventArgs>> FromClickEventPattern(this Button button) { return Observable.FromEventPattern<EventArgs>(button, "Click"); } public static IObservable<EventPattern<EventArgs>> FromDoubleClickEventPattern(this Button button) { return Observable.FromEventPattern<EventArgs>(button, "DoubleClick"); }
咱們平時經常使用的事件類型也就那麼幾個,能夠暫時經過這種方案來實現,該方案算不上完美,可是比起直接使用字符串又能優雅很多。
btnFirstReactiveMode.FromClickEventPattern() .Subscribe(e => MessageBox.Show("hello world"));
二、UI編程中存在一個很常見的場景:當一個事件的註冊者阻塞了線程時,整個界面都處於假死狀態。.net中的異步模型也從APM,EAP,TPL不斷演化直至async/await模型的出現才使得異步編程更加簡單易用。咱們來看看界面假死的代碼:
void btnSecondEventMode_Click(object sender, EventArgs e) { btnSecondEventMode.BackColor = Color.Coral; Thread.Sleep(2000); lblMessage.Text = "event mode"; }
Thread.Sleep(2000);模擬了一個長時間的操做,當你點下Button時整個界面處於假死狀態而且此時的程序沒法響應其餘的界面事件。傳統的解決方案是使用多線程來解決假死:
BtnSecondEventAsyncModel.BackColor = Color.Coral; Task.Run(() => { Thread.Sleep(2000); Action showMessage = () => lblMessage.Text = "async event mode"; lblMessage.Invoke(showMessage); });
這個代碼的複雜點在於:普通的多線程沒法對UI進行操做,在Winform中須要用Control.BeginInvoke(Action action)通過包裝後,多線程中的UI操做才能正確執行,WPF則要使用Dispatcher.BeginInvoke(Action action)包裝。
Rx方案:
btnSecondReactiveMode.FromClickEventPattern() .Subscribe(e => { Observable.Start(() => { btnSecondReactiveMode.BackColor = Color.Coral; Thread.Sleep(2000); return "reactive mode"; }) .SubscribeOn(ThreadPoolScheduler.Instance) .ObserveOn(this) .Subscribe(x => { lblMessage.Text = x; }); });
一句SubscribeOn(ThreadPoolScheduler.Instance)將費時的操做跑在了新線程中,ObserveOn(this)讓後面的觀察者跑在了UI線程中。
注:使用ObserveOn(this)須要使用Rx-WinForms
Install-Package Rx-WinForms
這個例子雖然成功了,可是並無比BeginInvoke(Action action)的方案有明顯的進步之處。在一個事件流中再次使用Ovservable.Start()開啓新的觀察者讓人更加摸不着頭腦。這並非Rx的問題,而是事件模型在UI編程中存在侷限性:不方便使用異步,不具有可測試性等。以XMAL和MVVM爲核心的UI編程模型將在將來處於主導地位,因爲在MVVM中能夠將UI綁定到一個Command,從而解耦了事件模型。
開源項目ReactiveUI提供了一個以Rx基礎的UI編程方案,可使用在XMAL和MVVM爲核心的UI編程中,例如:Xamarin,WFP,Windows Phone8等開發中。
注:在WPF中使用ObserveOn()須要安裝Rx-WPF
Install-Package Rx-WPF
三、再來一個例子,讓咱們感覺一下Rx的魅力
界面上有兩個Button分別爲+和-操做,點擊+按鈕則+1,點擊-按鈕則-1,最終的結果顯示在一個Label中。
這樣的一個需求使用經典事件模型只須要維護一個內部變量,兩個按鈕的Click事件分別對變量作加1或減1的操做便可。
Rx做爲一種函數式編程模型講求immutable-不可變性,即不使用變量來維護內部狀態。
var increasedEventStream = btnIncreasement.FromClickEventPattern() .Select(_ => 1); var decreasedEventStream = btnDecrement.FromClickEventPattern() .Select(_ => -1); increasedEventStream.Merge(decreasedEventStream) .Scan(0, (result, s) => result + s) .Subscribe(x => lblResult.Text = x.ToString());
這個例子使用了IObservable<T>的」謂詞」來對事件流作了一些操做。
下面就讓咱們來看看IObservable<T>中經常使用的「謂詞」
4、IObservable<T>中的謂詞
IObservable<T>的靈感來源於LINQ,因此不少操做也跟LINQ中的操做差很少,例如Where、First、Last、Single、Max、Any。
還有一些「謂詞」則是新出現的,例如上面提到的」Merge」、「Scan」等,爲了理解這些「謂詞」的含義,咱們請出一個神器RxSandbox。
一、Merge操做,從下面的圖中咱們能夠清晰的看出Merge操做將三個事件流中的事件合併在了同一個時間軸上。
二、Where操做則是根據指定的條件篩選出事件。
有了這個工具咱們能夠更加方便的瞭解這些「謂詞」的用途。
5、IObservable<T>的建立
Observable類提供了不少靜態方法用來建立IObservable<T>,以前的例子咱們都使用FromEventPattern方法來將事件轉化爲IObservable<T>,接下來再看看別的方法。
Return能夠建立一個具體的IObservable<T>:
public static void UsingReturn() { var greeting = Observable.Return("Hello world"); greeting.Subscribe(Console.WriteLine); }
Create也能夠建立一個IObservable<T>,而且擁有更加豐富的重載:
public static void UsingCreate() { var greeting = Observable.Create<string>(observer => { observer.OnNext("Hello world"); return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed")); }); greeting.Subscribe(Console.WriteLine); }
Range方法能夠產生一個指定範圍內的IObservable<T>
Observable.Range(1, 10) .Subscribe(x => Console.WriteLine(x.ToString()));
Generate方法是一個摺疊操做的逆向操做,又稱Unfold方法:
public static void UsingGenerate() { var range = Observable.Generate(0, x => x < 10, x => x + 1, x => x); range.Subscribe(Console.WriteLine); }
Interval方法能夠每隔必定時間產生一個IObservable<T>:
Observable.Interval(TimeSpan.FromSeconds(1)) .Subscribe(x => Console.WriteLine(x.ToString()));
Subscribe方法有一個重載,能夠分別對Observable發生異常和Observable完成定義一個回調函數。
Observable.Range(1, 10) .Subscribe(x => Console.WriteLine(x.ToString()), e => Console.WriteLine("Error" + e.Message), () => Console.WriteLine("Completed"));
還能夠將IEnumerable<T>轉化爲IObservable<T>類型:
Enumerable.Range(1, 10).ToObservable() .Subscribe(x => Console.WriteLine(x.ToString()));
也能夠將IObservable<T>轉化爲IEnumerable<T>
var list= Observable.Range(1, 10).ToEnumerable();
6、Scheduler
Rx的核心是觀察者模式和異步,Scheduler正是爲異步而生。咱們在以前的例子中已經接觸過一些具體的Scheduler了,那麼他們都具體是作什麼的呢?
一、先看下面的代碼:
public static void UsingScheduler() { Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId); var source = Observable.Create<int>( o => { Console.WriteLine("Invoked on threadId:{0}", Thread.CurrentThread.ManagedThreadId); o.OnNext(1); o.OnNext(2); o.OnNext(3); o.OnCompleted(); Console.WriteLine("Finished on threadId:{0}",Thread.CurrentThread.ManagedThreadId); return Disposable.Empty; }); source //.SubscribeOn(NewThreadScheduler.Default) //.SubscribeOn(ThreadPoolScheduler.Instance) .Subscribe( o => Console.WriteLine("Received {1} on threadId:{0}",Thread.CurrentThread.ManagedThreadId,o), () => Console.WriteLine("OnCompleted on threadId:{0}",Thread.CurrentThread.ManagedThreadId)); Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId); }
當咱們不使用任何Scheduler的時候,整個Rx的觀察者和主題都跑在主線程中,也就是說並無異步執行。正以下面的截圖,全部的操做都跑在threadId=1的線程中。
當咱們使用SubscribeOn(NewThreadScheduler.Default)或者SubscribeOn(ThreadPoolScheduler.Instance)的時候,觀察者和主題都跑在了theadId=3的線程中。
這兩個Scheduler的區別在於:NewThreadScheduler用於執行一個長時間的操做,ThreadPoolScheduler用來執行短期的操做。
二、SubscribeOn和ObserveOn的區別
上面的例子僅僅展現了SubscribeOn()方法,Rx中還有一個ObserveOn()方法。stackoverflow上有一個這樣的問題:What's the difference between SubscribeOn and ObserveOn,其中一個簡單的例子很好的詮釋了這個區別。
public static void DifferenceBetweenSubscribeOnAndObserveOn() { Thread.CurrentThread.Name = "Main"; IScheduler thread1 = new NewThreadScheduler(x => new Thread(x) { Name = "Thread1" }); IScheduler thread2 = new NewThreadScheduler(x => new Thread(x) { Name = "Thread2" }); Observable.Create<int>(o => { Console.WriteLine("Subscribing on " + Thread.CurrentThread.Name); o.OnNext(1); return Disposable.Create(() => { }); }) //.SubscribeOn(thread1) //.ObserveOn(thread2) .Subscribe(x => Console.WriteLine("Observing '" + x + "' on " + Thread.CurrentThread.Name)); }
當咱們註釋掉:SubscribeOn(thread1)和ObserveOn(thread2)時的結果以下:
觀察者和主題都跑在name爲Main的thread中。
當咱們放開SubscribeOn(thread1):
主題和觀察者都跑在了name爲Thread1的線程中
當咱們註釋掉:SubscribeOn(thread1),放開ObserveOn(thread2)時的結果以下:
主題跑在name爲Main的主線程中,觀察者跑在了name=Thread2的線程中。
當咱們同時放開SubscribeOn(thread1)和ObserveOn(thread2)時的結果以下:
主題跑在name爲Thread1的線程中,觀察者跑在了name爲Thread2的線程中。
至此結論應該很是清晰了:SubscribeOn()和ObserveOn()分別控制着主題和觀察者的異步。
7、其餘Rx資源
除了.net中的Rx.net,其餘語言也紛紛推出了本身的Rx框架。
參考資源:
http://rxwiki.wikidot.com/101samples
http://introtorx.com/Content/v1.0.10621.0/01_WhyRx.html#WhyRx
http://www.codeproject.com/Articles/646361/Reactive-Programming-For-NET-And-Csharp-Developers