建立和訂閱簡單的觀察隊列

你沒有必要本身手動實現Iobservable<T>接口來建立觀察隊列,一樣的,你也沒有必要實現Iobserver<T>接口來訂閱這個隊列。經過安裝RX庫,RX提供請多靜態的方法來建立帶有一個參數或多個或沒有參數的簡單隊列。你能夠很方便的使用這些靜態方法。另外,RX還提供了訂閱擴展方法來實現多種多樣的OnNext,OnError,OnCompleted句柄委託。
建立和訂閱簡單的觀察隊列
下面的例子使用觀察者類型的範圍操做來建立簡單的觀察集合,經過Observable類訂閱方法來訂閱這個觀察隊列集合,而且提供要處理OnNext,OnError,OnCompleted事件的委託Action.
這個範圍操做有不少重載的版本,在咱們的例子中,建立一個整數隊列從x開始並並生y個數。
一旦開始訂閱觀察者,值就會被髮送給訂閱者,那麼OnNext的委拖就會被執行。異步

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace SimpleSequence
{
class Program
{
static void Main(string[] args)
{
IObservable<int> source = Observable.Range(1, 10);
IDisposable subscription = source.Subscribe(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted"));
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();
}
}
}
View Code

 

當一個觀察者訂閱另外一個觀察隊列時,線程調用訂閱的方法可能來源與不一樣的線程,直到這些線程隊列執行完成。所以,在觀察隊列執行完成前訂閱調用是異步的不會阻塞其它線程。咱們會在後面講計劃任務裏詳細講到。ide

注意訂閱的方法返回的是Idisposable,因此你能夠解除訂閱關係而且釋放資源很是容易。當你調用觀察隊列的Dispose的方法時,這時觀察隊列就會中止監聽數據。通常狀況下你除非要提早結束訂閱不然你沒有必要手動調用Dispose這個方法,當觀察源的壽命比觀察隊列更長時,RX的設計能夠去失去相關的關係情節並不用使用終結器。當IDispose被GC回收時,RX不會自動的釋放訂閱關係,然而,咱們應該注意到當觀察者操做釋放訂閱關係時,訂閱關係會被當即釋放。如當OnCompeleted或者OnError消息發生時還有多是var x=Observable.Zip(a,b).Subscribe(),x訂閱了a,b若是a異常了那麼a,b與x的訂閱關係都 會當即被釋放。this

你能夠調整上面的代碼經過使用Observable類型的Create方法來返回一個observe,p 在這個方法中定義好OnNext,OnError,OnCompleted的委託,以後你能夠經過observer來訂閱到觀察者類型上代碼以下:spa

 

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace SimpleSequence
{
class Program
{
static void Main(string[] args)
{
IObservable<int> source = Observable.Range(1, 10);
IObserver<int> obsvr = Observer.Create<int>(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted"));
IDisposable subscription = source.Subscribe(obsvr);
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();
}
}
}
View Code

 

除了本身建立observable隊列你能夠轉換現存的.net事件和異步到觀察隊列中。其它的主題中會詳細說到。
使用一個計時器
接下來咱們使用一個計時器來建立一個隊列,這個隊列將在5秒後開推送數據,以後每1秒推送一次,爲了說明問題,咱們將在每一個操做推出值 的加上時間戳,經過這樣,當咱們訂閱這個數據源隊列時咱們能夠收到它的值 和時間戳。.net

Console.WriteLine(「Current Time: 「 + DateTime.Now);

var source = Observable.Timer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1))
.Timestamp();
using (source.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
View Code

 

把普通的隊列轉換爲觀察者隊列線程

IEnumerable<int> e = new List<int> { 1, 2, 3, 4, 5 };

IObservable<int> source = e.ToObservable();
IDisposable subscription = source.Subscribe(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted"));
Console.ReadKey();
View Code

 

冷與熱觀察者
冷觀察者在開始訂閱後,觀察者只會像訂閱的對象發送流,訂閱之間是不能共享值 的。
熱觀察者是能夠共享訂閱的值 的。每個訂閱者都 會獲得推送的值設計

IObservable<int> source = Observable.Interval(TimeSpan.FromSeconds(1)); 
IDisposable subscription1 = source.Subscribe(
x => Console.WriteLine("Observer 1: OnNext: {0}", x),
ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
() => Console.WriteLine("Observer 1: OnCompleted"));
IDisposable subscription2 = source.Subscribe(
x => Console.WriteLine("Observer 2: OnNext: {0}", x),
ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
() => Console.WriteLine("Observer 2: OnCompleted"));
Console.WriteLine("Press any key to unsubscribe");
Console.ReadLine();
subscription1.Dispose();
subscription2.Dispose();
View Code

 


熱的代碼以下:code

Console.WriteLine("Current Time: " + DateTime.Now);
var source = Observable.Interval(TimeSpan.FromSeconds(1)); //creates a sequence
IConnectableObservable<long> hot = Observable.Publish<long>(source); // convert the sequence into a hot sequence
IDisposable subscription1 = hot.Subscribe( // no value is pushed to 1st subscription at this point
x => Console.WriteLine("Observer 1: OnNext: {0}", x),
ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
() => Console.WriteLine("Observer 1: OnCompleted"));
Console.WriteLine("Current Time after 1st subscription: " + DateTime.Now);
Thread.Sleep(3000); //idle for 3 seconds
hot.Connect(); // hot is connected to source and starts pushing value to subscribers 
Console.WriteLine("Current Time after Connect: " + DateTime.Now);
Thread.Sleep(3000); //idle for 3 seconds
Console.WriteLine("Current Time just before 2nd subscription: " + DateTime.Now);
IDisposable subscription2 = hot.Subscribe( // value will immediately be pushed to 2nd subscription
x => Console.WriteLine("Observer 2: OnNext: {0}", x),
ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
() => Console.WriteLine("Observer 2: OnCompleted"));
Console.ReadKey();
View Code

 

源文server

相關文章
相關標籤/搜索