Esper學習之八:EPL語法(四)

關於EPL,已經寫了三篇了,預估計了一下,除了今天這篇,後面還有5篇左右。你們可別嫌多,官方的文檔對EPL的講解有將近140頁,我已經儘可能將廢話都幹掉了,再配合我附上的例子,看個人10篇文章比那140頁英文文檔確定舒服多了吧。也請各位原諒我一週一篇的速度,畢竟我還要學習,生活,工做,一個都不能少。java

        今天講解的內容包括三塊:Order by,Limit,Insert into。你們會SQL的應該很熟悉這三個東西,前兩個比較簡單,Insert into會有一些差異,篇幅也相對多些。express

 

1.Order by數組

EPL的Order by和SQL的幾乎如出一轍,做用都是對輸出結果進行排序,可是也有一些須要注意的地方。語法以下:app

 

[plain]  view plain copy
 
  1. order by expression [asc | desc] [, expression [asc | desc]] [, ...]  

expreession表示要排序的字段,asc表示升序排列(從小到大),desc表示降序排列(從大到小)。舉個例子:ide

 

[plain]  view plain copy
 
  1. // 每進入5個事件輸出一次,而且先按照name升序排列,再按照age降序排列。  
  2. select * from User output every 5 events order by name, age desc  

使用方法很簡單,除了和SQL類似的特色外,還有他本身須要注意的幾點:函數

 

a. 若是不特別說明是升序仍是降序,默認狀況下按照升序排列。學習

b. 若是order by的子句中出現了聚合函數,那麼該聚合函數必須出如今select的子句中。this

c. 出如今select中的expression或者在select中定義的expression,在order by中也有效。spa

d. 若是order by所在的句子沒有join或者沒有group by,則排序結果冪等,不然爲非冪等。.net

 

2. Limit

Limit在EPL中和在SQL中也基本同樣,不過SQL中是用具體的數字來表示限制範圍,而EPL能夠是常量或者變量來表示限制範圍。語法以下:

 

[plain]  view plain copy
 
  1. limit row_count [offset offset_count]  

row_count表示輸出多少行,能夠是一個整型常量,也能夠是一個整型變量,以方便運行時修改。

 

offset_count表示在當前結果集中跳過n行而後再輸出,一樣也能夠是一個整型變量。若是不使用此參數,則表示跳過0行,即從第一行輸出。舉例以下:

 

[plain]  view plain copy
 
  1. // 輸出結果集的第3行到第10行  
  2. select uri, count(*) from WebEvent group by uri output snapshot every 1 minute order by count(*) desc limit 8 offset 2  

除了以上的語法,limit還有一種簡化的寫法,其實是參照SQL的標準。

 

 

[plain]  view plain copy
 
  1. limit offset_count[, row_count]  

兩個參數的含義和上面的同樣,而且咱們將上面的例子改寫一下:

 

 

[plain]  view plain copy
 
  1. // 輸出結果集的第3行到第10行  
  2. select uri, count(*) from WebEvent group by uri output snapshot every 1 minute order by count(*) desc limit 2, 8  

若是這個兩個參數是負數會怎麼樣呢?

 

row_count爲負數,則無限制輸出,若爲0,則不輸出。當row_count是變量表示而且變量爲null,則無限制輸出。

offset _count是不容許負數的,若是是變量表示,而且變量值爲null或者負數,則EPL會把他假設爲0。

 

3. Insert into

3.1 簡單用法

        EPL的Insert into和SQL的有比較大的區別。SQL是往一張表裏插入數據,而EPL是把一個事件流的計算結果放入另外一個事件流,而後能夠對這個事件流進行別的計算。因此Insert into的一個好處就是能夠將是事件流的計算結果不斷級聯,對於那種須要將上一個業務的結果數據放到下一個業務處理的場景再適合不過了。除此以外,Insert into還有合併多個計算結果的做用。到這裏相信你們已經對他愈來愈好奇了,不急,我們先來看看語法:

 

