一個量化策略在用於實際交易時,處理實時數據的程序一般爲事件驅動。而研發量化策略時,須要使用歷史數據進行回測,這時的程序一般不是事件驅動。所以同一個策略須要編寫兩套代碼,不只耗時並且容易出錯。在 DolphinDB database 中,用戶可將歷史數據按照時間順序以「實時數據」的方式導入流數據表中,這樣就可使用同一套代碼進行回測和實盤交易。html
DolphinDB的流數據處理框架採用發佈-訂閱-消費的模式。數據生產者將實時數據繼續地以流的形式發佈給全部數據訂閱者。訂閱者收到消息之後,可以使用自定義函數或者DolphinDB內置的聚合引擎來處理消息。DolphinDB流數據接口支持多種語言的API,包括C++, C#, Java, 和Python等。用戶可使用這些API來編寫更加複雜的處理邏輯,更好地與實際生產環境相結合。詳情請參考DolphinDB流數據教程。git
本文介紹replay和replayDS函數,而後使用金融數據展現數據回放的過程與應用場景。github
1. 函數介紹
replaysql
replay(inputTables, outputTables, [dateColumn], [timeColumn], [replayRate], [parallelLevel=1])
replay函數的做用是將若干表或數據源同時回放到相應的輸出表中。用戶須要指定輸入的數據表或數據源、輸出表、日期列、時間列、回放速度以及並行度。數據庫
replay函數參數概念以下:服務器
- inputTables: 單個表或包含若干表或數據源(見replayDS介紹)的元組。
- outputTables: 單個表或包含若干個表的元組,這些表一般爲流數據表。輸入表和輸出表的個數一致,且一一對應,每對輸入、輸出表的結構相同。
- dateColumn, timeColumn: string, 表示輸入表的日期和時間列,若不指定則默認第一列爲日期列。若輸入表中時間列同時包含日期和時間,須要將dateColumn和timeColumn設爲同一列。回放時,系統將根據dateColumn和timeColumn的設定,決定回放的最小時間精度。在此時間精度下,同一時刻的數據將在相同批次輸出。好比一張表同時有日期列和時間列,可是replay函數只設置了dateColumn,那麼同一天的全部數據會在一個批次輸出。
- replayRate: 整數, 表示每秒鐘回放的數據條數。因爲回放時同一個時刻數據在同一批次輸出,所以當replayRate小於一個批次的行數時,實際輸出的速率會大於replayRate。
- parallelLevel: 整數, 表示讀取數據的並行度。當源數據大小超過內存大小的時候,須要使用replayDS函數將源數據劃分爲若干個小的數據源,依次從磁盤中讀取數據並回放。指定多個讀取數據的線程數可提高數據讀取速度。
replayDS網絡
replayDS(sqlObj, [dateColumn], [timeColumn], [timeRepartitionSchema])
replayDS函數能夠將輸入的SQL查詢轉化爲數據源,結合replay函數使用。其做用是根據輸入表的分區以及timeRepartitionSchema,將原始的SQL查詢按照時間順序拆分紅若干小的SQL查詢。session
replayDS函數參數概念以下:app
- sqlObj: SQL元代碼,表示回放的數據,如<select * from sourceTable>。
- dateColumn: 字符串, 表示日期列。若不指定,默認第一列爲日期列。replayDS函數默認日期列是數據源的一個分區列,並根據分區信息將原始SQL查詢拆分爲多個查詢。
- timeColumn: 字符串, 表示時間列,配合timeRepartitionSchema使用。
- timeRepartitionSchema: 時間類型向量,如08:00:00 .. 18:00:00。若同時指定了timeColumn, 則對SQL查詢在時間維度上進一步拆分。
單個內存表回放框架
單內存表回放只須要設置輸入表、輸出表、日期列、時間列和回放速度便可。
replay(inputTable, outputTable, `date, `time, 10)
使用data source的單表回放
當單錶行數過多時,能夠配合使用replayDS進行回放。首先使用replayDS生成data source,本例中指定了日期列和timeRepartitionColumn。回放調用與單個內存表回放類似,可是能夠指定回放的並行度。replay內部實現使用了pipeline框架,取數據和輸出分開執行。當輸入爲data source時,多塊數據能夠並行讀取,以免輸出線程等待的狀況。此例中並行度設置爲2,表示有兩個線程同時執行取數據的操做。
inputDS = replayDS(<select * from inputTable>, `date, `time, 08:00:00.000 + (1..10) * 3600000) replay(inputDS, outputTable, `date, `time, 1000, 2)
使用data source的多表回放
replay也支持多張表的同時回放,只須要將多張輸入表以元組的方式傳給replay,而且分別指定輸出表便可。這裏輸出表和輸入表應該一一對應,每一對都必須有相同的表結構。若是指定了日期列或時間列,那麼全部表中都應當有存在相應的列。
ds1 = replayDS(<select * from input1>, `date, `time, 08:00:00.000 + (1..10) * 3600000) ds2 = replayDS(<select * from input2>, `date, `time, 08:00:00.000 + (1..10) * 3600000) ds3 = replayDS(<select * from input3>, `date, `time, 08:00:00.000 + (1..10) * 3600000) replay([ds1, ds2, ds3], [out1, out2, out3], `date, `time, 1000, 2)
取消回放
若是replay函數是經過submitJob調用,可使用getRecentJob獲取jobId,而後用cancelJob取消回放。
getRecentJobs() cancelJob(jobid)
若是是直接調用,可在另一個GUI session中使用getConsoleJobs獲取jobId,而後使用cancelConsoleJob取消回聽任務。
getConsoleJobs() cancelConsoleJob(jobId)
2.如何使用回放數據
回放的數據以流數據形式存在,咱們可使用如下三種方式來訂閱消費這些數據:
- 在DolphinDB中訂閱,使用DolphinDB腳本自定義回調函數來消費流數據。
- 在DolphinDB中訂閱,使用內置的流計算引擎來處理流數據,譬如時間序列聚合引擎、橫截面聚合引擎、異常檢測引擎等。DolphinDB內置的聚合引擎能夠對流數據進行實時聚合計算,使用簡便且性能優異。在3.2中,咱們使用橫截面聚合引擎來處理回放的數據,並計算ETF的內在價值。橫截面聚合引擎的具體用法參見DolphinDB用戶手冊。
- 第三方客戶端經過DolphinDB的流數據API來訂閱和消費數據。
3. 金融示例
回放美國股市一天的level1交易數據,並計算ETF價值
本例中使用美國股市2007年8月17日的level1交易數據,利用replayDS進行數據回放,並經過DolphinDB內置的橫截面聚合引擎計算ETF價值。數據存放在分佈式數據庫dfs://TAQ的quotes表,下面是quotes表的結構以及數據預覽。
//加載數據庫中quotes表的數據,查看錶結構 quotes = database("dfs://TAQ").loadTable("quotes"); quotes.schema().colDefs; name typeString typeInt time SECOND 10 symbol SYMBOL 17 ofrsiz INT 4 ofr DOUBLE 16 mode INT 4 mmid SYMBOL 17 ex CHAR 2 date DATE 6 bidsize INT 4 bid DOUBLE 16 //查看quotes表內前十行的數據 select top 10 * from quotes; symbol date time bid ofr bidsiz ofrsiz mode ex mmid A 2007.08.17 04:15:06 0.01 0 10 0 12 80 A 2007.08.17 06:21:16 1 0 1 0 12 80 A 2007.08.17 06:21:44 0.01 0 10 0 12 80 A 2007.08.17 06:49:02 32.03 0 1 0 12 80 A 2007.08.17 06:49:02 32.03 32.78 1 1 12 80 A 2007.08.17 07:02:01 18.5 0 1 0 12 84 A 2007.08.17 07:02:01 18.5 45.25 1 1 12 84 A 2007.08.17 07:54:55 31.9 45.25 3 1 12 84 A 2007.08.17 08:00:00 31.9 40 3 2 12 84 A 2007.08.17 08:00:00 31.9 35.5 3 2 12 84
(1)對要進行回放的數據進行劃分。因爲一天的數據共有336,305,414條,一次性導入內存再回放會有較長延遲,還有可能致使內存溢出,所以先使用replayDS函數並指定參數timeRepartitionSchema,將數據按照時間戳分爲62個部分。
sch = select name,typeString as type from quotes.schema().colDefs trs = cutPoints(09:30:00.001..18:00:00.001, 60) rds = replayDS(<select * from quotes>, `date, `time, trs);
(2)定義輸出表outQuotes,通常爲流數據表。
share streamTable(100:0, sch.name,sch.type) as outQuotes
(3)定義股票權重字典weights以及聚合函數etfVal,用於計算ETF價值。在本例中,咱們僅計算AAPL、IBM、MSFT、NTES、AMZN、GOOG這幾隻股票的ETF價值。
defg etfVal(weights,sym, price) { return wsum(price, weights[sym]) } weights = dict(STRING, DOUBLE) weights[`AAPL] = 0.1 weights[`IBM] = 0.1 weights[`MSFT] = 0.1 weights[`NTES] = 0.1 weights[`AMZN] = 0.1 weights[`GOOG] = 0.5
(4)建立流聚合引擎,並訂閱數據回放的輸出表outQuotes。訂閱outQuotes表時,咱們指定了發佈表的過濾條件,只有symbol爲AAPL、IBM、MSFT、NTES、AMZN、GOOG的數據纔會發佈到橫截面聚合引擎,減小沒必要要的網絡開銷和數據傳輸。
setStreamTableFilterColumn(outQuotes, `symbol) outputTable = table(1:0, `time`etf, [TIMESTAMP,DOUBLE]) tradesCrossAggregator=createCrossSectionalAggregator("etfvalue", <[etfVal{weights}(symbol, ofr)]>, quotes, outputTable, `symbol, `perBatch) subscribeTable(,"outQuotes","tradesCrossAggregator",-1,append!{tradesCrossAggregator},true,,,,,`AAPL`IBM`MSFT`NTES`AMZN`GOOG)
(5)開始回放,設定每秒回放10萬條數據,聚合引擎則會實時地對回放的數據進行消費。
submitJob("replay_quotes", "replay_quotes_stream", replay, [rds], [`outQuotes], `date, `time,100000,4)
(6)查看不一樣時間點下咱們選擇的股票的ETF價值。
//查看outputTable表內前15行的數據,其中第一列時間爲聚合計算髮生的時間 >select top 15 * from outputTable; time etf 2019.06.04T16:40:18.476 14.749 2019.06.04T16:40:19.476 14.749 2019.06.04T16:40:20.477 14.749 2019.06.04T16:40:21.477 22.059 2019.06.04T16:40:22.477 22.059 2019.06.04T16:40:23.477 34.049 2019.06.04T16:40:24.477 34.049 2019.06.04T16:40:25.477 284.214 2019.06.04T16:40:26.477 284.214 2019.06.04T16:40:27.477 285.68 2019.06.04T16:40:28.477 285.68 2019.06.04T16:40:29.478 285.51 2019.06.04T16:40:30.478 285.51 2019.06.04T16:40:31.478 285.51 2019.06.04T16:40:32.478 285.51
4. 性能測試
咱們在服務器上對DolphinDB的數據回放功能進行了性能測試。服務器配置以下:
主機:DELL PowerEdge R730xd
CPU:Intel Xeon(R) CPU E5-2650 v4(24核 48線程 2.20GHz)
內存:512 GB (32GB × 16, 2666 MHz)
硬盤:17T HDD (1.7T × 10, 讀取速度222 MB/s,寫入速度210 MB/s)
網絡:萬兆以太網
測試腳本以下:
sch = select name,typeString as type from quotes.schema().colDefs trs = cutPoints(09:30:00.001..18:00:00.001,60) rds = replayDS(<select * from quotes>, `date, `time, trs); share streamTable(100:0, sch.name,sch.type) as outQuotes jobid = submitJob("replay_quotes","replay_quotes_stream", replay, [rds], [`outQuotes], `date, `time, , 4)
在不設定回放速率(即以最快的速率回放),而且輸出表沒有任何訂閱時,回放336,305,414條數據耗時僅須要90~110秒。