響應式編程知多少 | Rx.NET 瞭解下

1. 引言

An API for asynchronous programming with observable streams.
ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming.
ReactiveX 使用可觀察數據流進行異步編程的API。
ReactiveX結合了觀察者模式、迭代器模式和函數式編程的精華
html

關於Reactive(本文統一譯做響應式),有一個The Reactive Manifesto【響應式宣言】:響應式系統(Reactive System)具有如下特質:即時響應性(Responsive)、回彈性(Resilient)、彈性(Elastic)以及消息驅動(Message Driven)。react

很顯然開發一個響應式系統,並不簡單。
那本文就來說一講如何基於Rx.NET進行響應式編程,進而開發更加靈活、鬆耦合、可伸縮的響應式系統。git

2. 編程範式

在開始以前呢,咱們有必要了解下幾種編程範式:命令式編程、聲明式編程、函數式編程和響應式編程。github

命令式編程:命令式編程的主要思想是關注計算機執行的步驟,即一步一步告訴計算機先作什麼再作什麼。數據庫

//1. 聲明變量
List<int> results = new List<int>();
//2. 循環變量
foreach(var num in Enumerable.Range(1,10))
{
    //3. 添加條件
    if (num > 5)
    {  
        //4. 添加處理邏輯
        results.Add(num);
        Console.WriteLine(num);
    }
}

聲明式編程:聲明式編程是以數據結構的形式來表達程序執行的邏輯。它的主要思想是告訴計算機應該作什麼,但不指定具體要怎麼作。編程

var nums = from num in Enumerable.Range(1,10) where num > 5 select num

函數式編程:主要思想是把運算過程儘可能寫成一系列嵌套的函數調用。api

Enumerable.Range(1, 10).Where(num => num > 5).ToList().ForEach(Console.WriteLine);

響應式編程:響應式編程是一種面向數據流和變化傳播的編程範式,旨在簡化事件驅動應用的實現。響應式編程專一於如何建立依賴於變動的數據流並對變化作出響應。緩存

IObservable<int> nums = Enumerable.Range(1, 10).ToObservable();

IDisposable subscription = nums.Where(num => num > 5).Subscribe(Console.WriteLine);

subscription.Dispose();

3. Hello Rx.NET

從一個簡單的Demo開始。
假設咱們如今模擬電熱壺燒水,實時輸出當前水溫,通常咱們會這樣作:安全

Enumerable.Range(1, 100).ToList().ForEach(Console.WriteLine);
// do something else. 阻塞

假設當前程序是智能家居的中控設備,不只控制電熱壺燒水,還控制其餘設備,爲了不阻塞主線程。通常咱們會建立一個Thread或Task去作。服務器

Task.Run(() => Enumerable.Range(1, 100).ToList().ForEach(Console.WriteLine));
// do something else. 非阻塞

假設如今咱們不只要在控制檯輸出並且還要實時經過揚聲器報警。這時咱們應該想到委託和事件。

class Heater
{
    private delegate void TemperatureChanged(int temperature);
    private event TemperatureChanged TemperatureChangedEvent;
    public void BoilWater()
    {
        TemperatureChangedEvent += ShowTemperature;
        TemperatureChangedEvent += MakeAlerm;
        Task.Run(
            () =>
        Enumerable.Range(1, 100).ToList().ForEach((temperature) => TemperatureChangedEvent(temperature))
        );
    }
    private void ShowTemperature(int temperature)
    {
        Console.WriteLine($"當前溫度:{temperature}");
    }
    private void MakeAlerm(int temperature)
    {
        Console.WriteLine($"嘟嘟嘟,當前水溫{temperature}");
    }
}
class Program
{
    static void Main(string[] args)
    {
        Heater heater = new Heater();        
        heater.BoilWater();
    }
}

瞬間代碼量就上去了。可是藉助Rx.NET,咱們能夠簡化成如下代碼:

var observable = Enumerable.Range(1, 100).ToObservable(NewTheadScheduler.Default);//申明可觀察序列
Subject<int> subject = new Subject<int>();//申明Subject
subject.Subscribe((temperature) => Console.WriteLine($"當前溫度:{temperature}"));//訂閱subject
subject.Subscribe((temperature) => Console.WriteLine($"嘟嘟嘟,當前水溫:{temperature}"));//訂閱subject
observable.Subscribe(subject);//訂閱observable

僅僅經過如下三步:

  1. 調用ToObservable將枚舉序列轉換爲可觀察序列。
  2. 經過指定NewTheadScheduler.Default來指定在單獨的線程進行枚舉。
  3. 調用Subscribe方法進行事件註冊。
  4. 藉助Subject進行多播傳輸

經過以上咱們能夠看到Rx.NET大大簡化了事件處理的步驟,而這只是Rx的冰山一角。

4. Rx.NET 核心

Reactive Extensions(Rx)是一個爲.NET應用提供響應式編程模型的庫,用來構建異步基於事件流的應用,經過安裝System.ReactiveNuget包進行引用。Rx將事件流抽象爲Observable sequences(可觀察序列)表示異步數據流,使用LINQ運算符查詢異步數據流,並使用Scheduler來控制異步數據流中的併發性。簡單地說:Rx = Observables + LINQ + Schedulers。

Rx layer

在軟件系統中,事件是一種消息用於指示發生了某些事情。事件由Event Source(事件源)引起並由Event Handler(事件處理程序)使用。
在Rx中,事件源能夠由observable表示,事件處理程序能夠由observer表示。
可是應用程序使用的數據如何表示呢,例如數據庫中的數據或從Web服務器獲取的數據。而在應用程序中咱們通常處理的數據無外乎兩種:靜態數據和動態數據。 但不管使用何種類型的數據,其均可以做爲流來觀察。換句話說,數據流自己也是可觀察的。也就意味着,咱們也能夠用observable來表示數據流。

Everything is stream

講到這裏,Rx.NET的核心也就一目瞭然了:

  1. 一切皆爲數據流
  2. Observable 是對數據流的抽象
  3. Observer是對Observable的響應

在Rx中,分別使用IObservable<T>IObserver<T>接口來表示可觀察序列和觀察者。它們預置在system命名空間下,其定義以下:

public interface IObservable<out T>
{
      //Notifies the provider that an observer is to receive notifications.
      IDisposable Subscribe(IObserver<T> observer);
}

public interface IObserver<in T>
{
    //Notifies the observer that the provider has finished sending push-based notifications.
    void OnCompleted();
 
    //Notifies the observer that the provider has experienced an error condition.
    void OnError(Exception error);
    
    //Provides the observer with new data.
    void OnNext(T value);
}

5. 建立IObservable

建立IObservable<T>主要有如下幾種方式:
1. 直接實現IObservable<T>接口
2. 使用Observable.Create建立

Observable.Create<int>(observer=>{
    for (int i = 0; i < 5; i++)
    {
        observer.OnNext(i);
    }
    observer.OnCompleted();
    return Disposable.Empty;
})

3. 使用Observable.Deffer進行延遲建立(當有觀察者訂閱時才建立)
好比要鏈接數據庫進行查詢,若是沒有觀察者,那麼數據庫鏈接會一直被佔用,這樣會形成資源浪費。使用Deffer能夠解決這個問題。

Observable.Defer(() =>
{
    var connection = Connect(user, password);
    return connection.ToObservable();
});

4. 使用Observable.Generate建立迭代類型的可觀察序列

IObservable<int> observable =
    Observable.Generate(
        0,              //initial state
        i => i < 10,    //condition (false means terminate)
        i => i + 1,     //next iteration step
        i => i * 2);      //the value in each iteration

5. 使用Observable.Range建立指定區間的可觀察序列

IObservable<int> observable = Observable.Range (0, 10).Select (i => i * 2);

6. 建立特殊用途的可觀察序列

Observable.Return ("Hello World");//建立單個元素的可觀察序列
Observable.Never<string> ();//建立一個空的永遠不會結束的可觀察序列
Observable.Throw<ApplicationException> (
new ApplicationException ("something bad happened"))//建立一個拋出指定異常的可觀察序列
Observable.Empty<string> ()//建立一個空的當即結束的可觀察序列

7. 使用ToObservable轉換IEnumerate和Task類型

Enumerable.Range(1, 10).ToObservable();
IObservable<IEnumerable<string>> resultsA = searchEngineA.SearchAsync(term).ToObservable();

8. 使用Observable.FromEventPattern<T>Observable.FromEvent<TDelegate, TEventArgs>進行事件的轉換

public delegate void RoutedEventHandler(object sender,
 System.Windows.RoutedEventArgs e)
