元編程是指使用程序代碼來生成能夠動態運行的程序代碼。元編程的目的通常是延遲執行代碼或動態建立代碼。html
DolphinDB支持使用元編程來動態建立表達式,包括函數調用的表達式、SQL查詢表達式等。DolphinDB有兩種實現元編程的方法:sql
(1)使用一對尖括號<>來表示須要延遲執行的動態代碼。例如,數據庫
a = <1 + 2 * 3> typestr(a); CODE //a是元代碼,它的數據類型是CODE eval(a); 7 //eval函數用於執行元代碼
(2)使用函數來建立各類表達式。經常使用的元編程函數包括expr
, parseExpr
, partial
, sqlCol
, sqlColAlias
, sql
, eval
, makeCall
. 下面介紹這幾個函數的用法。編程
a = expr(1, +, 2, *, 3) a.typestr(); CODE a; < 1 + 2 * 3 >
parseExpr("1+2") < 1 + 2 >
partial(add,1)(2) 3 def f(a,b):a pow b g=partial(f, 2) g(3) 8
sqlCol
函數能夠將列名轉換成表達式,sqlColAlias
經常使用於生成計算列的元代碼,sql
函數能夠動態地生成SQL語句。sym = take(`GE,6) join take(`MSFT,6) join take(`F,6) date=take(take(2017.01.03,2) join take(2017.01.04,4), 18) PRC=31.82 31.69 31.92 31.8 31.75 31.76 63.12 62.58 63.12 62.77 61.86 62.3 12.46 12.59 13.24 13.41 13.36 13.17 vol=2300 3500 3700 2100 1200 4600 8800 7800 6400 4200 2300 6800 4200 5600 8900 2300 6300 9600 t1 = table(sym, date, PRC, vol); sql(sqlCol("*"),t1) < select * from t1 > sql(sqlCol("*"),t1,[<sym="MSFT">,<PRC>=5000>]) < select * from t1 where sym == "MSFT",PRC >= 5000 > sql(sqlColAlias(<avg(vol)>,"avg_vol"),t1,<sym="MSFT">,sqlCol("date")) < select avg(vol) as avg_vol from t1 where sym == "MSFT" group by date > sql(sqlColAlias(<avg(vol)>,"avg_vol"),t1,<sym="MSFT">,sqlCol("date"),,,,<avg(vol)>3000>) < select avg(vol) as avg_vol from t1 where sym == "MSFT" group by date having avg(vol) > 3000 > sql(sqlColAlias(<avg(vol)>,"avg_vol"),t1,<sym="MSFT">,sqlCol("date"),0) < select avg(vol) as avg_vol from t1 where sym == "MSFT" context by date > sql(sqlColAlias(<avg(vol)>,"avg_vol"),t1,<sym="MSFT">,sqlCol("date"),0,sqlCol("avg_vol"),0) < select avg(vol) as avg_vol from t1 where sym == "MSFT" context by date csort avg_vol desc > sql(sqlCol("*"),t1,,,,,,,sqlCol(`vol),0,5) < select top 5 * from t1 order by vol desc >
a = <1 + 2 * 3> eval(a); 7 sql(sqlColAlias(<avg(vol)>,"avg_vol"),t1,,sqlCol(["sym","date"])).eval(); sym date avg_vol ---- ---------- ------- F 2017.01.03 4900 F 2017.01.04 6775 GE 2017.01.03 2900 GE 2017.01.04 2900 MSFT 2017.01.03 8300 MSFT 2017.01.04 4925 //這裏使用的t1是第(2)部分的t1
sql([sqlColAlias(makeCall(temporalFormat,sqlCol(`date),"dd/MM/yyyy"),"date"),sqlCol(`sym),sqlCol(`PRC),sqlCol(`vol)],t1) < select temporalFormat(date, "dd/MM/yyyy") as date,sym,PRC,vol from t1 >
2.1 更新分區內存表併發
分區內存表的更新、刪除等操做不只能夠經過SQL語句完成,也能夠經過元編程完成。建立分區內存表:mvc
n=1000000 sym=rand(`IBM`MSFT`GOOG`FB`IBM`MSFT,n) date=rand(2018.01.02 2018.01.02 2018.01.02 2018.01.03 2018.01.03 2018.01.03,n) price=rand(1000.0,n) qty=rand(10000,n) t=table(sym,date,price,qty) db=database("",VALUE,`IBM`MSFT`GOOG`FB`IBM`MSFT) trades=db.createPartitionedTable(t,`trades,`sym).append!(t)
2.1.1 更新數據app
例如,更新股票代碼爲IBM的交易數量:ide
trades[`qty,<sym=`IBM>]=<qty+100> //等價於update trades set qty=qty+100 where sym=`IBM
2.1.2 新增一個列函數
例如,添加一個新的列volume,用於保存交易量:oop
trades[`volume]=<price*qty> //等價於update trades set volume=price*qty
2.1.3 刪除數據
例如,刪除qty爲0的數據:
trades.erase!(<qty=0>) //等價於delete from trades where qty=0
2.1.4 動態生成過濾條件並更新數據
本例使用瞭如下數據表。
ind1=rand(100,10) ind2=rand(100,10) ind3=rand(100,10) ind4=rand(100,10) ind5=rand(100,10) ind6=rand(100,10) ind7=rand(100,10) ind8=rand(100,10) ind9=rand(100,10) ind10=rand(100,10) indNum=1..10 t=table(ind1,ind2,ind3,ind4,ind5,ind6,ind7,ind8,ind9,ind10,indNum)
咱們須要對數據表進行更新操做,SQL語句以下:
update t set ind1=1 where indNum=1 update t set ind2=1 where indNum=2 update t set ind3=1 where indNum=3 update t set ind4=1 where indNum=4 update t set ind5=1 where indNum=5 update t set ind6=1 where indNum=6 update t set ind7=1 where indNum=7 update t set ind8=1 where indNum=8 update t set ind9=1 where indNum=9 update t set ind10=1 where indNum=10
若是數據表的列數較多,須要手工編寫很是多的SQL語句。觀察以上語句能夠發現,列名和過濾條件是有必定關係的。使用元編程能夠很是方便地完成以上操做。
for(i in 1..10){ t["ind"+i,<indNum=i>]=1 }
2.2 在內置函數中使用元編程
DolphinDB的一些內置函數會使用到元編程。
2.2.1 窗口鏈接
在窗口鏈接(window join)中,須要爲右表的窗口數據集指定一個或多個聚合函數以及這些函數運行時須要的參數。因爲問題的描述和執行在兩個不一樣的階段,咱們採用元編程來實現延後執行。
t = table(take(`ibm, 3) as sym, 10:01:01 10:01:04 10:01:07 as time, 100 101 105 as price) q = table(take(`ibm, 8) as sym, 10:01:01+ 0..7 as time, 101 103 103 104 104 107 108 107 as ask, 98 99 102 103 103 104 106 106 as bid) wj(t, q, -2 : 1, < [max(ask), min(bid), avg((bid+ask)*0.5) as avg_mid]>, `time) sym time price max_ask min_bid avg_mid --- -------- ----- ------- ------- ------- ibm 10:01:01 100 103 98 100.25 ibm 10:01:04 101 104 99 102.625 ibm 10:01:07 105 108 103 105.625
2.2.2 流計算引擎
DolphinDB有三種類型的流計算引擎:時間序列聚合引擎(createTimeSeriesAggregator)、橫截面引擎(createCrossSectionalAggregator)和異常檢測引擎(createAnomalyDetectionEngine)。在使用這些流計算引擎時,須要爲數據窗口中的數據集指定聚合函數或表達式以及它們運行時所需的參數。這種狀況下,咱們採用元編程來表示聚合函數或表達式以及它們所需的參數。以時間序列聚合引擎的應用爲例:
share streamTable(1000:0, `time`sym`qty, [DATETIME, SYMBOL, INT]) as trades output1 = table(10000:0, `time`sym`sumQty, [DATETIME, SYMBOL, INT]) agg1 = createTimeSeriesAggregator("agg1",60, 60, <[sum(qty)]>, trades, output1, `time, false,`sym, 50,,false) subscribeTable(, "trades", "agg1", 0, append!{agg1}, true) insert into trades values(2018.10.08T01:01:01,`A,10) insert into trades values(2018.10.08T01:01:02,`B,26) insert into trades values(2018.10.08T01:01:10,`B,14) insert into trades values(2018.10.08T01:01:12,`A,28) insert into trades values(2018.10.08T01:02:10,`A,15) insert into trades values(2018.10.08T01:02:12,`B,9) insert into trades values(2018.10.08T01:02:30,`A,10) insert into trades values(2018.10.08T01:04:02,`A,29) insert into trades values(2018.10.08T01:04:04,`B,32) insert into trades values(2018.10.08T01:04:05,`B,23) select * from output1 time sym sumQty ------------------- --- ------ 2018.10.08T01:02:00 A 38 2018.10.08T01:03:00 A 25 2018.10.08T01:02:00 B 40 2018.10.08T01:03:00 B 9
2.3 定製報表
元編程能夠用於定製報表。下例定義了一個用於生成報表的自定義函數,用戶只須要輸入數據表、字段名稱以及字段相應的格式字符串便可。
def generateReport(tbl, colNames, colFormat, filter){ colCount = colNames.size() colDefs = array(ANY, colCount) for(i in 0:colCount){ if(colFormat[i] == "") colDefs[i] = sqlCol(colNames[i]) else colDefs[i] = sqlCol(colNames[i], format{,colFormat[i]}) } return sql(colDefs, tbl, filter).eval() }
建立模擬的歷史數據庫:
if(existsDatabase("dfs://historical_db")){ dropDatabase("dfs://historical_db") } n=5000000 dates=2012.09.01..2012.09.30 syms=symbol(`IBM`MSFT`GOOG`FB`AAPL) t=table(rand(dates,n) as date, rand(syms,n) as sym, rand(200.0,n) as price, rand(1000..2000,n) as qty) db1=database("",VALUE,dates) db2=database("",VALUE,syms) db=database("dfs://historical_db",COMPO,[db1,db2]) stock=db.createPartitionedTable(t,`stock,`date`sym).append!(t)
選擇2012年9月1日股票代碼爲IBM的數據生成報表:
generateReport(stock,`date`sym`price`qty,["MM/dd/yyyy","","###.00","#,###"],<date=2012.09.01 and sym=`IBM >) date sym price qty ---------- --- ------ ----- 09/01/2012 IBM 90.97 1,679 09/01/2012 IBM 22.36 1,098 09/01/2012 IBM 133.42 1,404 09/01/2012 IBM 182.08 1,002 09/01/2012 IBM 144.67 1,468 09/01/2012 IBM 6.59 1,256 09/01/2012 IBM 73.09 1,149 09/01/2012 IBM 83.35 1,415 09/01/2012 IBM 93.13 1,006 09/01/2012 IBM 88.05 1,406 ...
上面的語句等價於如下SQL語句:
select format(date,"MM/dd/yyyy") as date, sym, format(price,"###.00") as price, format(qty,"#,###") as qty from stock where date=2012.09.01 and sym=`IBM
2.4 物聯網中動態生成計算指標
在物聯網的實時流計算中,數據源包含tag, timestamp和value三個字段。如今須要對輸入的原始數據進行實時的指標計算。因爲每次收到的原始數據的tag數量和種類有可能不一樣,而且每次計算的指標也可能不一樣,咱們沒法將計算指標固定下來,所以這種狀況下咱們能夠採用元編程的方法。咱們須要定義一個配置表,將計算的指標放到該表中,能夠根據實際增長、刪除或修改計算指標。每次實時計算時,從配置表中動態地讀取須要計算的指標,並把計算的結果輸出到另一個表中。
如下是示例代碼。pubTable是流數據的發佈表。config表是存儲計算指標的配置表,因爲計算指標有可能每次都不相同,這裏採用的是併發版本控制表(mvccTable)。subTable經過訂閱pubTable,對流數據進行實時計算。
t1=streamTable(1:0,`tag`value`time,[STRING,DOUBLE,DATETIME]) share t1 as pubTable config = mvccTable(`index1`index2`index3`index4 as targetTag, ["tag1 + tag2", "sqrt(tag3)", "floor(tag4)", "abs(tag5)"] as formular) subTable = streamTable(100:0, `targetTag`value, [STRING, FLOAT]) def calculateTag(mutable subTable,config,msg){ pmsg = select value from msg pivot by time, tag for(row in config){ try{ insert into subTable values(row.targetTag, sql(sqlColAlias(parseExpr(row.formular), "value"), pmsg).eval().value) } catch(ex){print ex} } } subscribeTable(,`pubTable,`calculateTag,-1,calculateTag{subTable,config},true) //模擬寫入數據 tmp = table(`tag1`tag2`tag3`tag4 as tag, 1.2 1.3 1.4 1.5 as value, take(2019.01.01T12:00:00, 4) as time) pubTable.append!(tmp) select * from subTable targetTag value --------- -------- index1 2.5 index2 1.183216 index3 1
2.5 執行一組查詢,合併查詢結果
在數據分析中,有時咱們須要對同一個數據集執行一組相關的查詢,並將查詢結果合併展現出來。若是每次都手動編寫所有SQL語句,工做量大,而且擴展性差。經過元編程動態生成SQL能夠解決這個問題。
本例使用的數據集結構以下(以第一行爲例):
mt vn bc cc stt vt gn bk sc vas pm dls dt ts val vol -------- ------- -- --- --- -- -- ---- -- --- -- ---------- ---------- ------ ----- ----- 52354955 50982208 25 814 11 2 1 4194 0 0 0 2020.02.05 2020.02.05 153234 5.374 18600
咱們須要對天天的數據都執行一組相關的查詢。好比:
select * from t where vn=50982208,bc=25,cc=814,stt=11,vt=2, dsl=2020.02.05, mt<52355979 order by mt desc limit 1 select * from t where vn=50982208,bc=25,cc=814,stt=12,vt=2, dsl=2020.02.05, mt<52355979 order by mt desc limit 1 select * from t where vn=51180116,bc=25,cc=814,stt=12,vt=2, dsl=2020.02.05, mt<52354979 order by mt desc limit 1 select * from t where vn=41774759,bc=1180,cc=333,stt=3,vt=116, dsl=2020.02.05, mt<52355979 order by mt desc limit 1
能夠觀察到,這一組查詢中,過濾條件包含的列和排序列都相同,而且都是取排序後的第一行記錄,還有部分過濾條件的值相同。爲此,咱們編寫了自定義函數bundleQuery:
def bundleQuery(tbl, dt, dtColName, mt, mtColName, filterColValues, filterColNames){ cnt = filterColValues[0].size() filterColCnt =filterColValues.size() orderByCol = sqlCol(mtColName) selCol = sqlCol("*") filters = array(ANY, filterColCnt + 2) filters[filterColCnt] = expr(sqlCol(dtColName), ==, dt) filters[filterColCnt+1] = expr(sqlCol(mtColName), <, mt) queries = array(ANY, cnt) for(i in 0:cnt) { for(j in 0:filterColCnt){ filters[j] = expr(sqlCol(filterColNames[j]), ==, filterColValues[j][i]) } queries.append!(sql(select=selCol, from=tbl, where=filters, orderBy=orderByCol, ascOrder=false, limit=1)) } return loop(eval, queries).unionAll(false) }
bundleQuery中各個參數的含義以下:
上面一組SQL語句,至關於執行如下代碼:
dt = 2020.02.05 dtColName = "dls" mt = 52355979 mtColName = "mt" colNames = `vn`bc`cc`stt`vt colValues = [50982208 50982208 51180116 41774759, 25 25 25 1180, 814 814 814 333, 11 12 12 3, 2 2 2 116] bundleQuery(t, dt, dtColName, mt, mtColName, colValues, colNames)
咱們能夠執行如下腳本把bundleQuery函數定義爲函數視圖,這樣在集羣的任何節點或者重啓系統以後,均可以直接使用該函數。
//please login as admin first addFunctionView(bundleQuery)
DolphinDB database的元編程功能強大,使用簡單,可以極大地提升程序開發效率。