開發大數據應用,不只須要一個能支撐海量數據的分佈式數據庫,一個能高效利用多核多節點的分佈式計算框架,更須要一門能與分佈式數據庫和分佈式計算有機融合,高性能易擴展,表達能力強,知足快速開發和建模須要的編程語言。DolphinDB從流行的SQL和Python語言汲取了靈感,設計了大數據處理腳本語言。本教程講解如何經過混合範式編程,快速開發大數據分析的應用。從中你也能夠了解DolpinDB的編程語言(如下簡稱DolphinDB)如何與數據庫和分佈式計算融合。html
向量化編程是DolphinDB中最基本的編程範式。DolphinDB中絕大部分函數支持向量做爲函數的輸入參數。根據函數的返回值的不一樣,函數可分爲兩種:一種是聚合函數(aggregate function),返回標量(scalar);另外一種是向量函數,返回與輸入向量等長的向量。node
向量化操做有三個主要優勢:ios
時間序列數據一般能夠用一個向量來表示,用於數據分析的列式數據庫的每個列也均可以用向量來表示。DolphinDB做爲一個內存計算引擎或者做爲一個分析型的數據倉庫,在進行時間序列數據分析時,特別適合使用向量化編程。算法
以兩個長度爲一千萬的向量相加做爲一個簡單例子。用命令式編程的for語句,不只語句冗長,並且耗時是向量化編程的百倍以上。sql
n = 10000000 a = rand(1.0, n) b = rand(1.0, n) //採用for語句編程: c = array(DOUBLE, n) for(i in 0 : n) c[i] = a[i] + b[i] //採用向量化編程: c = a + b
向量化編程其實是對一組同質數據的批處理,不只在編譯階段能夠利用vectorization對指令進行優化,在不少算法上也能夠優化。以常用的時間序列數據滑動窗口(sliding window)指標之一的移動平均(moving average)爲例。假設總的數據量是n,窗口大小爲k,若是不採用批量計算,時間複雜度是O(nk)。可是由於計算完一個窗口的移動平均後,計算下一個窗口時,只有一個數據點發生了變化,因此只要調整這一個點的值,就能夠算出新窗口的移動平均,因此批量計算的時間複雜度是O(n)。DolphinDB中,大部分計算滑動窗口指標的函數都通過了優化,性能近似於O(n)。這些函數包括mmax, mmin, mimax, mimin, mavg, msum, mcount, mstd, mvar, mrank, mcorr, mcovar, mbeta和 mmed。在下例中,通過優化的mavg函數的性能超過對每個窗口使用avg函數300倍。數據庫
n = 10000000 a = rand(1.0, n) window = 60 //對每個窗口分別使用avg計算: timer moving(avg, a, window); Time elapsed: 4039.23 ms //採用mavg函數批量計算: timer mavg(a, window); Time elapsed: 12.968 ms
向量化編程也有其侷限性。首先,不是全部的操做均可以用向量化計算來完成。在機器學習和統計分析中,在某些場景下,咱們只能對逐行數據進行迭代處理,沒法向量化計算。針對這種場景,DolphinDB計劃在後續的版本中推出即時編譯技術(JIT),能將用for語句編寫的逐行處理代碼在運行時動態編譯成機器碼來執行,顯著提高性能。編程
其次,向量化計算一般要將整個向量所有加載到一段連續內存中,Matlab和R都有這樣的要求。有時候由於內存碎片緣由,沒法找到大段的連續內存。DolphinDB針對內存碎片,特別引入了big array,能夠將物理上不連續的內存塊組成一個邏輯上連續的向量。系統是否採用big array是動態決定的,對用戶透明。一般,對big array進行掃描,性能損耗對於連續內存而言,在1%~5%之間;對big array進行隨機訪問,性能損耗在20%~30%左右。在此方面,DolphinDB是以能夠接受的少許性能損失來換取系統的更高可用性。數組
SQL是一個面向問題的語言。用戶只須要給出問題的描述,SQL引擎會產生結果。一般SQL引擎屬於數據庫的一部分,其它系統經過JDBC,ODBC或Native API 與數據庫交流。DolphinDB腳本語言的SQL語句不只支持SQL的標準功能,並且爲大數據的分析,尤爲是時間序列大數據的分析作了不少擴展,可極大簡化代碼,方便用戶使用。緩存
2.1 SQL與編程語言的融合安全
在DolphinDB中,腳本語言與SQL語言是無縫融合在一塊兒的。這種融合主要體如今幾個方面:
請注意,DolphinDB編程語言區分大小寫。在DolphinDB中全部的SQL關鍵詞均必須使用小寫。
下面例子中,首先生成一個員工工資表:
empWages = table(take(1..10, 100) as id, take(2017.10M + 1..10, 100).sort() as month, take(5000 5500 6000 6500, 100) as wage);
而後計算給定的一組員工的平均工資。員工列表存儲在一個本地變量empIds中。
empIds = 3 4 6 7 9 select avg(wage) from empWages where id in empIds group by id; id avg_wage -- -------- 3 5500 4 6000 6 6000 7 5500 9 5500
除計算平均工資外,同時顯示員工的姓名。員工姓名使用一個字典empName來獲取。
empNames = dict(1..10, `Alice`Bob`Jerry`Jessica`Mike`Tim`Henry`Anna`Kevin`Jones) select empNames[first(id)] as name, avg(wage) from empWages where id in empIds group by id; id name avg_wage -- ------- -------- 3 Jerry 5500 4 Jessica 6000 6 Tim 6000 7 Henry 5500 9 Kevin 5500
上面的兩個例子中,SQL語句的where子句和select子句分別用到了上下文中定義的數組和字典,使得原本須要經過子查詢和多表聯結來解決的問題,經過簡單的hash table就解決了。若是SQL涉及到分佈式數據庫,這些上下文變量會自動序列化到相應的節點。這不只讓代碼看上去更簡潔,有更好的可讀性,並且提高了性能。
SQL的select語句返回的數據表能夠直接賦給一個本地變量,作進一步的處理分析。DolphinDB還引入了exec關鍵詞,與select相比,EXEC語句返回的結果能夠是一個matrix,vector或scalar,更便於數據分析。下面的例子中,exec與pivot by配合使用,直接返回一個矩陣。
exec first(wage) from emp_wage pivot by month, id; 1 2 3 4 5 6 7 8 9 10 ---- ---- ---- ---- ---- ---- ---- ---- ---- ---- 2017.11M|5000 5500 6000 6500 5000 5500 6000 6500 5000 5500 2017.12M|6000 6500 5000 5500 6000 6500 5000 5500 6000 6500 2018.01M|5000 5500 6000 6500 5000 5500 6000 6500 5000 5500 2018.02M|6000 6500 5000 5500 6000 6500 5000 5500 6000 6500 2018.03M|5000 5500 6000 6500 5000 5500 6000 6500 5000 5500 2018.04M|6000 6500 5000 5500 6000 6500 5000 5500 6000 6500 2018.05M|5000 5500 6000 6500 5000 5500 6000 6500 5000 5500 2018.06M|6000 6500 5000 5500 6000 6500 5000 5500 6000 6500 2018.07M|5000 5500 6000 6500 5000 5500 6000 6500 5000 5500 2018.08M|6000 6500 5000 5500 6000 6500 5000 5500 6000 6500
2.2 對面板數據(Panel Data)的友好支持
SQL的group by子句將數據分紅多組,每組產生一個值,也就是一行。所以使用group by子句後,行數通常會大大減小。
在對面板數據進行分組後,每一組數據一般是時間序列數據,譬如按股票分組,每個組內的數據是一個股票的價格序列。處理面板數據時,有時候但願保持每一個組的數據行數,也就是爲組內的每一行數據生成一個值。例如,根據一個股票的價格序列生成回報序列,或者根據價格序列生成一個移動平均價格序列。其它數據庫系統(例如SQL Server,PostGreSQL),用窗口函數(window function)來解決這個問題。DolpinDB引入了context by子句來處理面板數據。context by與窗口函數相比,除了語法更簡潔,設計更系統化(與group by和pivot by一塊兒組成對分組數據處理的三個子句)之外,表達能力上也更強大,具體表如今下面三個方面:
假定trades數據表記錄了每一個股票天天的日終價格,咱們能夠用context by方便的計算每一個股票天天的回報以及天天的排名。首先按股票代碼進行分組,計算每一個股票天天的回報。咱們這裏假設數據是時間順序排列的。
update trades set ret = ratios(price) - 1.0 context by sym;
按日期進行分組,計算天天每一個股票的回報降序排名:
select date, symbol, ret, rank(ret, false) + 1 as rank from trades where isValid(ret) context by date;
選擇天天回報排名前10的股票:
select date, symbol, ret from trades where isValid(ret) context by date having rank(ret, false) < 10;
下面咱們以一個更爲複雜的實際例子演示context by子句如何高效的解決面板數據問題。一篇論文101 Formulaic Alphas介紹了華爾街的頂級量化對衝基金WorldQuant所使用的101個量化Alpha因子。某基金公司用C#來計算這些因子,其中表明性的98號因子既用到了縱向時間序列數據的多個指標的嵌套,又用到了橫向截面數據的排序信息,實現使用了幾百行代碼。使用中國股市3000多個股票10年近9百萬行的歷史數據,計算98號Alpha因子耗時約30分鐘。而改用DolphinDB實現,以下圖所示只用了4行核心代碼,耗時僅2秒鐘,達到了接近三個數量級的性能提高。
def alpha98(stock){ t = select code, valueDate, adv5, adv15, open, vwap from stock order by valueDate update t set rank_open = rank(open), rank_adv15 = rank(adv15) context by valueDate update t set decay7 = mavg(mcorr(vwap, msum(adv5, 26), 5), 1..7), decay8 = mavg(mrank(9 - mimin(mcorr(rank_open, rank_adv15, 21), 9), true, 7), 1..8) context by code return select code, valueDate, rank(decay7)-rank(decay8) as A98 from t context by valueDate }
2.3 對時間序列數據的友好支持
DolphinDB的數據庫採用列式數據存儲,計算的時候又採用向量化的編程,對時間序列數據自然友好。
bar
函數和group by子句的配合使用,轉換成任意時間間隔的數據。咱們以一個簡單的例子來解釋window join。譬如要統計一組人員在某些時間點前三個月的平均工資。咱們能夠簡單的用window join(wj)來實現。window join函數的具體解釋請參考用戶手冊
p = table(1 2 3 as id, 2018.06M 2018.07M 2018.07M as month) s = table(1 2 1 2 1 2 as id, 2018.04M + 0 0 1 1 2 2 as month, 4500 5000 6000 5000 6000 4500 as wage) select * from wj(p, s, -3:-1,<avg(wage)>,`id`month) id month avg_wage -- -------- ----------- 1 2018.06M 5250 2 2018.07M 4833.333333 3 2018.07M
上面的問題,在其它數據庫系統中,可使用equal join(id字段)和 non-equal join(month字段),以及group by子句來解決。但除了寫法更爲複雜外,與DolphinDB的window join相比,其它系統性能落後兩個數量級以上。
window join在金融領分析領域有着普遍的應用。一個經典的應用就是將交易(trades)表和報價(quotes)表進行關聯,計算交易成本。
如下爲交易表(trades),不分區或者按日期和股票代碼分區:
sym date time price qty ---- ---------- ------------ ------ --- IBM 2018.06.01 10:01:01.005 143.19 100 MSFT 2018.06.01 10:01:04.006 107.94 200
如下爲報價表(quotes),不分區或者按日期和股票代碼分區:
sym date time bid ask bidSize askSize ---- ---------- ------------ ------ ------ ------- ------- IBM 2018.06.01 10:01:01.006 143.18 143.21 400 200 MSFT 2018.06.01 10:01:04.010 107.92 107.97 800 100
使用asof join爲每個交易找到最近的一個報價,並利用報價的中間價做爲交易成本的基準:
dateRange = 2018.05.01 : 2018.08.01 select sum(abs(price - (bid+ask)/2.0)*qty)/sum(price*qty) as cost from aj(trades, quotes, `date`sym`time) where date between dateRange group by sym;
使用window join爲每個交易找到前10毫秒的報價,計算平均中間價做爲交易成本的基準:
select sum(abs(price - mid)*qty)/sum(price*qty) as cost from pwj(trades, quotes, -10:0, <avg((bid + ask)/2.0) as mid>,`date`sym`time) where date between dateRange group by sym;
爲知足大數據分析的要求,DolphinDB對SQL還作了不少其餘擴展。這兒咱們例舉一些經常使用功能。
若是要在SQL語句中使用組合字段,函數的輸出結果必須是簡單的鍵值對(key-value pair)或者數組。若是不是這兩種類型,能夠用自定義函數進行轉換。組合字段的詳細用法請參考用戶手冊。
factor1=3.2 1.2 5.9 6.9 11.1 9.6 1.4 7.3 2.0 0.1 6.1 2.9 6.3 8.4 5.6 factor2=1.7 1.3 4.2 6.8 9.2 1.3 1.4 7.8 7.9 9.9 9.3 4.6 7.8 2.4 8.7 t=table(take(1 2 3, 15).sort() as id, 1..15 as y, factor1, factor2);
爲每一個id運行ols,y = alpha + beta1 * factor1 + beta2 * factor2, 輸出參數alpha, beta1, beta2。
select ols(y, [factor1,factor2], true, 0) as `alpha`beta1`beta2 from t group by id; id alpha beta1 beta2 -- --------- --------- --------- 1 1.063991 -0.258685 0.732795 2 6.886877 -0.148325 0.303584 3 11.833867 0.272352 -0.065526
在輸出參數的同時,輸出R2。使用自定義函數包裝輸出結果。
def myols(y,x){ r=ols(y,x,true,2) return r.Coefficient.beta join r.RegressionStat.statistics[0] } select myols(y,[factor1,factor2]) as `alpha`beta1`beta2`R2 from t group by id; id alpha beta1 beta2 R2 -- --------- --------- --------- -------- 1 1.063991 -0.258685 0.732795 0.946056 2 6.886877 -0.148325 0.303584 0.992413 3 11.833867 0.272352 -0.065526 0.144837
DolphinDB與主流的腳本語言(Python和JavaScript等)和編譯型強類型語言(C++,C和Java)同樣,支持命令式編程,即一步一步告訴計算機先作什麼再作什麼。DolphinDB目前支持18種語句(詳細參考用戶手冊第五章),包括最經常使用的賦值語句,分支語句if..else,以及循環語句for和do..while等。
DolphinDB支持對單變量和多變量進行賦值。
x = 1 2 3 y = 4 5 y += 2 x, y = y, x //swap the value of x and y x, y =1 2 3, 4 5
DolphinDB目前支持的循環語句包括for語句和do..while語句。for語句的循環體能夠包括數據對(pair)(左閉右開區間)、數組(vector)、矩陣(matrix)和表(table)。
1到100累加求和:
s = 0 for(x in 1:101) s += x print s
數組中的元素求和:
s = 0; for(x in 1 3 5 9 15) s += x print s
打印矩陣每一列的均值:
m = matrix(1 2 3, 4 5 6, 7 8 9) for(c in m) print c.avg()
計算數據表中每個行兩列之乘積:
t= table(["TV set", "Phone", "PC"] as productId, 1200 600 800 as price, 10 20 7 as qty) for(row in t) print row.productId + ": " + row.price * row.qty
DolphinDB的分支語句if..else與其它語言一致。
if(condition){ <true statements> } else{ <false statements> }
對處理海量數據時,不推薦利用控制語句(for語句,if..else語句)對數據逐行處理。這些控制語句通常用於上層模塊的處理和調度,比較底層的數據處理模塊建議使用向量編程,函數編程,SQL編程等方式來處理。
DolphinDB支持函數式編程的大部分功能,包括:
詳細請參考用戶手冊第七章。
4.1 自定義函數和lambda函數
DolphinDB中能夠建立自定義函數,函數能夠有名稱或者沒有名稱(一般是lambda函數)。建立的函數符合純函數的要求,也就是說只有函數的輸入參數能夠影響函數的輸出結果。DolphinDB與Python不一樣,函數體內只能引用函數參數和函數內的局部變量,不能使用函數體外定義的變量。從軟件工程的角度看,這犧牲了一部分語法糖的靈活性,但對提升軟件質量大有裨益。
def getWeekDays(dates){ return dates[def(x):weekday(x) between 1:5] } getWeekDays(2018.07.01 2018.08.01 2018.09.01 2018.10.01) [2018.08.01, 2018.10.01]
上面的例子中,咱們定義了一個函數getWeekDays,該函數接受一組日期,返回其在週一和週五之間的日期。函數的實現採用了向量的過濾功能,也就是接受一個布爾型單目函數用於數據的過濾。咱們定義了一個lambda函數用於數據過濾。
4.2 高階函數
高階函數是指能夠接受另外一個函數做爲參數的函數。在DolphinDB中,高階函數主要用做數據處理的模板函數,一般第一個參數是另一個函數,用於具體的數據處理。譬如說,A對象有m個元素,B對象有n個元素,一種常見的處理模式是,A中的任意一個元素和B中的任意一個元素兩兩計算,最後產生一個m*n的矩陣。DolphinDB將這種數據處理模式抽象成一個高階函數cross。DolphinDB提供了不少相似的模板函數,包括all, any, each, loop, eachLeft, eachRight, eachPre, eachPost, accumulate, reduce, groupby, contextby, pivot, cross, moving, rolling等。
下面的一個例子咱們使用三個高階函數,只用三行代碼,根據股票日內tick級別的交易數據,計算出每兩隻股票之間的相關性。
模擬生成10000000個數據點(股票代碼,交易時間和價格):
n=10000000 syms = rand(`FB`GOOG`MSFT`AMZN`IBM, n) time = 09:30:00.000 + rand(21600000, n) price = 500.0 + rand(500.0, n)
利用pivot函數生成股價透視矩陣,每列爲一支股票,每行爲一分鐘:
priceMatrix = pivot(avg, price, time.minute(), syms)
each和ratios函數配合使用,對股價矩陣每列進行操做,將股價轉爲收益率:
retMatrix = each(ratios, priceMatrix) - 1
cross和corr函數配合使用,計算每兩支股票收益率的相關性:
corrMatrix = cross(corr, retMatrix, retMatrix)
結果爲:
AMZN FB GOOG IBM MSFT --------- --------- --------- --------- --------- AMZN|1 0.015181 -0.056245 0.005822 0.084104 FB |0.015181 1 -0.028113 0.034159 -0.117279 GOOG|-0.056245 -0.028113 1 -0.039278 -0.025165 IBM |0.005822 0.034159 -0.039278 1 -0.049922 MSFT|0.084104 -0.117279 -0.025165 -0.049922 1
4.3 部分應用(Partial Application)
部分應用指當一個函數的一部分或所有參數給定後生成一個新的函數。在DolphinDB中,函數調用使用圓括號(),部分應用使用{}。3.2節中的例子用到的ratios函數的具體實現就是高階函數eachPre的一個部分應用 eachPre{ratio}。
如下兩行代碼同構:
retMatrix = each(ratios, priceMatrix) - 1 retMatrix = each(eachPre{ratio}, priceMatrix) - 1
部分應用常常用於高階函數。使用高階函數時,一般對某些參數有特定要求,經過部分應用,能夠確保全部參數符合要求。例如,計算一個向量a與一個矩陣m中的每一列的相關性,能夠將函數corr與高階函數each配合使用。可是若直接將向量與矩陣在each中列爲corr的參數,系統將會試圖計算向量的某個元素與矩陣的某個列的相關性,致使產生錯誤。這時,可利用部分應用把函數corr與向量a組成一個新的函數corr{a},再與高階函數each配合使用於矩陣的每一列,以下例所示。咱們也能夠利用for語句來解決這個問題,但代碼冗長且增長耗時。
a = 12 14 18 m = matrix(5 6 7, 1 3 2, 8 7 11)
使用each和部分應用計算向量a與矩陣中的每一列的相關性:
each(corr{a}, m)
使用for語句解決上述問題:
cols = m.columns() c = array(DOUBLE, cols) for(i in 0:cols) c[i] = corr(a, m[i])
部分應用的另外一個妙用是使函數保持狀態。一般咱們但願函數是無狀態的,即函數的輸出結果徹底是由輸入參數決定的。但有時候咱們但願函數是有「狀態」的。譬如說,在流計算中,用戶一般須要給定一個消息處理函數(message handler),接受一條新的信息後返回一個結果。若是咱們但願消息處理函數返回的是迄今爲止全部接收到的數據的平均數,能夠經過部分應用來解決。
def cumavg(mutable stat, newNum){ stat[0] = (stat[0] * stat[1] + newNum)/(stat[1] + 1) stat[1] += 1 return stat[0] } msgHandler = cumavg{0.0 0.0} each(msgHandler, 1 2 3 4 5) [1,1.5,2,2.5,3]
遠程過程調用(Remote Procedure Call)是分佈式系統最經常使用的基礎設施之一。DolphinDB的分佈式文件系統實現,分佈式數據庫實現,分佈式計算框架實現都採用了DolphinDB原創的RPC系統。DolphinDB的腳本語言經過RPC能夠在遠程機器上執行代碼。DolphinDB在使用RPC時有如下特色:
5.1 使用remoteRun執行遠程函數
DolphinDB使用xdb建立一個到遠程節點的鏈接。遠程節點能夠是任何運行DolphinDB的節點,沒必要屬於當前集羣的一部分。建立鏈接以後能夠在遠程節點上執行遠程節點上註冊的函數或本地自定義的函數。
h = xdb("localhost", 8081);
在遠程節點上執行一段腳本:
remoteRun(h, "sum(1 3 5 7)"); 16
上述遠程調用也能夠簡寫成:
h("sum(1 3 5 7)"); 16
在遠程節點上執行一個在遠程節點註冊的函數:
h("sum", 1 3 5 7); 16
在遠程系節點上執行本地的自定義函數:
def mysum(x) : reduce(+, x) h(mysum, 1 3 5 7); 16
在遠程節點(localhost:8081)上建立一個共享表sales:
h("share table(2018.07.02 2018.07.02 2018.07.03 as date, 1 2 3 as qty, 10 15 7 as price) as sales");
若是本地的自定義函數有依賴,所依賴的自定義函數會自動序列化到遠程節點:
defg salesSum(tableName, d): select mysum(price*qty) from objByName(tableName) where date=d h(salesSum, "sales", 2018.07.02); 40
5.2 使用rpc執行遠程函數
DolphinDB使用遠程過程調用功能的另外一個途徑是rpc函數。rpc函數接受遠程節點的名稱,須要執行的函數定義以及須要的參數。rpc只能在同一個集羣內的控制節點及數據節點之間使用,可是不須要建立一個新的鏈接,而是複用已經存在的網絡鏈接。這樣作的好處是能夠節約網絡資源和免去建立新鏈接帶來的延遲。當節點的用戶不少時,這一點很是有意義。rpc函數只能在遠程節點執行一個函數。若是要運行腳本,請把腳本封裝在一個自定義函數內。 下面的例子必須在一個DolphinDB集羣內使用。nodeB是遠程節點的別名,nodeB上已經有共享表sales。
rpc("nodeB", salesSum, "sales",2018.07.02); 40
在使用rpc時,爲增長代碼的可讀性,建議使用部分應用,將函數參數和函數定義寫在一塊兒,造成一個新的零參數的函數定義。
rpc("nodeB", salesSum{"sales", 2018.07.02}); 40
master是控制節點的別名。DolphinDB只能在控制節點上建立用戶:
rpc("master", createUser{"jerry", "123456"});
rpc函數須要的參數也能夠是另一個函數包括內置函數和自定義函數:
rpc("nodeB", reduce{+, 1 2 3 4 5}); 15
5.3 使用其它函數間接執行遠程函數
remoteRun和rpc均可以在一個遠程節點上執行用戶在本地自定義的函數。這是DolphinDB的RPC子系統與其它RPC系統最大的不一樣之處。在其它系統中,一般RPC的客戶端只能被動調用遠程節點已經暴露的註冊函數。在大數據分析領域,數據科學家根據新的研發項目常常會提出新的接口需求。若是等待IT部門發佈新的API接口,一般須要很長的週期,這會嚴重影響研發的效率和週期。若是要在遠程節點執行自定義的函數,自定義的函數目前必須使用DolphinDB的腳原本開發。另外,對數據的安全性也提出了更高的要求,必須仔細規劃和設置用戶的訪問權限。若是限制用戶只能使用註冊的函數,用戶的訪問權限管理能夠十分的簡單,只要拒絕外部用戶訪問一切數據,受權外部用戶訪問註冊的視圖函數就能夠了。
除了直接使用remoteRun和rpc函數外,DolphinDB也提供了不少函數間接的使用遠程過程調用。例如,在分佈式數據庫的線性迴歸就用到了rpc與olsEx。另外,pnodeRun用於在集羣的多個節點上並行運行同一個函數,並將返回的結果合併。這在集羣的管理中十分有用。
每一個數據節點返回最近的10個正在運行或已經完成的批處理做業:
pnodeRun(getRecentJobs{10});
返回節點nodeA和nodeB的最近10個SQL query:
pnodeRun(getCompletedQueries{10}, `nodeA`nodeB);
清除全部數據節點上的緩存:
pnodeRun(clearAllCache);
5.4 分佈式計算
mr用於開發基於MapReduce的分佈式計算;imr於開發基於迭代的MapReduce的分佈式計算。用戶只須要指定分佈式數據源和核心函數,譬如map函數,reduce函數,final函數等。下面咱們演示使用分佈式數據計算中位數和線性迴歸的例子。
n=10000000 x1 = pow(rand(1.0,n), 2) x2 = norm(3.0, 1.0, n) y = 0.5 + 3 * x1 - 0.5*x2 + norm(0.0, 1.0, n) t=table(rand(10, n) as id, y, x1, x2) login(`admin,"123456") db = database("dfs://testdb", VALUE, 0..9) db.createPartitionedTable(t, "sample", "id").append!(t)
利用自定義的map函數myOLSMap,內置的reduce函數(+),自定義的final函數myOLSFinal,以及內置的map-reduce框架函數mr,構建一個在分佈式數據源上運行線性迴歸的函數myOLSEx。
def myOLSMap(table, yColName, xColNames){ x = matrix(take(1.0, table.rows()), table[xColNames]) xt = x.transpose(); return xt.dot(x), xt.dot(table[yColName]) } def myOLSFinal(result){ xtx = result[0] xty = result[1] return xtx.inv().dot(xty)[0] } def myOLSEx(ds, yColName, xColNames){ return mr(ds, myOLSMap{, yColName, xColNames}, +, myOLSFinal) }
使用用戶自定義的分佈式算法和分佈式數據源計算線性迴歸係數:
sample = loadTable("dfs://testdb", "sample") myOLSEx(sqlDS(<select * from sample>), `y, `x1`x2); [0.4991, 3.0001, -0.4996]
使用內置的函數ols和未分的數據計算線性迴歸的係數,獲得相同的結果:
ols(y, [x1,x2],true); [0.4991, 3.0001, -0.4996]
下面這個例子中,咱們構造一個算法,在分佈式數據源上計算一組數據的近似中位數。算法的基本原理是利用bucketCount函數,在每個節點上分別計算一組內的數據個數,而後把各個節點上的數據累加。這樣咱們能夠找到中位數應該落在哪一個區間內。若是這個區間不夠小,進一步細分這個區間,直到小於給定的精度要求。中位數的算法須要屢次迭代,咱們所以使用了迭代計算框架imr。
def medMap(data, range, colName): bucketCount(data[colName], double(range), 1024, true) def medFinal(range, result){ x= result.cumsum() index = x.asof(x[1025]/2.0) ranges = range[1] - range[0] if(index == -1) return (range[0] - ranges*32):range[1] else if(index == 1024) return range[0]:(range[1] + ranges*32) else{ interval = ranges / 1024.0 startValue = range[0] + (index - 1) * interval return startValue : (startValue + interval) } } def medEx(ds, colName, range, precision){ termFunc = def(prev, cur): cur[1] - cur[0] <= precision return imr(ds, range, medMap{,,colName}, +, medFinal, termFunc).avg() }
使用以上近似中位數算法,計算分佈式數據的中位數:
sample = loadTable("dfs://testdb", "sample") medEx(sqlDS(<select y from sample>), `y, 0.0 : 1.0, 0.001); -0.052973
使用內置的med函數計算未分區的數據的中位數:
med(y); -0.052947
元編程指使用程序代碼來建立能夠動態運行的程序代碼。元編程的目的通常是延遲執行代碼或動態建立代碼。
DolphinDB支持使用元編程來動態建立表達式,譬如函數調用的表達式,SQL查詢表達式。不少業務細節沒法在編碼階段肯定。譬如說客戶定製報表,只有運行時,客戶選擇了表格,字段和字段格式,才能夠肯定一個完整的SQL查詢表達式。
延遲執行代碼通常分爲這幾種狀況:
DolphinDB實現元編程的途徑有兩個,一是使用一對尖括號<>來表示須要延後執行的動態代碼,二是使用函數來建立各類表達式。經常使用的用於元編程的函數包括objByName, sqlCol, sqlColAlias, sql, expr, eval, partial, makeCall。
使用<>來生成延後執行的動態表達式:
a = <1 + 2 * 3> a.typestr(); CODE a.eval(); 7
使用函數來生成延後執行的動態表達式:
a = expr(1, +, 2, *, 3) a.typestr(); CODE a.eval(); 7
可以使用元編程來定製報表。用戶的輸入包括數據表,字段名稱和字段相應的格式字符串。下例中,根據輸入的數據表,字段名稱和格式,以及過濾條件,動態生成SQL表達式並執行。
def generateReport(tbl, colNames, colFormat, filter){ colCount = colNames.size() colDefs = array(ANY, colCount) for(i in 0:colCount){ if(colFormat[i] == "") colDefs[i] = sqlCol(colNames[i]) else colDefs[i] = sqlCol(colNames[i], format{,colFormat[i]}) } return sql(colDefs, tbl, filter).eval() }
模擬生成一個100行的數據表:
t = table(1..100 as id, (1..100 + 2018.01.01) as date, rand(100.0, 100) as price, rand(10000, 100) as qty);
輸入過濾條件,字段和格式,定製報表。過濾條件使用了元編程。
generateReport(t, ["id","date","price","qty"], ["000","MM/dd/yyyy", "00.00", "#,###"], < id<5 or id>95 >); id date price qty --- ---------- ----- ----- 001 01/02/2018 50.27 2,886 002 01/03/2018 30.85 1,331 003 01/04/2018 17.89 18 004 01/05/2018 51.00 6,439 096 04/07/2018 57.73 8,339 097 04/08/2018 47.16 2,425 098 04/09/2018 27.90 4,621 099 04/10/2018 31.55 7,644 100 04/11/2018 46.63 8,383
DolphinDB的一些內置函數的參數須要使用元編程。窗口鏈接(window join)中,須要爲右表的窗口數據集指定一個或多個聚合函數以及這些函數運行時須要的參數。因爲問題的描述和執行在兩個不一樣的階段,咱們採用元編程來實現延後執行。
t = table(take(`ibm, 3) as sym, 10:01:01 10:01:04 10:01:07 as time, 100 101 105 as price) q = table(take(`ibm, 8) as sym, 10:01:01+ 0..7 as time, 101 103 103 104 104 107 108 107 as ask, 98 99 102 103 103 104 106 106 as bid) wj(t, q, -2 : 1, < [max(ask), min(bid), avg((bid+ask)*0.5) as avg_mid]>, `time); sym time price max_ask min_bid avg_mid --- -------- ----- ------- ------- ------- ibm 10:01:01 100 103 98 100.25 ibm 10:01:04 101 104 99 102.625 ibm 10:01:07 105 108 103 105.625
DolphinDB中另外一個使用元編程的內置功能是更新內存分區表。固然內存分區表的更新,刪除,排序等功能也能夠經過SQL語句來完成。
建立一個以日期爲分區的內存分區數據庫,並模擬生成trades表:
db = database("", VALUE, 2018.01.02 2018.01.03) date = 2018.01.02 2018.01.02 2018.01.02 2018.01.03 2018.01.03 2018.01.03 t = table(`IBM`MSFT`GOOG`FB`IBM`MSFT as sym, date, 101 103 103 104 104 107 as price, 0 99 102 103 103 104 as qty) trades = db.createPartitionedTable(t, "trades", "date").append!(t);
刪除qty爲0的記錄,並在每一個分區中按交易量進行升序排序:
trades.erase!(<qty=0>).sortBy!(<price*qty>);
增長一個新的字段logPrice:
trades[`logPrice]=<log(price)>;
更新股票IBM的交易數量:
trades[`qty, <sym=`IBM>]=<qty+100>;
DolpinDB是一門爲數據分析而生的編程語言。與其它數據分析語言Matlab,SAS,pandas等不一樣,DolpinDB與分佈式數據庫和分佈式計算緊密集成,天生具有處理海量數據的能力。DolphinDB支持SQL編程,函數化編程和元編程,語言簡潔靈活,表達能力強,大大提升了數據科學家的開發效率。DolphinDB支持向量化計算和分佈式計算,具備極快的運行速度。