乾貨丨如何用時序數據庫處理Tushare金融數據?

本文將介紹如何把Tushare的滬深股票2008年到2017年的日線行情數據和每日指標數據導入到 DolphinDB database,並使用DolphinDB進行金融分析。Tushare是金融大數據開放社區,擁有豐富的金融數據,如股票、基金、期貨、數字貨幣等行情數據,爲量化從業人員和金融相關研究人員免費提供金融數據。php

DolphinDB  是新一代的時序數據庫,不只能夠做爲分佈式數據倉庫或者內存數據庫來使用,並且自帶豐富的計算工具,能夠做爲研究工具或研究平臺來使用,很是適用於量化金融、物聯網等領域的海量數據分析。量化金融領域的很多問題,如交易信號研究、策略回測、交易成本分析、股票相關性研究、市場風險控制等,均可以用DolphinDB來解決。python

1. 數據概況

Tushare提供的滬深股票日線行情數據包含如下字段:ios

名稱	        描述
ts_code	        股票代碼
trade_date	交易日期
open	        開盤價
high	        最高價
low	        最低價
close	        收盤價
pre_close	昨收價
change	        漲跌額
pct_change	漲跌幅
vol	        成交量(手)
amount	        成交額(千元)

每日指標數據包含如下字段:git

名稱	        描述
ts_code	        股票代碼
trade_date	交易日期
close	        收盤價
turnover_rate	換手率
turnover_rate_f	換手率(自由流通股)
volume_ratio	量比
pe	        市盈率(總市值/淨利潤)
pe_ttm	        市盈率(TTM)
pb	        市淨率(總市值/淨資產)
ps	        市銷率
ps_ttm	        市銷率(TTM)
total_share	總股本(萬)
float_share	流通股本(萬)
free_share	自由流通股本(萬)
total_mv	總市值(萬元)
cric_mv	        流通市值(萬元)

2. 建立DolphinDB數據庫

2.1 安裝DolphinDBgithub

官網下載DolphinDB安裝包和DolphinDB GUI.數據庫

DolphinDB單節點部署請參考單節點部署編程

DolphinDB單服務器集羣部署請參考單服務器集羣部署api

DolphinDB多物理服務器部署請參考多服務器集羣部署瀏覽器

2.2 建立數據庫服務器

咱們可使用database函數建立分區數據庫。

語法:database(directory, [partitionType], [partitionScheme], [locations])

參數

directory:數據庫保存的目錄。DolphinDB有三種類型的數據庫,分別是內存數據庫、磁盤上的數據庫和分佈式文件系統上的數據庫。建立內存數據庫,directory爲空;建立本地數據庫,directory應該是本地文件系統目錄;建立分佈式文件系統上的數據庫,directory應該以「dfs://」開頭。本教程使用分佈式文件系統上的數據庫。

partitionType:分區方式,有6種方式: 順序分區(SEQ),範圍分區(RANGE),哈希分區(HASH),值分區(VALUE),列表分區(LIST),複合分區(COMPO)。

partitionScheme:分區方案。各類分區方式對應的分區方案以下:

導入數據前,要作好數據的分區規劃,主要考慮兩個因素:分區字段和分區粒度。

在平常的查詢分析中,按照日期查詢的頻率最高,因此分區字段爲日期trade_date。若是一天一個分區,每一個分區的數據量過少,只有3000多條數據,不到1兆大小,並且分區數量很是多。分佈式系統在執行查詢時,會把查詢語句分紅多個子任務發送到不一樣的分區。這樣的分區方式會致使子任務數量很是多,而每一個任務執行的時間極短,系統在管理任務上耗費的時間反而大於任務自己的執行時間,明顯這樣的分區方式是不合理。這種狀況下,咱們按日期範圍進行分區,每一年的1月1日到次年的1月1日爲一個分區,這樣既能提高查詢的效率,也不會形成分區粒度太小。

現有數據的時間跨度是2008-2017年,可是爲了給將來的數據留出足夠的空間,咱們把時間範圍設置爲2008-2030年。執行如下代碼:

yearRange=date(2008.01M + 12*0..22)

因爲日線行情和每日指標數據的分區方案相同,所以把它們存放在同一個數據庫dfs://tushare的兩個表中,hushen_daily_line用於存放日線行情數據,hushen_daily_indicator用於存放每日指標數據。若是須要使用內存數據庫,建立數據庫時把directory設爲空;若是須要使用磁盤上的數據庫,把directory設置爲磁盤目錄便可。建立數據庫的代碼以下:

login("admin","123456")
dbPath="dfs://tushare"
yearRange=date(2008.01M + 12*0..22)
if(existsDatabase(dbPath)){
	dropDatabase(dbPath)
}
columns1=`ts_code`trade_date`open`high`low`close`pre_close`change`pct_change`vol`amount type1=`SYMBOL`NANOTIMESTAMP`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE
db=database(dbPath,RANGE,yearRange)
hushen_daily_line=db.createPartitionedTable(table(100000000:0,columns1,type1),`hushen_daily_line,`trade_date)

columns2=`ts_code`trade_date`close`turnover_rate`turnover_rate_f`volume_ratio`pe`pr_ttm`pb`ps`ps_ttm`total_share`float_share`free_share`total_mv`circ_mv
type2=`SYMBOL`NANOTIMESTAMP`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE
hushen_daily_indicator=db.createPartitionedTable(table(100000000:0,columns2,type2),`hushen_daily_indicator,`trade_date)

3. 使用Python API把數據導入到DolphinDB

Tushare提供了兩種經常使用的數據調取方式:

  • 經過Tushare Python包,返回的是python dataframe類型數據。
  • 經過http協議直接獲取,返回的是Json格式數據。

本教程使用了第一種方法調取滬深股票2008年到2017年10年的日線行情數據和每日指標數據。

3.1 下載安裝Python3.X和Tushare

具體教程請參考Tushare官網

3.2 安裝DolphinDB的Python3 API

官網下載Python3 API,把Python3 API的安裝包解壓至任意目錄。在console中進入該目錄,執行如下命令:

python setup.py install

使用如下命令更新Python API:

python setup.py install --force

3.3 數據導入

咱們分別使用Tushare Python包的daily和daily_basic接口調取日線行情和每日指標數據,返回的是Python Dataframe類型數據。注意,須要註冊Tushare帳號才能獲取token。接着,經過Python API,鏈接到IP爲localhost,端口號爲8941的DolphinDB數據節點(這裏的數據節點IP與端口根據本身集羣的狀況進行修改),把Tushare返回的Dataframe數據分別追加到以前建立的DolphinDB DFS Table中。

具體的Python代碼以下:

import datetime
import tushare as ts
import pandas as pd
import numpy as np
import dolphindb as ddb
pro=ts.pro_api('YOUR_TOKEN')
s=ddb.session()
s.connect("localhost",8941,"admin","123456")
t1=s.loadTable(tableName="hushen_daily_line",dbPath="dfs://tushare")
t2=s.loadTable(tableName="hushen_daily_indicator",dbPath="dfs://tushare")
def dateRange(beginDate,endDate):
    dates=[]
    dt=datetime.datetime.strptime(beginDate,"%Y%m%d")
    date=beginDate[:]
    while date <= endDate:
        dates.append(date)
        dt=dt + datetime.timedelta(1)
        date=dt.strftime("%Y%m%d")
    return dates

for dates in dateRange('20080101','20171231'):
df=pro.daily(trade_date=dates)
df['trade_date']=pd.to_datetime(df['trade_date'])
if len(df):
    t1.append(s.table(data=df))
    print(t1.rows)

for dates in dateRange('20080101','20171231'):
ds=pro.daily_basic(trade_date=dates)
ds['trade_date']=pd.to_datetime(ds['trade_date'])
ds['volume_ratio']=np.float64(ds['volume_ratio'])
if len(ds):
    t2.append(s.table(data=ds))
    print(t2.rows)

數據導入成功後,咱們能夠從DolphinDB GUI右下角的變量瀏覽器中看到兩個表的分區狀況:

查看數據量:

select count(*) from hushen_daily_line
5,332,932

select count(*) from hushen_daily_indicator
5,333,321

至此,咱們已經把滬深股票2008年-2017年的日線行情和每日指標數據所有導入到DolphinDB中。

4. 金融分析

DolphinDB將數據庫、編程語言和分佈式計算融合在一塊兒,不只能夠用做數據倉庫,還能夠用做計算和分析工具。DolphinDB內置了許多通過優化的時間序列函數,特別適用於投資銀行、對衝基金和交易所的定量查詢和分析,能夠用於構建基於歷史數據的策略測試。下面介紹如何使用Tushare的數據進行金融分析。

4.1 計算每隻股票滾動波動率

daily_line= loadTable("dfs://daily_line","hushen_daily_line")
t=select ts_code,trade_date,mstd(pct_change/100,21) as mvol from daily_line context by ts_code
select * from t where trade_date=2008.11.14
ts_code	        trade_date	mvol
000001.SZ	2008.11.14	0.048551
000002.SZ	2008.11.14	0.04565
000004.SZ	2008.11.14	0.030721
000005.SZ	2008.11.14	0.046655
000006.SZ	2008.11.14	0.043092
000008.SZ	2008.11.14	0.035764
000009.SZ	2008.11.14	0.051113
000010.SZ	2008.11.14	0.027254
...

計算每隻股票一個月的滾動波動率,僅需一行代碼。DolphinDB自帶金融基因,內置了大量與金融相關的函數,能夠用簡單的代碼計算金融指標。

4.2 找到最相關的股票

使用滬深股票日線行情數據,計算股票的兩兩相關性。首先,生成股票回報矩陣:

retMatrix=exec pct_change/100 as ret from loadTable("dfs://daily_line","hushen_daily_line") pivot by trade_date,ts_code

exec和pivot by是DolphinDB編程語言的特色之一。exec與select的用法相同,可是select語句生成的是表,exec語句生成的是向量。pivot by用於整理維度,與exec一塊兒使用時會生成一個矩陣。

接着,生成股票相關性矩陣:

corrMatrix=cross(corr,retMatrix,retMatrix)

上面使用到的cross是DolphinDB中的高階函數,它以函數和對象做爲輸入內容,把函數應用到每一個對象上。模板函數在複雜的批量計算中很是有用。

而後,找到每隻股票相關性最高的10只股票:

mostCorrelated = select * from table(corrMatrix).rename!(`ts_code`corr_ts_code`corr) context by ts_code having rank(corr,false) between 1:10

