乾貨丨DolphinDB元編程教程

元編程是指使用程序代碼來生成能夠動態運行的程序代碼。元編程的目的通常是延遲執行代碼或動態建立代碼。html

1. DolphinDB實現元編程的方法

DolphinDB支持使用元編程來動態建立表達式,包括函數調用的表達式、SQL查詢表達式等。DolphinDB有兩種實現元編程的方法:sql

(1)使用一對尖括號<>來表示須要延遲執行的動態代碼。例如,數據庫

a = <1 + 2 * 3>
typestr(a);
CODE
//a是元代碼,它的數據類型是CODE

eval(a);
7
//eval函數用於執行元代碼

(2)使用函數來建立各類表達式。經常使用的元編程函數包括exprparseExprpartialsqlColsqlColAliassqlevalmakeCall. 下面介紹這幾個函數的用法。編程

  • expr函數根據輸入的對象、運算符或其餘元代碼生成元代碼。例如:
a = expr(1, +, 2, *, 3)
a.typestr();
CODE

a;
< 1 + 2 * 3 >
  • parseExpr函數能夠把字符串轉換爲元代碼。例如:
parseExpr("1+2")

< 1 + 2 >
  • partial函數能夠固定一個函數的部分參數,產生一個參數較少的函數。例如:
partial(add,1)(2)
3

def f(a,b):a pow b
g=partial(f, 2)
g(3)

8
  • sqlColsqlColAliassql函數用於動態生成SQL表達式。sqlCol函數能夠將列名轉換成表達式,sqlColAlias經常使用於生成計算列的元代碼,sql函數能夠動態地生成SQL語句。
sym = take(`GE,6) join take(`MSFT,6) join take(`F,6)
date=take(take(2017.01.03,2) join take(2017.01.04,4), 18)
PRC=31.82 31.69 31.92 31.8  31.75 31.76 63.12 62.58 63.12 62.77 61.86 62.3 12.46 12.59 13.24 13.41 13.36 13.17
vol=2300 3500 3700 2100 1200 4600 8800 7800 6400 4200 2300 6800 4200 5600 8900 2300 6300 9600
t1 = table(sym, date, PRC, vol);

sql(sqlCol("*"),t1)
< select * from t1 >

sql(sqlCol("*"),t1,[<sym="MSFT">,<PRC>=5000>])
< select * from t1 where sym == "MSFT",PRC >= 5000 >

sql(sqlColAlias(<avg(vol)>,"avg_vol"),t1,<sym="MSFT">,sqlCol("date"))
< select avg(vol) as avg_vol from t1 where sym == "MSFT" group by date >

sql(sqlColAlias(<avg(vol)>,"avg_vol"),t1,<sym="MSFT">,sqlCol("date"),,,,<avg(vol)>3000>)
< select avg(vol) as avg_vol from t1 where sym == "MSFT" group by date having avg(vol) > 3000 >

sql(sqlColAlias(<avg(vol)>,"avg_vol"),t1,<sym="MSFT">,sqlCol("date"),0)
< select avg(vol) as avg_vol from t1 where sym == "MSFT" context by date >

sql(sqlColAlias(<avg(vol)>,"avg_vol"),t1,<sym="MSFT">,sqlCol("date"),0,sqlCol("avg_vol"),0)
< select avg(vol) as avg_vol from t1 where sym == "MSFT" context by date csort avg_vol desc >

sql(sqlCol("*"),t1,,,,,,,sqlCol(`vol),0,5)
< select top 5 * from t1 order by vol desc >
  • eval函數能夠執行元代碼。例如:
a = <1 + 2 * 3>
eval(a);
7

sql(sqlColAlias(<avg(vol)>,"avg_vol"),t1,,sqlCol(["sym","date"])).eval();
sym  date       avg_vol
---- ---------- -------
F    2017.01.03 4900   
F    2017.01.04 6775   
GE   2017.01.03 2900   
GE   2017.01.04 2900   
MSFT 2017.01.03 8300   
MSFT 2017.01.04 4925   
//這裏使用的t1是第(2)部分的t1
  • makeCall函數能夠根據指定的函數和輸入參數生成元代碼。例如,查詢表t1時,把date列輸出爲字符串,並以相似於03/01/2017的形式顯示。
