乾貨丨如何使用時序數據庫DolphinDB處理Tushare金融數據

本教程將介紹如何把Tushare的滬深股票2008年到2017年的日線行情數據和每日指標數據導入到 DolphinDB database,並使用DolphinDB進行金融分析。Tushare是金融大數據開放社區,擁有豐富的金融數據,如股票、基金、期貨、數字貨幣等行情數據,爲量化從業人員和金融相關研究人員免費提供金融數據。php

DolphinDB是新一代的時序數據庫,不只能夠做爲分佈式數據倉庫或者內存數據庫來使用,並且自帶豐富的計算工具,能夠做爲研究工具或研究平臺來使用,適用於量化金融、物聯網等領域的海量數據分析。量化金融領域的很多問題,如交易信號研究、策略回測、交易成本分析、股票相關性研究、市場風險控制等,均可以用DolphinDB來解決。python

1. 數據概況

Tushare提供的滬深股票日線行情數據包含如下字段:ios

名稱	        描述ts_code	        股票代碼trade_date	交易日期open	        開盤價high	        最高價low	        最低價close	        收盤價pre_close	昨收價change	        漲跌額pct_change	漲跌幅vol	        成交量(手)amount	        成交額(千元)

每日指標數據包含如下字段:git

名稱	        描述ts_code	        股票代碼trade_date	交易日期close	        收盤價turnover_rate	換手率turnover_rate_f	換手率(自由流通股)volume_ratio	量比pe	        市盈率(總市值/淨利潤)pe_ttm	        市盈率(TTM)pb	        市淨率(總市值/淨資產)ps	        市銷率ps_ttm	        市銷率(TTM)total_share	總股本(萬)float_share	流通股本(萬)free_share	自由流通股本(萬)total_mv	總市值(萬元)cric_mv	        流通市值(萬元)

2. 建立DolphinDB數據庫

2.1 安裝DolphinDBgithub

官網下載DolphinDB安裝包和DolphinDB GUI.sql

DolphinDB單節點部署請參考單節點部署數據庫

DolphinDB單服務器集羣部署請參考單服務器集羣部署編程

DolphinDB多物理服務器部署請參考多服務器集羣部署api

2.2 建立數據庫瀏覽器

咱們可使用database函數建立分區數據庫。

語法:database(directory, [partitionType], [partitionScheme], [locations])

參數

directory:數據庫保存的目錄。DolphinDB有三種類型的數據庫,分別是內存數據庫、磁盤上的數據庫和分佈式文件系統上的數據庫。建立內存數據庫,directory爲空;建立本地數據庫,directory應該是本地文件系統目錄;建立分佈式文件系統上的數據庫,directory應該以「dfs://」開頭。本教程使用分佈式文件系統上的數據庫。

partitionType:分區方式,有6種方式: 順序分區(SEQ),範圍分區(RANGE),哈希分區(HASH),值分區(VALUE),列表分區(LIST),複合分區(COMPO)。

partitionScheme:分區方案。各類分區方式對應的分區方案以下:

93d63e54933b47202b940702c588cc66.jpeg

導入數據前,要作好數據的分區規劃,主要考慮兩個因素:分區字段和分區粒度。

在平常的查詢分析中,按照日期查詢的頻率最高,因此分區字段爲日期trade_date。若是一天一個分區,每一個分區的數據量過少,只有3000多條數據,不到1兆大小,並且分區數量很是多。分佈式系統在執行查詢時,會把查詢語句分紅多個子任務發送到不一樣的分區。這樣的分區方式會致使子任務數量很是多,而每一個任務執行的時間極短,系統在管理任務上耗費的時間反而大於任務自己的執行時間,明顯這樣的分區方式是不合理。這種狀況下,咱們按日期範圍進行分區,每一年的1月1日到次年的1月1日爲一個分區,這樣既能提高查詢的效率,也不會形成分區粒度太小。

現有數據的時間跨度是2008-2017年,可是爲了給將來的數據留出足夠的空間,咱們把時間範圍設置爲2008-2030年。執行如下代碼:

yearRange=date(2008.01M + 12*0..22)

因爲日線行情和每日指標數據的分區方案相同,所以把它們存放在同一個數據庫dfs://tushare的兩個表中,hushen_daily_line用於存放日線行情數據,hushen_daily_indicator用於存放每日指標數據。若是須要使用內存數據庫,建立數據庫時把directory設爲空;若是須要使用磁盤上的數據庫,把directory設置爲磁盤目錄便可。建立數據庫的代碼以下:

login("admin","123456")
dbPath="dfs://tushare"yearRange=date(2008.01M + 12*0..22)if(existsDatabase(dbPath)){
	dropDatabase(dbPath)
}
columns1=`ts_code`trade_date`open`high`low`close`pre_close`change`pct_change`vol`amount
type1=`SYMBOL`NANOTIMESTAMP`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE
db=database(dbPath,RANGE,yearRange)
hushen_daily_line=db.createPartitionedTable(table(100000000:0,columns1,type1),`hushen_daily_line,`trade_date)

columns2=`ts_code`trade_date`close`turnover_rate`turnover_rate_f`volume_ratio`pe`pr_ttm`pb`ps`ps_ttm`total_share`float_share`free_share`total_mv`circ_mv
type2=`SYMBOL`NANOTIMESTAMP`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE
hushen_daily_indicator=db.createPartitionedTable(table(100000000:0,columns2,type2),`hushen_daily_indicator,`trade_date)

3. 使用Python API把數據導入到DolphinDB

Tushare提供了兩種經常使用的數據調取方式:

  • 經過Tushare Python包,返回的是python dataframe類型數據。
  • 經過http協議直接獲取,返回的是Json格式數據。

本教程使用了第一種方法調取滬深股票2008年到2017年10年的日線行情數據和每日指標數據。

3.1 下載安裝Python3.X和Tushare

具體教程請參考Tushare官網

3.2 安裝DolphinDB的Python3 API

官網下載Python3 API,把Python3 API的安裝包解壓至任意目錄。在console中進入該目錄,執行如下命令:

python setup.py install

使用如下命令更新Python API:

python setup.py install --force

3.3 數據導入

咱們分別使用Tushare Python包的daily和daily_basic接口調取日線行情和每日指標數據,返回的是Python Dataframe類型數據。注意,須要註冊Tushare帳號才能獲取token。接着,經過Python API,鏈接到IP爲localhost,端口號爲8941的DolphinDB數據節點(這裏的數據節點IP與端口根據本身集羣的狀況進行修改),把Tushare返回的Dataframe數據分別追加到以前建立的DolphinDB DFS Table中。

具體的Python代碼以下:

import datetimeimport tushare as tsimport pandas as pdimport numpy as npimport dolphindb as ddb
pro=ts.pro_api('YOUR_TOKEN')
s=ddb.session()
s.connect("localhost",8941,"admin","123456")
t1=s.loadTable(tableName="hushen_daily_line",dbPath="dfs://tushare")
t2=s.loadTable(tableName="hushen_daily_indicator",dbPath="dfs://tushare")def dateRange(beginDate,endDate):
    dates=[]
    dt=datetime.datetime.strptime(beginDate,"%Y%m%d")
    date=beginDate[:]    while date <= endDate:
        dates.append(date)
        dt=dt + datetime.timedelta(1)
        date=dt.strftime("%Y%m%d")    return datesfor dates in dateRange('20080101','20171231'):
df=pro.daily(trade_date=dates)
df['trade_date']=pd.to_datetime(df['trade_date'])if len(df):
    t1.append(s.table(data=df))
    print(t1.rows)for dates in dateRange('20080101','20171231'):
ds=pro.daily_basic(trade_date=dates)
ds['trade_date']=pd.to_datetime(ds['trade_date'])
ds['volume_ratio']=np.float64(ds['volume_ratio'])if len(ds):
    t2.append(s.table(data=ds))
    print(t2.rows)

數據導入成功後,咱們能夠從DolphinDB GUI右下角的變量瀏覽器中看到兩個表的分區狀況:

aabd97b638fc369e09d5a04ab6bce820.png

查看數據量:

select count(*) from hushen_daily_line5,332,932select count(*) from hushen_daily_indicator5,333,321

至此,咱們已經把滬深股票2008年-2017年的日線行情和每日指標數據所有導入到DolphinDB中。

4. 金融分析

DolphinDB將數據庫、編程語言和分佈式計算融合在一塊兒,不只能夠用做數據倉庫,還能夠用做計算和分析工具。DolphinDB內置了許多通過優化的時間序列函數,特別適用於投資銀行、對衝基金和交易所的定量查詢和分析,能夠用於構建基於歷史數據的策略測試。下面介紹如何使用Tushare的數據進行金融分析。

4.1 計算每隻股票滾動波動率

daily_line= loadTable("dfs://daily_line","hushen_daily_line")t=select ts_code,trade_date,mstd(pct_change/100,21) as mvol from daily_line context by ts_codeselect * from t where trade_date=2008.11.14ts_code	        trade_date	mvol000001.SZ	2008.11.14	0.048551000002.SZ	2008.11.14	0.04565000004.SZ	2008.11.14	0.030721000005.SZ	2008.11.14	0.046655000006.SZ	2008.11.14	0.043092000008.SZ	2008.11.14	0.035764000009.SZ	2008.11.14	0.051113000010.SZ	2008.11.14	0.027254...

計算每隻股票一個月的滾動波動率,僅需一行代碼。DolphinDB自帶金融基因,內置了大量與金融相關的函數,能夠用簡單的代碼計算金融指標。

4.2 找到最相關的股票

使用滬深股票日線行情數據,計算股票的兩兩相關性。首先,生成股票回報矩陣:

retMatrix=exec pct_change/100 as ret from loadTable("dfs://daily_line","hushen_daily_line") pivot by trade_date,ts_code

exec和pivot by是DolphinDB編程語言的特色之一。exec與select的用法相同,可是select語句生成的是表,exec語句生成的是向量。pivot by用於整理維度,與exec一塊兒使用時會生成一個矩陣。

接着,生成股票相關性矩陣:

corrMatrix=cross(corr,retMatrix,retMatrix)

上面使用到的cross是DolphinDB中的高階函數,它以函數和對象做爲輸入內容,把函數應用到每一個對象上。模板函數在複雜的批量計算中很是有用。

而後,找到每隻股票相關性最高的10只股票:

mostCorrelated = select * from table(corrMatrix).rename!(`ts_code`corr_ts_code`corr) context by ts_code having rank(corr,false) between 1:10

查找與000001.SZ相關性最高的10只股票:

select * from mostCorrelated where ts_code="000001.SZ" order by cor descts_code	        corr_ts_code	corr000001.SZ	601166.SH	0.859000001.SZ	600000.SH	0.8406000001.SZ	002920.SZ	0.8175000001.SZ	600015.SH	0.8153000001.SZ	600036.SH	0.8129000001.SZ	600016.SH	0.8022000001.SZ	002142.SZ	0.7956000001.SZ	601169.SH	0.7882000001.SZ	601009.SH	0.7778000001.SZ	601328.SH	0.7736

上面兩個示例都比較簡單,下面咱們進行復雜的計算。

4.3 構建World Quant Alpha #001和#98

WorldQuant LLC發表的論文101 Formulaic Alphas中給出了101個Alpha因子公式。不少我的和機構嘗試用不一樣的語言來實現這101個Alpha因子。本文中,咱們例舉了較爲簡單的Alpha #001和較爲複雜的Alpha #098兩個因子的實現。

Alpha#001公式:rank(Ts_ArgMax(SignedPower((returns<0?stddev(returns,20):close), 2), 5))-0.5

Alpha #001的詳細解讀能夠參考【史上最詳細】WorldQuant Alpha 101因子系列#001研究

Alpha#98公式:(rank(decay_linear(correlation(vwap, sum(adv5,26.4719), 4.58418), 7.18088))- rank(decay_linear(Ts_Rank(Ts_ArgMin(correlation(rank(open), rank(adv15), 20.8187), 8.62571), 6.95668) ,8.07206)))

這兩個因子在計算時候既用到了cross sectional的信息,也用到了大量時間序列的計算。也即在計算某個股票某一天的因子時,既要用到該股票的歷史數據,也要用到當天全部股票的信息,因此計算量很大。

構建這兩個因子,須要包含如下字段:股票代碼、日期、成交量、成交量的加權平均價格、開盤價和收盤價。其中,成交量的加權平均價格能夠經過收盤價和成交量計算得出。所以,日線行情的數據能夠用於構建這兩個因子。

構建因子的代碼以下:

def alpha1(stock){
	t= select trade_date,ts_code,mimax(pow(iif(ratios(close) < 1.0, mstd(ratios(close) - 1, 20),close), 2.0), 5) as maxIndex from stock context by ts_code	return select trade_date,ts_code,rank(maxIndex) - 0.5 as A1 from t context by trade_date
}def alpha98(stock){
	t = select ts_code,trade_date, wavg(close,vol) as vwap, open, mavg(vol, 5) as adv5, mavg(vol,15) as adv15 from stock context by ts_code
	update t set rank_open = rank(open), rank_adv15 = rank(adv15) context by trade_date
	update t set decay7 = mavg(mcorr(vwap, msum(adv5, 26), 5), 1..7), decay8 = mavg(mrank(9 - mimin(mcorr(rank_open, rank_adv15, 21), 9), true, 7), 1..8) context by ts_code	return select ts_code,trade_date, rank(decay7)-rank(decay8) as A98 from t context by trade_date
}

構建Alpha #001僅用了2行核心代碼,Alpha #98僅用了4行核心代碼,而且全部核心代碼都是用SQL實現,可讀性很是好。SQL中最關鍵的功能是context by子句實現的分組計算功能。context by是DolphinDB對標準SQL的擴展。與group by每一個組產生一行記錄不一樣,context by會輸出跟輸入相同行數的記錄,因此咱們能夠方便的進行多個函數嵌套。cross sectional計算時,咱們用trade_date分組。時間序列計算時,咱們用ts_code分組。與傳統的分析語言Matlab、SAS不一樣,DolphinDB腳本語言與分佈式數據庫和分佈式計算緊密集成,表達能力強,高性能易擴展,可以知足快速開發和建模的須要。

查看結果:

select * from alpha1(stock1) where trade_date=2017.07.06trade_date	ts_code	    A12017.07.06	000001.SZ	252.52017.07.06	000002.SZ	1,103.52017.07.06	000004.SZ	252.52017.07.06	000005.SZ	252.52017.07.06	000006.SZ	1,103.52017.07.06	000008.SZ	1,972.52017.07.06	000009.SZ	1,103.52017.07.06	000010.SZ	1,103.52017.07.06	000011.SZ	1,103.5...select * from alpha98(stock1) where trade_date=2017.07.19ts_code	trade_date	A98000001.SZ	2017.07.19	(1,073)000002.SZ	2017.07.19	(1,270)000004.SZ	2017.07.19	(1,805)000005.SZ	2017.07.19	224000006.SZ	2017.07.19	791000007.SZ	2017.07.19	(609)000008.SZ	2017.07.19	444000009.SZ	2017.07.19	(1,411)000010.SZ	2017.07.19	(1,333)...

使用單線程計算,Alpha #001耗時僅4秒,複雜的Alpha #98耗時僅5秒,性能極佳。

4.4 動量交易策略

動量策略是投資界最流行的策略之一。通俗地講,動量策略就是「追漲殺跌」,買漲得厲害的,賣跌得厲害的。下面將介紹如何在DolphinDB中測試動量交易策略。

最經常使用的動量因素是過去一年扣除最近一個月的收益率。動量策略一般是一個月調整一次,而且持有期也是一個月。本教程中,天天調整1/21的投資組合,並持有新的投資組合21天。

要測試動量交易策略,須要包含如下字段的數據:股票代碼、日期、每股價格(收盤價格)、流通市值、股票日收益和每日交易量。

顯然,只有日線行情的數據是不夠的,咱們須要鏈接hushen_daily_line和hushen_daily_indicator兩個表。

經過equal join,從兩個表中選擇須要的字段:

daily_line=loadTable(「dfs://daily_line」,」hushen_daily_line」)
daily_indicator=loadTable(「dfs://daily_indicator」,」hushen_daily_indicator」)
s=select ts_code,trade_date,close,change,pre_close,vol,amount,turnover_rate,total_share,float_share,free_share,total_mv,circ_mv from ej(daily_line,daily_indicator,`ts_code`trade_date)

(1)對數據進行清洗和過濾,爲每隻股票構建過去一年扣除最近一個月收益率的動量信號。

def loadPriceData(inData){
	stocks = select ts_code, trade_date, close,change/pre_close as ret, circ_mv from inData where weekday(trade_date) between 1:5, isValid(close), isValid(vol) order by ts_code, trade_date
	stocks = select ts_code, trade_date,close,ret,circ_mv, cumprod(1+ret) as cumretIndex from stocks context by ts_code	return select ts_code, trade_date, close, ret, circ_mv, move(cumretIndex,21)\move(cumretIndex,252)-1 as signal from stocks context by ts_code 
}
priceData = loadPriceData(s)

(2)生成投資組合

選擇知足如下條件的流通股:動量信號完好失、當天的交易量爲正、市值超過1億元以及每股價格超過5元。

def genTradables(indata){	return select trade_date, ts_code, circ_mv, signal from indata where close>5, circ_mv>10000, vol>0, isValid(signal) order by trade_date
}
tradables = genTradables(priceData)

根據天天的動量信號,產生10組流通股票。只保留兩個最極端的羣體(贏家和輸家)。假設在21天內,天天老是多頭1元和空頭1元,因此咱們天天在贏家組多頭1/21,在輸家組天天空頭1/21。在每組中,咱們可使用等權重或值權重,來計算投資組合造成日期上每一個股票的權重。

//WtScheme=1表示等權重;WtScheme=2表示值權重def formPortfolio(startDate, endDate, tradables, holdingDays, groups, WtScheme){
	ports = select date(trade_date) as trade_date, ts_code, circ_mv, rank(signal,,groups) as rank, count(ts_code) as symCount, 0.0 as wt from tradables where date(trade_date) between startDate:endDate context by trade_date having count(ts_code)>=100
	if (WtScheme==1){
		update ports set wt = -1.0\count(ts_code)\holdingDays where rank=0 context by trade_date
		update ports set wt = 1.0\count(ts_code)\holdingDays where rank=groups-1 context by trade_date
	}	else if (WtScheme==2){
		update ports set wt = -circ_mv\sum(circ_mv)\holdingDays where rank=0 context by trade_date
		update ports set wt = circ_mv\sum(circ_mv)\holdingDays where rank=groups-1 context by trade_date
	}	return select ts_code, trade_date as tranche, wt from ports where wt != 0 order by ts_code, trade_date
}
startDate=2008.01.01endDate=2018.01.01 holdingDays=21groups=10ports = formPortfolio(startDate, endDate, tradables, holdingDays, groups, 2)
dailyRtn = select date(trade_date) as trade_date, ts_code, ret as dailyRet from priceData where date(trade_date) between startDate:endDate

(3)計算投資組合中每隻股票接下來21天的利潤或損失。在投資組合造成後的21天關停投資組合。

def calcStockPnL(ports, dailyRtn, holdingDays, endDate, lastDays){
	ages = table(1..holdingDays as age)
	dates = sort distinct ports.tranche
        dictDateIndex = dict(dates, 1..dates.size())
        dictIndexDate = dict(1..dates.size(), dates)
	pos = select dictIndexDate[dictDateIndex[tranche]+age] as date, ts_code, tranche, age, take(0.0,size age) as ret, wt as expr, take(0.0,size age) as pnl from cj(ports,ages) where isValid(dictIndexDate[dictDateIndex[tranche]+age]), dictIndexDate[dictDateIndex[tranche]+age]<=min(lastDays[ts_code], endDate)

	update pos set ret = dailyRet from ej(pos, dailyRtn,`date`ts_code,`trade_date`ts_code)
	update pos set expr = expr*cumprod(1+ret) from pos context by ts_code, tranche
	update pos set pnl = expr*ret/(1+ret)	return pos
}
lastDaysTable = select max(date(trade_date)) as date from priceData group by ts_code
lastDays = dict(lastDaysTable.ts_code, lastDaysTable.date)
stockPnL = calcStockPnL(ports, dailyRtn, holdingDays, endDate, lastDays)

(4)計算投資組合的利潤或損失,並繪製動量策略累計回報走勢圖。

portPnL = select sum(pnl) as pnl from stockPnL group by date order by dateplot(cumsum(portPnL.pnl) as cumulativeReturn,portPnL.date, "Cumulative Returns of the Momentum Strategy")

下面是滬深股票2008年到2017年的回測結果。回測時,天天產生一個新的tranche,持有21天。使用單線程計算,耗時僅6秒。

afdd0dd911a691a53adade2622ea5607.jpeg

若是使用Pandas來處理金融數據,對內存的要求較高,內存使用峯值通常是數據的3-4倍,隨着數據的積累,pandas的內存佔用問題會愈來愈明顯。在性能上,pandas在多線程處理方面比較弱,不能充分利用多核CPU的計算能力,而且pandas不能根據業務字段對數據進行分區,也不支持列式存儲,查詢數據時必須全表掃描,效率不高。

5. 總結

做爲數據庫,DolphinDB支持單表PB級存儲和靈活的分區方式;做爲研究平臺,DolphinDB不只功能豐富,支持快速的數據清洗、高效的數據導入、交互式分析、庫內分析,流計算框架和離線計算支持生產環境代碼重用,並且性能極佳,即便面對龐大數據集,仍能夠輕鬆實現秒級毫秒級的低延時交互分析。另外,DolphinDB對用戶十分友好,提供了豐富的編程接口,如Python、C++、Java、C#、R等編程API和Excel的add-in插件、ODBC、JDBC插件,還提供了功能強大的集成開發工具,支持圖形化數據顯示,讓實驗結果更加直觀易於理解。

相關文章
相關標籤/搜索