DolphinDB丨金融高頻因子流批一體計算神器

前言

量化金融的研究和實盤中,愈來愈多的機構須要根據高頻的行情數據(L1/L2以及逐筆委託數據)來計算量價因子。這些因子一般是有狀態的:不只與當前的多個指標有關,並且與多個指標的歷史狀態相關。以國內的股票市場爲例,每3秒收到一個快照,每一個股票天天獲得4800個快照,計算因子時可能會用到以前若干個快照的數據,甚至以前若干天的數據。若研發環境系統(例如Python)與生產環境系統(例如C++)不一樣,要維護兩套代碼,對用戶是很是沉重的負擔。python

今天的推文爲你們介紹如何使用DolphinDB發佈的響應式狀態引擎(Reactive State Engine)高效開發與計算帶有狀態的高頻因子,實現流批統一計算。狀態引擎接受在歷史數據批量處理(研發階段)中編寫的表達式或函數做爲輸入,避免了在生產環境中重寫代碼的高額成本,以及維護研發和生產兩套代碼的負擔。狀態引擎確保流式計算的結果與批量計算徹底一致,只要在歷史數據的批量計算中驗證正確,便可保證流數據的實時計算正確,這極大下降了實時計算調試的成本。react

一、金融高頻因子計算ios

咱們經過一個具體的例子來引入金融高頻因子計算的問題。下面這個因子表達式是用DolphinDB的腳本語言寫的。它使用了自定義函數sum_diff和內置函數ema (exponential moving average)。sum_diff是一個無狀態函數,ema是一個有狀態的函數,依賴歷史數據。更爲棘手的是,以下面的計算分解圖所示,計算須要使用ema函數的多重嵌套。算法

def sum_diff(x, y){
    return (x-y)/(x+y)}ema(1000 * sum_diff(ema(price, 20), ema(price, 40)),10) -  ema(1000 * sum_diff(ema(price, 20), ema(price, 40)), 20)

6dd525c1d4134352b36912c992fc0992.jpeg

面對此類場景,咱們須要解決如下幾個問題:數據庫

  • 投研階段可否使用歷史數據快速爲每隻股票計算100~1000個相似的因子?
  • 實盤階段可否在每一個行情tick數據到來時爲每隻股票計算100~1000個相似的因子?
  • 批處理和流計算的代碼實現是否高效?批和流可否統一代碼?正確性校驗是否便捷?

二、現有解決方案的優缺點

python pandas/numpy目前是研究階段最經常使用的高頻因子解決方案。pandas對面板數據處理有很是成熟的解決方案,並且內置了大部分高頻因子計算須要用到的算子,能夠快速實現這些高頻因子。但性能是pandas實現的一個短板,尤爲是算子須要用自定義函數實現的時候,速度較慢。一個解決辦法是經過啓動多個python進程來並行計算。python pandas的實現是針對歷史數據的,面對生產環境中的流式數據,若是不修改代碼,只能採用相似apache spark的處理方法,把數據緩存起來,劃分紅一個個數據窗口來計算。所以,性能的問題在生產環境中會更突出。apache

爲解決上述方案在生產環境中的性能問題,不少機構會用C++從新實現研究(歷史數據)代碼。這樣作,須要維護兩套代碼,開發成本(時間和人力)會極大增長。此外,還要耗費大量精力確保兩套系統的結果徹底一致。編程

相似Flink批流統一的解決方案應運而生。Flink支持SQL和窗口函數,高頻因子用到的常見算子在Flink中已經內置實現。所以,簡單的因子用Flink實現會很是高效,運行性能也會很是好。但Flink最大的問題是沒法實現複雜的高頻因子計算。如前一章中提到的例子,須要多個窗口函數的嵌套,沒法直接用Flink實現。這也正是DolphinDB開發響應式狀態引擎的動機所在。緩存

三、響應式狀態引擎(Reactive State Engine)

響應式狀態引擎其實是一個計算黑盒,在歷史數據上已經驗證的DolphinDB因子代碼(表達式或函數)以及實時行情數據做爲輸入,輸出實時因子值。因爲在靜態的歷史數據集上開發和驗證高頻因子遠比在流數據上開發更爲簡單,響應式狀態引擎顯著下降了流式高頻因子的開發成本和難度。安全

def sum_diff(x, y){
 return (x-y)/(x+y)}factor1 = <ema(1000 * sum_diff(ema(price, 20), ema(price, 40)),10) -  ema(1000 * sum_diff(ema(price, 20), ema(price, 40)), 20)>share streamTable(1:0, `sym`price, [STRING,DOUBLE]) as tickStreamresult = table(1000:0, `sym`factor1, [STRING,DOUBLE])rse = createReactiveStateEngine(name="reactiveDemo", metrics =factor1, dummyTable=tickStream, outputTable=result, keyColumn="sym")subscribeTable(tableName=`tickStream, actionName="factors", handler=tableInsert{rse})

以上代碼在DolphinDB中實現前述因子的流式計算。factor1是前述因子在歷史數據上的實現,不作任何改變,直接傳遞給響應式狀態引擎rse,便可實現流式計算。經過訂閱函數subscribeTable,咱們將流數據表tickStream與狀態引擎rse進行關聯。任何實時數據的注入,都將觸發狀態引擎的計算,輸出因子值到結果表result。如下代碼產生100條隨機數據,並注入到流數據表。結果與經過SQL語句計算的結果徹底相同。服務器

data = table(take("000001.SH", 100) as sym, rand(10.0, 100) as price)tickStream.append!(data)factor1Hist = select sym, ema(1000 * sum_diff(ema(price, 20), ema(price, 40)),10) -  ema(1000 * sum_diff(ema(price, 20), ema(price, 40)), 20) as factor1 from data context by symassert each(eqObj, result.values(), factor1Hist.values())

3.1 工做原理

如圖1所示,一個有狀態的高頻因子計算過程實際上能夠分解成有一個有向無環圖(DAG)。圖中的節點有3種:

一、數據源,如price。

二、有狀態的算子,如a, b, d, e。

三、無狀態的算子,如c和result。

從數據源節點開始,按照既定的路徑,層層推動,獲得最後的因子輸出。這很是相似excel中的單元格鏈式計算。當一個單元格的數據發生變化時,相關聯的單元格依次發生變化。響應式狀態引擎的名稱也是從這一點引伸出來的。

無狀態的算子比較簡單,使用DolphinDB已有的腳本引擎,就能夠表示和計算。所以,問題轉化爲兩點:

一、如何解析獲得一個優化的DAG。

二、如何優化每一個有狀態的算子的計算。

3.2 解析和優化

DolphinDB的腳本語言是支持向量化和函數化的多範式編程語言。經過函數的調用關係,不可貴到計算步驟的DAG。在解析的時候,由於輸入消息的schema是已知的,咱們能夠快速推斷出每個節點的輸入數據類型和輸出數據類型。輸入參數類型肯定,函數名稱肯定,每一個狀態算子的具體實例就能夠建立出來。

每個算子(有狀態和無狀態)在DolphinDB中均可以轉化爲一個惟一的字符串序列。據此,咱們能夠刪除重複的算子,提升計算效率。

3.3 內置的狀態函數

狀態算子計算時須要用到歷史狀態。所以,若是每一次觸發計算,都必須準備歷史數據,全量計算,不但耗費內存,並且耗費CPU時間。狀態函數的優化,也就是增量方式的流式實現很是關鍵。下列狀態函在DolphinDB的響應式狀態引擎均獲得了優化實現。目前,狀態引擎不容許使用未經優化的狀態函數。

  • 累計窗口函數:cumavg, cumsum, cumprod, cumcount, cummin, cummax, cumvar, cumvarp, cumstd, cumstdp, cumcorr, cumcovar, cumbeta, cumwsum, cumwavg
  • 滑動窗口函數:ema, mavg, msum, mcount, mprod, mvar, mvarp, mstd, mstdp, mskew, mkurtosis, mmin, mmax, mimin, mimax, mmed, mpercentile, mrank, mcorr, mcovar, mbeta, mwsum, mwavg, mslr
  • 序列相關函數:deltas, ratios, ffill, move, prev, iterate, ewmMean, ewmVar, ewmStd, ewmCovar, ewmCorr

上述函數除了mslr返回兩個值之外,其他函數均只有一個返回值。在後續的版本中,DolphinDB將容許用戶用插件來開發本身的狀態函數,註冊後便可在狀態引擎中使用。

3.4 自定義狀態函數

響應式狀態引擎中可以使用自定義狀態函數。須要注意如下幾點:

函數定義前,使用 @state 表示函數是自定義的狀態函數。 自定義狀態函數中只能使用賦值語句和return語句。return語句必須是最後一個語句,可返回多個值。 使用iif函數表示if...else的邏輯。 若是僅容許使用一個表達式來表示一個因子,會帶來不少侷限性。首先,在某些狀況下,僅使用表達式,沒法實現一個完整的因子。下面的例子返回線性迴歸的alpha,beta和residual。

@statedef slr(y, x){
    a, b = mslr(y, x, 12)
    residual = mavg(y, 12) - b * mavg(x, 12) - a
    return a, b, residual}

其次,不少因子可能會使用共同的中間結果,定義多個因子時,代碼會更簡潔。自定義函數能夠同時返回多個結果。下面的函數multiFactors定義了5個因子。

@statedef multiFactors(lowPrice, highPrice, volumeTrade, closePrice, buy_active, sell_active, tradePrice, askPrice1, bidPrice1, askPrice10, agg_vol, agg_amt){
    a = ema(askPrice10, 30)
    term0 = ema((lowPrice - a) / (ema(highPrice, 30) - a), 50)
    term1 = mrank((highPrice - a) / (ema(highPrice, 5) - a), true,  15)
    term2 = mcorr(askPrice10, volumeTrade, 10) * mrank(mstd(closePrice, 20, 20), true, 10)
    buy_vol_ma = mavg(buy_active, 6)
    sell_vol_ma = mavg(sell_active, 6)
    zero_free_vol = iif(agg_vol==0, 1, agg_vol)
    stl_prc = ffill(agg_amt \ zero_free_vol \ 20).nullFill(tradePrice)
    buy_prop = stl_prc
 
    spd = askPrice1 - bidPrice1
    spd_ma = round(mavg(iif(spd < 0, 0, spd), 6), 5)
    term3 = buy_prop * spd_ma
    term4 = iif(spd_ma == 0, 0, buy_prop / spd_ma)
    return term0, term1, term2, term3, term4}

最後,某些表達式冗長,缺少可讀性。第一節中的因子表達式改成下面的自定義狀態函數factor1後,計算邏輯簡潔明瞭。

@statedef factor1(price) {
    a = ema(price, 20)
    b = ema(price, 40)
    c = 1000 * sum_diff(a, b)
    return  ema(c, 10) - ema(c, 20)}

3.5 輸出結果過濾

狀態引擎會對輸入的每一條消息作出計算響應,產生一條記錄做爲結果,計算的結果在默認狀況下都會輸出到結果表,也就是說輸入n個消息,輸出n條記錄。若是但願僅輸出一部分結果,能夠啓用過濾條件,只有知足條件的結果纔會輸出。

下面的例子檢查股票價格是否有變化,只有價格變化的記錄纔會輸出。

share streamTable(1:0, `sym`price, [STRING,DOUBLE]) as tickStreamresult = table(1000:0, `sym`price, [STRING,DOUBLE])rse = createReactiveStateEngine(name="reactiveFilter", metrics =[<price>], dummyTable=tickStream, outputTable=result, keyColumn="sym", filter=<prev(price) != price>)subscribeTable(tableName=`tickStream, actionName="filter", handler=tableInsert{rse})

