在處理實時流數據時,不只須要按照時間作縱向聚合計算(時間序列聚合引擎),還須要對最新的數據作橫向比較和計算,如金融裏對全部股票的最新報價求百分位、工業物聯網中計算一批設備的溫度均值等。DolphinDB database 提供了橫截面聚合引擎,能夠對流數據中全部分組的最新數據作聚合運算。git
橫截面引擎的主體分爲兩部分:橫截面數據表和計算引擎。橫截面數據是橫截面引擎的內部表,保存了全部分組最新的截面數據。計算引擎是一組聚合計算表達式以及觸發器,系統會按照指定的方式觸發聚合運算,計算結果會輸出到另一個表中。github
1. 基本用法
在DolphinDB database中,經過createCrossSectionalAggregator建立橫截面聚合引擎。它返回一個橫截面數據表,保存了全部分組最新的截面數據,往這個表寫入數據意味着這些數據進入橫截面聚合引擎進行計算。具體用法以下:編程
createCrossSectionalAggregator(name, [metrics], dummyTable, [outputTable], keyColumn, [triggeringPattern="perBatch"], [triggeringInterval=1000])
- name是一個字符串,表示橫截面聚合引擎的名稱,是橫截面聚合引擎的惟一標識。它能夠包含字母,數字和下劃線,但必須以字母開頭。
- metrics是元代碼。它能夠是系統內置或用戶自定義的函數,如<[sum(qty), avg(price)]>,能夠對聚合結果使用表達式,如<[avg(price1)-avg(price2)]>,也能夠對計算列進行聚合運算,如<[std(price1-price2)]>。詳情可參考元編程。
- dummyTable是表對象,它能夠不包含數據,但它的結構必須與訂閱的流數據表相同。
- outputTable是表對象,用於保存計算結果。輸出表的列數爲metrics數量+1,第一列爲TIMESTAMP類型,用於存放發生計算的時間戳,,其餘列的數據類型必須與metrics返回結果的數據類型一致。
- keyColumn是一個字符串,指定dummyTable的某列爲橫截面聚合引擎的key。keyColumn指定列中的每個key對應表中的惟一一行。
- triggeringPattern是一個字符串,表示觸發計算的方式。它能夠是如下取值:
- "perRow": 每插入一行數據觸發一次計算
- "perBatch": 每插入一次數據觸發一次計算
- "interval": 按必定的時間間隔觸發計算
- triggeringInterval是一個整數。只有當triggeringPattern的取值爲interval時才生效,表示觸發計算的時間間隔。默認值爲1000毫秒。
2. 示例
下面經過一個例子說明橫截面聚合引擎的應用。在金融交易中,每每須要實時瞭解全部股票最新的報價均值、最近一次成交量總和以及最近一次交易的交易量。DolphinDB的橫截面聚合引擎結合流數據訂閱功能能夠方便地完成這些工做。app
(1)建立實時交易表函數
股票的實時交易表trades,包含如下主要字段:url
sym:股票代碼 time:時間 price:成交價 qty:成交量
每當交易發生時,實時數據會寫入trades表。建立trades表的腳本以下:spa
share streamTable(10:0,`time`sym`price`qty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades
(2)建立橫截面聚合引擎.net
tradesCrossAggregator=createCrossSectionalAggregator("CrossSectionalDemo", <[avg(price), sum(qty), sum(price*qty)]>, trades, outputTable, `sym, `perRow)
tradesCrossAggregator是橫截面數據表,它按股票代碼分組,每一個股票有且僅有一行。當數據進入該表時,會計算每一個股票的avg(price), sum(qty)和sum(price*qty)。每插入一條數據觸發一次計算。code
(3)橫截面數據表訂閱實時交易表對象
subscribeTable(,"trades","tradesCrossAggregator",-1,append!{tradesCrossAggregator},true)
經過流數據訂閱功能,把實時數據寫入橫截面數據表。
(4)模擬數據產生
def writeData(n){ timev = 2000.10.08T01:01:01.001 + timestamp(1..n) symv = take(`A`B, n) pricev = take(102.1 33.4 73.6 223,n) qtyv = take(60 74 82 59, n) insert into trades values(timev, symv, pricev,qtyv) } writeData(4);
查看實時交易表,共有4條數據。
select * from trades time sym price qty ----------------------- --- ----- --- 2000.10.08T01:01:01.002 A 102.1 60 2000.10.08T01:01:01.003 B 33.4 74 2000.10.08T01:01:01.004 A 73.6 82 2000.10.08T01:01:01.005 B 223 59
查看橫截面數據表,裏面保存了A、B兩隻股票最近的兩筆交易記錄。
select * from tradesCrossAggregator time sym price qty ----------------------- --- ----- --- 2000.10.08T01:01:01.004 A 73.6 82 2000.10.08T01:01:01.005 B 223 59
查看橫截面引擎的輸出表,因爲橫截面引擎採用了perRow每行觸發計算的頻率,因此每往橫截面表寫入一行數據,聚合引擎都會作一次計算,所以一共有4條記錄。
select * from outputTable time avgPrice sumqty Total ----------------------- -------- ------ ------- 2019.07.08T10:04:41.731 102.1 60 6126 2019.07.08T10:04:41.732 67.75 134 8597.6 2019.07.08T10:04:41.732 53.5 156 8506.8 2019.07.08T10:04:41.732 148.3 141 19192.2
經過getAggregatorStat函數查看橫截面引擎的狀態。
getAggregatorStat().CrossSectionalAggregator name user status lastErrMsg numRows numMetrics metrics triggeringPattern triggeringInterval ------------------ ----- ------ ---------- ------- ---------- ------------------ ----------------- ------------------ CrossSectionalDemo guest OK 2 3 [ avg(price), su...perRow 1000
經過removeAggregator函數刪除橫截面引擎。
removeAggregator("CrossSectionalDemo")
3. 觸發計算的幾種方式
橫截面引擎一共有三種觸發計算的方式:perRow、perBatch和interval。上面的例子中採用的是每插入一行數據觸發一次計算。下面介紹另外兩種觸發計算的方式。
- perBatch
perBatch參數表示每追加一批數據就觸發一次寫入,下例按perBatch模式啓用橫截面引擎,腳本一共生成12條記錄,分三批寫入,輸出表中預期有3條記錄。
share streamTable(10:0,`time`sym`price`qty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades outputTable = table(1:0, `time`avgPrice`sumqty`Total, [TIMESTAMP,DOUBLE,INT,DOUBLE]) tradesCrossAggregator=createCrossSectionalAggregator("CrossSectionalDemo", <[avg(price), sum(qty), sum(price*qty)]>, trades, outputTable, `sym, `perBatch) subscribeTable(,"trades","tradesCrossAggregator",-1,append!{tradesCrossAggregator},true) def writeData(n){ timev = 2000.10.08T01:01:01.001 + timestamp(1..n) symv = take(`A`B, n) pricev = take(102.1 33.4 73.6 223,n) qtyv = take(60 74 82 59, n) insert into trades values(timev, symv, pricev,qtyv) } //寫入三批數據,預期會觸發三次計算,輸出三次聚合結果。 writeData(4); writeData(4); writeData(4);
查看橫截面數據表。
select * from tradesCrossAggregator time sym price qty ----------------------- --- ----- --- 2000.10.08T01:01:01.002 A 73.6 82 2000.10.08T01:01:01.003 B 33.4 59
查看輸出表。插入了三批數據,所以輸出表中有3條記錄。
select * from outputTable time avgPrice sumqty Total ----------------------- -------- ------ ------- 2019.07.08T10:14:54.446 148.3 141 19192.2 2019.07.08T10:14:54.446 148.3 141 19192.2 2019.07.08T10:14:54.446 148.3 141 19192.2
- interval
當觸發計算的方式爲interval時,須要指定triggeringInterval,表示每隔triggeringInterval毫秒觸發一次計算。下面的例子中,分6次寫入12條記錄,每次間隔500毫秒。設置橫截面引擎每1000毫秒觸發一次計算,預期最終輸出3條記錄。
share streamTable(10:0,`time`sym`price`qty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades outputTable = table(1:0, `time`avgPrice`sumqty`Total, [TIMESTAMP,DOUBLE,INT,DOUBLE]) tradesCrossAggregator=createCrossSectionalAggregator("CrossSectionalDemo", <[avg(price), sum(qty), sum(price*qty)]>, trades, outputTable, `sym, `interval,1000) subscribeTable(,"trades","tradesCrossAggregator",-1,append!{tradesCrossAggregator},true) def writeData(n){ timev = 2000.10.08T01:01:01.001 + timestamp(1..n) symv = take(`A`B, n) pricev = take(102.1 33.4 73.6 223,n) qtyv = take(60 74 82 59, n) insert into trades values(timev, symv, pricev,qtyv) } a = now() writeData(2); sleep(500) writeData(2); sleep(500) writeData(2); sleep(500) writeData(2); sleep(500) writeData(2); sleep(500) writeData(2); sleep(500) b = now() select count(*) from outputTable 3
若是再次執行select count(*) from outputTable,會發現隨着時間的推移,輸出表的記錄數會不斷增加。這是由於在interval模式下,計算是按照現實時間定時觸發,並不依賴因而否有新的數據進來。
4. 橫截面數據表的獨立使用
從上面的例子中能夠看出,橫截面表雖然是爲聚合計算提供的一箇中間數據表,但其實在不少場合仍是能獨立發揮做用的。好比咱們須要定時刷新某隻股票的最新交易價格,按照常規思路是從實時交易表中按代碼篩選股票並拿出最後一條記錄,而交易表的數據量是隨着時間快速增加的,若是頻繁作這樣的查詢,不管從系統的資源消耗仍是從查詢的效能來看都不是很好的作法。而橫截面表永遠只保存全部股票的最近一次交易數據,數據量是穩定的,對於這種定時輪詢的場景很是合適。
若是要單獨使用橫截面表,須要在建立橫截面引擎時,把metrics,outputTable這兩個參數設置爲空。
tradesCrossAggregator=createCrossSectionalAggregator("CrossSectionalDemo", , trades,, `sym, `perRow)
相關連接: