乾貨丨解析Orca對DolphinDB分佈式表的操做

DolphinDB是一個分佈式時序數據庫,而且內置了豐富的計算和分析功能。它能夠將TB級的海量數據存儲在多臺物理機器上,充分利用CPU,對海量數據進行高性能分析計算。經過Orca,咱們能夠在python環境中使用與pandas語法相同的腳本對DolphinDB分佈式數據庫中的數據進行復雜高效的計算。本教程主要介紹Orca對DolphinDB分佈式表的操做。python

本示例使用的是DolphinDB單機模式。首先,建立本教程的示例數據庫dfs://orca_stock 。建立數據庫的DolphinDB腳本以下所示:git

login("admin","123456")
if(existsDatabase("dfs://orca_stock")){
	dropDatabase("dfs://orca_stock")
}
dates=2019.01.01..2019.01.31
syms="A"+string(1..30)
sym_range=cutPoints(syms,3)
db1=database("",VALUE,dates)
db2=database("",RANGE,sym_range)
db=database("dfs://orca_stock",COMPO,[db1,db2])
n=10000000
datetimes=2019.01.01T00:00:00..2019.01.31T23:59:59
t=table(rand(datetimes,n) as trade_time,rand(syms,n) as sym,rand(1000,n) as qty,rand(500.0,n) as price)
trades=db.createPartitionedTable(t,`trades,`trade_time`sym).append!(t)

n=200000
datetimes=2019.01.01T00:00:00..2019.01.02T23:59:59
syms="A"+string(1..30)
t2=table(rand(datetimes,n) as trade_time,rand(syms,n) as sym,rand(500.0,n) as bid,rand(500.0,n) as offer)
quotes=db.createPartitionedTable(t2,`quotes,`trade_time`sym).append!(t2)

syms="A"+string(1..30)
t3=table(syms as sym,rand(0 1,30) as type)
infos=db.createTable(t3,`infos).append!(t3)
注意:須要在DolphinDB database客戶端或經過DolphinDB Python API建立分佈式表,不能直接在Orca建立分佈式表。

在Orca中經過connect函數鏈接到DolphinDB服務器:github

>>> import dolphindb.orca as orca
>>> orca.connect("localhost",8848,"admin","123456")

用戶須要根據實際狀況修改IP地址和端口號。數據庫

1 讀取分佈式表服務器

Orca經過read_table函數讀取分佈式表,返回的結果是Orca DataFrame。例如:讀取示例數據庫dfs://orca_stock 中的表trades:app

>>> trades = orca.read_table('dfs://orca_stock','trades')
>>> type(trades)
orca.core.frame.DataFrame

查看trades的列名:分佈式

>>> trades.columns
Index(['trade_time', 'sym', 'qty', 'price'], dtype='object')

查看trades各列的數據類型:ide

>>> trades.dtypes
trade_time    datetime64[s]
sym                  object
qty                   int32
price               float64
dtype: object

查看trades的行數:函數

>>> len(trades)
10000000

DolphinDB分佈式表對應的Orca DataFrame只存儲元數據,包括表名、數據的列名等信息。因爲分佈式表不是連續存儲,各個分區之間沒有嚴格的順序關係,所以分佈式表對應的DataFrame沒有RangeIndex的概念。若是須要設置index,能夠使用set_index函數。例如,把trades中的trade_time設置爲index:性能

>>> trades.set_index('trade_time')

若是要將index列轉換爲數據列,能夠用reset_index函數。

>>> trades.reset_index()

2 查詢和計算

Orca採用惰性求值,某些計算不會當即在服務端計算,而是轉換爲一箇中間表達式,直到真正須要時才發生計算。若是用戶須要當即觸發計算,能夠調用compute函數。

注意,示例數據庫dfs://orca_stock 中的數據是隨機生成的,所以用戶的運行結果會與本章中的結果有所差別。

2.1 取前n條記錄

head函數能夠查詢前n條記錄,默認取前5條。例如,取trades的前5條記錄:

>>> trades.head()
           trade_time  sym  qty       price
0 2019-01-01 18:04:33  A16  855  482.526769
1 2019-01-01 13:57:38  A12  244   61.675293
2 2019-01-01 23:58:15  A10   36  297.623295
3 2019-01-01 23:02:43  A16  426  109.041012
4 2019-01-01 04:33:53   A1  472   75.778951

2.2 排序

sort_values方法能夠根據某列排序。例如,trades按照price降序排序,取前5條記錄:

>>> trades.sort_values(by='price', ascending=False).head()
           trade_time  sym  qty       price
0 2019-01-03 12:56:09  A22  861  499.999998
1 2019-01-18 17:25:21  A19   95  499.999963
2 2019-01-30 02:18:48  A30  114  499.999949
3 2019-01-23 08:31:56   A3  926  499.999926
4 2019-01-20 03:36:53   A3  719  499.999892

按照多列排序:

>>> trades.sort_values(by=['qty','trade_time'], ascending=False).head()
           trade_time  sym  qty       price
0 2019-01-31 23:58:50  A24  999  359.887697
1 2019-01-31 23:57:26   A3  999  420.156175
2 2019-01-31 23:56:34   A2  999  455.228435
3 2019-01-31 23:52:58   A6  999  210.819227
4 2019-01-31 23:45:17  A14  999  310.813216

2.3 按照條件查詢

Orca支持按照單個或多個條件多慮查詢。例如,

查詢trades中2019年1月2日的數據:

>>> tmp = trades[trades.trade_time.dt.date == "2019.01.01"]
>>> tmp.head()
           trade_time sym  qty       price
0 2019-01-01 00:32:21  A2  139  383.971293
1 2019-01-01 21:19:09  A2  263  100.932553
2 2019-01-01 18:50:48  A2  890  335.614454
3 2019-01-01 23:29:16  A2  858  469.223992
4 2019-01-01 09:58:51  A2  883  235.753424

查詢trades中2019年1月30日,股票代碼爲A2的數據:

>>> tmp = trades[(trades.trade_time.dt.date == '2019.01.30') & (trades.sym == 'A2')]
>>> tmp.head()
           trade_time sym  qty       price
0 2019-01-30 04:41:56  A2  880  428.552654
1 2019-01-30 14:13:53  A2  512  488.826978
2 2019-01-30 14:31:28  A2  536  478.578219
3 2019-01-30 04:09:41  A2  709  255.435903
4 2019-01-30 13:18:50  A2  355  404.782260

2.4 groupby分組查詢

groupby函數用於分組聚合。如下函數均可以用於groupby對象:

  • count:返回非NULL元素的個數
  • sum:求和
  • mean:均值
  • min:最小值
  • max:最大值
  • mode:衆數
  • abs:絕對值
  • prod:乘積
  • std:標準差
  • var:方差
  • sem:平均值的標準偏差
  • skew:傾斜度
  • kurtosis:峯度
  • cumsum:累積求和
  • cumprod:累積乘積
  • cummax:累積最大值
  • cummin:累積最小值

計算trades中天天的記錄數:

>>> trades.groupby(trades.trade_time.dt.date)['sym'].count()
trade_time
2019-01-01    322573
2019-01-02    322662
2019-01-03    323116
2019-01-04    322436
2019-01-05    322156
2019-01-06    324191
2019-01-07    321879
2019-01-08    323319
2019-01-09    322262
2019-01-10    322585
2019-01-11    322986
2019-01-12    322839
2019-01-13    322302
2019-01-14    322032
2019-01-15    322409
2019-01-16    321810
2019-01-17    321566
2019-01-18    323651
2019-01-19    323463
2019-01-20    322675
2019-01-21    322845
2019-01-22    322931
2019-01-23    322598
2019-01-24    322404
2019-01-25    322454
2019-01-26    321760
2019-01-27    321955
2019-01-28    322013
2019-01-29    322745
2019-01-30    322193
2019-01-31    323190
dtype: int64

計算trades中天天每隻股票的記錄數:

>>> trades.groupby([trades.trade_time.dt.date,'sym'])['price'].count()
trade_time  sym
2019-01-01  A1     10638
            A10    10747
            A11    10709
            A12    10715
            A13    10914
                   ...  
2019-01-31  A5     10717
            A6     10934
            A7     10963
            A8     10907
            A9     10815
Length: 930, dtype: int64

Orca支持經過agg一次應用多個聚合函數。和pandas不一樣,Orca在agg中使用字符串來表示要調用的聚合函數。例如,對計算trades中天天價格的最大值、最小值和均值:

>>> trades.groupby(trades.trade_time.dt.date)['price'].agg(["min","max","avg"])
               price                        
                 min         max         avg
trade_time                                  
2019-01-01  0.003263  499.999073  249.913612
2019-01-02  0.000468  499.999533  249.956874
2019-01-03  0.000054  499.999998  249.927257
2019-01-04  0.000252  499.999762  249.982737
2019-01-05  0.001907  499.999704  250.097487
2019-01-06  0.000318  499.999824  249.991605
2019-01-07  0.003196  499.999548  249.560505
2019-01-08  0.000216  499.996703  250.024405
2019-01-09  0.002635  499.998985  249.966446
2019-01-10  0.000725  499.996717  249.663324
2019-01-11  0.003140  499.998267  250.243786
2019-01-12  0.000105  499.998453  250.077061
2019-01-13  0.004297  499.999139  250.097489
2019-01-14  0.003510  499.999452  249.775830
2019-01-15  0.002501  499.999638  250.021218
2019-01-16  0.000451  499.998059  250.044059
2019-01-17  0.002359  499.998462  249.808932
2019-01-18  0.000104  499.999963  249.918651
2019-01-19  0.000999  499.998000  249.899495
2019-01-20  0.000489  499.999892  249.606668
2019-01-21  0.000729  499.999774  249.839876
2019-01-22  0.000834  499.999331  249.632037
2019-01-23  0.001982  499.999926  249.955031
2019-01-24  0.000323  499.993956  249.557851
2019-01-25  0.000978  499.999716  249.722053
2019-01-26  0.002582  499.998753  249.897519
2019-01-27  0.000547  499.999809  250.404666
2019-01-28  0.002729  499.998545  249.622289
2019-01-29  0.000487  499.999598  249.950167
2019-01-30  0.000811  499.999949  250.182493
2019-01-31  0.000801  499.999292  249.317517

Orca groupby支持過濾功能。和pandas不一樣,Orca中的過濾條件用字符串形式的表達式來表示,而不是lambda函數。

例如,返回trades中天天每隻股票均價大於200,而且記錄數大於11000的記錄:

>>> trades.groupby([trades.trade_time.dt.date,'sym'])['price'].filter("avg(price) > 200 and count(price) > 11000")
0        499.171179
1        375.553059
2        119.240890
3        370.198534
4          5.876941
            ...    
88416     37.872317
88417    373.259785
88418    435.154484
88419    436.163806
88420    428.455914
Length: 88421, dtype: float64

2.5 resample重採樣

Orca支持resample函數,能夠對常規時間序列數據從新採樣和頻率轉換。目前,resample函數的參數以下:

  • rule:DateOffset,能夠是字符串或者是dateoffset對象
  • on:時間列,採用該列進行重採樣
  • level:字符串或整數,對於MultiIndex,採用level指定的列進行重採樣

Orca支持的DateOffset以下:

'B':BDay or BusinessDay
'WOM':WeekOfMonth
'LWOM':LastWeekOfMonth
'M':MonthEnd
'MS':MonthBegin
'BM':BMonthEnd or BusinessMonthEnd
'BMS':BMonthBegin or BusinessMonthBegin
'SM':SemiMonthEnd
'SMS':SemiMonthBegin
'Q':QuarterEnd
'QS':QuarterBegin
'BQ':BQuarterEnd
'BQS':BQuarterBegin
'REQ':FY5253Quarter
'A':YearEnd
'AS' or 'BYS':YearBegin
'BA':BYearEnd
'BAS':BYearBegin
'RE':FY5253
'D':Day
'H':Hour
'T' or 'min':Minute
'S':Second
'L' or 'ms':Milli
'U' or 'us':Micro
'N':Nano

例如,對trades中的數據從新採樣,每3分鐘計算一次:

>>> trades.resample('3T', on='trade_time')['qty'].sum()
trade_time
2019-01-01 00:00:00    321063
2019-01-01 00:03:00    354917
2019-01-01 00:06:00    329419
2019-01-01 00:09:00    340880
2019-01-01 00:12:00    356612
                        ...  
2019-01-31 23:45:00    322829
2019-01-31 23:48:00    344753
2019-01-31 23:51:00    330959
2019-01-31 23:54:00    336712
2019-01-31 23:57:00    328730
Length: 14880, dtype: int64

若是trades設置了trade_time爲index,也能夠用如下方法從新採樣:

>>> trades.resample('3T', level='trade_time')['qty'].sum()

若是要用dateoffset函數生成的對象來表示dateoffset,須要先導入pandas的dateoffset。按3分鐘從新採樣也能夠使用如下寫法:

>>> from pandas.tseries.offsets import *
>>> ofst = Minute(n=3)
>>> trades.resample(ofst,on='trade_time')['qty'].sum()

2.6 rolling移動窗口

Orca提供了rolling函數,能夠在移動窗口中作計算。目前,rolling函數的參數以下:

  • window::整型,表示窗口的長度
  • on:字符串,根據該列來計算窗口

如下函數可用於orca.DataFrame.rolling對象:

  • count:返回非NULL元素的個數
  • sum:求和
  • min:最小值
  • max:最大值
  • std:標準差
  • var:方差
  • corr:相關性
  • covar:協方差
  • skew:傾斜度
  • kurtosis:峯度

對於分佈式表對應的DataFrame,在滑動窗口中計算時,是以分區爲單位單獨計算的,所以每一個分區的計算結果的前window-1個值爲空。例如,trades中2019.01.01和2019.01.02的數據在長度爲3的滑動窗口中求price的和:

>>> tmp = trades[(trades.trade_time.dt.date == '2019.01.01') | (trades.trade_time.dt.date == '2019.01.02')]
>>> re = tmp.rolling(window=3)['price'].sum()
0                 NaN
1                 NaN
2          792.386603
3          601.826312
4          444.858366
             ...     
646057    1281.099161
646058    1287.816045
646059     963.262163
646060     865.797011
646061     719.050068
Name: price, Length: 646062, dtype: float64

2.7 數據鏈接

Orca提供了鏈接DataFrame的功能。分佈式表對應的DataFrame,既能夠鏈接普通內存表對應的DataFrame,也能夠鏈接分佈式表對應的DataFrame。兩個分佈式表對應的DataFrame鏈接時必須同時知足如下條件:

  • 兩個分佈式表在同一個數據庫中
  • 鏈接列必須包含全部分區列

Orca提供了mergejoin函數。

merge函數支持如下參數:

  • right:Orca DataFrame或Series
  • how:字符串,表示鏈接的類型,能夠是left、right、outer和inner,默認值是inner
  • on:字符串,表示鏈接列
  • left_on:字符串,表示左表的鏈接列
  • right_on:字符串,表示右表的鏈接列
  • left_index:左表的索引
  • right_index:右表的索引
  • suffixes:字符串,表示重複列的後綴

join函數是merge函數的特例,它的參數及含義與merge基本相同,只是join默認爲左外鏈接,即how='left'。

例如,對trades和quotes進行內鏈接:

>>> quotes = orca.read_table('dfs://orca_stock','quotes')
>>> trades.merge(right=quotes, left_on=['trade_time','sym'], right_on=['trade_time','sym'], how='inner')
               trade_time  sym  qty       price         bid       offer
0     2019-01-01 02:36:34  A15  273  186.144261  317.458480  155.361661
1     2019-01-01 05:37:59  A13  185  420.397500  248.447426  115.722893
2     2019-01-01 00:59:43  A10  751   89.801687  193.925714  144.345473
3     2019-01-01 21:58:36  A16  175  251.753495  116.810807  439.178207
4     2019-01-01 10:53:54  A16  532   71.733640  240.927647  388.718680
...                   ...  ...  ...         ...         ...         ...
25035 2019-01-02 03:59:51   A3  220   50.004418  107.905522  167.375994
25036 2019-01-02 17:54:01   A3  202  195.189216  134.463906  142.443428
25037 2019-01-02 16:57:50   A9  627   68.661644  440.421876  110.801070
25038 2019-01-02 10:27:43  A28  414  487.337282  169.081363  261.171073
25039 2019-01-02 17:02:51   A3  661  243.960836   92.999404   26.747609

[25040 rows x 6 columns]

使用join函數對trades和quotes進行左外鏈接:

>>> trades.set_index(['trade_time','sym'], inplace=True)
>>> quotes.set_index(['trade_time','sym'], inplace=True)
>>> trades.join(quotes)
                         qty       price  bid  offer
trade_time          sym                             
2019-01-01 18:04:25 A14  435  378.595626  NaN    NaN
2019-01-01 20:38:47 A13  701  275.039372  NaN    NaN
2019-01-01 02:43:03 A16  787  138.751605  NaN    NaN
2019-01-01 20:32:42 A14  989  188.035335  NaN    NaN
2019-01-01 16:59:16 A13  847  118.071427  NaN    NaN
...                      ...         ...  ...    ...
2019-01-31 17:21:27 A30    3   49.855063  NaN    NaN
2019-01-31 13:49:01 A6   273  245.966115  NaN    NaN
2019-01-31 16:42:29 A7   548  197.814548  NaN    NaN
2019-01-31 03:42:11 A5   563  263.999224  NaN    NaN
2019-01-31 20:48:57 A9   809  318.420522  NaN    NaN

[10000481 rows x 4 columns]

3 把dataframe追加到dfs表

Orca提供了append函數,能夠將Orca DataFrame追加到dfs表中。

append函數具備如下參數:

  • other:要追加的DataFrame
  • ignore_index:布爾值,是否忽略索引。默認爲False
  • verify_integrity:布爾值。默認爲False
  • sort:布爾值,表示是否排序。默認爲None
  • inplace:布爾值,表示是否插入到dfs表。默認爲False

例如,往dataframe追加到trades對應的分佈式表:

>>> import pandas as pd
>>> odf=orca.DataFrame({'trade_time':pd.date_range('20190101 12:30',periods=5,freq='T'),
                   'sym':['A1','A2','A3','A4','A5'],
                   'qty':[100,200,300,400,500],
                   'price':[100.5,263.1,254.9,215.1,245.6]})
>>> trades.append(odf,inplace=True)
>>> len(trades)
10000005

Orca擴展了append函數,支持inplace參數,即容許就地添加數據。若是inplace爲False,表現和pandas相同。分佈式表中的內容會複製到內存中,此時trades對應的只是一個內存表,odf中的內容只追加到內存表,沒有真正地追加到dfs表。

4 小結

對於分佈式表,目前Orca還具備一些功能上的限制,例如分區表對應的DataFrame沒有RangeIndex的概念、一些函數不支持在分佈式表上使用以及修改表中數據的限制等。具體請參考Orca快速入門指導

相關文章
相關標籤/搜索