[plain]  view plain copy
 
  1. insert [istream | irstream | rstream] into event_stream_name [ (property_name [, property_name] ) ]  

event_stream_name定義了事件流的名稱,在執行完insert的定義以後,咱們可使用select對這個事件流進行別的計算。

istream | irstream | rstream表示該事件流容許另外一個事件的輸入/輸入和輸出/輸出數據可以進入(解釋好像很繞。。一下子看例子就能明白了)

property_name表示該事件流裏包含的屬性名稱,多個屬性名之間用逗號分割,而且用小括號括起來。

上面的說明可能不是很好理解,我們先看個例子:

 

[plain]  view plain copy
 
  1. // 將新進入的Asus事件傳遞到Computer,且Asus的id,size和Computer的cid,csize對應  
  2. insert into Computer(cid,csize) select id,size from Asus  
  3.   
  4. // 第二種寫法  
  5. insert into Computer select id as cid, size as csize Asus  

        從例子中能夠看到,insert into須要配合select進行使用,以代表前一個事件流有哪些計算結果將進入insert into定義的事件流。而且在select中的字段要和insert裏的事件流的屬性要對應(這裏指的對應是數據類型對應,並且屬性數量也必須同樣)。若是說insert定義的事件流名稱在以前已經定義過(insert into中定義的除外),重名是不容許的。

 

        我我的推薦第二種寫法,經過as設置的別名即爲insert定義的事件流的屬性,這樣能夠避免屬性的個數不一致的錯誤。

 

剛纔說了istream | irstream | rstream的用法,可能有點表述不清楚,這裏看一個完整的例子。

 

[java]  view plain copy
 
  1. /** 
  2.  *  
  3.  * @author luonanqin 
  4.  * 
  5.  */  
  6. class Asus  
  7. {  
  8.     private int id;  
  9.     private int size;  
  10.   
  11.     public int getId()  
  12.     {  
  13.         return id;  
  14.     }  
  15.   
  16.     public void setId(int id)  
  17.     {  
  18.         this.id = id;  
  19.     }  
  20.   
  21.   
  22.     public int getSize()  
  23.     {  
  24.         return size;  
  25.     }  
  26.   
  27.     public void setSize(int size)  
  28.     {  
  29.         this.size = size;  
  30.     }  
  31.   
  32.     public String toString()  
  33.     {  
  34.         return "id: " + id + ", size: " + size;  
  35.     }  
  36. }  
  37.   
  38.   
  39. class InsertRstreamListener implements UpdateListener  
  40. {  
  41.     public void update(EventBean[] newEvents, EventBean[] oldEvents)  
  42.     {  
  43.         if (newEvents != null)  
  44.         {  
  45.             for (int i = 0; i < newEvents.length; i++)  
  46.             {  
  47.                 Object id = newEvents[i].get("cid");  
  48.                 System.out.println("Insert Asus: cid: " + id);  
  49.             }  
  50.         }  
  51.         if (oldEvents != null)  
  52.         {  
  53.             for (int i = 0; i < oldEvents.length; i++)  
  54.             {  
  55.                 Object id = oldEvents[i].get("cid");  
  56.                 System.out.println("Remove Asus: cid: " + id);  
  57.             }  
  58.         }  
  59.         System.out.println();  
  60.     }  
  61. }  
  62.   
  63. public class InsertRstreamTest {  
  64.   
  65.     public static void main(String[] args) throws InterruptedException {  
  66.         EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();  
  67.   
  68.         EPAdministrator admin = epService.getEPAdministrator();  
  69.   
  70.         String asus = Asus.class.getName();  
  71.         String insertEPL = "insert rstream into Computer(cid,csize) select id,size from " + asus + ".win:length(1)";  
  72.         String insertSelectEPL = "select cid from Computer.win:length_batch(2)";  
  73.   
  74.         EPStatement state = admin.createEPL(insertEPL);  
  75.         EPStatement state1 = admin.createEPL(insertSelectEPL);  
  76.         state1.addListener(new InsertRstreamListener());  
  77.   
  78.         EPRuntime runtime = epService.getEPRuntime();  
  79.   
  80.         Asus apple1 = new Asus();  
  81.         apple1.setId(1);  
  82.         apple1.setSize(1);  
  83.         System.out.println("Send Asus: " + apple1);  
  84.         runtime.sendEvent(apple1);  
  85.   
  86.         Asus apple2 = new Asus();  
  87.         apple2.setId(2);  
  88.         apple2.setSize(1);  
  89.         System.out.println("Send Asus: " + apple2);  
  90.         runtime.sendEvent(apple2);  
  91.   
  92.         Asus apple3 = new Asus();  
  93.         apple3.setId(3);  
  94.         apple3.setSize(3);  
  95.         System.out.println("Send Asus: " + apple3);  
  96.         runtime.sendEvent(apple3);  
  97.   
  98.         Asus apple4 = new Asus();  
  99.         apple4.setId(4);  
  100.         apple4.setSize(4);  
  101.         System.out.println("Send Asus: " + apple4);  
  102.         runtime.sendEvent(apple4);  
  103.   
  104.         Asus apple5 = new Asus();  
  105.         apple5.setId(5);  
  106.         apple5.setSize(3);  
  107.         System.out.println("Send Asus: " + apple5);  
  108.         runtime.sendEvent(apple5);  
  109.   
  110.         Asus apple6 = new Asus();  
  111.         apple6.setId(6);  
  112.         apple6.setSize(4);  
  113.         System.out.println("Send Asus: " + apple6);  
  114.         runtime.sendEvent(apple6);  
  115.     }  
  116. }  

