Esper系列(五)Order by、Limit、構建事件流、Updating an Insert Stream

Order byjava

與SQL語法相似相似,默認爲升序排列;
注意:
sql

  1. 若是order by的子句中出現了聚合函數,那麼該聚合函數必須出如今select的子句中。
  2. 出如今select中的expression或者在select中定義的expression,在order by中也有效。
  3. 若是order by所在的句子沒有join或者沒有group by,則排序結果冪等,不然爲非冪等。

 

Limit
格式一:
express

limit row_count [offset offset_count]

注意:row_count和offset_count既能夠是常量也能夠是變量.
例句:
數組

// 從結果集中第三條開始輸出5個事件流,offset 2表示跳過前面兩個事件流,limit 5表示出入的個數
String epsql = "select value as result from inEvent.win:length_batch(20) limit 5 offset 2";

格式二:dom

limit offset_count[, row_count]

例句:ide

String epsql = "select value as result from inEvent.win:length_batch(20) limit 2,5";

 

構建事件流函數

Insert(未定義的事件流)
功能:根據已有的事件流組合生成新的事件流.
this

 

格式:spa

insert [istream | irstream | rstream] into event_stream_name  [ (property_name [, property_name] ) ]

說明:xml

istream表示新事件流(New Events),rstream表示舊事件流(Old Events),irstream二者都包含event_stream_name爲新建事件流名稱,property_name爲事件流屬性。

例句:

// 建立inEvent事件流該事件流有兩個屬性字段name與salary
String insql = "insert into inEvent select name,salary from  orderEvent";
// 建立EPL
epAdmin.createEPL(insql);
// 查詢inEvent事件流中屬性salary大於150的name屬性字段
String epsql = "select name as result from inEvent where salary > 150 

 

多事件流Insert

例句:

// 將事件流orderEvent的name、salary屬性值分別賦值插給inEvent事件流的content、price屬性
String insql1 = "insert into inEvent(content,price) select name,salary from orderEvent";
epAdmin.createEPL(insql1);
 
// 將事件流orderEvent中的JavaBean,bean中的屬性key、value屬性值分別賦值插給inEvent事件流的content、price屬性
String insql2 = "insert into inEvent (content,price) select bean.key,bean.value from orderEvent ";
epAdmin.createEPL(insql2);
 
// 查詢數據流inEvent中content屬性值
10  String epsql = "select content as result from inEvent ";

注意:

Insert新建立的事件流屬性字段,可由自定義靜態函數返回,但必定要返回javabean,map,或者Object數組,且不能用as來爲轉換後的結果設置別名;

例子:

String insql = "insert into msgEvent select BaseUntil.getEvent() from orderEvent";
epAdmin.createEPL(insql);
String epsql = "select * from msgEvent";

其中BaseUntil.getEvent()返回orderEvent類型的javaBean;

Insert(已定義的事件流)

上面對事件流的構建都是新生成的,即事件流沒有預約義。在事件流有預約義的狀況下,Insert中引用該事件流時必須帶包名。
例如:

文件名:msgEvent.java

// msgEvent事件流定義
public class msgEvent {
    private int msgId;
    private String msgInfo;
    // 注意該事件流定義中沒有對應屬性字段的set方法,只能經過構造函數改變屬性值
    public msgEvent(int msgId, String msgInfo) {
        super();
        this.msgId = msgId;
        this.msgInfo = msgInfo;
10      }
11   
12      @Override
13      public String toString() {
14          return "msgId:"+msgId+",msgInfo:"+msgInfo;
15      }
16  }

 

文件名:BaseUntil.java

public class BaseUntil {
   
    public static int Add(int n){
        return n+100;
    }
   
    public static String UpdataText(String str){
        return str+",你好!";
    }
10     
11      public static orderEvent getEvent(){
12          orderEvent event = new orderEvent();
13          event.setName("張三");
14          event.setSalary(50000);
15          return event;
16      }
17  }

 

文件名:orderListener

