上一篇 提到的官方源碼也能夠在個人共享(http://pan.baidu.com/s/1qWqKe5Y)下載。不瞭解觀察者設計模式的讀者閱讀源碼會有意義點困難(好比我),能夠參考(.NET設計模式(19):觀察者模式(Observer Pattern))入門。html
HelloInsightObservable官方源碼利用toToPointStream方法將觀察者的實例轉化爲點事件流,接着在點事件流中使用linq查詢e>50的輸入,並將其輸出c#
運行結果以下:設計模式
其不足之處在於代碼有點混亂,並且只有一個觀察者。網絡
接下來本文就逐步修改,而且實現多個觀察者的狀況。app
program.cs中定義觀察者,將觀察者「訂閱」到目標對象的語句以下:dom
var outputObserver = new OutputObserver();ide
var outputObservable = query.ToObservable();//將事件流轉化爲可觀察的輸出 函數
outputObservable.Subscribe(outputObserver);//提供通知信息到outputObserverthis
那容易想到的思路是直接在program.cs中添加多個觀察者,再使用Subscribe方法訂閱多個觀察者。可是輸出每次都有變更,因爲不一樣觀察者輸出同樣也看不出明顯規律,偶爾還會因爲枚舉觀察者的過程當中觀察者集合變更而產生異常spa
這是由於InputObservable.cs中模擬輸入流的GenerateInput是Timer的回調函數。每個觀察者在運行以後都會將Timer設爲中止狀態,別的觀察者在Timer已經啓動的狀況下加入不是很恰當。使人奇怪的是官方源碼在InputObservable. cs的構造函數中啓動了Timer,既然沒打算添加多個觀察者,那在GenerateInput中遍歷觀察者集合Observers的語句有什麼意義?
添加以下引用:
Microsoft.ComplexEventProcessing;
Microsoft.ComplexEventProcessing.Observable;
System.Reactive;
System.Reactive.Providers;
namespace HelloInsight_edit { public class OutputObserver:IObserver<int>//實現IObserver接口 { private string name; public OutputObserver(string name){ this.name = name; } public virtual void OnCompleted() { Console.WriteLine("Stopping query..."); } public virtual void OnError(Exception e) { Console.WriteLine("Unexpected error occured"); } public virtual void OnNext(int value) { Console.WriteLine("{0}觀察到的value: {1}", this.name,value); } } }
爲簡單起見,IObserver的抽象類型都使用int型,之後Main方法建立事件流的時候也會相應修改。
咱們要刪掉構造方法中的timer.change(timeSpan,timeSpan),新建了update方法,用來調用這句話。這樣可使得多個observer都添加到observers中以後再啓動Timer。
public class EventSource:IObservable<int> { private List<IObserver<int>> observers = new List<IObserver<int>>(); private readonly int dataNumber; private int generatedNumber; private Random random; private readonly Timer timer; private readonly int timeSpan; //add private int _randomNumber; public EventSource(int dataNumber) { Console.WriteLine("我是構造方法"); this.random = new Random(); this.dataNumber = dataNumber; this.generatedNumber = 0; this.timer = new Timer(GenerateInput);//callback是一個委託,表示要執行的方法 this.timeSpan = 100;//每一個隨機數字產生的時間間隔 1000ms //timer.Change(timeSpan, timeSpan);//此語句控制數據 this._randomNumber = -1;//初始化隨機數字 } public int RandomNumber { get { return _randomNumber; } set { this._randomNumber = value; } } public void Update() { timer.Change(timeSpan, timeSpan); } private void GenerateInput(object _) { foreach (var observer in observers) { _randomNumber= random.Next(100); Console.WriteLine("Random generated data {0} : {1}", generatedNumber, _randomNumber); observer.OnNext(_randomNumber); generatedNumber++; if (generatedNumber >= dataNumber) { observer.OnCompleted(); timer.Change(Timeout.Infinite, timeSpan); return; } } timer.Change(timeSpan, timeSpan); } public void AddObserver(IObserver<int> observer) { observers.Add(observer); } public void RemoveObserver(IObserver<int> observer) { observers.Remove(observer); } //必須實現的方法 public IDisposable Subscribe(IObserver<int> observer) { if (observer != null && !observers.Contains(observer)) { observers.Add(observer); } Console.WriteLine("我是subscriber"); return observer as IDisposable; } }
將輸入源的實例es轉化爲點事件流stream,query過濾獲得stream中大於50的事件流,query2過濾獲得stream大於70的事件流。創建了3個觀察者roger、luffy和nami,咱們用luffy觀察query,用nami觀察query2。
修好program.cs以後就能夠調試了噢耶……
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; //add using Microsoft.ComplexEventProcessing; using Microsoft.ComplexEventProcessing.Linq; namespace HelloInsight_edit { class Program { static void Main(string[] args) { //將EventSource類做爲CEP引擎的輸入。 EventSource es = new EventSource(10); var server = Server.Create("Default"); var application = server.CreateApplication("Observable Application"); //注意如下4行,這裏與適配器方式的程序不一樣的是,沒有插入CTI事件。 var stream = es.ToPointStream(application, e => PointEvent.CreateInsert(DateTime.Now, e), AdvanceTimeSettings.StrictlyIncreasingStartTime, "Observable Stream"); var query = from e in stream where e > 50 select e; OutputObserver roger = new OutputObserver("roger"); OutputObserver luffy = new OutputObserver("luffy"); OutputObserver nami = new OutputObserver("nami"); Console.WriteLine("Starting query..."); //直接對原始流添加觀察者 //es.AddObserver(roger); es.AddObserver(luffy); es.AddObserver(nami); //對newStream添加觀察者 var newStream = query.ToObservable(); newStream.Subscribe(luffy); //newStream.Subscribe(nami);//添加多個訂閱者可能會有異常 //對newStream2添加觀察者 var query2 = from e in stream where e > 70 select e; var newStream2 = query2.ToObservable(); newStream2.Subscribe(nami); //調用timer.change(定義callback的等待時間和時間間隔) es.Update(); Console.ReadLine(); } } }
運行結果:
能夠看出,Subscribe兩個觀察者的操做先執行。
在遍歷觀察者集合observers的過程當中,每組顯示2個隨機數。luffy和nami依次觀察第一個和第二個。
{?我的理解爲newStream.Subscribe(luffy);的功能相似於一個綁定了luffy的線程,遍歷結束以後所有用戶開始依次輸出。全局變量generatedNumber負責總體次數}
這不是咱們要的功能。
對於流中每一個事件,不一樣觀察者都觀察到才行。
將生成隨機數的語句放到遍歷操做foreach以前
private void GenerateInput(object _) { _randomNumber = random.Next(100); if (generatedNumber <= dataNumber) { Console.WriteLine("Random generated data {0} : {1}", generatedNumber, _randomNumber); foreach (var observer in observers) observer.OnNext(_randomNumber);//使用最大程度實現的OnNext } else { observers.ElementAt(0).OnCompleted(); timer.Change(Timeout.Infinite, timeSpan); } generatedNumber++; timer.Change(timeSpan, timeSpan); }
運行結果:
能夠看出,對於流中每一個事件,luffy檢測到了大於50的事件,nami檢測到了大於70的事件,實現了預約的目標。
{!接下來咱們要將觀察者模式、點事件流檢測和WCF(Windows Communication Foundation)相結合,實現事件源和觀察者WCF通訊,便於接下來部署到網絡中}
[1]IObserver<T>接口
(http://msdn.microsoft.com/zh-cn/library/dd783449(v=vs.110).aspx)
[2]IDisposable接口
(http://msdn.microsoft.com/zh-cn/library/system.idisposable(v=vs.110).aspx)
[3]virtual方法
(http://www.cnblogs.com/hacker/archive/2004/08/10/31774.html)