The Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators.app
Reactive Extensions represents all these data sequences as observable sequences. An application can subscribe to these observable sequences to receive asynchronous notifications as new data arrive.async
IObservable<T>
/IObserver<T>
IObservable<T>
就是observable sequences 的抽象,就像Pull-based中的IEnumerable<T>
相似,用來承載數據,表明了一個能夠被觀察的數據源,他能夠在將sequences中的數據,推送給任何一個有興趣的listener,咱們用IObserver<T>
來表明這種對IObservable<T>
感興趣的listener。spa
一個IObservable<T>
的實現能夠被視爲一個T類型數據的集合, 例如IObservable<int>
能夠被視爲一個int類型數據的集合,他們會被推送給IObserver<T>
這樣的訂閱者。
咱們看一下接口的定義:code
public interface IObservable<out T>{ IDisposable Subscribe(IObserver<T> observer);}public interface IObserver<in T>{ void OnCompleted(); // Notifies the observer that the source has finished sending messages. void OnError(Exception error); // Notifies the observer about any exception or error. void OnNext(T value); // Pushes the next data value from the source to the observer.}
Rx還經過擴展的方式提供了一些訂閱更方便的方式,不須要實現本身的IObserver<T>
,只須要實現對應的訂閱事件(OnNext, OnError, OnComplete)對應的delegate方法便可,以下面的例子:server
IObservable<int> source = Observable.Range(1, 5); //creates an observable sequence of 5 integers, starting from 1IDisposable subscription = source.Subscribe( x => Console.WriteLine("OnNext: {0}", x), //prints out the value being pushed ex => Console.WriteLine("OnError: {0}", ex.Message), () => Console.WriteLine("OnCompleted"));
上面這只是兩個例子,在實際的開發應用中,我沒幾乎沒有必要本身去實現這兩個接口,Rx經過Observable
和Observer
爲咱們提供了豐富的足夠的實現,後面將繼續說明。接口