DolphinDB是一個分佈式時序數據庫,而且內置了豐富的計算和分析功能。它能夠將TB級的海量數據存儲在多臺物理機器上,充分利用CPU,對海量數據進行高性能分析計算。經過Orca,咱們能夠在python環境中使用與pandas語法相同的腳本對DolphinDB分佈式數據庫中的數據進行復雜高效的計算。本教程主要介紹Orca對DolphinDB分佈式表的操做。python
本示例使用的是DolphinDB單機模式。首先,建立本教程的示例數據庫dfs://orca_stock 。建立數據庫的DolphinDB腳本以下所示:git
login("admin","123456") if(existsDatabase("dfs://orca_stock")){ dropDatabase("dfs://orca_stock") } dates=2019.01.01..2019.01.31 syms="A"+string(1..30) sym_range=cutPoints(syms,3) db1=database("",VALUE,dates) db2=database("",RANGE,sym_range) db=database("dfs://orca_stock",COMPO,[db1,db2]) n=10000000 datetimes=2019.01.01T00:00:00..2019.01.31T23:59:59 t=table(rand(datetimes,n) as trade_time,rand(syms,n) as sym,rand(1000,n) as qty,rand(500.0,n) as price) trades=db.createPartitionedTable(t,`trades,`trade_time`sym).append!(t) n=200000 datetimes=2019.01.01T00:00:00..2019.01.02T23:59:59 syms="A"+string(1..30) t2=table(rand(datetimes,n) as trade_time,rand(syms,n) as sym,rand(500.0,n) as bid,rand(500.0,n) as offer) quotes=db.createPartitionedTable(t2,`quotes,`trade_time`sym).append!(t2) syms="A"+string(1..30) t3=table(syms as sym,rand(0 1,30) as type) infos=db.createTable(t3,`infos).append!(t3)
注意:須要在DolphinDB database客戶端或經過DolphinDB Python API建立分佈式表,不能直接在Orca建立分佈式表。
在Orca中經過connect
函數鏈接到DolphinDB服務器:github
>>> import dolphindb.orca as orca >>> orca.connect("localhost",8848,"admin","123456")
用戶須要根據實際狀況修改IP地址和端口號。數據庫
1 讀取分佈式表服務器
Orca經過read_table
函數讀取分佈式表,返回的結果是Orca DataFrame。例如:讀取示例數據庫dfs://orca_stock 中的表trades:app
>>> trades = orca.read_table('dfs://orca_stock','trades') >>> type(trades) orca.core.frame.DataFrame
查看trades的列名:分佈式
>>> trades.columns Index(['trade_time', 'sym', 'qty', 'price'], dtype='object')
查看trades各列的數據類型:ide
>>> trades.dtypes trade_time datetime64[s] sym object qty int32 price float64 dtype: object
查看trades的行數:函數
>>> len(trades) 10000000
DolphinDB分佈式表對應的Orca DataFrame只存儲元數據,包括表名、數據的列名等信息。因爲分佈式表不是連續存儲,各個分區之間沒有嚴格的順序關係,所以分佈式表對應的DataFrame沒有RangeIndex的概念。若是須要設置index,能夠使用set_index
函數。例如,把trades中的trade_time設置爲index:性能
>>> trades.set_index('trade_time')
若是要將index列轉換爲數據列,能夠用reset_index
函數。
>>> trades.reset_index()
2 查詢和計算
Orca採用惰性求值,某些計算不會當即在服務端計算,而是轉換爲一箇中間表達式,直到真正須要時才發生計算。若是用戶須要當即觸發計算,能夠調用compute
函數。
注意,示例數據庫dfs://orca_stock 中的數據是隨機生成的,所以用戶的運行結果會與本章中的結果有所差別。
2.1 取前n條記錄
head
函數能夠查詢前n條記錄,默認取前5條。例如,取trades的前5條記錄:
>>> trades.head() trade_time sym qty price 0 2019-01-01 18:04:33 A16 855 482.526769 1 2019-01-01 13:57:38 A12 244 61.675293 2 2019-01-01 23:58:15 A10 36 297.623295 3 2019-01-01 23:02:43 A16 426 109.041012 4 2019-01-01 04:33:53 A1 472 75.778951
2.2 排序
sort_values
方法能夠根據某列排序。例如,trades按照price降序排序,取前5條記錄:
>>> trades.sort_values(by='price', ascending=False).head() trade_time sym qty price 0 2019-01-03 12:56:09 A22 861 499.999998 1 2019-01-18 17:25:21 A19 95 499.999963 2 2019-01-30 02:18:48 A30 114 499.999949 3 2019-01-23 08:31:56 A3 926 499.999926 4 2019-01-20 03:36:53 A3 719 499.999892
按照多列排序:
>>> trades.sort_values(by=['qty','trade_time'], ascending=False).head() trade_time sym qty price 0 2019-01-31 23:58:50 A24 999 359.887697 1 2019-01-31 23:57:26 A3 999 420.156175 2 2019-01-31 23:56:34 A2 999 455.228435 3 2019-01-31 23:52:58 A6 999 210.819227 4 2019-01-31 23:45:17 A14 999 310.813216
2.3 按照條件查詢
Orca支持按照單個或多個條件多慮查詢。例如,
查詢trades中2019年1月2日的數據:
>>> tmp = trades[trades.trade_time.dt.date == "2019.01.01"] >>> tmp.head() trade_time sym qty price 0 2019-01-01 00:32:21 A2 139 383.971293 1 2019-01-01 21:19:09 A2 263 100.932553 2 2019-01-01 18:50:48 A2 890 335.614454 3 2019-01-01 23:29:16 A2 858 469.223992 4 2019-01-01 09:58:51 A2 883 235.753424
查詢trades中2019年1月30日,股票代碼爲A2的數據:
>>> tmp = trades[(trades.trade_time.dt.date == '2019.01.30') & (trades.sym == 'A2')] >>> tmp.head() trade_time sym qty price 0 2019-01-30 04:41:56 A2 880 428.552654 1 2019-01-30 14:13:53 A2 512 488.826978 2 2019-01-30 14:31:28 A2 536 478.578219 3 2019-01-30 04:09:41 A2 709 255.435903 4 2019-01-30 13:18:50 A2 355 404.782260
2.4 groupby分組查詢
groupby
函數用於分組聚合。如下函數均可以用於groupby對象:
count
:返回非NULL元素的個數sum
:求和mean
:均值min
:最小值max
:最大值mode
:衆數abs
:絕對值prod
:乘積std
:標準差var
:方差sem
:平均值的標準偏差skew
:傾斜度kurtosis
:峯度cumsum
:累積求和cumprod
:累積乘積cummax
:累積最大值cummin
:累積最小值計算trades中天天的記錄數:
>>> trades.groupby(trades.trade_time.dt.date)['sym'].count() trade_time 2019-01-01 322573 2019-01-02 322662 2019-01-03 323116 2019-01-04 322436 2019-01-05 322156 2019-01-06 324191 2019-01-07 321879 2019-01-08 323319 2019-01-09 322262 2019-01-10 322585 2019-01-11 322986 2019-01-12 322839 2019-01-13 322302 2019-01-14 322032 2019-01-15 322409 2019-01-16 321810 2019-01-17 321566 2019-01-18 323651 2019-01-19 323463 2019-01-20 322675 2019-01-21 322845 2019-01-22 322931 2019-01-23 322598 2019-01-24 322404 2019-01-25 322454 2019-01-26 321760 2019-01-27 321955 2019-01-28 322013 2019-01-29 322745 2019-01-30 322193 2019-01-31 323190 dtype: int64
計算trades中天天每隻股票的記錄數:
>>> trades.groupby([trades.trade_time.dt.date,'sym'])['price'].count() trade_time sym 2019-01-01 A1 10638 A10 10747 A11 10709 A12 10715 A13 10914 ... 2019-01-31 A5 10717 A6 10934 A7 10963 A8 10907 A9 10815 Length: 930, dtype: int64
Orca支持經過agg一次應用多個聚合函數。和pandas不一樣,Orca在agg中使用字符串來表示要調用的聚合函數。例如,對計算trades中天天價格的最大值、最小值和均值:
>>> trades.groupby(trades.trade_time.dt.date)['price'].agg(["min","max","avg"]) price min max avg trade_time 2019-01-01 0.003263 499.999073 249.913612 2019-01-02 0.000468 499.999533 249.956874 2019-01-03 0.000054 499.999998 249.927257 2019-01-04 0.000252 499.999762 249.982737 2019-01-05 0.001907 499.999704 250.097487 2019-01-06 0.000318 499.999824 249.991605 2019-01-07 0.003196 499.999548 249.560505 2019-01-08 0.000216 499.996703 250.024405 2019-01-09 0.002635 499.998985 249.966446 2019-01-10 0.000725 499.996717 249.663324 2019-01-11 0.003140 499.998267 250.243786 2019-01-12 0.000105 499.998453 250.077061 2019-01-13 0.004297 499.999139 250.097489 2019-01-14 0.003510 499.999452 249.775830 2019-01-15 0.002501 499.999638 250.021218 2019-01-16 0.000451 499.998059 250.044059 2019-01-17 0.002359 499.998462 249.808932 2019-01-18 0.000104 499.999963 249.918651 2019-01-19 0.000999 499.998000 249.899495 2019-01-20 0.000489 499.999892 249.606668 2019-01-21 0.000729 499.999774 249.839876 2019-01-22 0.000834 499.999331 249.632037 2019-01-23 0.001982 499.999926 249.955031 2019-01-24 0.000323 499.993956 249.557851 2019-01-25 0.000978 499.999716 249.722053 2019-01-26 0.002582 499.998753 249.897519 2019-01-27 0.000547 499.999809 250.404666 2019-01-28 0.002729 499.998545 249.622289 2019-01-29 0.000487 499.999598 249.950167 2019-01-30 0.000811 499.999949 250.182493 2019-01-31 0.000801 499.999292 249.317517
Orca groupby支持過濾功能。和pandas不一樣,Orca中的過濾條件用字符串形式的表達式來表示,而不是lambda函數。
例如,返回trades中天天每隻股票均價大於200,而且記錄數大於11000的記錄:
>>> trades.groupby([trades.trade_time.dt.date,'sym'])['price'].filter("avg(price) > 200 and count(price) > 11000") 0 499.171179 1 375.553059 2 119.240890 3 370.198534 4 5.876941 ... 88416 37.872317 88417 373.259785 88418 435.154484 88419 436.163806 88420 428.455914 Length: 88421, dtype: float64
2.5 resample重採樣
Orca支持resample
函數,能夠對常規時間序列數據從新採樣和頻率轉換。目前,resample函數的參數以下:
Orca支持的DateOffset以下:
'B':BDay or BusinessDay 'WOM':WeekOfMonth 'LWOM':LastWeekOfMonth 'M':MonthEnd 'MS':MonthBegin 'BM':BMonthEnd or BusinessMonthEnd 'BMS':BMonthBegin or BusinessMonthBegin 'SM':SemiMonthEnd 'SMS':SemiMonthBegin 'Q':QuarterEnd 'QS':QuarterBegin 'BQ':BQuarterEnd 'BQS':BQuarterBegin 'REQ':FY5253Quarter 'A':YearEnd 'AS' or 'BYS':YearBegin 'BA':BYearEnd 'BAS':BYearBegin 'RE':FY5253 'D':Day 'H':Hour 'T' or 'min':Minute 'S':Second 'L' or 'ms':Milli 'U' or 'us':Micro 'N':Nano
例如,對trades中的數據從新採樣,每3分鐘計算一次:
>>> trades.resample('3T', on='trade_time')['qty'].sum() trade_time 2019-01-01 00:00:00 321063 2019-01-01 00:03:00 354917 2019-01-01 00:06:00 329419 2019-01-01 00:09:00 340880 2019-01-01 00:12:00 356612 ... 2019-01-31 23:45:00 322829 2019-01-31 23:48:00 344753 2019-01-31 23:51:00 330959 2019-01-31 23:54:00 336712 2019-01-31 23:57:00 328730 Length: 14880, dtype: int64
若是trades設置了trade_time爲index,也能夠用如下方法從新採樣:
>>> trades.resample('3T', level='trade_time')['qty'].sum()
若是要用dateoffset函數生成的對象來表示dateoffset,須要先導入pandas的dateoffset。按3分鐘從新採樣也能夠使用如下寫法:
>>> from pandas.tseries.offsets import * >>> ofst = Minute(n=3) >>> trades.resample(ofst,on='trade_time')['qty'].sum()
2.6 rolling移動窗口
Orca提供了rolling
函數,能夠在移動窗口中作計算。目前,rolling
函數的參數以下:
如下函數可用於orca.DataFrame.rolling對象:
count
:返回非NULL元素的個數sum
:求和min
:最小值max
:最大值std
:標準差var
:方差corr
:相關性covar
:協方差skew
:傾斜度kurtosis
:峯度對於分佈式表對應的DataFrame,在滑動窗口中計算時,是以分區爲單位單獨計算的,所以每一個分區的計算結果的前window-1個值爲空。例如,trades中2019.01.01和2019.01.02的數據在長度爲3的滑動窗口中求price的和:
>>> tmp = trades[(trades.trade_time.dt.date == '2019.01.01') | (trades.trade_time.dt.date == '2019.01.02')] >>> re = tmp.rolling(window=3)['price'].sum() 0 NaN 1 NaN 2 792.386603 3 601.826312 4 444.858366 ... 646057 1281.099161 646058 1287.816045 646059 963.262163 646060 865.797011 646061 719.050068 Name: price, Length: 646062, dtype: float64
2.7 數據鏈接
Orca提供了鏈接DataFrame的功能。分佈式表對應的DataFrame,既能夠鏈接普通內存表對應的DataFrame,也能夠鏈接分佈式表對應的DataFrame。兩個分佈式表對應的DataFrame鏈接時必須同時知足如下條件:
Orca提供了merge
和join
函數。
merge
函數支持如下參數:
join
函數是merge
函數的特例,它的參數及含義與merge
基本相同,只是join
默認爲左外鏈接,即how='left'。
例如,對trades和quotes進行內鏈接:
>>> quotes = orca.read_table('dfs://orca_stock','quotes') >>> trades.merge(right=quotes, left_on=['trade_time','sym'], right_on=['trade_time','sym'], how='inner') trade_time sym qty price bid offer 0 2019-01-01 02:36:34 A15 273 186.144261 317.458480 155.361661 1 2019-01-01 05:37:59 A13 185 420.397500 248.447426 115.722893 2 2019-01-01 00:59:43 A10 751 89.801687 193.925714 144.345473 3 2019-01-01 21:58:36 A16 175 251.753495 116.810807 439.178207 4 2019-01-01 10:53:54 A16 532 71.733640 240.927647 388.718680 ... ... ... ... ... ... ... 25035 2019-01-02 03:59:51 A3 220 50.004418 107.905522 167.375994 25036 2019-01-02 17:54:01 A3 202 195.189216 134.463906 142.443428 25037 2019-01-02 16:57:50 A9 627 68.661644 440.421876 110.801070 25038 2019-01-02 10:27:43 A28 414 487.337282 169.081363 261.171073 25039 2019-01-02 17:02:51 A3 661 243.960836 92.999404 26.747609 [25040 rows x 6 columns]
使用join
函數對trades和quotes進行左外鏈接:
>>> trades.set_index(['trade_time','sym'], inplace=True) >>> quotes.set_index(['trade_time','sym'], inplace=True) >>> trades.join(quotes) qty price bid offer trade_time sym 2019-01-01 18:04:25 A14 435 378.595626 NaN NaN 2019-01-01 20:38:47 A13 701 275.039372 NaN NaN 2019-01-01 02:43:03 A16 787 138.751605 NaN NaN 2019-01-01 20:32:42 A14 989 188.035335 NaN NaN 2019-01-01 16:59:16 A13 847 118.071427 NaN NaN ... ... ... ... ... 2019-01-31 17:21:27 A30 3 49.855063 NaN NaN 2019-01-31 13:49:01 A6 273 245.966115 NaN NaN 2019-01-31 16:42:29 A7 548 197.814548 NaN NaN 2019-01-31 03:42:11 A5 563 263.999224 NaN NaN 2019-01-31 20:48:57 A9 809 318.420522 NaN NaN [10000481 rows x 4 columns]
3 把dataframe追加到dfs表
Orca提供了append
函數,能夠將Orca DataFrame追加到dfs表中。
append
函數具備如下參數:
例如,往dataframe追加到trades對應的分佈式表:
>>> import pandas as pd >>> odf=orca.DataFrame({'trade_time':pd.date_range('20190101 12:30',periods=5,freq='T'), 'sym':['A1','A2','A3','A4','A5'], 'qty':[100,200,300,400,500], 'price':[100.5,263.1,254.9,215.1,245.6]}) >>> trades.append(odf,inplace=True) >>> len(trades) 10000005
Orca擴展了append函數,支持inplace參數,即容許就地添加數據。若是inplace爲False,表現和pandas相同。分佈式表中的內容會複製到內存中,此時trades對應的只是一個內存表,odf中的內容只追加到內存表,沒有真正地追加到dfs表。
4 小結
對於分佈式表,目前Orca還具備一些功能上的限制,例如分區表對應的DataFrame沒有RangeIndex的概念、一些函數不支持在分佈式表上使用以及修改表中數據的限制等。具體請參考Orca快速入門指導。