查找與000001.SZ相關性最高的10只股票:

select * from mostCorrelated where ts_code="000001.SZ" order by cor desc
ts_code	        corr_ts_code	corr
000001.SZ	601166.SH	0.859
000001.SZ	600000.SH	0.8406
000001.SZ	002920.SZ	0.8175
000001.SZ	600015.SH	0.8153
000001.SZ	600036.SH	0.8129
000001.SZ	600016.SH	0.8022
000001.SZ	002142.SZ	0.7956
000001.SZ	601169.SH	0.7882
000001.SZ	601009.SH	0.7778
000001.SZ	601328.SH	0.7736

上面兩個示例都比較簡單,下面咱們進行復雜的計算。

4.3 構建World Quant Alpha #001和#98

WorldQuant LLC發表的論文101 Formulaic Alphas中給出了101個Alpha因子公式。不少我的和機構嘗試用不一樣的語言來實現這101個Alpha因子。本文中,咱們例舉了較爲簡單的Alpha #001和較爲複雜的Alpha #098兩個因子的實現。

Alpha#001公式:rank(Ts_ArgMax(SignedPower((returns<0?stddev(returns,20):close), 2), 5))-0.5

Alpha #001的詳細解讀能夠參考【史上最詳細】WorldQuant Alpha 101因子系列#001研究

Alpha#98公式:(rank(decay_linear(correlation(vwap, sum(adv5,26.4719), 4.58418), 7.18088))- rank(decay_linear(Ts_Rank(Ts_ArgMin(correlation(rank(open), rank(adv15), 20.8187), 8.62571), 6.95668) ,8.07206)))

這兩個因子在計算時候既用到了cross sectional的信息,也用到了大量時間序列的計算。也即在計算某個股票某一天的因子時,既要用到該股票的歷史數據,也要用到當天全部股票的信息,因此計算量很大。

構建這兩個因子,須要包含如下字段:股票代碼、日期、成交量、成交量的加權平均價格、開盤價和收盤價。其中,成交量的加權平均價格能夠經過收盤價和成交量計算得出。所以,日線行情的數據能夠用於構建這兩個因子。

構建因子的代碼以下:

def alpha1(stock){
	t= select trade_date,ts_code,mimax(pow(iif(ratios(close) < 1.0, mstd(ratios(close) - 1, 20),close), 2.0), 5) as maxIndex from stock context by ts_code
	return select trade_date,ts_code,rank(maxIndex) - 0.5 as A1 from t context by trade_date
}

def alpha98(stock){
	t = select ts_code,trade_date, wavg(close,vol) as vwap, open, mavg(vol, 5) as adv5, mavg(vol,15) as adv15 from stock context by ts_code
	update t set rank_open = rank(open), rank_adv15 = rank(adv15) context by trade_date
	update t set decay7 = mavg(mcorr(vwap, msum(adv5, 26), 5), 1..7), decay8 = mavg(mrank(9 - mimin(mcorr(rank_open, rank_adv15, 21), 9), true, 7), 1..8) context by ts_code
	return select ts_code,trade_date, rank(decay7)-rank(decay8) as A98 from t context by trade_date
}

