原做者:CoffeeOneSugarphp
翻譯:劉斌華java
在我以前發表的文章中,我提到我最近熱衷於Complex Event Processing (CEP) (復瑣事件處理)。簡單來講,CEP把數據流做爲輸入,根據一系列預約義的規則,把數據(或部分數據)重定向給監聽者們;又或者是當發現數據中的隱含的模式(Pattern)時,觸發事件。在大量數據被產生出來並須要進行實時地分析的場景下,CEP特別有用。程序員
有一個很不錯的軟件項目,可讓你作到這一點,叫作ESPER。你能夠在這裏找到該項目的網站。Esper向程序員提供一個稱爲EPL的語言,有些相似於SQL語言,它能夠很好地用於對規則和模式的配置,這些規則和模式將被應用到數據流上。apache
Esper附帶的文檔是至關完整的,但缺少實際的例子,這使得它看起來難以被使用。因此這裏給出一個Esper的5分鐘指導。雖然下面的例子是用Java編寫,可是其實Esper是同時支持Java和C#的。我這裏假設你已經下載了Esper,若是沒有,能夠經過點擊這個連接來完成。解壓剛纔下載的文件後,你應該在你磁盤的某個地方能夠找到一個叫esper-3.1.0的文件夾。(譯者:這篇文章比較早了,如今esper最新版本是5.2.0)數據結構
在開始前,你須要添加幾個庫文件到你的工程中,固然,esper-3.1.0.jar是其中之一,你也須要另外其餘4個第三方的庫文件,他們能夠在esper-3.1.0.jar/esper/lib文件夾中找到。app
如今開始咱們的5分鐘吧。在你把須要分析的數據丟到CEP引擎的管道中以前,須要把這些數據結構化到對象當中去。讓咱們用一個簡單的例子,寫一個類(或者叫它bean)來描述給定時間的某個股票的價格:dom
import java.util.Date; public static class Tick { String symbol; Double price; Date timeStamp; public Tick(String s, double p, long t) { symbol = s; price = p; timeStamp = new Date(t); } public double getPrice() {return price;} public String getSymbol() {return symbol;} public Date getTimeStamp() {return timeStamp;} @Override public String toString() { return "Price: " + price.toString() + " time: " + timeStamp.toString(); } }
它有3個屬性:股票代碼,價格和時間戳。在咱們開始生成數以億計的數據前,咱們須要通知引擎它須要處理哪些對象,這是經過在實例化CEP引擎時,使用一個Configuration對象來實現的:ide
import com.espertech.esper.client.*; public class main { public static void main(String [] args){ //The Configuration is meant only as an initialization-time object. Configuration cepConfig = new Configuration(); // We register Ticks as objects the engine will have to handle cepConfig.addEventType("StockTick",Tick.class.getName()); // We setup the engine EPServiceProvider cep = EPServiceProviderManager.getProvider("myCEPEngine",cepConfig); } }
做爲測試目的,咱們如今建立一個函數來生成隨機的數據,而且把它們丟到CEP引擎當中,咱們把這個函數叫作「GenerateRandomTick」,它把EPRuntime對象做爲參數,這個對象用於把事件傳遞給CEP引擎:wordpress
import java.util.Random; import com.espertech.esper.client.*; public class exampleMain { private static Random generator=new Random(); public static void GenerateRandomTick(EPRuntime cepRT){ double price = (double) generator.nextInt(10); long timeStamp = System.currentTimeMillis(); String symbol = "AAPL"; Tick tick= new Tick(symbol,price,timeStamp); System.out.println("Sending tick:" + tick); cepRT.sendEvent(tick); } public static void main(String[] args) { //The Configuration is meant only as an initialization-time object. Configuration cepConfig = new Configuration(); cepConfig.addEventType("StockTick",Tick.class.getName()); EPServiceProvider cep=EPServiceProviderManager.getProvider("myCEPEngine",cepConfig); EPRuntime cepRT = cep.getEPRuntime(); } }
如今,咱們有了一個能夠工做的CEP引擎,和不斷輸入的虛假的數據,是時候來建立咱們的第一條規則了,用Esper的說法,咱們的第一條EPL語句。要這麼作,咱們須要請引擎的管理員記錄咱們的語句。而後,CEP引擎會根據EPL語句的定義,過濾它收到的數據,當數據知足語句中的選擇條件或者模式時,觸發事件。函數
public static void main(String[] args) { //The Configuration is meant only as an initialization-time object. Configuration cepConfig = new Configuration(); cepConfig.addEventType("StockTick",Tick.class.getName()); EPServiceProvider cep = EPServiceProviderManager.getProvider("myCEPEngine",cepConfig); EPRuntime cepRT = cep.getEPRuntime(); // We register an EPL statement EPAdministrator cepAdm = cep.getEPAdministrator(); EPStatement cepStatement = cepAdm.createEPL("select * from " + "StockTick(symbol='AAPL').win:length(2) " + "having avg(price) > 6.0"); }
這裏,咱們的規則設置爲:每當最近的2次數據的平均值大於6.0時,觸發事件。
下一步,主要是建立一個監聽者並把它和咱們的選擇規則產生的事件關聯起來。能夠這麼作:
cepStatement.addListener(new CEPListener());
這裏有不一樣的方式來實現監聽者,下面的是其中最簡單的一種。這裏監聽者只是簡單地把它從引擎中收到的對象打印出來:
public static class CEPListener implements UpdateListener { public void update(EventBean[] newData, EventBean[] oldData) { System.out.println("Event received: " + newData[0].getUnderlying()); } }
到目前爲止,看上去還不錯。如今是測試咱們的代碼的時候了。讓咱們生成一些數據,看一切可否正常工做。能夠在main函數中添加一下行來作到:
for(int i = 0; i< 5; i++) GenerateRandomTick(cepRT);
如今全部的代碼看上去以下所示(我把Tick類和入口函數放在一塊兒,這樣你就能把它們複製粘貼到一個文件並運行它們)
import com.espertech.esper.client.*; import java.util.Random; import java.util.Date; public class exampleMain { public static class Tick { String symbol; Double price; Date timeStamp; public Tick(String s, double p, long t) { symbol = s; price = p; timeStamp = new Date(t); } public double getPrice() {return price;} public String getSymbol() {return symbol;} public Date getTimeStamp() {return timeStamp;} @Override public String toString() { return "Price: " + price.toString() + " time: " + timeStamp.toString(); } } private static Random generator = new Random(); public static void GenerateRandomTick(EPRuntime cepRT) { double price = (double) generator.nextInt(10); long timeStamp = System.currentTimeMillis(); String symbol = "AAPL"; Tick tick = new Tick(symbol, price, timeStamp); System.out.println("Sending tick:" + tick); cepRT.sendEvent(tick); } public static class CEPListener implements UpdateListener { public void update(EventBean[] newData, EventBean[] oldData) { System.out.println("Event received: " + newData[0].getUnderlying()); } } public static void main(String[] args) { //The Configuration is meant only as an initialization-time object. Configuration cepConfig = new Configuration(); cepConfig.addEventType("StockTick", Tick.class.getName()); EPServiceProvider cep = EPServiceProviderManager.getProvider("myCEPEngine", cepConfig); EPRuntime cepRT = cep.getEPRuntime(); EPAdministrator cepAdm = cep.getEPAdministrator(); EPStatement cepStatement = cepAdm.createEPL("select * from " + "StockTick(symbol='AAPL').win:length(2) " + "having avg(price) > 6.0"); cepStatement.addListener(new CEPListener()); // We generate a few ticks... for (int i = 0; i < 5; i++) { GenerateRandomTick(cepRT); } } }
Output:
log4j:WARN No appenders could be found for logger (com.espertech.esper.epl.metric.MetricReportingPath). log4j:WARN Please initialize the log4j system properly. Sending tick:Price: 6.0 time: Tue Jul 21 01:11:15 CEST 2009 Sending tick:Price: 0.0 time: Tue Jul 21 01:11:15 CEST 2009 Sending tick:Price: 7.0 time: Tue Jul 21 01:11:15 CEST 2009 Sending tick:Price: 4.0 time: Tue Jul 21 01:11:15 CEST 2009 Sending tick:Price: 9.0 time: Tue Jul 21 01:11:15 CEST 2009 Event received: Price: 9.0 time: Tue Jul 21 01:11:15 CEST 2009
正如你看到的,只有最後兩行數據平均值大於6,所以只有一個事件最終被引擎觸發了。至關不錯!
Oh,你或許擔憂輸出中的第一行,是的,這裏還有一點小問題。事實上,Esper使用的日誌生成包log4j致使了這個警告。它是能夠經過一個叫log4j.xml的文件來配置的,你能夠在esper-3.1.0/examples下的某個例子中的/etc目錄下找到它。我不認爲給咱們全部的程序都弄一個xml配置文件是個好主意,因此咱們在如下代碼中,用點技巧來配置咱們的logger,在你的文件開始處加入些import和代碼:
import org.apache.log4j.ConsoleAppender; import org.apache.log4j.SimpleLayout; import org.apache.log4j.Level; import org.apache.log4j.Logger; //and this in the main function before the rest of your code: public static void main(String [] args){ SimpleLayout layout = new SimpleLayout(); ConsoleAppender appender = new ConsoleAppender(new SimpleLayout()); Logger.getRootLogger().addAppender(appender); Logger.getRootLogger().setLevel((Level) Level.WARN); (...)
5分鐘到此結束。
下一篇文章中,我將更深刻一些來探索EPL語句,提供一些代碼來鏈接兩個引擎實現所謂的「事件精化」(event refinement)(譯者:好像以後做者再也沒有更新過了,因此,不要期望後續了:))
原文地址:https://coffeeonesugar.wordpress.com/2009/07/21/getting-started-with-esper-in-5-minutes/。劉斌華原創翻譯,轉載請註明出處