Esper簡介

 

1. CEP(Complex Event Processing, 復瑣事件處理)

事件(Event)通常狀況下指的是一個系統中正在發生的事,事件可能發生在系統的各個層面上,它能夠是某個動做,例如客戶下單,發送消息,提交報告等,也能夠是某種狀態的改變,例如溫度的變化,超時等等。經過對這些事件進行分析,能夠提取出其中有效的信息。 根據維基百科的定義,事件處理(Event processing)指的是跟蹤系統中發生的事件,分析事件中的信息並從中獲得某種結論。而復瑣事件處理,則是結合多個事件源中的事件,從中推斷出更加複雜的狀況下的事件。數據庫

因而可知,CEP的目的包括:(1)識別所須要的事件;(2)快速地對這些事件進行處理。 一般狀況下,咱們想要利用CEP達到的目的是掌握當前的某種狀況或者說狀態,所以CEP感興趣的不是事件自己給出的信息,而是經過這些信息所能推導出的某種結論,經過CEP,咱們可以讓這些事件變得有意義。數組

要實現一個CEP引擎,須要考慮的事情包括: (1)吞吐量; (2)低延遲,從事件到達到事件被處理,不能有太大的延遲; (3)複雜的邏輯處理,CEP須要可以對事件進行較爲複雜的操做,例如,檢測事件之間的相關性,過濾,加窗,鏈接等。數據結構

2. Esper

Esper是一個開源的復瑣事件處理引擎,它的目的是讓用戶可以經過它提供的接口,構建一個用於處理復瑣事件的應用程序。 這裏寫圖片描述 架構

從Esper的架構圖中,能夠看出,Esper主要包括了三個部分:Input adapter,Esper engine,Output adapter。ide

2.1 Input adapter & Output adapter

輸入適配器和輸出適配器的主要目的是接收來自不一樣事件源的事件,並向不一樣的目的地輸出事件。 目前,Esper提供的適配器包括File Input and Output adpter, Spring JMS Input and Output Adapter, AMQP Input and Output Adapter, Kafka Adapter等等。這些適配器提供了一系列接口,可讓用戶從不一樣的數據源讀取數據,並將數據發送給不一樣的目的數據源,用戶能夠不用本身單獨編寫客戶端代碼來鏈接這些數據源,感受至關於對這些數據源提供了一層封裝。函數

2.2 Esper engine

Esper引擎是處理事件的核心,它容許用戶定義須要接收的事件以及對這些事件的處理方式。測試

2.2.1 Esper支持的事件表現形式

Esper支持多種事件表現形式,包括遵循JavaBean方式的含有getter方法的Java POJO(普通Java對象),實現了Map接口的對象,對象數組,XML文檔對象,以及Apache Avro(一個支持JSON和Schema的數據序列化系統,能夠將數據結構或對象轉化成便於存儲和傳輸的格式)。 這些事件表現形式的共同之處在於,它們都提供了事件類型的元數據,也就是說可以表示事件的一系列屬性,例如,一個Java對象能夠經過其成員變量來表示其事件屬性,一個Map對象可以經過鍵值對來表示屬性。因而可知,本質上事件是一系列屬性值的集合,對事件的操做即對事件中的部分或所有屬性的操做。this

2.2.2 Esper事件處理模型

Esper的事件處理模型主要包括兩個部分:Statement和Listener。 (1)Statement 利用Esper的事件處理語言EPL聲明對事件進行的操做,Esper中提供了多種類型的事件操做,包括過濾、加窗、事件聚合等等。EPL是一種相似於SQL的語言,從這一點上來看,Esper剛好與數據庫相反,數據庫時保存數據,並在數據上運行查詢語句,而Esper是保存查詢語句,在這些查詢上運行數據,只要事件與查詢條件匹配,Esper就會實時進行處理,而不是隻有在查詢提交的時候才處理。 假設如今要處理的事件是用戶註冊事件,註冊時用戶須要提供用戶名和年齡,那麼事件中將包含用戶名和年齡兩個屬性,而咱們要作的事是計算用戶的平均年齡,那麼,首先應該定義一個事件類PersonEvent,並加上getter方法:spa

public class PersonEvent extends Event {
    
    private String name;
    private int age;
    
    public PersonEvent(String name, int age) {
        
        this.name = name;
        this.age = age;
        
    }

    public String getName() {
        return name;
    }

    public int getAge() {
        return age;
    }    

}

 

而後,經過EPL語言聲明對事件的操做,此處爲取平均值:code

import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.events.Event;
import com.events.OrderEvent;
import com.events.PersonEvent;
import com.listener.OrderEventListener;
import com.listener.PersonEventListener;

public class EsperClient {
    
    private EPServiceProvider engine;
    
    public EsperClient() {
        //obtain an engine instance
        this.engine = EPServiceProviderManager.getDefaultProvider();
        //System.out.println(engine.getURI());
    }
    
    public void personEventProcess() {
        
        //tell the engine about the event type
        engine.getEPAdministrator().getConfiguration().addEventType(PersonEvent.class);
        //create an epl statement
        String epl = "select name, age from PersonEvent";
        EPStatement statement = engine.getEPAdministrator().createEPL(epl);
        
    }
    
    public void send(Event event) {
        
        engine.getEPRuntime().sendEvent(event);
        
    }

}

 

其中,send方法用於向Esper引擎發送一個事件,當引擎接收到這個事件後,即可根據事件的類型進行相應的處理。 (2)Listener Listener用於監聽事件的處理狀況,接收事件處理的結果,經過UpdateListener接口來實現,它至關於一個回調函數,當事件處理完成以後,能夠經過該回調函數向結果發送到目的地。此處將處理結果打印到控制檯:

import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.UpdateListener;

public class PersonEventListener implements UpdateListener {


    @Override
    public void update(EventBean[] newEvents, EventBean[] oldEvents) {
        // TODO Auto-generated method stub
        EventBean event = newEvents[0];
        System.out.println(String.format("Name: %s, Age: %d", event.get("name"), event.get("age")));
    }


}

 

而後將對事件的操做聲明和監聽器關聯起來:

public void personEventProcess() {
        
        //tell the engine about the event type
        engine.getEPAdministrator().getConfiguration().addEventType(PersonEvent.class);
        //create an epl statement
        String epl = "select avg(age) from PersonEvent";
        EPStatement statement = engine.getEPAdministrator().createEPL(epl);
        //attach a callback to receive the results
        statement.addListener(new PersonEventListener());
    }

 

測試:

import com.esper.client.EsperClient;
import com.events.PersonEvent;

public class Test {

    @SuppressWarnings("static-access")
    public static void main(String[] args) throws InterruptedException {
        // TODO Auto-generated method stub
        
        EsperClient ec = new EsperClient();
        ec.personEventProcess();
        ec.send(new PersonEvent("name", 10));    
        ec.send(new PersonEvent("name", 20));    
}
}
相關文章
相關標籤/搜索