執行結果:

 

 

[plain]  view plain copy
 
  1. Send Asus: id: 1, size: 1  
  2. Send Asus: id: 2, size: 1  
  3. Send Asus: id: 3, size: 3  
  4. Insert Asus: cid: 1  
  5. Insert Asus: cid: 2  
  6.   
  7. Send Asus: id: 4, size: 4  
  8. Send Asus: id: 5, size: 3  
  9. Insert Asus: cid: 3  
  10. Insert Asus: cid: 4  
  11.   
  12. Send Asus: id: 6, size: 4  

        這個例子中,insertEPL表示當Asus事件從length爲1的view中移除時,把移除的事件放入Computer。insertSelectEPL是對Computer的事件流進行計算,這裏只是在每進入兩個事件時才輸出這兩個事件的cid。而rstream在這裏的表現,從執行結果中能夠看到,在進入id爲1 2 3的事件後,insertSelectEPL的監聽器被觸發,由於id爲1和2的事件是在發送了Asus的id爲2和3的事件以後被移除了,以後就進入了Computer,並知足了length=2,所以在監聽器裏看到有id爲1和2的事件進入了Computer。

 

        若是不顯示指定rstream,則insert into只容許istream的事件流進入Computer。若是指定爲irstream,那麼進入的和移除的Asus都會進入到Computer。

 

上面的例子都是指定了insert into裏事件流會有什麼屬性,若是不指定會是什麼結果呢?請看例句:

 

[plain]  view plain copy
 
  1. insert into Computer select * from Asus  

        很容易想到,這裏其實是把進入引擎的Asus事件都傳遞到Computer定義的事件流中,而且屬性什麼的徹底和Asus同樣,能夠說是Asus的一個複製版本,只是名字不同。也許有人以爲這麼作沒什麼意思,直接計算Asus事件流不就能夠了麼,實際上在業務處理數據時,這種作法就能夠屏蔽掉外部的數據來源,作到業務層上的隔離。

 

        假設Asus中還包含其餘的JavaBean,一樣也能夠將這個Bean的數據傳遞到另外一個事件流中。例句以下:

 

[plain]  view plain copy
 
  1. // Lenovo中包含了thinkpad這個JavaBean  
  2. insert into Computer select thinkpad.* from Lenovo  

 