構建Alpha #001僅用了2行核心代碼,Alpha #98僅用了4行核心代碼,而且全部核心代碼都是用SQL實現,可讀性很是好。SQL中最關鍵的功能是context by子句實現的分組計算功能。context by是DolphinDB對標準SQL的擴展。與group by每一個組產生一行記錄不一樣,context by會輸出跟輸入相同行數的記錄,因此咱們能夠方便的進行多個函數嵌套。cross sectional計算時,咱們用trade_date分組。時間序列計算時,咱們用ts_code分組。與傳統的分析語言Matlab、SAS不一樣,DolphinDB腳本語言與分佈式數據庫和分佈式計算緊密集成,表達能力強,高性能易擴展,可以知足快速開發和建模的須要。

查看結果:

select * from alpha1(stock1) where trade_date=2017.07.06
trade_date	ts_code	    A1
2017.07.06	000001.SZ	252.5
2017.07.06	000002.SZ	1,103.5
2017.07.06	000004.SZ	252.5
2017.07.06	000005.SZ	252.5
2017.07.06	000006.SZ	1,103.5
2017.07.06	000008.SZ	1,972.5
2017.07.06	000009.SZ	1,103.5
2017.07.06	000010.SZ	1,103.5
2017.07.06	000011.SZ	1,103.5
...

select * from alpha98(stock1) where trade_date=2017.07.19
ts_code	trade_date	A98
000001.SZ	2017.07.19	(1,073)
000002.SZ	2017.07.19	(1,270)
000004.SZ	2017.07.19	(1,805)
000005.SZ	2017.07.19	224
000006.SZ	2017.07.19	791
000007.SZ	2017.07.19	(609)
000008.SZ	2017.07.19	444
000009.SZ	2017.07.19	(1,411)
000010.SZ	2017.07.19	(1,333)
...

使用單線程計算,Alpha #001耗時僅4秒,複雜的Alpha #98耗時僅5秒,性能極佳。

4.4 動量交易策略

動量策略是投資界最流行的策略之一。通俗地講,動量策略就是「追漲殺跌」,買漲得厲害的,賣跌得厲害的。下面將介紹如何在DolphinDB中測試動量交易策略。

最經常使用的動量因素是過去一年扣除最近一個月的收益率。動量策略一般是一個月調整一次,而且持有期也是一個月。本教程中,天天調整1/21的投資組合,並持有新的投資組合21天。

要測試動量交易策略,須要包含如下字段的數據:股票代碼、日期、每股價格(收盤價格)、流通市值、股票日收益和每日交易量。

顯然,只有日線行情的數據是不夠的,咱們須要鏈接hushen_daily_line和hushen_daily_indicator兩個表。

經過equal join,從兩個表中選擇須要的字段:

daily_line=loadTable(「dfs://daily_line」,」hushen_daily_line」)
daily_indicator=loadTable(「dfs://daily_indicator」,」hushen_daily_indicator」)
s=select ts_code,trade_date,close,change,pre_close,vol,amount,turnover_rate,total_share,float_share,free_share,total_mv,circ_mv from ej(daily_line,daily_indicator,`ts_code`trade_date)

(1)對數據進行清洗和過濾,爲每隻股票構建過去一年扣除最近一個月收益率的動量信號。

def loadPriceData(inData){
	stocks = select ts_code, trade_date, close,change/pre_close as ret, circ_mv from inData where weekday(trade_date) between 1:5, isValid(close), isValid(vol) order by ts_code, trade_date stocks = select ts_code, trade_date,close,ret,circ_mv, cumprod(1+ret) as cumretIndex from stocks context by ts_code
	return select ts_code, trade_date, close, ret, circ_mv, move(cumretIndex,21)\move(cumretIndex,252)-1 as signal from stocks context by ts_code 
}
priceData = loadPriceData(s)

(2)生成投資組合

選擇知足如下條件的流通股:動量信號完好失、當天的交易量爲正、市值超過1億元以及每股價格超過5元。

def genTradables(indata){
	return select trade_date, ts_code, circ_mv, signal from indata where close>5, circ_mv>10000, vol>0, isValid(signal) order by trade_date
}
tradables = genTradables(priceData)

根據天天的動量信號,產生10組流通股票。只保留兩個最極端的羣體(贏家和輸家)。假設在21天內,天天老是多頭1元和空頭1元,因此咱們天天在贏家組多頭1/21,在輸家組天天空頭1/21。在每組中,咱們可使用等權重或值權重,來計算投資組合造成日期上每一個股票的權重。

//WtScheme=1表示等權重;WtScheme=2表示值權重
def formPortfolio(startDate, endDate, tradables, holdingDays, groups, WtScheme){
	ports = select date(trade_date) as trade_date, ts_code, circ_mv, rank(signal,,groups) as rank, count(ts_code) as symCount, 0.0 as wt from tradables where date(trade_date) between startDate:endDate context by trade_date having count(ts_code)>=100
	if (WtScheme==1){
		update ports set wt = -1.0\count(ts_code)\holdingDays where rank=0 context by trade_date
		update ports set wt = 1.0\count(ts_code)\holdingDays where rank=groups-1 context by trade_date
	}
	else if (WtScheme==2){
		update ports set wt = -circ_mv\sum(circ_mv)\holdingDays where rank=0 context by trade_date
		update ports set wt = circ_mv\sum(circ_mv)\holdingDays where rank=groups-1 context by trade_date
	}
	return select ts_code, trade_date as tranche, wt from ports where wt != 0 order by ts_code, trade_date
}
startDate=2008.01.01
endDate=2018.01.01 
holdingDays=21
groups=10
ports = formPortfolio(startDate, endDate, tradables, holdingDays, groups, 2)
dailyRtn = select date(trade_date) as trade_date, ts_code, ret as dailyRet from priceData where date(trade_date) between startDate:endDate

(3)計算投資組合中每隻股票接下來21天的利潤或損失。在投資組合造成後的21天關停投資組合。

def calcStockPnL(ports, dailyRtn, holdingDays, endDate, lastDays){
	ages = table(1..holdingDays as age)
	dates = sort distinct ports.tranche
        dictDateIndex = dict(dates, 1..dates.size())
        dictIndexDate = dict(1..dates.size(), dates)
	pos = select dictIndexDate[dictDateIndex[tranche]+age] as date, ts_code, tranche, age, take(0.0,size age) as ret, wt as expr, take(0.0,size age) as pnl from cj(ports,ages) where isValid(dictIndexDate[dictDateIndex[tranche]+age]), dictIndexDate[dictDateIndex[tranche]+age]<=min(lastDays[ts_code], endDate)

	update pos set ret = dailyRet from ej(pos, dailyRtn,`date`ts_code,`trade_date`ts_code) update pos set expr = expr*cumprod(1+ret) from pos context by ts_code, tranche
	update pos set pnl = expr*ret/(1+ret)
	return pos
}
lastDaysTable = select max(date(trade_date)) as date from priceData group by ts_code lastDays = dict(lastDaysTable.ts_code, lastDaysTable.date)
stockPnL = calcStockPnL(ports, dailyRtn, holdingDays, endDate, lastDays)

(4)計算投資組合的利潤或損失,並繪製動量策略累計回報走勢圖。

portPnL = select sum(pnl) as pnl from stockPnL group by date order by date
plot(cumsum(portPnL.pnl) as cumulativeReturn,portPnL.date, "Cumulative Returns of the Momentum Strategy")

下面是滬深股票2008年到2017年的回測結果。回測時,天天產生一個新的tranche,持有21天。使用單線程計算,耗時僅6秒。

若是使用Pandas來處理金融數據,對內存的要求較高,內存使用峯值通常是數據的3-4倍,隨着數據的積累,pandas的內存佔用問題會愈來愈明顯。在性能上,pandas在多線程處理方面比較弱,不能充分利用多核CPU的計算能力,而且pandas不能根據業務字段對數據進行分區,也不支持列式存儲,查詢數據時必須全表掃描,效率不高。

5. 總結

做爲數據庫,DolphinDB支持單表PB級存儲和靈活的分區方式;做爲研究平臺,DolphinDB不只功能豐富,支持快速的數據清洗、高效的數據導入、交互式分析、庫內分析,流計算框架和離線計算支持生產環境代碼重用,並且性能極佳,即便面對龐大數據集,仍能夠輕鬆實現秒級毫秒級的低延時交互分析。另外,DolphinDB對用戶十分友好,提供了豐富的編程接口,如Python、C++、Java、C#、R等編程API和Excel的add-in插件、ODBC、JDBC插件,還提供了功能強大的集成開發工具,支持圖形化數據顯示,讓實驗結果更加直觀易於理解。

相關文章
相關標籤/搜索