3.6 快照機制

爲了知足生產環境業務持續性的須要,DolphinDB內置的流式計算引擎包括響應式狀態引擎均支持快照(snapshot)輸出。

響應式狀態引擎的快照包括已處理的最後一條消息的ID以及引擎當前的狀態。當系統出現異常,從新初始化狀態引擎時,可恢復到最後一個快照的狀態,而且從已處理的消息的下一條開始訂閱。

def sum_diff(x, y){
 return (x-y)/(x+y)}factor1 = <ema(1000 * sum_diff(ema(price, 20), ema(price, 40)),10) -  ema(1000 * sum_diff(ema(price, 20), ema(price, 40)), 20)>share streamTable(1:0, `sym`price, [STRING,DOUBLE]) as tickStreamresult = table(1000:0, `sym`factor1, [STRING,DOUBLE])rse = createReactiveStateEngine(name="reactiveDemo", metrics =factor1, dummyTable=tickStream, outputTable=result, keyColumn="sym", snapshotDir= "/home/data/snapshot", snapshotIntervalInMsgCount=400000)msgId = getSnapshotMsgId(rse)if(msgId >= 0) msgId += 1subscribeTable(tableName=`tickStream, actionName="factors", offset=msgId, handler=appendMsg{rse}, handlerNeedMsgId=true)

響應式狀態引擎要啓用快照機制,建立時須要指定兩個額外的參數snapshotDir和snapshotIntervalInMsgCount。snapshotDir用於指定存儲快照的目錄。snapshotIntervalInMsgCount指定處理多少條消息後產生一個快照。引擎初始化時,系統會檢查快照目錄下是否存在一個以引擎名稱命名,後綴爲snapshot的文件。以上面的代碼爲例,若是存在文件/home/data/snapshot/reactiveDemo.snapshot,加載這個快照。函數getSnapshotMsgId能夠獲取最近一個快照對應的msgId。若是不存在快照,返回-1。

狀態引擎要啓用快照機制,調用subscribeTable函數也需相應的修改:

  • 首先必須指定消息的offset。
  • 其次,handler必須使用appendMsg函數。appendMsg函數接受兩個參數,msgBody和msgId。
  • 再次,參數handlerNeedMsgId必須指定爲true。

3.7 並行處理

當須要處理大量消息時,可在DolphinDB消息訂閱函數subscribeTable中指定可選參數filter與hash,讓多個訂閱客戶端並行處理消息。

  • 參數filter用於指定消息過濾邏輯。目前支持三種過濾方式,分別爲值過濾,範圍過濾和哈希過濾。
  • 參數hash能夠指定一個哈希值,肯定這個訂閱由哪一個線程來執行。例如,配置參數subExecutors爲4,用戶指定了哈希值5,那麼該訂閱的計算任務將由第二個線程來執行。

下面是響應式狀態引擎並行計算因子的例子。假設配置參數subExecutors=4,建立4個狀態引擎,每一個狀態引擎根據流表的股票代碼的哈希值來訂閱不一樣股票的數據,而且指定不一樣的訂閱線程來處理,最終將結果輸出到同一個輸出表中。

def sum_diff(x, y){
 return (x-y)/(x+y)}factor1 = <ema(1000 * sum_diff(ema(price, 20), ema(price, 40)),10) -  ema(1000 * sum_diff(ema(price, 20), ema(price, 40)), 20)>share streamTable(1:0, `sym`price, [STRING,DOUBLE]) as tickStreamsetStreamTableFilterColumn(tickStream, `sym)share streamTable(1000:0, `sym`factor1, [STRING,DOUBLE]) as resultStreamfor(i in 0..3){
 rse = createReactiveStateEngine(name="reactiveDemo"+string(i), metrics =factor1, dummyTable=tickStream, outputTable=resultStream, keyColumn="sym")
 subscribeTable(tableName=`tickStream, actionName="sub"+string(i), handler=tableInsert{rse}, msgAsTable = true, hash = i, filter = (4,i))}n=2000000tmp = table(take("A"+string(1..4000), n) as sym, rand(10.0, n) as price)tickStream.append!(tmp)
須要注意的是,若是多個狀態引擎是同一個輸出表,該輸出表必須是一個共享表。沒有共享的表不是線程安全的,並行寫入可能會致使系統崩潰。

四、流批統一解決方案

金融高頻因子的流批統一處理在DolphinDB中有兩種實現方法。

第一種方法:使用函數或表達式實現金融高頻因子,代入不一樣的計算引擎進行歷史數據或流數據的計算。代入SQL引擎,能夠實現對歷史數據的計算;代入響應式狀態引擎,能夠實現對流數據的計算。這在第3章的序言部分已經舉例說明。在這種模式下用DolphinDB腳本語言表示的表達式或函數其實是對因子語義的一種描述,而不是具體的實現。因子計算的具體實現交由相應的計算引擎來完成,從而實現不一樣場景下的最佳性能。

第二種方法:歷史數據經過回放,轉變成流數據,而後使用流數據計算引擎來完成計算。咱們仍然以教程開始部分的因子爲例,惟一的區別是流數據表tickStream的數據源來自於歷史數據庫的replay。使用這種方法計算曆史數據的因子值,效率會略遜與基於SQL的批量計算。

def sum_diff(x, y){
 return (x-y)/(x+y)}factor1 = <ema(1000 * sum_diff(ema(price, 20), ema(price, 40)),10) -  ema(1000 * sum_diff(ema(price, 20), ema(price, 40)), 20)>share streamTable(1:0, `sym`date`time`price, [STRING,DATE,TIME,DOUBLE]) as tickStreamresult = table(1000:0, `sym`factor1, [STRING,DOUBLE])rse = createReactiveStateEngine(name="reactiveDemo", metrics =factor1, dummyTable=tickStream, outputTable=result, keyColumn="sym")subscribeTable(tableName=`tickStream, actionName="factors", handler=tableInsert{rse})//從歷史數據庫dfs://TAQ的trades表中加載一天的數據,回放到流表tickStream中inputDS = replayDS(<select sym, date, time, price from loadTable("dfs://TAQ", "trades") where date=2021.03.08>, `date, `time, 08:00:00.000 + (1..10) * 3600000)replay(inputDS, tickStream, `date, `time, 1000, true, 2)

五、性能測試

咱們測試了響應式狀態引擎計算因子的性能。測試使用模擬數據,並使用warmupStreamEngine函數模擬狀態引擎已經處理部分數據的狀況。測試共包括20個不一樣複雜度度的因子,其中兩個自定義狀態函數分別返回3個和5個因子。爲方便測試,計算僅使用單線程處理。

@statedef slr(y, x){
    a, b = mslr(y, x, 12)
    residual = mavg(y, 12) - b * mavg(x, 12) - a
    return a, b, residual}@statedef multiFactors(lowPrice, highPrice, volumeTrade, closePrice, buy_active, sell_active, tradePrice, askPrice1, bidPrice1, askPrice10, agg_vol, agg_amt){
    a = ema(askPrice10, 30)
    term0 = ema((lowPrice - a) / (ema(highPrice, 30) - a), 50)
    term1 = mrank((highPrice - a) / (ema(highPrice, 5) - a), true,  15)
    term2 = mcorr(askPrice10, volumeTrade, 10) * mrank(mstd(closePrice, 20, 20), true, 10)
    buy_vol_ma = mavg(buy_active, 6)
    sell_vol_ma = mavg(sell_active, 6)
    zero_free_vol = iif(agg_vol==0, 1, agg_vol)
    stl_prc = ffill(agg_amt \ zero_free_vol \ 20).nullFill(tradePrice)
    buy_prop = stl_prc
 
    spd = askPrice1 - bidPrice1
    spd_ma = round(mavg(iif(spd < 0, 0, spd), 6), 5)
    term3 = buy_prop * spd_ma
    term4 = iif(spd_ma == 0, 0, buy_prop / spd_ma)
    return term0, term1, term2, term3, term4}metrics = array(ANY, 14)metrics[0] = <ema(1000 * sum_diff(ema(close, 20), ema(close, 40)),10) -  ema(1000 * sum_diff(ema(close, 20), ema(close, 40)), 20)>metrics[1] = <mslr(high, volume, 8)[1]>metrics[2] = <mcorr(low, high, 11)>metrics[3] = <mstdp(low, 15)>metrics[4] = <mbeta(high, value, 63)>metrics[5] = <mcovar(low, value, 71)>metrics[6] = <(close/mavg(close, 1..6)-1)*100>metrics[7] = <mmin(high, 15)>metrics[8] = <mavg(((high+low)/2+(mavg(high, 2)+mavg(low, 2))/2)*(high-low)/volume, 7, 2)>metrics[9] = <mslr(mavg(close, 14), volume, 63)[1]>metrics[10] = <mcorr(mavg(open, 25), volume, 71)>metrics[11] = <mbeta(high, mstdp(close, 8), 77)>metrics[12] = <slr(close, volume)>metrics[13] = <multiFactors(low, high, volume, close, numTrade, numTrade, close, value, close, open, volume, numTrade)>dummy = streamTable(10000:0, `symbol`market`date`time`quote_type`preclose`open`high`low`close`numTrade`volume`value`position`recvtime,[SYMBOL,SHORT,DATE,TIME,SHORT,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG,DOUBLE,LONG,TIMESTAMP])def prepareData(tickNum, batch){
    total = tickNum*batch
    data=table(total:total, `symbol`market`date`time`quote_type`preclose`open`high`low`close`numTrade`volume`value`position`recvtime,[SYMBOL,SHORT,DATE,TIME,SHORT,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG,DOUBLE,LONG,TIMESTAMP])
    data[`market]=rand(10, total)
    data[`date]=take(date(now()), total)
    data[`time]=take(time(now()), total)
    data[`symbol]=take("A"+string(1..tickNum), total)
    data[`open]=rand(100.0, total)
    data[`high]=rand(100.0, total)
    data[`low]=rand(100.0, total)
    data[`close]=rand(100.0, total)
    data[`numTrade]=rand(100, total)
    data[`volume]=rand(100, total)
    data[`value]=rand(100.0, total)
    data[`recvtime]=take(now(), total)
    return data}dropStreamEngine("demo1")dropStreamEngine("demo2")dropStreamEngine("demo3")dropStreamEngine("demo4")//4000個股票,20個因子hisData = prepareData(4000, 100)realData = prepareData(4000, 1)colNames = ["symbol"].append!("factor"+string(0..19))colTypes = [SYMBOL].append!(take(DOUBLE, 20))resultTable = streamTable(10000:0, colNames, colTypes)engine1 = createReactiveStateEngine(name="demo1", metrics=metrics, dummyTable=dummy, outputTable=resultTable, keyColumn="symbol")warmupStreamEngine(engine1, hisData)timer(10) engine1.append!(realData)dropAggregator("demo1")//1個股票,20個因子hisData = prepareData(1, 100)realData = prepareData(1, 1)colNames = ["symbol"].append!("factor"+string(0..19))colTypes = [SYMBOL].append!(take(DOUBLE, 20))resultTable = streamTable(10000:0, colNames, colTypes)engine2 = createReactiveStateEngine(name="demo2", metrics=metrics, dummyTable=dummy, outputTable=resultTable, keyColumn="symbol")warmupStreamEngine(engine2, hisData)timer(10) engine2.append!(realData)dropAggregator("demo2")//4000個股票,1個因子hisData = prepareData(4000, 100)realData = prepareData(4000, 1)metrics3 = metrics[0]colNames = ["symbol", "factor0"]colTypes = [SYMBOL, DOUBLE]resultTable = streamTable(10000:0, colNames, colTypes)engine3 = createReactiveStateEngine(name="demo3", metrics=metrics3, dummyTable=dummy, outputTable=resultTable, keyColumn="symbol")warmupStreamEngine(engine3, hisData)timer(10) engine3.append!(realData)//200個股票,20個因子hisData = prepareData(200, 100)realData = prepareData(200, 1)colNames = ["symbol"].append!("factor"+string(0..19))colTypes = [SYMBOL].append!(take(DOUBLE, 20))resultTable = streamTable(10000:0, colNames, colTypes)engine4 = createReactiveStateEngine(name="demo4", metrics=metrics, dummyTable=dummy, outputTable=resultTable, keyColumn="symbol")warmupStreamEngine(engine4, hisData)timer(10) engine4.append!(realData)