3.2 Merge Event Stream

        insert into除了接收一個流的事件,同時也支持多個流的合併。通俗一點來講,合併的流數據要一致才能夠合併。並且在第一次定義insert的事件流之後,別的事件流想要被合併就必須和以前定義的屬性數量和數據類型對應。舉例以下:

 

[plain]  view plain copy
 
  1. // 定義Computer並把Asus的數據輸入  
  2. insert into Computer(cid, csize) select aid,asize from Asus  
  3.   
  4. // 根據以前的Computer定義將Lenovo對應的屬性輸入  
  5. insert into Computer(cid, csize) select lid,lsize from Lenovo  

若是說select了多個事件流,可是你只想輸入其中一個,應該像下面這樣寫:

[plain]  view plain copy
 
  1. insert into Computer select l.* from Asus as a, Lenovo as l  

除此以外,EPL還支持調用函數轉換事件後再輸入insert into:

[plain]  view plain copy
 
  1. // 將Lenovo事件轉換後輸入Computer  
  2. insert into Computer select Converter.convert(l) from Lenovo as l  

注意,使用自定義函數必定要返回javabean,map,或者Object數組,且不能用as來爲轉換後的結果設置別名。

 

3.3 Decorated Events

以前所見到的不是將事件流總體輸入insert,就是將事件流的部分屬性輸入insert。實際上能夠將事件流總體和事件流屬性組成的複雜表達式一塊兒放入insert。示例以下:

 

[plain]  view plain copy
 
  1. insert into Computer select *, size*price as sp from Asus  
  2. // 第一個*表示Asus,size*price的*表示乘法,二者互不影響  

若是說別的事件流想進入此insert,那麼事件流屬性必定要和第一個*表示的全部屬性相同。

 

 

3.4 Event Objects Instantiated by insert into

        前面的全部例子中,對於insert into的事件結構都是在insert子句中配合select子句進行定義的。若是咱們想用已經定義好的事件結構是否能夠呢?答案是確定的。可是若是事件是javabean,而且事先沒有註冊到引擎,則須要insert子句中寫上類的全名。例如:

 

[plain]  view plain copy
 
  1. insert into test.computer.Computer ...  

固然,若是在使用以前有註冊過,那麼使用註冊時的名稱也是能夠的。

 

        由於事件結構是早就定義好的,因此在寫select的時候就必須符合insert事件中的屬性了,若是屬性名稱不同須要使用as加上別名,同樣的能夠不用設置別名,且數據類型也要一一對應。例如:

 

[plain]  view plain copy
 
  1. // Computer中包含cid和csize屬性  
  2. insert into test.computer.Computer select aid as cid, asize as csize from Dell  

可是這種用法在Computer存在包含了參數的構造方法時就顯得沒那麼必須了。先看一個完整例子,你也許就會明白了。

 

 

