StreamInsight開發指南(https://technet.microsoft.com/zh-cn/library/ee391564(v=sql.111).aspx)在「輸入和輸出適配器」後面標註了「舊模型」。react
適配器模型遵循以下的狀態轉移圖:web
其體系結構如圖1:sql
{能夠看出,StreamInsight主要包括三個部分:輸入適配器(Input Adapter)、輸出適配器(Output Adapter)以及CEP服務器。}數據庫
StreamInsight示例採用的新模型。該示例中建立和使用五個基本實體類型:源、接收器、主題、綁定和處理。其體系結構如圖2:c#
{對比圖1,能夠看出新舊模型的原理沒變}設計模式
新模型「使用事件源和事件接收器」過程當中用到可枚舉和可觀察的源和接收器,其中IQbservable能夠理解爲Observable的「遠程查詢」版本{by趙姐夫blog.zhaojie.me/2010/09/async-programming-and-reactive-framework.html}服務器
HelloInsight系列樣例是基於適配器模型的,包括三個項目:
app
HelloInsight:流中只有一個簡單事件(其payload爲字符串類型)便中止了,輸出這個事件payload的內容;異步
HelloInsightObservable:事件流中payload爲數字,找出流中大於某個值的事件;
HelloInsightEnumerable:在HelloInsight基礎上,InputAdapter把txt文件中每行字符串「包裝」爲事件push到流中,OutputAdapter輸出流中每一個事件的payload;
手札系列的內容相對權威、深刻,在這裏本文會從初學者角度來提到一些點。
至於開發環境請參考上一篇(http://my.oschina.net/SnifferApache/blog/324541)
Demo下載地址(http://pan.baidu.com/s/1qWqKe5Y的CEP目錄)
HelloInsight和HelloInsightObservable本地直接運行成功,而HelloInsightObservable運行出現以下問題:
類型「System.Reactive.Linq.IQbservable`1<T0>」在未被引用的程序集中定義。必須添加對程序集「System.Reactive.Providers, Version=1.0.10621.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35」的引用。 HelloInsightObservable
按照「使用IObservable接口建立StreamInsight程序」這篇教程,第一步須要安裝Reactive Extension for .Net 4,接着添加System.CoreEx和System.Reactive程序集引用。可是最新版Rx安裝以後,程序集列表中沒有System.CoreEx,在只導入System.Reactive .dll的狀況下運行出現上面的問題。{!安裝Rx的目的僅僅是提供兩個dll文件,略顯逗逼;}
事實上,安裝StreamInsight2.1以後,Reactive會自動安裝上,VS中下文program.cs原本出現的錯誤提示和引用中的黃色歎號當即消失了,這個項目調試正常輸出;
推測五星教程以及其餘教程中之因此提到的單獨安裝Reactive Extension for .Net 4,多是其StreamInsight版本較舊;
BTW,安裝Reactive Extension for .Net 4安裝以後默認在C盤建立兩個文件夾Microsoft Reactive Extensions SDK和Microsoft SDKs,VS2013添加引用欄會自動識別到這些東西,Reactive主要路徑以下
路徑1:C:\Program Files (x86)\Microsoft Reactive Extensions SDK\v1.0.10621\Binaries\.NETFramework\v4.0
路徑2:C:\Program Files (x86)\Microsoft SDKs\Reactive Extensions\v2.0\Binaries\.NETFramework\v4.5
從下圖可看出VS自動將V2.0和V1.0的dll混到一塊兒了,因此引用的時候不要重複,並且最好同一版本(不然兩個版本的方法名相同,可能會出現方法不明確的錯誤)。
前面提到的System.CoreEx在V2.0中已經集成到SystemCore中了,因此沒有這個dll文件
該項目是基於適配器模型的。回顧前面的狀態轉移圖,適配器就是將一種格式的數據轉換成另外一個格式的數據。輸入適配器把傳感器、web服務器和數據庫等等多樣數據轉換成事件(Event)格式,push到事件流(Event Stream)中,輸出適配器則與之進行相反的操做。
咱們首先看這個項目的InputAdapters包括:
HelloInputConfig.cs //配置類
HelloInputFactory.cs //適配器工廠
HelloPointInput.cs //點事件適配器
理論上,適配器工廠會根據狀況把任務交給不一樣類型的適配器,好比Point,Interval或者Edge適配器。樣例爲了簡單期間,只有點事件一種適配器,要否則怎麼叫「麻雀雖小,五臟俱全」呢,哈哈;
適配器處理的各種信息確定要有必定的格式才規範,咱們把格式信息抽象爲配置類,這即是HelloInputConfig;
具體到HelloPointInput.cs,其處理流程則和狀態轉移圖對照起來了,其中最重要的「生產」環節
private void ProduceEvents() { var pendingEvent = CreateInsertEvent(); pendingEvent.StartTime = DateTime.Now; pendingEvent.Payload = new HelloPayload { str = _config.inputString }; EnqueueOperationResult result = Enqueue(ref pendingEvent); EnqueueCtiEvent(DateTime.Now); Stopped(); }
構造Insert事件,配置其StartTime和Payload,Enqueue到StreamInsight引擎,在Enqueue一個Cti事件,遍把引擎關掉了,關掉了,關掉了。。。。。
因此呢,功能很是簡單,咱們再來看看輸出適配器(暫時不關心引擎到底會對stream作什麼事情。。),一樣的,只看最核心的如何消費流中的事件
private void ConsumeEvents() { PointEvent<HelloPayload> currEvent; DequeueOperationResult result; while (true) { if (AdapterState.Running == base.AdapterState) { result = Dequeue(out currEvent); if (result == DequeueOperationResult.Empty) { Ready(); return; } else { if (currEvent.EventKind == EventKind.Insert) { Console.WriteLine("Output: " + currEvent.Payload.str ); } ReleaseEvent(ref currEvent); } } else if (AdapterState.Stopping == AdapterState) { Stopped(); } else { return; } } }
大致上,只要適配器還在Running狀態,就Dequeue一個事件,輸出Insert類型事件的payload。適配器要不在Running狀態了,那就結束了。
最後主程序要作的就是搭建Query模版,啓動Query
Query query = filteredCepStream.ToQuery(application,//Guid.NewGuid().ToString() "HelloInsightQuery", "建立查詢,並綁定到輸出適配器", typeof(HelloOutputFactory), outputConfig, EventShape.Point, StreamEventOrder.ChainOrdered); query.Start();
沒錯,就是這兩句,其中application是CEP服務器上的一個應用程序,定義以下
Server server = Server.Create("Default");
Application application = server.CreateApplication("HelloInsight");
filteredCepStream爲Microsoft.ComplexEventProcessing.Linq.CepStream<TPayload>類型;
這個類用到了泛型,就比如你明天有一個快遞到,快遞具體是什麼不重要,咱們關注你和快遞員的溝通、協做過程。
這個強大的類型可使用linq表達式來定義好流的處理方式(這裏就一個字符串沒什麼好處理的),咱們如今是在構建Query模版,啓動以後纔會生效
var filteredCepStream = from e in cepStream select e;cepStream怎麼構造呢?
var inputConfig = new HelloInputConfig { inputString = "Hello StreamInsight!" }; Microsoft.ComplexEventProcessing.Linq.CepStream<HelloPayload> inputStream = CepStream<HelloPayload>.Create("InputStream", typeof(HelloInputFactory), inputConfig, EventShape.Point);BTW,CepStream<TPayload>是一個牛逼的靜態類,有不少靜態方法能夠用~~
查詢啓動以後,輸入適配器用另外的線程執行相應的方法(ProduceEvents),主線程須要等待適配器線程執行結束。結束以後必定要記得關閉Query哦
那問題來了,咱們怎麼在主線程獲取適配器線程的運行狀態呢?
答案是使用DiagnosticView 診斷報告每,具體原理你不用管,你只要知道咱們必定會成功就行了,科科
//流隊列爲空的時候,適配器會中止運行 DiagnosticView dv = query.Application.Server.GetDiagnosticView(query.Name); //Start()異步啓動以後,主程序每隔一秒就詢問一下輸出適配器的狀態,若是不是「Running」,就能夠輸出統計結果了 while ((string)dv[DiagnosticViewProperty.QueryState] == "Running") { Thread.Sleep(1000); dv = query.Application.Server.GetDiagnosticView(query.Name); } // 此函數輸出監視到的參數,好比總事件數、查詢時間等 RetrieveDiagnostics(query.Application.Server.GetDiagnosticView(new Uri("cep:/Server/EventManager")), Console.Out); query.Stop();
讓咱們看一下效果:
建立Query模版,啓動Query的方法除了上面提到的,還可使用QueryTemplate來替代(上一篇提到的複雜樣例TrafficJoinQuery就是使用這種方法)
var queryTemplate = application.CreateQueryTemplate("ExampleTemplate", "Description...", cepStream); var queryBinder = new QueryBinder(queryTemplate); queryBinder.BindProducer<HelloPayload>("input", inputAdapter, inputConfig, EventShape.Point); queryBinder.AddConsumer<HelloPayload>("output", outputAdapter, outputConfig, EventShape.Point, StreamEventOrder.ChainOrdered); var query = application.CreateQuery("ExampleQuery", "Description...", queryBinder);
和HelloInsight相比,你只要修改一下輸入適配器就ok了(配置類字段改爲了txt路徑)
構造函數改爲讀完txt所有內容。。
private HelloInputConfig _config; private List<string> strings; private IEnumerator<string> stringEnumerator; public HelloPointInput(HelloInputConfig config) { _config = config; var streamReader = new StreamReader(config.fileName); strings = new List<string>(); while (!streamReader.EndOfStream) { strings.Add(streamReader.ReadLine()); } stringEnumerator = strings.GetEnumerator(); streamReader.Close(); }
再改改ProduceEvents
private void ProduceEvents() { while (AdapterState != AdapterState.Stopping) { if (stringEnumerator.MoveNext()) { try { var line = stringEnumerator.Current; var pendingEvent = CreateInsertEvent(); pendingEvent.StartTime = DateTime.Now; pendingEvent.Payload = new HelloPayload { str = line }; EnqueueOperationResult result= Enqueue(ref pendingEvent); if (result == EnqueueOperationResult.Full) { Thread.Sleep(1000); //Ready(); return; } EnqueueCtiEvent(DateTime.Now); //Thread.Sleep(1000); } catch { //error handling should go here } } else { break; } } Stopped(); }
搞定
手札四沒法解決海量數據讀取的問題,若是某一行文本沒有換行,超長,有可能塞爆string空間。還有一次性所有讀取可能也不太合適。
LINQ和Rx都是用來對集合進行操做。LINQ操做的集合實現了IEnumerable接口,可以使用foreach語句遍歷集合。而Rx操做的集合實現了IEnumerable,IQueryable集合,這樣的集合稱之爲Observable集合。你們對Enumerable集合可能很熟悉,他是foreach語句的基礎,個人另外一篇文章對這個有詳細介紹,這裏就很少說了,下面主要來看看Observable集合。
Rx對Observable集合進行操做,這個集合的命名是從觀察者設計模式得來的,觀察者模式的基礎是委託和事件,因此要了解這一模式須要理解委託和事件,在這裏推薦張子陽的文章C#中的委託和事件。Enumebrable集合中全部的元素在集合中都已經填充好了,是靜態的,用戶可使用「拉」的方式從集合中遍歷元素進行處理。而Observable集合則不一樣,在建立該集合時,集合中的元素可能會在之後的某個時間才能添加進去。因爲集合註冊了事件,一旦集合中的元素到達,就會觸發這一事件,將信息「推」到註冊者哪裏去。
HelloInsightObservable項目中,過濾這一步使用了LINQ中的查詢表達式
var query = from e in stream
where e.value > 50
select e;
參考(LINQ之路http://www.cnblogs.com/lifepoem/archive/2011/10/28/2227735.html)
方法語法和查詢表達式語法互爲補充。
方法語法中,Where產生一個通過過濾的sequence;OrderBy生成輸入sequence的排序版本;Select獲得的序列中的每一個元素都通過了給定lambda表達式的轉換。
查詢表達式語法中,查詢表達式老是以from子句開始,以select或者group子句結束。From子句定義了查詢的範圍變量(range variable),能夠認爲該變量是對輸入sequence的一個遍歷,就像foreach作的那樣。下面這幅圖描述了查詢表達式的完整語法:
嗯,理論上說會有不少篇……
①重構和改進HelloInsightObservable