咱們統計了10次的總耗時,取平均值做爲單次的耗時。測試使用的服務器CPU爲Intel(R) Xeon(R) Silver 4216 CPU @ 2.10GHz。單線程狀況下,測試結果以下:

8b97e4fb05cdd375af6984db93a8ca33.jpg

六、多個引擎的流水線處理

DolphinDB內置的流計算引擎包括響應式狀態引擎,時間序列聚合引擎,橫截面引擎和異常檢測引擎。這些引擎均實現了數據表(table)的接口,所以多個引擎流水線處理變得異常簡單,只要將後一個引擎做爲前一個引擎的輸出便可。引入流水線處理,能夠解決更爲複雜的因子計算問題。譬如,因子計算常常須要使用面板數據,完成時間序列和橫截面兩個維度的計算,只要把響應式狀態引擎和橫截面兩個引擎串聯處理便可完成。

下面的例子是World Quant 101個Alpha因子中的1號因子公式的流數據實現。rank函數是一個橫截面操做。rank的參數部分用響應式狀態引擎實現。rank函數自己用橫截面引擎實現。橫截面引擎做爲狀態引擎的輸出。

#Alpha 001公式:rank(Ts_ArgMax(SignedPower((returns<0?stddev(returns,20):close), 2), 5))-0.5//建立橫截面引擎,計算每一個股票的rankdummy = table(1:0, `sym`time`maxIndex, [SYMBOL, TIMESTAMP, INDEX])resultTable = streamTable(10000:0, `sym`time`factor1, [SYMBOL, TIMESTAMP, DOUBLE])ccsRank = createCrossSectionalAggregator(name="alpha1CCS", metrics=[<time>, <rank(maxIndex)\count(maxIndex) - 0.5>],  dummyTable=dummy, outputTable=resultTable,  keyColumn=`sym, triggeringPattern='keyCount', triggeringInterval=3000, timeColumn=`time)@statedef wqAlpha1TS(close){
    ret = ratios(close) - 1
    v = iif(ret < 0, mstd(ret, 20), close)
    return mimax(signum(v)*v*v, 5)}//建立響應式狀態引擎,輸出到前面的橫截面引擎ccsRankinput = table(1:0, `sym`time`close, [SYMBOL, TIMESTAMP, DOUBLE])rse = createReactiveStateEngine(name="alpha1", metrics=<wqAlpha1TS(close)>, dummyTable=input, outputTable=ccsRank, keyColumn="sym")