[java]  view plain copy
 
  1. /** 
  2.  *  
  3.  * @author luonanqin 
  4.  * 
  5.  */  
  6. class Car  
  7. {  
  8.     private int size;  
  9.     private String name;  
  10.     private int price;  
  11.   
  12.     public void setSize(int size)  
  13.     {  
  14.         this.size = size;  
  15.     }  
  16.   
  17.     public void setName(String name)  
  18.     {  
  19.         this.name = name;  
  20.     }  
  21.   
  22.     public void setPrice(int price)  
  23.     {  
  24.         this.price = price;  
  25.     }  
  26.   
  27.     public int getSize()  
  28.     {  
  29.         return size;  
  30.     }  
  31.   
  32.     public String getName()  
  33.     {  
  34.         return name;  
  35.     }  
  36.   
  37.     public int getPrice()  
  38.     {  
  39.         return price;  
  40.     }  
  41.   
  42. }  
  43.   
  44. class Auto  
  45. {  
  46.     private int autoSize;  
  47.     private String autoName;  
  48.   
  49.     public Auto(int s, String n)  
  50.     {  
  51.         this.autoSize = s;  
  52.         this.autoName = n;  
  53.     }  
  54.   
  55.     public String toString()  
  56.     {  
  57.         return "AutoSize: " + autoSize + ", AutoName: " + autoName;  
  58.     }  
  59. }  
  60.   
  61. class Benz  
  62. {  
  63.     private int benzSize;  
  64.     private String benzName;  
  65.   
  66.     public void setBenzSize(int benzSize)  
  67.     {  
  68.         this.benzSize = benzSize;  
  69.     }  
  70.   
  71.     public void setBenzName(String benzName)  
  72.     {  
  73.         this.benzName = benzName;  
  74.     }  
  75.   
  76.     public String toString()  
  77.     {  
  78.         return "BenzSize: " + benzSize + ", BenzName: " + benzName;  
  79.     }  
  80. }  
  81.   
  82. class InstantiatePopulateListener implements UpdateListener  
  83. {  
  84.     public void update(EventBean[] newEvents, EventBean[] oldEvents)  
  85.     {  
  86.         if (newEvents != null)  
  87.         {  
  88.             Object car = newEvents[0].getUnderlying();  
  89.             System.out.println(car);  
  90.         }  
  91.     }  
  92. }  
  93.   
  94. public class InstantiatePopulateTest  
  95. {  
  96.     public static void main(String[] args) throws InterruptedException  
  97.     {  
  98.         EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();  
  99.   
  100.         EPAdministrator admin = epService.getEPAdministrator();  
  101.   
  102.         String car = Car.class.getName();  
  103.         String auto = Auto.class.getName();  
  104.         String benz = Benz.class.getName();  
  105.   
  106.         String cartToAutoEpl = "insert into " + auto + " select size, name from " + car;  
  107.         String autoEpl = "select * from " + auto;  
  108.         String cartToBenzEpl = "insert into " + benz + " select size as benzSize, name as benzName from " + car;  
  109.         // String benzEpl2 = "insert into " + benz + "(benzSize,benzName) select size, name from " + car;<pre name="code" class="java">     String benzEpl = "select * from " + benz;</pre>admin.createEPL(cartToAutoEpl);EPStatement state1 = admin.createEPL(autoEpl);state1.addListener(new InstantiatePopulateListener());admin.createEPL(cartToBenzEpl);EPStatement state2 = admin.createEPL(benzEpl);state2.addListener(new InstantiatePopulateListener());EPRuntime runtime = epService.getEPRuntime();Car c1 = new Car();c1.setSize(1);c1.setName("car1");c1.setPrice(11);runtime.sendEvent(c1);Car c2 = new Car();c2.setSize(2);c2.setName("car2");c2.setPrice(22);runtime.sendEvent(c2);}}  

執行結果:

 

 

[plain]  view plain copy
 
  1. AutoSize: 1, AutoName: car1  
  2. BenzSize: 1, BenzName: car1  
  3. AutoSize: 2, AutoName: car2  
  4. BenzSize: 2, BenzName: car2  

這裏的執行結果很容易理解,關鍵是carToAutoEpl和carToBenzEpl兩個句子。

 

        對於Auto的JavaBean,咱們能夠發現它包含一個有參數的構造函數且沒有屬性對應的set方法,在carToAutoEpl中,select的內容並無和屬性名稱對應起來。這種寫法確實是正確的,正由於Auto中定了含參的構造函數,才使得select能夠寫的更隨意。可是必定要記住,構造函數裏的參數順序必定要和select中的屬性的數據類型對應起來,若是這裏把name和size互換位置,一定報錯!

        對於Benz的JavaBean,能夠看到每一個屬性都有對應的set方法,而沒有含參的構造函數,因此select中屬性的名稱須要as來設置別名。固然,像benzEpl2那種寫法,一樣能夠避免select中設置別名。

 

        一句話總結,若是JavaBean中有含參的構造函數,EPL中不須要顯示寫出屬性名稱。若是沒有構造函數,那麼必須包含set方法,且select中要寫出具體的屬性。這幾種寫法各有各的好處,你們使用時可針對具體的狀況選擇性使用。

相關文章
相關標籤/搜索