【CEP】調試StreamInsight官方Demo——HelloInsight

原文連接:http://my.oschina.net/SnifferApache/blog/338550html

StreamInsight開發指南(https://technet.microsoft.com/zh-cn/library/ee391564(v=sql.111).aspx)在「輸入和輸出適配器」後面標註了「舊模型」。react

適配器模型遵循以下的狀態轉移圖:web

imageimage

其體系結構如圖1:sql

{能夠看出,StreamInsight主要包括三個部分:輸入適配器(Input Adapter)、輸出適配器(Output Adapter)以及CEP服務器。}數據庫

StreamInsight示例採用的新模型。該示例中建立和使用五個基本實體類型:源、接收器、主題、綁定和處理。其體系結構如圖2:c#

CEP 體系結構概述

{對比圖1,能夠看出新舊模型的原理沒變}設計模式

新模型「使用事件源和事件接收器」過程當中用到可枚舉和可觀察的源和接收器,其中IQbservable能夠理解爲Observable的「遠程查詢」版本{by趙姐夫blog.zhaojie.me/2010/09/async-programming-and-reactive-framework.html}服務器

HelloInsight系列樣例是基於適配器模型的,包括三個項目:
app

HelloInsight:流中只有一個簡單事件(其payload爲字符串類型)便中止了,輸出這個事件payload的內容;異步

HelloInsightObservable:事件流中payload爲數字,找出流中大於某個值的事件;

HelloInsightEnumerable:在HelloInsight基礎上,InputAdapter把txt文件中每行字符串「包裝」爲事件push到流中,OutputAdapter輸出流中每一個事件的payload;

1. 運行樣例

手札系列的內容相對權威、深刻,在這裏本文會從初學者角度來提到一些點。

至於開發環境請參考上一篇(http://my.oschina.net/SnifferApache/blog/324541

Demo下載地址(http://pan.baidu.com/s/1qWqKe5Y的CEP目錄

HelloInsight和HelloInsightObservable本地直接運行成功,而HelloInsightObservable運行出現以下問題:

類型「System.Reactive.Linq.IQbservable`1<T0>」在未被引用的程序集中定義。必須添加對程序集「System.Reactive.Providers, Version=1.0.10621.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35」的引用。 HelloInsightObservable

按照「使用IObservable接口建立StreamInsight程序」這篇教程,第一步須要安裝Reactive Extension for .Net 4,接着添加System.CoreEx和System.Reactive程序集引用。可是最新版Rx安裝以後,程序集列表中沒有System.CoreEx,在只導入System.Reactive .dll的狀況下運行出現上面的問題。{!安裝Rx的目的僅僅是提供兩個dll文件,略顯逗逼;}

事實上,安裝StreamInsight2.1以後,Reactive會自動安裝上,VS中下文program.cs原本出現的錯誤提示和引用中的黃色歎號當即消失了,這個項目調試正常輸出;

推測五星教程以及其餘教程中之因此提到的單獨安裝Reactive Extension for .Net 4,多是其StreamInsight版本較舊;

BTW,安裝Reactive Extension for .Net 4安裝以後默認在C盤建立兩個文件夾Microsoft Reactive Extensions SDK和Microsoft SDKs,VS2013添加引用欄會自動識別到這些東西,Reactive主要路徑以下

路徑1:C:\Program Files (x86)\Microsoft Reactive Extensions SDK\v1.0.10621\Binaries\.NETFramework\v4.0

路徑2:C:\Program Files (x86)\Microsoft SDKs\Reactive Extensions\v2.0\Binaries\.NETFramework\v4.5

從下圖可看出VS自動將V2.0和V1.0的dll混到一塊兒了,因此引用的時候不要重複,並且最好同一版本(不然兩個版本的方法名相同,可能會出現方法不明確的錯誤)。

前面提到的System.CoreEx在V2.0中已經集成到SystemCore中了,因此沒有這個dll文件


2. HelloInsight

該項目是基於適配器模型的。回顧前面的狀態轉移圖,適配器就是將一種格式的數據轉換成另外一個格式的數據。輸入適配器把傳感器、web服務器和數據庫等等多樣數據轉換成事件(Event)格式,push到事件流(Event Stream)中,輸出適配器則與之進行相反的操做。

咱們首先看這個項目的InputAdapters包括:

HelloInputConfig.cs    //配置類

HelloInputFactory.cs    //適配器工廠

HelloPointInput.cs    //點事件適配器

理論上,適配器工廠會根據狀況把任務交給不一樣類型的適配器,好比Point,Interval或者Edge適配器。樣例爲了簡單期間,只有點事件一種適配器,要否則怎麼叫「麻雀雖小,五臟俱全」呢,哈哈;

適配器處理的各種信息確定要有必定的格式才規範,咱們把格式信息抽象爲配置類,這即是HelloInputConfig;

具體到HelloPointInput.cs,其處理流程則和狀態轉移圖對照起來了,其中最重要的「生產」環節

private void ProduceEvents()
        {
            var pendingEvent = CreateInsertEvent();
            pendingEvent.StartTime = DateTime.Now;
            pendingEvent.Payload = new HelloPayload
            {
                str = _config.inputString
            };

            EnqueueOperationResult result = Enqueue(ref pendingEvent);
            EnqueueCtiEvent(DateTime.Now);

            Stopped();
         }

構造Insert事件,配置其StartTime和Payload,Enqueue到StreamInsight引擎,在Enqueue一個Cti事件,遍把引擎關掉了,關掉了,關掉了。。。。。

因此呢,功能很是簡單,咱們再來看看輸出適配器(暫時不關心引擎到底會對stream作什麼事情。。),一樣的,只看最核心的如何消費流中的事件

private void ConsumeEvents()
        {
            PointEvent<HelloPayload> currEvent;
            DequeueOperationResult result;
            while (true)
            {
                if (AdapterState.Running == base.AdapterState)
                {
                    result = Dequeue(out currEvent);
                    if (result == DequeueOperationResult.Empty)
                    {
                        Ready();
                        return;
                    }
                    else
                    {
                        if (currEvent.EventKind == EventKind.Insert)
                        {
                            Console.WriteLine("Output: " +
                            currEvent.Payload.str
                            );

                        }
                        ReleaseEvent(ref currEvent);
                    }
                }
                else if (AdapterState.Stopping == AdapterState)
                {
                    Stopped();
                }
                else
                {
                    return;
                }

            }

        }

大致上,只要適配器還在Running狀態,就Dequeue一個事件,輸出Insert類型事件的payload。適配器要不在Running狀態了,那就結束了。

最後主程序要作的就是搭建Query模版,啓動Query

Query query = filteredCepStream.ToQuery(application,//Guid.NewGuid().ToString()
                "HelloInsightQuery", "建立查詢,並綁定到輸出適配器",
                typeof(HelloOutputFactory), outputConfig, 
                EventShape.Point, StreamEventOrder.ChainOrdered);
            query.Start();

沒錯,就是這兩句,其中application是CEP服務器上的一個應用程序,定義以下

Server server = Server.Create("Default");

Application application = server.CreateApplication("HelloInsight");

filteredCepStream爲Microsoft.ComplexEventProcessing.Linq.CepStream<TPayload>類型;

這個類用到了泛型,就比如你明天有一個快遞到,快遞具體是什麼不重要,咱們關注你和快遞員的溝通、協做過程。

這個強大的類型可使用linq表達式來定義好流的處理方式(這裏就一個字符串沒什麼好處理的),咱們如今是在構建Query模版,啓動以後纔會生效

var filteredCepStream = from e in cepStream
                                    select e;

cepStream怎麼構造呢?

var inputConfig = new HelloInputConfig
                    {
                        inputString = "Hello StreamInsight!"
                    };                
Microsoft.ComplexEventProcessing.Linq.CepStream<HelloPayload> inputStream = 
            CepStream<HelloPayload>.Create("InputStream", typeof(HelloInputFactory), inputConfig, EventShape.Point);

BTW,CepStream<TPayload>是一個牛逼的靜態類,有不少靜態方法能夠用~~

查詢啓動以後,輸入適配器用另外的線程執行相應的方法(ProduceEvents),主線程須要等待適配器線程執行結束。結束以後必定要記得關閉Query哦

那問題來了,咱們怎麼在主線程獲取適配器線程的運行狀態呢?

答案是使用DiagnosticView 診斷報告每,具體原理你不用管,你只要知道咱們必定會成功就行了,科科

 //流隊列爲空的時候,適配器會中止運行
            DiagnosticView dv = query.Application.Server.GetDiagnosticView(query.Name);

            //Start()異步啓動以後,主程序每隔一秒就詢問一下輸出適配器的狀態,若是不是「Running」,就能夠輸出統計結果了
            while ((string)dv[DiagnosticViewProperty.QueryState] == "Running")
            {
                Thread.Sleep(1000);
                dv = query.Application.Server.GetDiagnosticView(query.Name);
            }
            // 此函數輸出監視到的參數,好比總事件數、查詢時間等
            RetrieveDiagnostics(query.Application.Server.GetDiagnosticView(new Uri("cep:/Server/EventManager")), Console.Out);
            
            query.Stop();

讓咱們看一下效果:

建立Query模版,啓動Query的方法除了上面提到的,還可使用QueryTemplate來替代(上一篇提到的複雜樣例TrafficJoinQuery就是使用這種方法)

var queryTemplate = application.CreateQueryTemplate("ExampleTemplate", "Description...", cepStream);
var queryBinder = new QueryBinder(queryTemplate);
queryBinder.BindProducer<HelloPayload>("input", inputAdapter, inputConfig, EventShape.Point);
queryBinder.AddConsumer<HelloPayload>("output", outputAdapter, outputConfig, EventShape.Point, StreamEventOrder.ChainOrdered);

var query = application.CreateQuery("ExampleQuery", "Description...", queryBinder);


3. HelloInsightEnumerable

和HelloInsight相比,你只要修改一下輸入適配器就ok了(配置類字段改爲了txt路徑)

構造函數改爲讀完txt所有內容。。

 private HelloInputConfig _config;
        private List<string> strings;
        private IEnumerator<string> stringEnumerator;

        public HelloPointInput(HelloInputConfig config)
        {
            _config = config;

            var streamReader = new StreamReader(config.fileName);
            strings = new List<string>();
            
            while (!streamReader.EndOfStream)
            {
                strings.Add(streamReader.ReadLine());
            }
            stringEnumerator = strings.GetEnumerator();
            
            streamReader.Close();
        }

再改改ProduceEvents

 private void ProduceEvents()
        {
            while (AdapterState != AdapterState.Stopping)
            {
                if (stringEnumerator.MoveNext())
                {
                    try
                    {
                        var line = stringEnumerator.Current;
                        var pendingEvent = CreateInsertEvent();
                        pendingEvent.StartTime = DateTime.Now;
                        pendingEvent.Payload = new HelloPayload
                        {
                            str = line
                        };
                        EnqueueOperationResult result= Enqueue(ref pendingEvent);
                        if (result == EnqueueOperationResult.Full)
                        {
                            Thread.Sleep(1000);
                            //Ready();
                            return;
                        }
                        EnqueueCtiEvent(DateTime.Now);
                        //Thread.Sleep(1000);
                    }
                    catch
                    {
                        //error handling should go here
                    }
                }
                else
                {
                    break;
                }
            }
            Stopped();

         }


搞定

手札四沒法解決海量數據讀取的問題,若是某一行文本沒有換行,超長,有可能塞爆string空間。還有一次性所有讀取可能也不太合適。

4. HelloInsightObservable


5. 拾遺

5.1 Rx和LINQ

LINQ和Rx都是用來對集合進行操做。LINQ操做的集合實現了IEnumerable接口,可以使用foreach語句遍歷集合。而Rx操做的集合實現了IEnumerable,IQueryable集合,這樣的集合稱之爲Observable集合。你們對Enumerable集合可能很熟悉,他是foreach語句的基礎,個人另外一篇文章對這個有詳細介紹,這裏就很少說了,下面主要來看看Observable集合。

    Rx對Observable集合進行操做,這個集合的命名是從觀察者設計模式得來的,觀察者模式的基礎是委託和事件,因此要了解這一模式須要理解委託和事件,在這裏推薦張子陽的文章C#中的委託和事件。Enumebrable集合中全部的元素在集合中都已經填充好了,是靜態的,用戶可使用「拉」的方式從集合中遍歷元素進行處理。而Observable集合則不一樣,在建立該集合時,集合中的元素可能會在之後的某個時間才能添加進去。因爲集合註冊了事件,一旦集合中的元素到達,就會觸發這一事件,將信息「推」到註冊者哪裏去。


5.2 LINQ查詢表達式

HelloInsightObservable項目中,過濾這一步使用了LINQ中的查詢表達式

   var query =    from e in stream

         where e.value > 50

         select e;


參考(LINQ之路http://www.cnblogs.com/lifepoem/archive/2011/10/28/2227735.html

方法語法和查詢表達式語法互爲補充。

方法語法中,Where產生一個通過過濾的sequence;OrderBy生成輸入sequence的排序版本;Select獲得的序列中的每一個元素都通過了給定lambda表達式的轉換。

查詢表達式語法中,查詢表達式老是以from子句開始,以select或者group子句結束。From子句定義了查詢的範圍變量(range variable),能夠認爲該變量是對輸入sequence的一個遍歷,就像foreach作的那樣。下面這幅圖描述了查詢表達式的完整語法:



6. 對XX項目CEP Project的調試

嗯,理論上說會有不少篇……

①重構和改進HelloInsightObservable

http://my.oschina.net/SnifferApache/blog/360563

相關文章
相關標籤/搜索