package com.framework_technology.esper.views; import com.framework_technology.esper.javabean.Apple; import com.java.annotation.document.Undigested; import com.java.annotation.document.Unfinished; import com.java.annotation.document.Unsolved; /** * Created by IntelliJ IDEA. * User: wei.Li * Date: 14-8-8 * <p> * 視圖 Views窗口語法 * <p> * esper/doc/reference - Chapter 12. EPL Reference: Views */ public class View { /** * Table 12.1. Built-in Data Window Views * <p> * View Syntax Description * ========================================================================================================== * win:length(size) 滑動窗口,存最近的 size 條數據 * win:length_batch(size) 批處理-間隔窗口,存滿 size 條觸發查詢 * win:time(time period) 滑動窗口,存最近 size 秒內的數據 * win:ext_timed(timestamp expression, time period) 滑動窗口,存最近 time period 內的數據,再也不基於EPL引擎,而是系統時間戳[timestamp] * win:time_batch(time period[,optional reference point] [, flow control]) 批次事件和釋放他們指定的時間間隔,與流控制選項 * win:ext_timed_batch(timestamp expression, time period[,optional reference point]) 批處理事件並釋放它們的基礎上由表達式提供的毫秒值每隔指定的時間間隔 * win:time_length_batch(time period, size [, flow control]) 多策略的時間和長度的批處理窗口,當時間或數量任意一個知足條件時,觸發查詢 * win:time_accum(time period) 阻塞輸出,直到 period 內沒有新進入的數據時,才輸出並觸發查詢. 這些數據視爲已移出窗口 * win:keepall() keep-all 無參數,記錄全部進入的數據,除非使用delete操做,才能從窗口移出數據 * ext:sort(size, sort criteria) 排序的排序標準表達式返回的值,僅保留事件到給定的大小。 Q:select sum(amount) from userbuy.ext:sort(3, amount desc) 將3個最高金額求和 * ext:rank(unique criteria(s), size, sort criteria(s)) 只保留最近的事件中有相同的值爲標準表達式(s)按某種標準表達式和僅保留頂部事件到給定的大小。 Q:select * from userbuy.ext:sort(3,amount desc,id asc) 找出最高金額的3條事件,並按id排序 * ext:time_order(timestamp expression, time period) Orders events that arrive out-of-order, using an expression providing timestamps to be ordered. * std:unique(unique criteria(s)) 對不一樣的unique[例如:id]保留其最近的一條事件 * std:groupwin(grouping criteria(s)) 通常與其它窗口組合使用,將進入的數據按表達式 id 分組 * 關於分組的時間窗口:當使用grouped-window時間窗口,注意,是否保留5分鐘的事件或每組保留5分鐘的事件, * 結果都是同樣的從保留的角度事件既保留政策,考慮全部組,同一組的事件。所以請單獨指定時間窗 * std:lastevent() 保留最後一個事件,長度爲1 * std:firstevent() 保留第一個事件,不顧後續的事件 * std:firstunique(unique criteria(s)) Retains only the very first among events having the same value for the criteria expression(s), disregarding all subsequent events for same value(s). * win:firstlength(size) 保留一批數據的前 size 條,需配合delete操做 * win:firsttime(time period) 保留窗口初始化後 period 內的全部數據 * * @return epl */ protected static String dataWindowViews() { //每進入3個事件後統計輸出一次newEvents[3] String epl1 = "select price from " + Apple.CLASSNAME + ".win:length_batch(3)"; //每進入3個事件後,按price分組統計3個事件中每組 price 出現的次數 // TODO 默認記錄了之前的分組統計信息,當前10個事件中不存在之前分組統計信息,則 count(price)爲0 String epl2 = "select rstream price , count(price) from " + Apple.CLASSNAME + ".win:length_batch(3) group by price"; //[istream | rstream]的規則,在咱們插入一批數據後,istream在3秒後輸出進入窗口的數據,3秒事後,rstream會輸出一樣的內容 String epl3 = "select rstream price , count(price) from " + Apple.CLASSNAME + ".win:time(5 seconds) group by price"; @Unfinished(Description = "ERROR-> Caused by: com.espertech.esper.view.ViewParameterException: Invalid parameter expression 0: Property named 'timestamp' is not valid in any stream") String epl4 = "select price from " + Apple.CLASSNAME + ".win:ext_timed(timestamp, 10 sec)"; //win:time_batch(10 sec,"FORCE_UPDATE, START_EAGER") 加上這兩個參數後,會在窗口初始化時就執行查詢,而且在沒有數據進入和移出時,強制查詢出結果 String epl5 = "select * from " + Apple.CLASSNAME + ".win:time_batch(10 sec, \"FORCE_UPDATE, START_EAGER\")"; // 時間和數量知足任意一個則輸出 String epl6 = "select price from " + Apple.CLASSNAME + ".win:time_length_batch(3 sec, 5, \"FORCE_UPDATE, START_EAGER\")"; //3s 內沒有數據進入則輸出結果,並移除數據 String epl7 = " select rstream price from " + Apple.CLASSNAME + ".win:time_accum(3 sec)"; //只輸出前10個事件 String epl8 = "select price from " + Apple.CLASSNAME + ".win:firstlength(10)"; //對 id 和 price 分組,保留最後一條事件記錄 等同於std:groupwin(id, price).win:length(1) String epl9 = "select price from " + Apple.CLASSNAME + ".std:unique(id, price)"; //進入的數據按id分組,相同id的數據條目數不大於3 @Unfinished(Description = "輸出結果不明確") String epl10 = "select sum(price) from " + Apple.CLASSNAME + ".std:groupwin(id).win:length(3)"; //統計每組id最近3次消費的總price String epl11 = "select sum(price) from " + Apple.CLASSNAME + ".std:groupwin(id).win:length(3) group by id"; //當前窗口中 最大的3個 price 的sum 值 String epl12 = "select sum(price) from " + Apple.CLASSNAME + ".ext:sort(3, price desc)"; //當前窗口中 按 price 升序 id 降序 的price sum 值 String epl13 = "select sum(price) from " + Apple.CLASSNAME + ".ext:sort(3, price desc, id asc)"; //按 size 分組,只保留最後一個分組的 size 數據,取前3個 price 的 sum 值 String epl14 = "select sum(price) from " + Apple.CLASSNAME + ".ext:rank(size, 3, price desc)"; //create_time 時間排序,保留4s 的數據 String epl15 = "select rstream * from " + Apple.CLASSNAME + ".ext:time_order(create_time, 4 sec)"; return epl15; } /** * win:expr(expiry expression) 窗口的事件是否保留取決於過時表達式expression的結果。 * expression至關於開關,若是一直是true,那麼時間進入view不移除,直到expression爲false,將以前保留的事件從view中所有刪掉 * <p> * Table 12.3. Built-in Properties of the Expiry Expression Data Window View * <p> * Name Type Description * ================================================================================ * current_count int 當前數據窗口包括到達的事件的數量 * expired_count int 評估過時的事件數量 * newest_event (same event type as arriving events) 最後一個到達的事件 * newest_timestamp long 引擎的時間戳與last-arriving事件有關。 * oldest_event (same event type as arriving events) currently-evaluated事件自己。 * oldest_timestamp long 引擎的時間戳與currently-evaluated事件有關。 * view_reference Object 當前視圖處理的對象 * * @return epl */ protected static String expr() { //保留最後進入的2個事件 String epl1 = "select rstream price from " + Apple.CLASSNAME + ".win:expr(current_count <= 2)"; //已過時事件數量 String epl2 = "select rstream price from " + Apple.CLASSNAME + ".win:expr(expired_count = 2)"; //保留最後5秒進入的事件 String epl3 = "select rstream price from " + Apple.CLASSNAME + ".win:expr(oldest_timestamp > newest_timestamp - 5000)"; // id 相同標誌的事件被保留,若是 id 值變化則移除保留的事件 String epl4 = "select rstream price from " + Apple.CLASSNAME + ".win:expr(newest_event.id = oldest_event.id)"; return epl2; } /** * win:expr_batch(expiry expression) 事件窗口,批次處理,基於過時表達式的結果做爲一個參數傳遞判斷是否移除他們 * 參考{@link #expr()} * <p> * Table 12.4. Built-in Properties of the Expiry Expression Data Window View * <p> * Name Type Description * ================================================================================ * current_count int 在數據窗口包括當前到達的事件的事件的數量 * newest_event (same event type as arriving events) 最後一個到達的事件 * newest_timestamp long 引擎的時間戳與last-arriving事件有關。 * oldest_event (same event type as arriving events) currently-evaluated事件自己。 * oldest_timestamp long 引擎的時間戳與currently-evaluated事件有關。 * view_reference Object 當前視圖處理的對象 * * @return epl */ protected static String expr_batch() { //按current_count保留的事件數量分組,批量處理輸出 String epl1 = "select rstream price from " + Apple.CLASSNAME + ".win:expr_batch(current_count > 2)"; //保留最後5秒進入的事件分組- 5秒內的事件,批量輸出 String epl2 = "select rstream price from " + Apple.CLASSNAME + ".win:expr_batch(oldest_timestamp > newest_timestamp - 5000)"; return epl1; } /** * 如下爲數據公式計算 */ /** * View Syntax Description * ============================================================================================================================= * Size std:size([expression, ...]) Derives a count of the number of events in a data window, or in an insert stream if used without a data window, and optionally provides additional event properties as listed in parameters. * * @return epl */ @Unsolved protected static String size_Views() { //統計按price分組 事件總數 String epl1 = "select size from " + Apple.CLASSNAME + ".std:groupwin(price).std:size()"; @Unfinished String epl2 = "select size ,id ,price from " + Apple.CLASSNAME + ".win:time(3 sec).std:size(id, price)"; @Unfinished String epl3 = "select size from " + Apple.CLASSNAME + ".win:time(3 sec).std:size()"; return epl3; } /** * stat:uni屬性 * <p> * View Syntax Description * ============================================================================================================ * 單變量統計數據 stat:uni(value expression [,expression, ...]) 統計由單變量統計出的值 * <p> * <p> * Property Name Description * ========================================================= * datapoints 值得數量, 至關於 count(*) * total 總計 , 至關於sum() * average 平均值 * variance 方差 * stddev 樣本的標準誤差(方差的平方根) * stddevpa 整體標準誤差 * * @return epl */ protected static String stat_uni_Views() { String epl1 = "select stddev from " + Apple.CLASSNAME + ".win:length(3).stat:uni(price)"; String epl2 = "select total from " + Apple.CLASSNAME + ".win:length(3).stat:uni(price)"; String epl3 = "select average from " + Apple.CLASSNAME + ".win:length(3).stat:uni(price)"; String epl4 = "select datapoints from " + Apple.CLASSNAME + ".win:length(3).stat:uni(price)"; return epl3; } /** * stat:linest 計算兩個表達式的返回值的迴歸和相關的中間結果。 * <p> * View Syntax Description * ============================================================================================================ * Regression stat:linest(value expression, value expression [,expression, ...]) Calculates regression on the values returned by two expressions. * <p> * 參考文檔 12.4.2. Regression (stat:linest) 部分 * * @return epl */ @Undigested protected static String stat_linest_Views() { //下面的示例選擇全部的派生值加上全部事件屬性: String epl3 = "select * from " + Apple.CLASSNAME + ".win:time(10 seconds).stat:linest(price, offer, *)"; return epl3; } /** * stat:correl 計算兩個事件的相關性 * <p> * View Syntax Description * ============================================================================================================ * Correlation stat:correl(value expression, value expression [,expression, ...]) 計算2個表達式返回的值的相關值 * <p> * Property Name Description * ========================================================= * correlation 兩個事件的相關性 * * @return epl */ @Undigested protected static String stat_correl_Views() { //計算價格的相關性 String epl1 = "select correlation from " + Apple.CLASSNAME + ".stat:correl(size,price)"; String epl2 = "select * from " + Apple.CLASSNAME + ".stat:correl(price, size, *)"; return epl2; } /** * stat:weighted_avg 加權平均數 * <p> * View Syntax Description * ============================================================================================================ * Weighted average stat:weighted_avg(value expression, value expression [,expression, ...]) 返回給返回值來計算的平均值和表達式返回重量表達式的加權平均值 * <p> * Property Name Description * ========================================================= * average 加權平均數 * * @return epl */ @Undigested protected static String stat_weighted_avg_Views() { //計算價格的相關性 String epl1 = "select average " + "from " + Apple.CLASSNAME + ".win:time(3 seconds).stat:weighted_avg(price, size)"; return epl1; } }