在上面這個例子中,咱們仍是須要人工來區分哪一部分是橫截面操做,哪一部分是時間序列操做。在後續的版本中,DolphinDB將以行函數(rowRank,rowSum等)表示橫截面操做的語義,其它向量函數表示時間序列操做,從而系統可以自動識別一個因子中的橫截面操做和時間序列操做,進一步自動構建引擎流水線。

流水線處理和多個流表的級聯處理有很大的區別。二者能夠完成相同的任務,可是效率上有很大的區別。後者涉及多個流數據表與屢次訂閱。前者實際上只有一次訂閱,全部的計算均在一個線程中依次順序完成,於是有更好的性能。

七、展望

響應式狀態引擎內置了大量經常使用的狀態算子,支持自定義狀態函數,也可與其餘流式計算引擎以流水線的方式任意組合,方便開發人員快速實現複雜的金融高頻因子。

內置的狀態算子所有使用C++開發實現,算法上通過了大量的優化,以增量方式實現狀態算子的流式計算,於是在單個線程上的計算達到了很是好的性能。對於規模較大的任務,能夠經過訂閱過濾的方式,拆分紅多個子訂閱,由多個節點以及每一個節點的多個CPU並行完成訂閱計算。後續的版本將完善計算子做業的建立、管理和監控功能,從手動轉變爲自動。

相關文章
相關標籤/搜索