Order byjava
與SQL語法相似相似,默認爲升序排列;
注意:sql
Limit
格式一:express
1 | limit row_count [offset offset_count] |
注意:row_count和offset_count既能夠是常量也能夠是變量.
例句:數組
1 | // 從結果集中第三條開始輸出5個事件流,offset 2表示跳過前面兩個事件流,limit 5表示出入的個數 |
2 | String epsql = "select value as result from inEvent.win:length_batch(20) limit 5 offset 2"; |
格式二:dom
1 | limit offset_count[, row_count] |
例句:ide
1 | String epsql = "select value as result from inEvent.win:length_batch(20) limit 2,5"; |
構建事件流函數
Insert(未定義的事件流)
功能:根據已有的事件流組合生成新的事件流.this
格式:spa
1 | insert [istream | irstream | rstream] into event_stream_name [ (property_name [, property_name] ) ] |
說明:xml
istream表示新事件流(New Events),rstream表示舊事件流(Old Events),irstream二者都包含event_stream_name爲新建事件流名稱,property_name爲事件流屬性。
例句:
1 | // 建立inEvent事件流該事件流有兩個屬性字段name與salary |
2 | String insql = "insert into inEvent select name,salary from orderEvent"; |
3 | // 建立EPL |
4 | epAdmin.createEPL(insql); |
5 | // 查詢inEvent事件流中屬性salary大於150的name屬性字段 |
6 | String epsql = "select name as result from inEvent where salary > 150 |
多事件流Insert
例句:
1 | // 將事件流orderEvent的name、salary屬性值分別賦值插給inEvent事件流的content、price屬性 |
2 | String insql1 = "insert into inEvent(content,price) select name,salary from orderEvent"; |
3 | epAdmin.createEPL(insql1); |
4 | |
5 | // 將事件流orderEvent中的JavaBean,bean中的屬性key、value屬性值分別賦值插給inEvent事件流的content、price屬性 |
6 | String insql2 = "insert into inEvent (content,price) select bean.key,bean.value from orderEvent "; |
7 | epAdmin.createEPL(insql2); |
8 | |
9 | // 查詢數據流inEvent中content屬性值 |
10 | String epsql = "select content as result from inEvent "; |
注意:
Insert新建立的事件流屬性字段,可由自定義靜態函數返回,但必定要返回javabean,map,或者Object數組,且不能用as來爲轉換後的結果設置別名;
例子:
1 | String insql = "insert into msgEvent select BaseUntil.getEvent() from orderEvent"; |
2 | epAdmin.createEPL(insql); |
3 | String epsql = "select * from msgEvent"; |
其中BaseUntil.getEvent()返回orderEvent類型的javaBean;
Insert(已定義的事件流)
上面對事件流的構建都是新生成的,即事件流沒有預約義。在事件流有預約義的狀況下,Insert中引用該事件流時必須帶包名。
例如:
文件名:msgEvent.java
1 | // msgEvent事件流定義 |
2 | public class msgEvent { |
3 | private int msgId; |
4 | private String msgInfo; |
5 | // 注意該事件流定義中沒有對應屬性字段的set方法,只能經過構造函數改變屬性值 |
6 | public msgEvent(int msgId, String msgInfo) { |
7 | super(); |
8 | this.msgId = msgId; |
9 | this.msgInfo = msgInfo; |
10 | } |
11 | |
12 | @Override |
13 | public String toString() { |
14 | return "msgId:"+msgId+",msgInfo:"+msgInfo; |
15 | } |
16 | } |
文件名:BaseUntil.java
1 | public class BaseUntil { |
2 | |
3 | public static int Add(int n){ |
4 | return n+100; |
5 | } |
6 | |
7 | public static String UpdataText(String str){ |
8 | return str+",你好!"; |
9 | } |
10 | |
11 | public static orderEvent getEvent(){ |
12 | orderEvent event = new orderEvent(); |
13 | event.setName("張三"); |
14 | event.setSalary(50000); |
15 | return event; |
16 | } |
17 | } |
文件名:orderListener
1 | /** |
2 | * 用於監聽某個EPL在引擎中的運行狀況,事件進入併產生結果後會回調UpdateListener |
3 | * 必須實現 UpdateListener 接口 |
4 | */ |
5 | public class orderListener implements UpdateListener { |
6 | |
7 | /** |
8 | * arg0對應newEvent,arg1對應oldEvent |
9 | */ |
10 | @Override |
11 | public void update(EventBean[] arg0, EventBean[] arg1) { |
12 | if (null != arg0) { |
13 | for (int i=0;i<arg0.length;i++){ |
14 | System.out.println("orderEvent Count is "+arg0.length+",EventBean is "+arg0[i].getUnderlying()); |
15 | } |
16 | } else { |
17 | System.out.println("EventBean is Null "); |
18 | } |
19 | } |
20 | } |
文件名:orderMainTest.java
1 | public class orderMainTest { |
2 | |
3 | public static void main(String[] args) throws InterruptedException { |
4 | |
5 | // 添加配置(包所在路勁),方面後面的引用自動添加包名前綴 |
6 | Configuration config = new Configuration(); |
7 | config.addEventTypeAutoName("cn.chenx.esper.insert"); |
8 | // |
9 | EPServiceProvider epServiceProvider = EPServiceProviderManager |
10 | .getDefaultProvider(config); |
11 | EPAdministrator epAdmin = epServiceProvider.getEPAdministrator(); |
12 | |
13 | ConfigurationOperations configOper = epAdmin.getConfiguration(); |
14 | configOper.addVariable("ifbool", Boolean.class, false); |
15 | configOper.addImport(BaseUntil.class); |
16 | |
17 | // 事件流名稱 |
18 | String className = "orderEvent";// orderEvent.class.getName(); |
19 | System.out.println("className is " + className); |
20 | |
21 | String insql = "insert into msgEvent select BaseUntil.getEvent() from orderEvent"; |
22 | epAdmin.createEPL(insql); |
23 | String epsql = "select * from msgEvent"; |
24 | |
25 | System.out.println("epsql:" + epsql); |
26 | EPStatement epstate = epAdmin.createEPL(epsql); |
27 | epstate.addListener(new orderListener()); |
28 | EPRuntime epRuntime = epServiceProvider.getEPRuntime(); |
29 | // |
30 | for (int i = 0; i < 5; i++) { |
31 | int seed = (int) (Math.random() * 100); |
32 | orderEvent event = new orderEvent("張" + seed, 100 + seed); |
33 | System.out.println("seed name:" + event.getName() + ",salary:" |
34 | + event.getSalary()); |
35 | orderBean bean = new orderBean(); |
36 | bean.setKey("BeanKey:" + i); |
37 | bean.setValue(seed+i); |
38 | event.setBean(bean); |
39 | |
40 | List<orderBean> list = new ArrayList<orderBean>(); |
41 | for (int j = 0; j < 10; j++) { |
42 | bean = new orderBean(); |
43 | bean.setKey("ListKey:" + j); |
44 | bean.setValue(seed+j); |
45 | list.add(bean); |
46 | } |
47 | event.setOrderBeans(list); |
48 | |
49 | Map<Integer, orderBean> map = new HashMap<Integer, orderBean>(); |
50 | for (int k = 0; k < 10; k++) { |
51 | bean = new orderBean(); |
52 | bean.setKey("MapKey" + k); |
53 | bean.setValue(seed+k); |
54 | map.put(k, bean); |
55 | } |
56 | event.setOrderMap(map); |
57 | |
58 | epRuntime.sendEvent(event); |
59 | } |
60 | } |
61 | } |
Updating an Insert Stream
功能:
在事件即將被用於計算前,改變其自身的屬性值,而後再將其用於後面的計算.
格式:
1 | update istream event_type [as stream_name] |
2 | set property_name = set_expression [, property_name = set_expression] [,...] |
3 | [where where_expression] |
說明:
注意: