一個量化策略在用於實際交易時,處理實時數據的程序一般爲事件驅動。而研發量化策略時,須要使用歷史數據進行回測,這時的程序一般不是事件驅動。所以同一個策略須要編寫兩套代碼,不只耗時並且容易出錯。在 DolphinDB database 中,用戶可將歷史數據按照時間順序以「實時數據」的方式導入流數據表中,這樣就可使用同一套代碼進行回測和實盤交易。sql
DolphinDB的流數據處理框架採用發佈-訂閱-消費的模式。數據生產者將實時數據繼續地以流的形式發佈給全部數據訂閱者。訂閱者收到消息之後,可以使用自定義函數或者DolphinDB內置的聚合引擎來處理消息。DolphinDB流數據接口支持多種語言的API,包括C++, C#, Java, 和Python等。用戶可使用這些API來編寫更加複雜的處理邏輯,更好地與實際生產環境相結合。有興趣的朋友能夠搜索「DolphinDB流數據教程」瞭解更多內容。數據庫
本文介紹replay和replayDS函數,而後使用金融數據展現數據回放的過程與應用場景。服務器
replay網絡
replay(inputTables, outputTables, [dateColumn], [timeColumn], [replayRate], [parallelLevel=1])
replay函數的做用是將若干表或數據源同時回放到相應的輸出表中。用戶須要指定輸入的數據表或數據源、輸出表、日期列、時間列、回放速度以及並行度。session
replay函數參數概念以下:app
replayDS框架
replayDS(sqlObj, [dateColumn], [timeColumn], [timeRepartitionSchema])
replayDS函數能夠將輸入的SQL查詢轉化爲數據源,結合replay函數使用。其做用是根據輸入表的分區以及timeRepartitionSchema,將原始的SQL查詢按照時間順序拆分紅若干小的SQL查詢。分佈式
replayDS函數參數概念以下:ide
單個內存表回放函數
單內存表回放只須要設置輸入表、輸出表、日期列、時間列和回放速度便可。
replay(inputTable, outputTable, `date, `time, 10)
使用data source的單表回放
當單錶行數過多時,能夠配合使用replayDS進行回放。首先使用replayDS生成data source,本例中指定了日期列和timeRepartitionColumn。回放調用與單個內存表回放類似,可是能夠指定回放的並行度。replay內部實現使用了pipeline框架,取數據和輸出分開執行。當輸入爲data source時,多塊數據能夠並行讀取,以免輸出線程等待的狀況。此例中並行度設置爲2,表示有兩個線程同時執行取數據的操做。
inputDS = replayDS(<select * from inputTable>, `date, `time, 08:00:00.000 + (1..10) * 3600000) replay(inputDS, outputTable, `date, `time, 1000, 2)
使用data source的多表回放
replay也支持多張表的同時回放,只須要將多張輸入表以元組的方式傳給replay,而且分別指定輸出表便可。這裏輸出表和輸入表應該一一對應,每一對都必須有相同的表結構。若是指定了日期列或時間列,那麼全部表中都應當有存在相應的列。
ds1 = replayDS(<select * from input1>, `date, `time, 08:00:00.000 + (1..10) * 3600000) ds2 = replayDS(<select * from input2>, `date, `time, 08:00:00.000 + (1..10) * 3600000) ds3 = replayDS(<select * from input3>, `date, `time, 08:00:00.000 + (1..10) * 3600000) replay([ds1, ds2, ds3], [out1, out2, out3], `date, `time, 1000, 2)
取消回放
若是replay函數是經過submitJob調用,可使用getRecentJob獲取jobId,而後用cancelJob取消回放。
getRecentJobs() cancelJob(jobid)
若是是直接調用,可在另一個GUI session中使用getConsoleJobs獲取jobId,而後使用cancelConsoleJob取消回聽任務。
getConsoleJobs() cancelConsoleJob(jobId)
回放的數據以流數據形式存在,咱們可使用如下三種方式來訂閱消費這些數據:
回放美國股市一天的level1交易數據,並計算ETF價值
本例中使用美國股市2007年8月17日的level1交易數據,利用replayDS進行數據回放,並經過DolphinDB內置的橫截面聚合引擎計算ETF價值。數據存放在分佈式數據庫dfs://TAQ的quotes表,下面是quotes表的結構以及數據預覽。
//加載數據庫中quotes表的數據,查看錶結構 quotes = database("dfs://TAQ").loadTable("quotes"); quotes.schema().colDefs; name typeString typeInt time SECOND 10 symbol SYMBOL 17 ofrsiz INT 4 ofr DOUBLE 16 mode INT 4 mmid SYMBOL 17 ex CHAR 2 date DATE 6 bidsize INT 4 bid DOUBLE 16 //查看quotes表內前十行的數據 select top 10 * from quotes; symbol date time bid ofr bidsiz ofrsiz mode ex mmid A 2007.08.17 04:15:06 0.01 0 10 0 12 80 A 2007.08.17 06:21:16 1 0 1 0 12 80 A 2007.08.17 06:21:44 0.01 0 10 0 12 80 A 2007.08.17 06:49:02 32.03 0 1 0 12 80 A 2007.08.17 06:49:02 32.03 32.78 1 1 12 80 A 2007.08.17 07:02:01 18.5 0 1 0 12 84 A 2007.08.17 07:02:01 18.5 45.25 1 1 12 84 A 2007.08.17 07:54:55 31.9 45.25 3 1 12 84 A 2007.08.17 08:00:00 31.9 40 3 2 12 84 A 2007.08.17 08:00:00 31.9 35.5 3 2 12 84
(1)對要進行回放的數據進行劃分。因爲一天的數據共有336,305,414條,一次性導入內存再回放會有較長延遲,還有可能致使內存溢出,所以先使用replayDS函數並指定參數timeRepartitionSchema,將數據按照時間戳分爲62個部分。
sch = select name,typeString as type from quotes.schema().colDefs trs = cutPoints(09:30:00.001..18:00:00.001, 60) rds = replayDS(<select * from quotes>, `date, `time, trs);
(2)定義輸出表outQuotes,通常爲流數據表。
share streamTable(100:0, sch.name,sch.type) as outQuotes
(3)定義股票權重字典weights以及聚合函數etfVal,用於計算ETF價值。在本例中,咱們僅計算AAPL、IBM、MSFT、NTES、AMZN、GOOG這幾隻股票的ETF價值。
defg etfVal(weights,sym, price) { return wsum(price, weights[sym]) } weights = dict(STRING, DOUBLE) weights[`AAPL] = 0.1 weights[`IBM] = 0.1 weights[`MSFT] = 0.1 weights[`NTES] = 0.1 weights[`AMZN] = 0.1 weights[`GOOG] = 0.5
(4)建立流聚合引擎,並訂閱數據回放的輸出表outQuotes。訂閱outQuotes表時,咱們指定了發佈表的過濾條件,只有symbol爲AAPL、IBM、MSFT、NTES、AMZN、GOOG的數據纔會發佈到橫截面聚合引擎,減小沒必要要的網絡開銷和數據傳輸。
setStreamTableFilterColumn(outQuotes, `symbol) outputTable = table(1:0, `time`etf, [TIMESTAMP,DOUBLE]) tradesCrossAggregator=createCrossSectionalAggregator("etfvalue", <[etfVal{weights}(symbol, ofr)]>, quotes, outputTable, `symbol, `perBatch) subscribeTable(,"outQuotes","tradesCrossAggregator",-1,append!{tradesCrossAggregator},true,,,,,`AAPL`IBM`MSFT`NTES`AMZN`GOOG)
(5)開始回放,設定每秒回放10萬條數據,聚合引擎則會實時地對回放的數據進行消費。
submitJob("replay_quotes", "replay_quotes_stream", replay, [rds], [`outQuotes], `date, `time,100000,4)
(6)查看不一樣時間點下咱們選擇的股票的ETF價值。
//查看outputTable表內前15行的數據,其中第一列時間爲聚合計算髮生的時間 >select top 15 * from outputTable; time etf 2019.06.04T16:40:18.476 14.749 2019.06.04T16:40:19.476 14.749 2019.06.04T16:40:20.477 14.749 2019.06.04T16:40:21.477 22.059 2019.06.04T16:40:22.477 22.059 2019.06.04T16:40:23.477 34.049 2019.06.04T16:40:24.477 34.049 2019.06.04T16:40:25.477 284.214 2019.06.04T16:40:26.477 284.214 2019.06.04T16:40:27.477 285.68 2019.06.04T16:40:28.477 285.68 2019.06.04T16:40:29.478 285.51 2019.06.04T16:40:30.478 285.51 2019.06.04T16:40:31.478 285.51 2019.06.04T16:40:32.478 285.51
咱們在服務器上對DolphinDB的數據回放功能進行了性能測試。服務器配置以下:
主機:DELL PowerEdge R730xd
CPU:Intel Xeon(R) CPU E5-2650 v4(24核 48線程 2.20GHz)
內存:512 GB (32GB × 16, 2666 MHz)
硬盤:17T HDD (1.7T × 10, 讀取速度222 MB/s,寫入速度210 MB/s)
網絡:萬兆以太網
測試腳本以下:
sch = select name,typeString as type from quotes.schema().colDefs trs = cutPoints(09:30:00.001..18:00:00.001,60) rds = replayDS(<select * from quotes>, `date, `time, trs); share streamTable(100:0, sch.name,sch.type) as outQuotes jobid = submitJob("replay_quotes","replay_quotes_stream", replay, [rds], [`outQuotes], `date, `time, , 4)
在不設定回放速率(即以最快的速率回放),而且輸出表沒有任何訂閱時,回放336,305,414條數據耗時僅須要90~110秒。