/**
 * 用於監聽某個EPL在引擎中的運行狀況,事件進入併產生結果後會回調UpdateListener
 * 必須實現 UpdateListener 接口
 */
public class orderListener implements UpdateListener {
 
    /**
     * arg0對應newEvent,arg1對應oldEvent
     */
10      @Override
11      public void update(EventBean[] arg0, EventBean[] arg1) {
12          if (null != arg0) {
13              for (int i=0;i<arg0.length;i++){
14                  System.out.println("orderEvent Count is "+arg0.length+",EventBean is "+arg0[i].getUnderlying());
15              }
16          } else {
17              System.out.println("EventBean is Null ");
18          }
19      }
20  }

 

文件名:orderMainTest.java

public class orderMainTest {
 
    public static void main(String[] args) throws InterruptedException {
 
        // 添加配置(包所在路勁),方面後面的引用自動添加包名前綴
        Configuration config = new Configuration();
        config.addEventTypeAutoName("cn.chenx.esper.insert");
        //
        EPServiceProvider epServiceProvider = EPServiceProviderManager
10                  .getDefaultProvider(config);
11          EPAdministrator epAdmin = epServiceProvider.getEPAdministrator();
12         
13          ConfigurationOperations configOper = epAdmin.getConfiguration();
14          configOper.addVariable("ifbool", Boolean.class, false);
15          configOper.addImport(BaseUntil.class);
16   
17          // 事件流名稱
18          String className = "orderEvent";// orderEvent.class.getName();
19          System.out.println("className is " + className);
20     
21          String insql = "insert into msgEvent select BaseUntil.getEvent() from orderEvent";
22          epAdmin.createEPL(insql);
23          String epsql = "select * from msgEvent";
24   
25          System.out.println("epsql:" + epsql);
26          EPStatement epstate = epAdmin.createEPL(epsql);
27          epstate.addListener(new orderListener());
28          EPRuntime epRuntime = epServiceProvider.getEPRuntime();
29          //
30          for (int i = 0; i < 5; i++) {
31              int seed = (int) (Math.random() * 100);
32              orderEvent event = new orderEvent("" + seed, 100 + seed);
33              System.out.println("seed name:" + event.getName() + ",salary:"
34                      + event.getSalary());
35              orderBean bean = new orderBean();
36              bean.setKey("BeanKey:" + i);
37              bean.setValue(seed+i);
38              event.setBean(bean);
39   
40              List<orderBean> list = new ArrayList<orderBean>();
41              for (int j = 0; j < 10; j++) {
42                  bean = new orderBean();
43                  bean.setKey("ListKey:" + j);
44                  bean.setValue(seed+j);
45                  list.add(bean);
46              }
47              event.setOrderBeans(list);
48   
49              Map<Integer, orderBean> map = new HashMap<Integer, orderBean>();
50              for (int k = 0; k < 10; k++) {
51                  bean = new orderBean();
52                  bean.setKey("MapKey" + k);
53                  bean.setValue(seed+k);
54                  map.put(k, bean);
55              }
56              event.setOrderMap(map);
57   
58              epRuntime.sendEvent(event);
59          }
60      }
61  }

 

Updating an Insert Stream

功能:
在事件即將被用於計算前,改變其自身的屬性值,而後再將其用於後面的計算.


格式:

update istream event_type [as stream_name]
  set property_name = set_expression [, property_name = set_expression] [,...]
  [where where_expression]

說明:

  • 由於istream的限制,因此該語法只支持新輸入的事件.
  • event_type表明要更新的事件,set以後的property_name是要更新的事件屬性,最後能夠用where子句進行簡單的過濾.
  • update句首用@Priority這個註解,使更新事件的順序是以優先級最高的最早更新;

注意:

  • 若是事件是POJO,那麼要實現java.io.Serializable接口。由於引擎內部的update操做其實是要先深複製原事件再更新拷貝後的事件,不會對原事件做出任何修改。
  • 設置屬性的表達式不能用聚合函數.
  • 若是事件是xml,update語法則不適用.
  • update操做不可用於嵌套的事件.
相關文章
相關標籤/搜索