sql([sqlColAlias(makeCall(temporalFormat,sqlCol(`date),"dd/MM/yyyy"),"date"),sqlCol(`sym),sqlCol(`PRC),sqlCol(`vol)],t1)
< select temporalFormat(date, "dd/MM/yyyy") as date,sym,PRC,vol from t1 >

2.DolphinDB元編程應用

2.1 更新分區內存表併發

分區內存表的更新、刪除等操做不只能夠經過SQL語句完成,也能夠經過元編程完成。建立分區內存表:mvc

n=1000000
sym=rand(`IBM`MSFT`GOOG`FB`IBM`MSFT,n)
date=rand(2018.01.02 2018.01.02 2018.01.02 2018.01.03 2018.01.03 2018.01.03,n)
price=rand(1000.0,n)
qty=rand(10000,n)
t=table(sym,date,price,qty)

db=database("",VALUE,`IBM`MSFT`GOOG`FB`IBM`MSFT)
trades=db.createPartitionedTable(t,`trades,`sym).append!(t)

2.1.1 更新數據app

例如,更新股票代碼爲IBM的交易數量:ide

trades[`qty,<sym=`IBM>]=<qty+100>
//等價於update trades set qty=qty+100 where sym=`IBM

2.1.2 新增一個列函數

例如,添加一個新的列volume,用於保存交易量:oop

trades[`volume]=<price*qty>
//等價於update trades set volume=price*qty

2.1.3 刪除數據

例如,刪除qty爲0的數據:

trades.erase!(<qty=0>)
//等價於delete from trades where qty=0

2.1.4 動態生成過濾條件並更新數據

本例使用瞭如下數據表。

ind1=rand(100,10)
ind2=rand(100,10)
ind3=rand(100,10)
ind4=rand(100,10)
ind5=rand(100,10)
ind6=rand(100,10)
ind7=rand(100,10)
ind8=rand(100,10)
ind9=rand(100,10)
ind10=rand(100,10)
indNum=1..10
t=table(ind1,ind2,ind3,ind4,ind5,ind6,ind7,ind8,ind9,ind10,indNum)

咱們須要對數據表進行更新操做,SQL語句以下:

update t set ind1=1 where indNum=1
update t set ind2=1 where indNum=2
update t set ind3=1 where indNum=3
update t set ind4=1 where indNum=4
update t set ind5=1 where indNum=5
update t set ind6=1 where indNum=6
update t set ind7=1 where indNum=7
update t set ind8=1 where indNum=8
update t set ind9=1 where indNum=9
update t set ind10=1 where indNum=10

若是數據表的列數較多,須要手工編寫很是多的SQL語句。觀察以上語句能夠發現,列名和過濾條件是有必定關係的。使用元編程能夠很是方便地完成以上操做。

for(i in 1..10){
	t["ind"+i,<indNum=i>]=1
}

2.2 在內置函數中使用元編程

DolphinDB的一些內置函數會使用到元編程。

2.2.1 窗口鏈接

在窗口鏈接(window join)中,須要爲右表的窗口數據集指定一個或多個聚合函數以及這些函數運行時須要的參數。因爲問題的描述和執行在兩個不一樣的階段,咱們採用元編程來實現延後執行。

t = table(take(`ibm, 3) as sym, 10:01:01 10:01:04 10:01:07 as time, 100 101 105 as price)
q = table(take(`ibm, 8) as sym, 10:01:01+ 0..7 as time, 101 103 103 104 104 107 108 107 as ask, 98 99 102 103 103 104 106 106 as bid)
wj(t, q, -2 : 1, < [max(ask), min(bid), avg((bid+ask)*0.5) as avg_mid]>, `time)

sym time     price max_ask min_bid avg_mid
--- -------- ----- ------- ------- -------
ibm 10:01:01 100   103     98      100.25
ibm 10:01:04 101   104     99      102.625
ibm 10:01:07 105   108     103     105.625

2.2.2 流計算引擎

DolphinDB有三種類型的流計算引擎:時間序列聚合引擎(createTimeSeriesAggregator)、橫截面引擎(createCrossSectionalAggregator)和異常檢測引擎(createAnomalyDetectionEngine)。在使用這些流計算引擎時,須要爲數據窗口中的數據集指定聚合函數或表達式以及它們運行時所需的參數。這種狀況下,咱們採用元編程來表示聚合函數或表達式以及它們所需的參數。以時間序列聚合引擎的應用爲例:

share streamTable(1000:0, `time`sym`qty, [DATETIME, SYMBOL, INT]) as trades
output1 = table(10000:0, `time`sym`sumQty, [DATETIME, SYMBOL, INT])
agg1 = createTimeSeriesAggregator("agg1",60, 60, <[sum(qty)]>, trades, output1, `time, false,`sym, 50,,false)
subscribeTable(, "trades", "agg1",  0, append!{agg1}, true)

insert into trades values(2018.10.08T01:01:01,`A,10)
insert into trades values(2018.10.08T01:01:02,`B,26)
insert into trades values(2018.10.08T01:01:10,`B,14)
insert into trades values(2018.10.08T01:01:12,`A,28)
insert into trades values(2018.10.08T01:02:10,`A,15)
insert into trades values(2018.10.08T01:02:12,`B,9)
insert into trades values(2018.10.08T01:02:30,`A,10)
insert into trades values(2018.10.08T01:04:02,`A,29)
insert into trades values(2018.10.08T01:04:04,`B,32)
insert into trades values(2018.10.08T01:04:05,`B,23)

select * from output1

time                sym sumQty
------------------- --- ------
2018.10.08T01:02:00 A   38    
2018.10.08T01:03:00 A   25    
2018.10.08T01:02:00 B   40    
2018.10.08T01:03:00 B   9

2.3 定製報表

元編程能夠用於定製報表。下例定義了一個用於生成報表的自定義函數,用戶只須要輸入數據表、字段名稱以及字段相應的格式字符串便可。

def generateReport(tbl, colNames, colFormat, filter){
	colCount = colNames.size()
	colDefs = array(ANY, colCount)
	for(i in 0:colCount){
		if(colFormat[i] == "") 
			colDefs[i] = sqlCol(colNames[i])
		else
			colDefs[i] = sqlCol(colNames[i], format{,colFormat[i]})
	}
	return sql(colDefs, tbl, filter).eval()
}

建立模擬的歷史數據庫:

if(existsDatabase("dfs://historical_db")){
	dropDatabase("dfs://historical_db")
}
n=5000000
dates=2012.09.01..2012.09.30
syms=symbol(`IBM`MSFT`GOOG`FB`AAPL)
t=table(rand(dates,n) as date, rand(syms,n) as sym, rand(200.0,n) as price, rand(1000..2000,n) as qty)

db1=database("",VALUE,dates)
db2=database("",VALUE,syms)
db=database("dfs://historical_db",COMPO,[db1,db2])
stock=db.createPartitionedTable(t,`stock,`date`sym).append!(t)

選擇2012年9月1日股票代碼爲IBM的數據生成報表:

generateReport(stock,`date`sym`price`qty,["MM/dd/yyyy","","###.00","#,###"],<date=2012.09.01 and sym=`IBM >)
date       sym price  qty  
---------- --- ------ -----
09/01/2012 IBM 90.97  1,679
09/01/2012 IBM 22.36  1,098
09/01/2012 IBM 133.42 1,404
09/01/2012 IBM 182.08 1,002
09/01/2012 IBM 144.67 1,468
09/01/2012 IBM 6.59   1,256
09/01/2012 IBM 73.09  1,149
09/01/2012 IBM 83.35  1,415
09/01/2012 IBM 93.13  1,006
09/01/2012 IBM 88.05  1,406
...

上面的語句等價於如下SQL語句:

select format(date,"MM/dd/yyyy") as date, sym, format(price,"###.00") as price, format(qty,"#,###") as qty  from stock where date=2012.09.01 and sym=`IBM

2.4 物聯網中動態生成計算指標

在物聯網的實時流計算中,數據源包含tag, timestamp和value三個字段。如今須要對輸入的原始數據進行實時的指標計算。因爲每次收到的原始數據的tag數量和種類有可能不一樣,而且每次計算的指標也可能不一樣,咱們沒法將計算指標固定下來,所以這種狀況下咱們能夠採用元編程的方法。咱們須要定義一個配置表,將計算的指標放到該表中,能夠根據實際增長、刪除或修改計算指標。每次實時計算時,從配置表中動態地讀取須要計算的指標,並把計算的結果輸出到另一個表中。

如下是示例代碼。pubTable是流數據的發佈表。config表是存儲計算指標的配置表,因爲計算指標有可能每次都不相同,這裏採用的是併發版本控制表(mvccTable)。subTable經過訂閱pubTable,對流數據進行實時計算。

t1=streamTable(1:0,`tag`value`time,[STRING,DOUBLE,DATETIME])
share t1 as pubTable

config = mvccTable(`index1`index2`index3`index4 as targetTag, ["tag1 + tag2", "sqrt(tag3)", "floor(tag4)", "abs(tag5)"] as formular)

subTable = streamTable(100:0, `targetTag`value, [STRING, FLOAT])

def calculateTag(mutable subTable,config,msg){
	pmsg = select value from msg pivot by time, tag
	for(row in config){
		try{
			insert into subTable values(row.targetTag, sql(sqlColAlias(parseExpr(row.formular), "value"), pmsg).eval().value)
		}
			catch(ex){print ex}
	}
}

subscribeTable(,`pubTable,`calculateTag,-1,calculateTag{subTable,config},true)

//模擬寫入數據
tmp = table(`tag1`tag2`tag3`tag4 as tag, 1.2 1.3 1.4 1.5 as value, take(2019.01.01T12:00:00, 4) as time)
pubTable.append!(tmp)

select * from subTable

targetTag value   
--------- --------
index1    2.5     
index2    1.183216
index3    1

2.5 執行一組查詢,合併查詢結果

在數據分析中,有時咱們須要對同一個數據集執行一組相關的查詢,並將查詢結果合併展現出來。若是每次都手動編寫所有SQL語句,工做量大,而且擴展性差。經過元編程動態生成SQL能夠解決這個問題。

本例使用的數據集結構以下(以第一行爲例):

mt         vn       bc  cc   stt  vt  gn  bk    sc  vas  pm  dls         dt          ts      val    vol
--------   -------  --  ---  ---  --  --  ----  --  ---  --  ----------  ----------  ------  -----  -----
52354955  50982208  25  814  11   2   1   4194  0   0    0   2020.02.05  2020.02.05  153234  5.374  18600

咱們須要對天天的數據都執行一組相關的查詢。好比:

select * from t where vn=50982208,bc=25,cc=814,stt=11,vt=2, dsl=2020.02.05, mt<52355979 order by mt desc limit 1
select * from t where vn=50982208,bc=25,cc=814,stt=12,vt=2, dsl=2020.02.05, mt<52355979 order by mt desc limit 1
select * from t where vn=51180116,bc=25,cc=814,stt=12,vt=2, dsl=2020.02.05, mt<52354979 order by mt desc limit 1
select * from t where vn=41774759,bc=1180,cc=333,stt=3,vt=116, dsl=2020.02.05, mt<52355979 order by mt desc limit 1

能夠觀察到,這一組查詢中,過濾條件包含的列和排序列都相同,而且都是取排序後的第一行記錄,還有部分過濾條件的值相同。爲此,咱們編寫了自定義函數bundleQuery:

def bundleQuery(tbl, dt, dtColName, mt, mtColName, filterColValues, filterColNames){
	cnt = filterColValues[0].size()
	filterColCnt =filterColValues.size()
	orderByCol = sqlCol(mtColName)
	selCol = sqlCol("*")
	filters = array(ANY, filterColCnt + 2)
	filters[filterColCnt] = expr(sqlCol(dtColName), ==, dt)
	filters[filterColCnt+1] = expr(sqlCol(mtColName), <, mt)
	
	queries = array(ANY, cnt)
	for(i in 0:cnt)	{
		for(j in 0:filterColCnt){
			filters[j] = expr(sqlCol(filterColNames[j]), ==, filterColValues[j][i])
		}
		queries.append!(sql(select=selCol, from=tbl, where=filters, orderBy=orderByCol, ascOrder=false, limit=1))
	}
	return loop(eval, queries).unionAll(false)
}

bundleQuery中各個參數的含義以下:

  • tbl是數據表
  • dt是過濾條件中日期的值
  • dtColName是過濾條件中日期列的名稱
  • mt是過濾條件中mt的值
  • mtColName是過濾條件中mt列的名稱,以及排序列的名稱
  • filterColValues是其餘過濾條件中的值,用元組表示,其中的每一個向量表示一個過濾條件,每一個向量中的元素表示該過濾條件的值
  • filterColNames是其餘過濾條件中的列名,用向量表示

上面一組SQL語句,至關於執行如下代碼:

dt = 2020.02.05 
dtColName = "dls" 
mt = 52355979 
mtColName = "mt"
colNames = `vn`bc`cc`stt`vt
colValues = [50982208 50982208 51180116 41774759, 25 25 25 1180, 814 814 814 333, 11 12 12 3, 2 2 2 116]

bundleQuery(t, dt, dtColName, mt, mtColName, colValues, colNames)

咱們能夠執行如下腳本把bundleQuery函數定義爲函數視圖,這樣在集羣的任何節點或者重啓系統以後,均可以直接使用該函數。

//please login as admin first
addFunctionView(bundleQuery)

3.小結

DolphinDB database的元編程功能強大,使用簡單,可以極大地提升程序開發效率。

相關文章
相關標籤/搜索