Siddhi 是一種 lightweight, easy-to-use, open source CEP(Complex Event Processing)引擎,由wso2公司開發(http://wso2.com/about/)。apache
像絕大多數的 CEP 系統同樣,Siddhi 支持對於流式數據的類 SQL 的查詢,SQL 式的 query 經過 complier 翻譯成 Java 代碼。
當一條數據流或多條數據流流入時,Siddhi Core 會實時的 check 當前數據流是否知足定義的 query,若是知足則觸發 Callback 執行相應的邏輯。多線程
Siddhi和傳統的CEP系統,如Esper,相比區別?
主要是比較輕量和高效,之因此能夠達到更高的 performance,由於:app
尤爲是前兩點很是關鍵,傳統的CEP系統,若是Esper,都是使用單線程去處理全部的 query matching,這樣雖然簡單,可是效率不高,沒法利用 cpu 多核。
因此 Siddhi 採用多線程,而且結合pipeline機制,以下圖框架
Siddhi 將整個 query 切分紅獨立的 stages,即 processors,這樣作的好處,首先是便於多線程化,再者,能夠重用相同的 processor;
而 processor 之間經過 queue 進行鏈接,這裏就不詳細描述了,有興趣的同窗能夠去仔細看 Siddhi 的論文和文檔。ide
下面咱們就來看看,最關鍵的,Siddhi 能夠爲咱們作什麼?測試
這裏就用幾個形象的例子來講明 Siddhi 使用的典型的場景ui
咱們先用個最簡單的例子,看看若是 run 一個真正的 Siddhi 例子,this
上面說了,Siddhi 是用類 SQL 的查詢語言,spa
首先須要先定義流的格式,線程
define stream TempStream (deviceID long, roomNo int, temp double);
而後定義查詢,
from TempStream select roomNo, temp * 9/5 + 32 as temp, 'F' as scale, roomNo >= 100 and roomNo < 110 as isServerRoom insert into RoomTempStream;
這樣就能實現一個完整的 ETL 過程,
extraction,將須要的字段從 TempStream 裏面 select 出來;
transform, 將攝氏溫度轉換爲華氏溫度;
loading,將結果輸出到RoomTempStream流;
很方便,不用再另外寫任何的邏輯,只須要寫類SQL的Query語句。
爲了增長感性認識,我給出一個完成的 Java 測試例子,
SiddhiManager siddhiManager = new SiddhiManager(); String executionPlan = "" + "ddefine stream TempStream (deviceID int, roomNo int, temp float);" + "" + "@info(name = 'query1') " + "from TempStream " + "select roomNo, temp * 9/5 + 32 as temp, 'F' as scale, roomNo >= 100 and roomNo < 110 as isServerRoom " + "insert into RoomTempStream;"; ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan); executionPlanRuntime.addCallback("query1", new QueryCallback() { @Override public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { EventPrinter.print(timeStamp, inEvents, removeEvents); } }); InputHandler inputHandler = executionPlanRuntime.getInputHandler("TempStream"); executionPlanRuntime.start(); inputHandler.send(new Object[] {12344, 201, 28.2f}); inputHandler.send(new Object[] {12345, 202, 22.2f});
inputHandler.send(new Object[] {12346, 203, 24.2f});
//Shutting down the runtime executionPlanRuntime.shutdown(); //Shutting down Siddhi siddhiManager.shutdown();
Siddhi 支持不少中類型的 window,具體參考https://docs.wso2.com/display/CEP400/Inbuilt+Windows#InbuiltWindows-time
這裏給出最基本的,基於時間窗口的例子,
from TempStream#window.time(1 min) select roomNo, avg(temp) as avgTemp group by roomNo insert all events into AvgRoomTempStream ;
這個查詢會計算以1分鐘爲滑動窗口的,每一個 room 的平均溫度
Siddhi時間窗口也支持,按照外部的輸入的時間進行聚合,可是它要求時間是必須遞增的;這點咱們brain的聚合庫比它通用,能夠適用於非單調遞增的場景
Siddhi 支持基於 window 的多個流的實時 join,
from TempStream[temp > 30.0]#window.time(1 min) as T join RegulatorStream[isOn == false]#window.length(1) as R on T.roomNo == R.roomNo select T.roomNo, T.temp, R.deviceID, 'start' as action insert into RegulatorActionStream ;
上面的查詢將,TempStream 和RegulatorStream 經過 roomNo 進行 join
這種 query 最能表達出 CEP 的威力,什麼是Pattern Query?
「Pattern allows event streams to be correlated over time and detect event patterns based on the order of event arrival.」
直接看個例子,用 Pattern 查詢來 detect credit card/ATM transaction frauds:
from every a1 = atmStatsStream[amountWithdrawed < 100] -> b1 = atmStatsStream[amountWithdrawed > 10000 and a1.cardNo == b1.cardNo] within 1 day select a1.cardNo as cardNo, a1.cardHolderName as cardHolderName, b1.amountWithdrawed as amountWithdrawed, b1.location as location, b1.cardHolderMobile as cardHolderMobile insert into possibleFraudStream;
注意看到這個符號‘->’,這個表示 event 發生順序,
上面這個查詢的意思就是,在一天內,出現一次取現金額 < 100後,同一張卡,出現取現金額 > 10000,則認爲多是 fraud。
固然這只是個例子,不是說這樣真的能夠 detect fraud。你能夠參照這個,寫出更爲複雜的查詢。
和 pattern 的區別是,pattern 的多個 event 之間能夠是不連續的,但 sequence 的 events 之間必須是連續的。
咱們能夠看個例子,用 sequence 來發現股票價格的 peak:
from every e1=FilteredStockStream[price>20],
e2=FilteredStockStream[((e2[last].price is null) and price>=e1.price) or ((not (e2[last].price is null)) and price>=e2[last].price)],
e3=FilteredStockStream[price<e2[last].price] select e1.price as priceInitial, e2[last].price as pricePeak, e3.price as priceAfterPeak insert into PeakStream ;
上面的查詢的意思,
e1,收到一條 event.price>20
e2,後續收到的全部 events 的 price,都大於前一條 event
e3,最終收到一條 event 的 price,小於前一條 event
ok,咱們發現了一個peak
Siddhi 還有其餘不少的功能,這裏就不一一說明。。。。。。
那麼最後,咱們看看如何將 Siddhi 融入到咱們當前的框架中,達到做爲 Brain 補充的目的。
我將 Siddhi core 封裝成一個 Siddhi Bolt,這樣能夠在 JStorm 的 topology 中很靈活的,選擇是否什麼方案,能夠部分統計用 brain,部分用 Siddhi,很是簡單。
廢話不說,直接給出源碼,供你們參考,
public class SiddhiBolt implements IRichBolt { protected OutputCollector collector; protected SiddhiManager siddhiManager = null; protected String executionPlan = null; ExecutionPlanRuntime executionPlanRuntime = null; protected HashMap<String,InputHandler> handlers = null; public SiddhiBolt(String plan) { this.executionPlan = plan; } public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.siddhiManager = new SiddhiManager(); this.executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan); addCallbacks(); handlers = new HashMap<String,InputHandler>(); executionPlanRuntime.start(); } public void execute(Tuple tuple) { String inputStream = tuple.getSourceStreamId(); InputHandler inputHandler = getInputHandler(inputStream); List<Object> values = tuple.getValues(); Object[] objects = values.toArray(); try { inputHandler.send(objects); }catch (Exception e){ LOG.error("Send stream event error: ", e); } // collector.fail(tuple); //test replay collector.ack(tuple); // remember ack the tuple // Make sure that add anchor tuple if you want to track it // collector.emit(streamid, tuple,new Values(counters, now)); } public InputHandler getInputHandler(String streamName){ InputHandler handler = null; if(handlers.containsKey(streamName)) handler = handlers.get(streamName); else { handler = executionPlanRuntime.getInputHandler(streamName); if (handler != null) { handlers.put(streamName, handler); } } return handler; } //Need Override public void addCallbacks( ){ //StreamCallback example executionPlanRuntime.addCallback("outputStream", new StreamCallback() { @Override public void receive(Event[] events) { LOG.info("receive events: " + events.length); for (Event e:events) LOG.info(e); } }); //QueryCallback example executionPlanRuntime.addCallback("query1", new QueryCallback() { @Override public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { printEvents(timeStamp, inEvents, removeEvents); } }); } public void printEvents(long timeStamp, Event[] inEvents, Event[] removeEvents){ StringBuilder sb = new StringBuilder(); sb.append("Events{ @timeStamp = ").append(timeStamp).append(", inEvents = ").append( Arrays.deepToString(inEvents)).append(", RemoveEvents = ").append(Arrays.deepToString(removeEvents)).append(" }"); LOG.info(sb.toString()); } public void cleanup() { //Shutting down the runtime executionPlanRuntime.shutdown(); //Shutting down Siddhi siddhiManager.shutdown(); } }
1. Siddhi paper, https://people.apache.org/~hemapani/research/papers/siddi-gce2011.pdf
2. Siddhi doc, https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0