開發大數據應用,不只須要能支撐海量數據的分佈式數據庫,能高效利用多核多節點的分佈式計算框架,更須要一門能與分佈式數據庫和分佈式計算有機融合、高性能易擴展、表達能力強、知足快速開發和建模須要的編程語言。DolphinDB從流行的Python和SQL語言汲取了靈感,設計了大數據處理腳本語言。java
提到數據庫語言,咱們很容易想到標準的SQL語言。不一樣於標準的SQL,DolphinDB編程語言功能齊全,表達能力很是強大,完美支持命令式編程、向量化編程、函數話編程、SQL編程、遠程過程調用編程(RPC)和元編程等多種編程範式。DolphinDB編程語言的語法和表達習慣與Python和SQL很是類似,只要對Python和SQL有必定的瞭解,就能輕鬆掌握。相對而言,掌握內存時序數據庫kdb+的q語言難度要大得多。python
DolphinDB的編程語言可以知足數據科學家快速開發和建模的需求。DolphinDB語言簡潔靈活,表達能力強,大大提升了數據科學家的開發效率。DolphinDB支持向量化計算和分佈式計算,具備極快的運行速度。下面將詳細介紹DolphinDB編程語言的獨特之處。ios
1.命令式編程git
與主流的腳本語言Python、JS等,還有強類型語言C、C++、Java等同樣,DolphinDB也支持命令式編程。命令式編程是指經過執行一條一條的語句,實現最終目標。DolphinDB的命令式編程主要是用做上層模塊的處理和調度。在大數據分析中,因爲須要處理的數據量很是龐大,若是咱們採用命令式編程逐行處理數據,效率會十分低下,性能也會有所降低。所以,咱們推薦在DolphinDB中使用其餘編程方式來批量處理數據。github
//DolphinDB支持對單變量和多變量進行賦值 x = 1 2 3 y = 4 5 y += 2 x, y = y, x //swap the value of x and y x, y =1 2 3, 4 5 // 1到100累加求和 s = 0 for(x in 1:101) s += x print s //數組中的元素求和 s = 0; for(x in 1 3 5 9 15) s += x print s //打印矩陣每一列的均值 m = matrix(1 2 3, 4 5 6, 7 8 9) for(c in m) print c.avg() //計算product表中每個產品的銷售額 t= table(["TV set", "Phone", "PC"] as productId, 1200 600 800 as price, 10 20 7 as qty) for(row in t) print row.productId + ": " + row.price * row.qty
2.向量化編程算法
跟matlab、R等編程語言同樣,DolphinDB也支持向量化編程。前面提到的kdb+數據庫的q語言也是向量處理語言,它在複雜的計算上表現出很好的性能,而且效率很高。DolphinDB的編程語言對不少算法都進行了優化,好比對時間序列數據計算滑動窗口指標,大大提升了向量函數的效率。sql
//兩個長度爲1000萬的向量相加,採用向量化編程比命令式編程的for語句更加簡潔,耗耗時更短。 n = 10000000 a = rand(1.0, n) b = rand(1.0, n) //採用for語句編程,須要12秒 c = array(DOUBLE, n) for(i in 0 : n) c[i] = a[i] + b[i] Time elapsed: 12341.043 ms //採用向量化編程,僅需36毫秒 c = a + b Time elapsed: 36.901 ms
向量化編程一般是把整個向量加載到連續內存中。有時候由於內存碎片,沒有找到連續內存,向量就不可用了。DolphinDB針對這個問題,特地提供了big array數據類型。big array能夠把物理上不連續的內存塊組成邏輯上連續的向量,即便是很是大的向量,也能在DolphinDB中使用,提升了系統的可用性。數據庫
3.函數化編程編程
DolphinDB支持函數化編程的大部分功能,包括純函數、自定義函數、λ函數、高階函數、部分應用和閉包。DolphinDB內置了400多個函數,涵蓋了各類數據類型、數據結構和系統調用。api
DolphinDB的純函數特性減小了函數的反作用。在自定義函數時,DolphinDB不能使用函數體外定義的變量。純函數特性能夠大幅度提升代碼可讀性和軟件質量。
3.1 自定義函數
//定義一個函數返回工做日 def getWorkDays(dates){ return dates[def(x):weekday(x) between 1:5] } getWorkDays(2018.07.01 2018.08.01 2018.09.01 2018.10.01) [2018.08.01, 2018.10.01]
上面的例子定義一個函數getWorkDays,該函數受一組日期,返回並返回在週一和週五之間的日期。函數的實現採用了向量的過濾功能,也就是接受一個布爾型單目函數用於數據的過濾。
3.2 高階函數
下面的一個例子咱們使用三個高階函數pivot、each和cross,乾淨利落的用三行代碼,根據股票日內tick級別的報價數據,計算出兩兩之間的相關性。
//模擬生成10000000萬個數據點(股票代碼,交易時間和價格) n=10000000 syms = rand(`FB`GOOG`MSFT`AMZN`IBM, n) time = 09:30:00.000 + rand(21600000, n) price = 500.0 + rand(500.0, n) //利用pivot函數生成透視表 priceMatrix = pivot(avg, price, time.minute(), syms) //each和ratios函數的配合使用,爲每一個股票(矩陣的列)生成每分鐘的回報序列 retMatrix = each(ratios, priceMatrix) - 1 //cross和corr函數的配合使用,計算股票兩兩之間的相關性 corrMatrix = cross(corr, retMatrix, retMatrix) AMZN FB GOOG IBM MSFT --------- --------- --------- --------- --------- AMZN|1 0.015181 -0.056245 0.005822 0.084104 FB |0.015181 1 -0.028113 0.034159 -0.117279 GOOG|-0.056245 -0.028113 1 -0.039278 -0.025165 IBM |0.005822 0.034159 -0.039278 1 -0.049922 MSFT|0.084104 -0.117279 -0.025165 -0.049922 1
3.3 部分應用
高階函數中的函數參數一般對參數有限制,經過部分應用,能夠確保參數符合要求。例如,給定一個向量 a = 12 14 18,計算與矩陣中的每一列的相關性。由於要計算矩陣的每一列的相關性,固然可使用高階函數each。可是corr函數須要兩個參數,而矩陣只提供其中的一個參數,另外一個參數必須事先給定,因此部分應用能夠解決這個問題。固然咱們也能夠用for語句來解決這個問題,但代碼冗長而低效。
a = 12 14 18 m = matrix(5 6 7, 1 3 2, 8 7 11) //使用each和部分應用計算矩陣中的每一列與給定向量a的相關性 each(corr{a}, m) //使用for語句解決上面的問題 cols = m.columns() c = array(DOUBLE, cols) for(i in 0:cols) c[i] = corr(a, m[i])
部分應用的另外一個做用是使函數保持狀態。例如,在流計算中,用戶一般須要給定一個消息處理函數(message handler),接受一條新的信息,返回一個結果。可是咱們但願消息處理函數返回的是迄今爲止全部數的平均數。這個問題咱們能夠經過部分應用來解決。
def cumavg(mutable stat, newNum){ stat[0] = (stat[0] * stat[1] + newNum)/(stat[1] + 1) stat[1] += 1 return stat[0] } msgHandler = cumavg{0.0 0.0} each(msgHandler, 1 2 3 4 5) [1,1.5,2,2.5,3]
4.SQL編程
DolphinDB的編程語言不只支持標準的SQL,還針對時間序列數據擴展了SQL的功能,如分組計算(context by)、數據透視(pivot by)、窗口函數、asof鏈接和窗口鏈接等,更便於分析時間序列數據。單純的SQL引擎表達能力有限,很難知足更加複雜的數據分析和算法實現,影響開發效率。在DolphinDB中,腳本語言與SQL語言是徹底融合在一塊兒的。
4.1 SQL與編程語言融合
//生成一個員工工資表 emp_wage = table(take(1..10, 100) as id, take(2017.10M + 1..10, 100).sort() as month, take(5000 5500 6000 6500, 100) as wage) //計算給定的一組員工的平均工資。員工列表存儲在一個本地變量empIds中 empIds = 3 4 6 7 9 select avg(wage) from emp_wage where id in empIds group by id id avg_wage -- -------- 3 5500 4 6000 6 6000 7 5500 9 5500 //除計算平均工資外,同時顯示員工的姓名。員工姓名使用一個字典empName來獲取。 empName = dict(1..10, `Alice`Bob`Jerry`Jessica`Mike`Tim`Henry`Anna`Kevin`Jones) select empName[first(id)] as name, avg(wage) from emp_wage where id in empIds group by id id name avg_wage -- ------- -------- 3 Jerry 5500 4 Jessica 6000 6 Tim 6000 7 Henry 5500 9 Kevin 5500
上面的例子,SQL語句的where子句和select子句分別用到了上下文中定義的數組和字典,使得原本須要經過子查詢和多表聯結來解決的問題,經過簡單的hash table解決了。若是SQL涉及到分佈式數據庫,這些上下文變量會自動序列化到須要的節點。這不只讓代碼看上去更簡潔,有更好的可讀性,並且提高了性能。在大數據分析中,不少數據表關聯,即便SQL優化器作了不少優化,也不免帶來性能問題。
4.2 context by——對面板數據的友好支持
DolphinDB提供了相似其餘數據庫系統的window function——context by。可是與window function相比,context by的語法更簡潔,而且沒有那麼多限制,能夠與select或update一塊兒使用。
//按股票代碼進行分組,計算每一個股票天天的回報。假設數據是時間順序排列的。 update trades set ret = ratios(price) - 1.0 context by sym //按日期進行分組,計算天天每一個股票的ret降序排名。 select date, symbol, ret, rank(ret, false) + 1 as rank from trades where isValid(ret) context by date //選擇天天ret排名前10的股票 select date, symbol, ret from trades where isValid(ret) context by date having rank(ret, false) < 10
4.3 asof join和window join——對時序數據的友好支持
t1 = table(09:30m 09:31m 09:33m 09:34m as minute, 29.2 28.9 29.3 30.1 as price) t2 = table(09:30m 09:31m 09:34m 09:36m as minute, 51.2 52.4 51.9 52.8 as price) select * from aj(t1, t2, `minute) minute price t2_minute t2_price ------ ----- --------- -------- 09:30m 29.2 09:30m 51.2 09:31m 28.9 09:31m 52.4 09:33m 29.3 09:31m 52.4 09:34m 30.1 09:34m 51.9
上面的例子中,t2中沒有與09:33m、09:34m對應的記錄,asof join(aj)會分別取t2中在09:33m、09:34m以前最近時間對應的記錄,即取t2中09:31m的記錄。
p = table(1 2 3 as id, 2018.06M 2018.07M 2018.07M as month) s = table(1 2 1 2 1 2 as id, 2018.04M 2018.04M 2018.05M 2018.05M 2018.06M 2018.06M as month, 4500 5000 6000 5000 6000 4500 as wage) select * from wj(p, s, -3:-1,<avg(wage)>,`id`month) id month avg_wage -- -------- ----------- 1 2018.06M 5250 2 2018.07M 4833.333333 3 2018.07M
上面的例子說明了window join(wj)的用法。wj首先取表p第一行記錄,即id=1,month=2018.06M。而後在表s中選擇id=1而且month在(2018.06M-3)到(2018.06M-1),即2018.03M到2018.05M之間的記錄來計算avg(wage)。所以avg_wage=(4500+6000)/2=5250。如此類推。
asof join和window join在金融分析領域有着普遍的應用。一個經典的應用是將交易表和報價表進行關聯,計算個股交易成本。詳情能夠參考使用Window Join快速估計個股交易成本。
4.4 SQL其它擴展
爲了知足大數據分析的要求,DolphinDB對SQL還作了不少擴展。好比,用戶的自定義函數無需編譯、打包或部署,便可在SQL中使用。又好比DolphinDB支持組合字段(Composite Column),能夠將複雜分析函數的多個返回值輸出到數據表的一行。
factor1=3.2 1.2 5.9 6.9 11.1 9.6 1.4 7.3 2.0 0.1 6.1 2.9 6.3 8.4 5.6 factor2=1.7 1.3 4.2 6.8 9.2 1.3 1.4 7.8 7.9 9.9 9.3 4.6 7.8 2.4 8.7 t=table(take(1 2 3, 15).sort() as id, 1..15 as y, factor1, factor2) //在輸出參數的同時,輸出t統計值。使用自定義函數包裝輸出結果 def myols(y,x){ r=ols(y,x,true,2) return r.Coefficient.beta join r.RegressionStat.statistics[0] } select myols(y,[factor1,factor2]) as `alpha`beta1`beta2`R2 from t group by id id alpha beta1 beta2 R2 -- --------- --------- --------- -------- 1 1.063991 -0.258685 0.732795 0.946056 2 6.886877 -0.148325 0.303584 0.992413 3 11.833867 0.272352 -0.065526 0.144837
5.遠程過程調用編程
DolphinDB與其餘系統相比,在遠程過程調用(RPC)上的優點主要體如今兩個方面:第一,在DolphinDB中,不管是自定義函數仍是內置函數,咱們均可以經過遠程過程調用發送到其餘節點上運行,而其餘系統不能遠程調用與自定義函數相關的函數。第二,DolphinDB的遠程過程調用無需編譯或者部署。系統會自動把相關函數定義和所需數據序列化到遠程節點。數據科學家或數據分析師在編寫與遠程過程調用相關的函數時,不須要工程師配合編譯和部署,能夠直接在線使用,極大地提升了開發和分析效率。
下面的例子是使用remoteRun執行遠程函數:
h = xdb("localhost", 8081) //在遠程節點上執行一段腳本 remoteRun(h, "sum(1 3 5 7)") 16 //上述遠程調用也能夠簡寫成 h("sum(1 3 5 7)") 16 //在遠程節點上執行一個在遠程節點註冊的函數 h("sum", 1 3 5 7) 16 //在遠程系節點上執行本地的自定義函數 def mysum(x) : reduce(+, x) h(mysum, 1 3 5 7) 16 //在遠程節點(localhost:8081)上建立一個共享表sales h("share table(2018.07.02 2018.07.02 2018.07.03 as date, 1 2 3 as qty, 10 15 7 as price) as sales") //若是本地的自定義函數有依賴,依賴的自定義函數也會序列化到遠程節點 defg salesSum(tableName, d): select mysum(price*qty) from objByName(tableName) where date=d h(salesSum, "sales", 2018.07.02) 40
DolphinDB還提供了與分佈式計算相關的函數。mr和imr分別用於開發基於map-reduce和迭代的map-reduce分佈式算法。用戶只須要指定分佈式數據源和定製的核心函數,譬如map函數,reduce函數,final函數等。下面咱們先建立一個分佈式表,添加一些模擬數據,而後演示開發計算中位數和線性迴歸的例子。
//模擬生成分佈式表sample,用id分區 //y = 0.5 + 3x1 -0.5x2 n=10000000 x1 = pow(rand(1.0,n), 2) x2 = norm(3.0:1.0, n) y = 0.5 + 3 * x1 - 0.5*x2 + norm(0.0:1.0, n) t=table(rand(10, n) as id, y, x1, x2) login(`admin,"123456") db = database("dfs://testdb", VALUE, 0..9) db.createPartitionedTable(t, "sample", "id").append!(t)
利用自定義的map函數myOLSMap,內置的reudce函數加函數(+),自定義的final函數myOLSFinal,以及內置的map-reduce框架函數mr,快速構建了一個在分佈式數據源上運行線性迴歸的函數myOLSEx。
def myOLSMap(table, yColName, xColNames){ x = matrix(take(1.0, table.rows()), table[xColNames]) xt = x.transpose(); return xt.dot(x), xt.dot(table[yColName]) } def myOLSFinal(result){ xtx = result[0] xty = result[1] return xtx.inv().dot(xty)[0] } def myOLSEx(ds, yColName, xColNames){ return mr(ds, myOLSMap{, yColName, xColNames}, +, myOLSFinal) } //使用本身開發的分佈式算法和分佈式數據源計算線性迴歸係數 sample = loadTable("dfs://testdb", "sample") myOLSEx(sqlDS(<select * from sample>), `y, `x1`x2) [0.4991, 3.0001, -0.4996] //使用內置的函數ols和未分的數據計算線性迴歸的係數,獲得相同的結果 ols(y, [x1,x2],true) [0.4991, 3.0001, -0.4996]
下面這個例子,咱們構造一個算法,在分佈式數據源上計算一組數據的近似中位數。算法的基本原理是利用bucketCount函數,在每個節點上分別計算一組bucket內的數據個數,而後把各個節點上的數據累加。這樣咱們能夠找到中位數應該落在哪一個區間內。若是這個區間不夠小,進一步細分這個區間,直到小於給定的精度要求。中位數的算法須要屢次迭代,咱們所以使用了迭代計算框架imr。
def medMap(data, range, colName): bucketCount(data[colName], double(range), 1024, true) def medFinal(range, result){ x= result.cumsum() index = x.asof(x[1025]/2.0) ranges = range[1] - range[0] if(index == -1) return (range[0] - ranges*32):range[1] else if(index == 1024) return range[0]:(range[1] + ranges*32) else{ interval = ranges / 1024.0 startValue = range[0] + (index - 1) * interval return startValue : (startValue + interval) } } def medEx(ds, colName, range, precision){ termFunc = def(prev, cur): cur[1] - cur[0] <= precision return imr(ds, range, medMap{,,colName}, +, medFinal, termFunc).avg() } //使用本身開發的近似中位數算法,計算分佈式數據的中位數。 sample = loadTable("dfs://testdb", "sample") medEx(sqlDS(<select y from sample>), `y, 0.0 : 1.0, 0.001) -0.052973 //使用內置的med函數計算未分區的數據的中位數。 med(y) -0.052947
6.元編程
DolphinDB支持使用元編程來動態建立表達式,如函數調用的表達式和SQL查詢表達式。元編程的一個典型應用是定製報表。用戶只須要輸入數據表、字段名稱和字段格式就能生成報表。具體實現以下:
//根據輸入的數據表,字段名稱和格式,以及過濾條件,動態生成SQL表達式並執行 def generateReport(tbl, colNames, colFormat, filter){ colCount = colNames.size() colDefs = array(ANY, colCount) for(i in 0:colCount){ if(colFormat[i] == "") colDefs[i] = sqlCol(colNames[i]) else colDefs[i] = sqlCol(colNames[i], format{,colFormat[i]}) } return sql(colDefs, tbl, filter).eval() } //模擬生成一個100行的數據表 t = table(1..100 as id, (1..100 + 2018.01.01) as date, rand(100.0, 100) as price, rand(10000, 100) as qty) //輸入過濾條件,字段和格式,定製報表。過濾條件使用了元編程。 generateReport(t, ["id","date","price","qty"], ["000","MM/dd/yyyy", "00.00", "#,###"], < id<5 or id>95 >) id date price qty --- ---------- ----- ----- 001 01/02/2018 50.27 2,886 002 01/03/2018 30.85 1,331 003 01/04/2018 17.89 18 004 01/05/2018 51.00 6,439 096 04/07/2018 57.73 8,339 097 04/08/2018 47.16 2,425 098 04/09/2018 27.90 4,621 099 04/10/2018 31.55 7,644 100 04/11/2018 46.63 8,383
DolphinDB編程語言爲數據分析而生,天生具有處理海量數據的能力,功能強大,簡單易用。若是想要了解更多關於DolphinDB腳本,能夠參考DolphinDB腳本語言的混合範式編程。
此外,還提供了多種編程API,如R、Python、Java、C#等,可以方便地與已有的應用集成。
Java API:dolphindb/api-java
Python 3 API:dolphindb/api-python3
Python 2.7 API:dolphindb/api-python
C# API:dolphindb/api-csharp
歡迎訪問官網下載 DolphinDB database 試用版