IObservable<EventPattern<RoutedEventArgs>> clicks =
                Observable.FromEventPattern<RoutedEventHandler, RoutedEventArgs>(
                    h => theButton.Click += h,
                    h => theButton.Click -= h);
clicks.Subscribe(eventPattern => output.Text += "button clicked" + Environment.NewLine);

9. 使用Observable.Using進行資源釋放

IObservable<string> lines =
    Observable.Using (
        () => File.OpenText ("TextFile.txt"), // opens the file and returns the stream we work with
        stream =>
        Observable.Generate (
            stream, //initial state
            s => !s.EndOfStream, //we continue until we reach the end of the file
            s => s, //the stream is our state, it holds the position in the file 
            s => s.ReadLine ()) //each iteration will emit the current line (and moves to the next)
    );

10. 使用Observable.Interval建立指定間隔可觀察序列

11. 使用Observable.Timer建立可觀察的計時器

6. RX 操做符

建立完IObservable 後,咱們能夠對其應用系列Linq操做符,對其進行查詢、過濾、聚合等等。Rx內置瞭如下系列操做符:
Rx 操做符
下面經過圖示來解釋經常使用操做符的做用:
操做符解釋

7. 多播傳輸靠:Subject

基於以上示例,咱們瞭解到,藉助Rx能夠簡化事件模型的實現,而其實質上就是對觀察者模式的擴展。提到觀察者模式,咱們知道一個Subject能夠被多個觀察者訂閱,從而完成消息的多播。一樣,在Rx中,也引入了Subject用於多播消息傳輸,不過Rx中的Subject具備雙重身份——便是觀察者也是被觀察者。

interface ISubject<in TSource, out TResult> : IObserver<TSource>,IObservable<TResult>
{
}

Rx中默認提供瞭如下四種實現:

  • Subject - 向全部觀察者廣播每一個通知

  • AsyncSubject - 當可觀察序列完成後有且僅發送一個通知

  • ReplaySubject - 緩存指定通知以對後續訂閱的觀察者進行重放

  • BehaviorSubject - 推送默認值或最新值給觀察者

但對於第一種Subject<T>有一點須要指出,當其有多個觀察者序列時,一旦其中一箇中止發送消息,則Subject就中止廣播全部其餘序列後續發送的任何消息。

8. 有溫度的可觀察者序列

對於Observable,它們是有溫度的,有冷熱之分。它們的區別以下圖所示:
冷熱觀察者序列的區別

Cold Observable:有且僅當有觀察者訂閱時才發送通知,且每一個觀察者獨享一份完整的觀察者序列。
Hot Observable:無論有無觀察者訂閱都會發送通知,且全部觀察者共享同一份觀察者序列。

9. 一切皆在掌控:Scheduler

在Rx中,使用Scheduler來控制併發。而對於Scheduler咱們能夠理解爲程序調度,經過Scheduler來規定在什麼時間什麼地點執行什麼事情。Rx提供瞭如下幾種Scheduler:

  1. NewThreadScheduler:即在新線程上執行
  2. ThreadPoolScheduler:即在線程池中執行
  3. TaskPoolScheduler:與ThreadPoolScheduler相似
  4. CurrentThreadScheduler:在當前線程執行
  5. ImmediateScheduler:在當前線程當即執行
  6. EventLoopScheduler:建立一個後臺線程按序執行全部操做

舉例而言:

Observable.Return("Hello",NewThreadScheduler.Default)
.Subscribe(str=>Console.WriteLine($"{str} on ThreadId:{Thread.CurrentThread.ManagedThreadId}")
);
Console.WriteLine($"Current ThreadId:{Thread.CurrentThread.ManagedThreadId}");

以上輸出:
Current ThreadId:1
Hello on ThreadId:4

10. 最後

羅裏吧嗦的總算把《Rx.NET In Action》這本書的內容大體梳理了一遍,對Rx也有了一個更深的認識,Rx擴展了觀察者模式用於支持數據和事件序列,內置系列操做符容許咱們以聲明式的方式組合這些序列,且無需關注底層的實現進行事件驅動開發:如線程、同步、線程安全、併發數據結構和非阻塞IO。

但事無鉅細,不免疏漏。對響應式編程有興趣的不妨拜讀下此書,相信對你會大有裨益。

參考資料:
Rx.NET in Action.pdf
ReactiveX
.Net中的反應式編程(Reactive Programming)

相關文章
相關標籤/搜索