實時流處理通常是將業務系統產生的數據進行實時收集,交由流處理框架進行數據清洗,統計,入庫,並能夠經過可視化的方式對統計結果進行實時的展現。傳統的面向靜態數據表的計算引擎沒法勝任流數據領域的分析和計算任務。在金融交易、物聯網、互聯網/移動互聯網等應用場景中,複雜的業務需求對大數據處理的實時性提出了更高的要求。對於這一類高實時性需求的場景,須要一個快速、高效、靈活可控的流式大數據處理平臺來支撐。算法
DolphinDB內置的流數據框架支持流數據發佈、訂閱、流數據預處理、實時內存計算、複雜指標的滾動窗口計算等,是一個運行高效,使用便捷的流數據處理框架。數據庫
與其它流數據系統相比,DolphinDB database 流數據處理系統的優勢在於:緩存
- 吞吐量大,低延遲
- 與時序數據庫及數據倉庫集成,一站式解決方案
- 自然具有流表對偶性,支持SQL語句數據注入和查詢分析
本教程包含如下內容:服務器
- DolphinDB流數據框架及概念
- 使用DolphinDB流數據
- 使用Java API來訂閱DolphinDB流數據
- 監控流數據運行狀態
- 流數據性能調優
- 與開源系統Grafana結合使用
1. DolphinDB流數據框架及概念
流數據框架對流數據的管理和應用是基於發佈-訂閱-消費的模式,經過流數據表來發布數據,數據節點或者第三方的應用能夠經過DolphinDB腳本或者 API來訂閱消費流數據。網絡
上圖展現了DolphinDB的流數據處理框架。把實時數據注入到發佈節點流數據表後,發佈的數據能夠同時供多方訂閱消費:併發
- 可由數據倉庫訂閱並保存,做爲分析系統與報表系統的數據源。
- 能夠由聚合引擎訂閱,進行聚合計算,並將聚合結果輸出到流數據表。聚合結果既能夠由Grafana等流數據展現平臺展現,也能夠做爲數據源再次發佈出去,供二次訂閱作事件處理。
- 也可由API訂閱,例如第三方的Java應用程序能夠經過Java API訂閱流數據,應用到業務系統中。
1.1 實時流數據表app
DolphinDB實時流數據表能夠做爲發佈和訂閱流數據的載體,發佈一條消息等價於往流數據表插入一條記錄,同時它也能夠經過SQL來進行查詢和分析。框架
1.2 發佈和訂閱異步
DolphinDB的流數據框架使用了經典的訂閱發佈模式。每當有新的流數據寫入時,發佈方會通知全部的訂閱方去處理新的流數據。數據節點經過subscribeTable函數來訂閱發佈的流數據。函數
1.3 實時聚合引擎
實時聚合引擎指的是專門用於處理流數據實時計算和分析的模塊。DolphinDB提供createStreamAggregator函數用於持續地對流數據作實時聚合計算,而且將計算結果持續輸出到指定的數據表中,具體如何使用聚合引擎能夠參考流數據聚合引擎教程。
2. 使用DolphinDB流數據
要開啓DolphinDB支持流數據功能的模塊,須要對DolphinDB數據節點增長配置項。
對於發佈節點須要的配置項:
maxPubConnections:發佈信息節點可以鏈接的最大節點數。若是maxPubConnections>0,節點能夠做爲信息發佈節點。默認值爲0。 persisitenceDir:共享的流數據表保存的路徑。若是須要保存流數據表,必須指定該參數。 persistenceWorkerNum:負責以異步模式保存流數據表的工做線程數。默認值爲0。 maxPersistenceQueueDepth:異步保存流數據表時,容許的最大消息隊列深度。 maxMsgNumPerBlock:當服務器發佈或組合消息時,消息塊中的最大記錄數。 maxPubQueueDepthPerSite:發佈節點可允許的最大消息隊列深度。
對於訂閱節點須要的配置項:
subPort:訂閱線程監聽的端口號。當節點做爲訂閱節點時,該參數必須指定。默認值爲0。 subExecutors:訂閱節點中消息處理線程的數量。默認值爲0,表示解析消息線程也處理消息。 maxSubConnections:服務器可以接收的最大的訂閱鏈接數。默認值是64。 subExecutorPooling: 表示執行流計算的線程是否處於pooling模式的布爾值。默認值是false。 maxSubQueueDepth:訂閱節點可允許的最大消息隊列深度。
2.1 流數據發佈
定義一個streamTable,向其寫入數據即意味着發佈流數據,因爲流數據表須要被不一樣的會話訪問,因此要使用share,將流數據表進行共享。下面的例子中,定義並共享流數據表pubTable,向pubTable表寫入數據即意味着發佈數據:
share streamTable(10000:0,`timestamp`temperature, [TIMESTAMP,DOUBLE]) as pubTable
2.2 流數據訂閱
訂閱數據經過subscribeTable函數來實現。
語法以下:
subscribeTable([server], tableName, [actionName], [offset=-1], handler, [msgAsTable=false], [batchSize=0], [throttle=1], [hash=-1], [reconnect=false], [filter], [persistOffset=false])
參數說明:
- 只有tableName和handler兩個參數是必需的。其餘全部參數都是可選參數。
- server:字符串,表示服務器的別名或流數據所在的xdb鏈接服務器。若是它沒有指定,或者爲空字符串,表示服務器是本地實例。
實際狀況中,發佈者與訂閱者的關係有三種可能。下面的例子解釋這三種狀況的server參數如何構造:
- 發佈者與訂閱者是同一節點。
subscribeTable(, "pubTable", "actionName", 0, subTable , true)
發佈者與訂閱者是同一集羣內的不一樣節點。此處發佈節點別名爲「NODE2」。
subscribeTable("NODE2", "pubTable", "actionName", 0, subTable , true)
發佈者與訂閱者不在同一個集羣內。此處發佈者節點爲 (host="192.168.1.13",port=8891)。
pubNodeHandler=xdb("192.168.1.13",8891) subscribeTable(pubNodeHandler, "pubTable", "actionName", 0, subTable , true)
- tableName:被訂閱的數據表名。
share streamTable(10000:0,`ts`temp, [TIMESTAMP,DOUBLE]) as subTable subscribeTable(, "pubTable", "actionName", 0, subTable , true)
- actionName:流數據能夠針對各類場景分別訂閱消費。同一份流數據,可用於實時聚合運算,同時亦可將其存儲到數據倉庫供第三方應用作批處理。subscribeTable函數提供了actionName參數以區分同一個流數據表被訂閱用於不一樣場景的狀況。
share streamTable(10000:0,`ts`temp, [TIMESTAMP,DOUBLE]) as subTable topic1 = subscribeTable(, "pubTable", "actionName_realtimeAnalytics", 0, subTable , true) topic2 = subscribeTable(, "pubTable", "actionName_saveToDataWarehouse", 0, subTable , true)
subscribeTable函數的返回值是訂閱主題,它是訂閱表所在節點的別名,流數據表名稱和訂閱任務名稱(若是指定了actionName)的組合,使用下劃線分隔。若是訂閱主題已經存在,函數將會拋出異常。當前節點別名爲:NODE1,上述例子返回的兩個topic內容以下:
topic1:
NODE1/pubTable/actionName_realtimeAnalytics
topic2:
NODE1/pubTable/actionName_saveToDataWarehouse
- offset:訂閱任務開始後的第一條消息所在的位置。消息是流數據表中的行。若是沒有指定offset,或爲-1,或超過了流數據表的記錄行數,訂閱將會從流數據表的當前行開始。若是offset=-2,系統會自動獲取持久化到磁盤上的offset,並從該位置開始訂閱。offset與流數據表建立時的第一行對應。若是某些行由於內存限制被刪除,在決定訂閱開始的位置時,這些行仍然考慮在內。 下面的示例說明offset的做用,向pubTable寫入100行數據,創建三個訂閱,分別從102,-1,50行開始訂閱:
share streamTable(10000:0,`timestamp`temperature, [TIMESTAMP,DOUBLE]) as pubTable share streamTable(10000:0,`ts`temp, [TIMESTAMP,DOUBLE]) as subTable1 share streamTable(10000:0,`ts`temp, [TIMESTAMP,DOUBLE]) as subTable2 share streamTable(10000:0,`ts`temp, [TIMESTAMP,DOUBLE]) as subTable3 vtimestamp = 1..100 vtemp = norm(2,0.4,100) tableInsert(pubTable,vtimestamp,vtemp) topic1 = subscribeTable(, "pubTable", "actionName1", 102,subTable1 , true) topic1 = subscribeTable(, "pubTable", "actionName2", -1, subTable2 , true) topic1 = subscribeTable(, "pubTable", "actionName3", 50,subTable3 , true)//50
從結果看到,subTable1,subTable2都沒有數據,subTable3有50條數據,說明只有當offset在從0到數據集記錄數之間才能正常起做用,不然訂閱會從當前行開始,只有當新數據進入發佈表時才能訂閱到數據。
- handler:一元函數或表。它用於處理訂閱數據。若它是函數,其惟一的參數是訂閱到的數據。訂閱數據能夠是一個數據表或元組,訂閱數據表的每一個列是元組的一個元素。咱們常常須要把訂閱數據插入到數據表。爲了方便使用,handler也能夠是一個數據表,而且訂閱數據能夠直接插入到該表中。 下面的示例展現handler的兩種用途,subTable1直接把訂閱數據寫入目標table,subTable2經過自定義函數myHandler將數據進行過濾後寫入。
def myhandler(msg){ t = select * from msg where temperature>0.2 if(size(t)>0) subTable2.append!(t) } share streamTable(10000:0,`timestamp`temperature, [TIMESTAMP,DOUBLE]) as pubTable share streamTable(10000:0,`ts`temp, [TIMESTAMP,DOUBLE]) as subTable1 share streamTable(10000:0,`ts`temp, [TIMESTAMP,DOUBLE]) as subTable2 topic1 = subscribeTable(, "pubTable", "actionName1", -1, subTable1 , true) topic1 = subscribeTable(, "pubTable", "actionName2", -1, myhandler , true) vtimestamp = 1..10 vtemp = 2.0 2.2 2.3 2.4 2.5 2.6 2.7 0.13 0.23 2.9 tableInsert(pubTable,vtimestamp,vtemp)
從結果能夠看到pubTable寫入10條數據,subTable1是所有接收了,而subTable2通過myhandler過濾掉了0.13,收到9條數據。
- msgAsTable:表示訂閱的數據是否爲表的布爾值。默認值是false,表示訂閱數據是由列組成的元組。 訂閱數據格式的不一樣經過下面的示例展現:
def myhandler1(table){ subTable1.append!(table) } def myhandler2(tuple){ tableInsert(subTable2,tuple[0],tuple[1]) } share streamTable(10000:0,`timestamp`temperature, [TIMESTAMP,DOUBLE]) as pubTable share streamTable(10000:0,`ts`temp, [TIMESTAMP,DOUBLE]) as subTable1 share streamTable(10000:0,`ts`temp, [TIMESTAMP,DOUBLE]) as subTable2 //msgAsTable = true topic1 = subscribeTable(, "pubTable", "actionName1", -1, myhandler1 , true) //msgAsTable = false topic2 = subscribeTable(, "pubTable", "actionName2", -1, myhandler2 , false) vtimestamp = 1..10 vtemp = 2.0 2.2 2.3 2.4 2.5 2.6 2.7 0.13 0.23 2.9 tableInsert(pubTable,vtimestamp,vtemp)
- batchSize:一個整數,表示批處理的消息的行數。若是它是正數,直到消息的數量達到batchSize時,handler纔會處理進來的消息。若是它沒有指定或者是非正數,消息一進來,handler就會處理消息。 下面示例展現當batchSize設置爲11時,先向pubTable寫入10條數據,觀察訂閱表,而後再寫入1條數據,再觀察數據。
//batchSize share streamTable(10000:0,`timestamp`temperature, [TIMESTAMP,DOUBLE]) as pubTable share streamTable(10000:0,`ts`temp, [TIMESTAMP,DOUBLE]) as subTable1 topic1 = subscribeTable(, "pubTable", "actionName1", -1, subTable1 , true, 11) vtimestamp = 1..10 vtemp = 2.0 2.2 2.3 2.4 2.5 2.6 2.7 0.13 0.23 2.9 tableInsert(pubTable,vtimestamp,vtemp) print size(subTable1)//0 insert into pubTable values(11,3.1) print size(subTable1)//11
從結果能夠看到,當發佈數據累計到11條時,數據才進入到subTable1。
- throttle:一個整數,表示handler處理進來的消息以前等待的時間,以秒爲單位。默認值爲1。若是沒有指定batchSize,throttle將不會起做用。
batchSize是用來作數據緩衝使用,有時候流數據的寫入頻率很是高,當消費能力跟不上數據進入的速度時,須要進行流量控制,否者訂閱端緩衝區很快會堆積數據並耗光全部的內存。 throttle設定一個時間,根據訂閱端的消費速度定時放一批數據進來,保障訂閱端的緩衝區數據量穩定。
- hash:一個非負整數,指定某個訂閱線程處理進來的消息。若是沒有指定該參數,系統會自動分配一個線程。若是須要使用一個線程來處理多個訂閱任務的消息,把訂閱任務的hash設置爲相同的值。當須要在兩個或多個訂閱的處理過程當中保持消息數據的同步,能夠將多個訂閱的hash值設置成相同,這樣就能使用同一個線程來同步處理多個數據源,不會出現數據處理有前後致使結果偏差。
- reconnect是一個布爾值。默認值爲false,表示若是網絡異常等問題致使訂閱中斷,訂閱端不會自動從新訂閱,若是設置爲true,訂閱端會在網絡正常時,自動從中斷位置從新訂閱。若是發佈端崩潰致使訂閱中斷,那麼訂閱端會在發佈端重啓後不斷嘗試從新訂閱。若發佈端對流數據表啓用了持久化,那麼發佈端重啓後會首先讀取硬盤上的數據,直到發佈端讀取到訂閱中斷位置的數據,訂閱端才能成功從新訂閱。若發佈端沒有對流數據表啓用持久化,那麼將從新訂閱失敗。若是訂閱端崩潰致使訂閱中斷,即便設置了reconnect=true,訂閱端重啓後也沒法自動從新訂閱。
- filter是一個向量。該參數須要配合
setStreamTableFilterColumn
函數一塊兒使用。使用setStreamTableFilterColumn
指定流數據表的過濾列,流數據表過濾列在filter中的數據纔會發佈到訂閱端,不在filter中的數據不會發布。filter不支持過濾BOOL類型數據。 - persistOffset是一個布爾值,表示是否持久化保存本次訂閱已經處理的數據的偏移量,默認值爲false。持久化保存的偏移量用於重訂閱,可經過
getTopicProcessedOffset
函數獲取。
例如,訂閱時把persistOffset設置爲true。
share streamTable(1000:0, `time`sym`qty, [TIMESTAMP, SYMBOL, INT]) as trades trades_slave = streamTable(1000:0, `time`sym`qty, [TIMESTAMP, SYMBOL, INT]) topic=subscribeTable(, "trades", "trades_slave", 0, append!{trades_slave}, true,0,1,-1,false,,true) def writeData(n){ timev = 2018.10.08T01:01:01.001 + timestamp(1..n) symv =take(`A`B, n) qtyv = take(1, n) insert into trades values(timev, symv, qtyv) } writeData(6); select * from trades_slave time sym qty ----------------------- --- --- 2018.10.08T01:01:01.002 A 1 2018.10.08T01:01:01.003 B 1 2018.10.08T01:01:01.004 A 1 2018.10.08T01:01:01.005 B 1 2018.10.08T01:01:01.006 A 1 2018.10.08T01:01:01.007 B 1 getTopicProcessedOffset(topic); 5
2.3 斷線重連
DolphinDB的流數據訂閱提供了自動重連的功能,當reconnect設爲true時,訂閱端會記錄流數據的offset,鏈接中斷時訂閱端會從offset開始從新訂閱。可是,自動重連機制具備必定的侷限性。若是訂閱端崩潰或者發佈端沒有對流數據持久化,訂閱端沒法自動重連。所以,若是要啓用自動重連,發佈端必須對流數據持久化。啓用持久化請參考2.6節。
2.4 發佈端數據過濾
發佈端能夠過濾數據,只發布符合條件的數據。使用setStreamTableFilterColumn
指定流數據表的過濾列,流數據表過濾列在filter中的數據纔會發佈到訂閱端,不在filter中的數據不會發布。目前僅支持對一個列進行過濾。例如,發佈端上的流數據表trades只發布symbol爲IBM或GOOG的數據:
share streamTable(10000:0,`time`symbol`price, [TIMESTAMP,SYMBOL,INT]) as trades setStreamTableFilterColumn(trades, `symbol) trades_slave=table(10000:0,`time`symbol`price, [TIMESTAMP,SYMBOL,INT]) filter=symbol(`IBM`GOOG) subscribeTable(, `trades,`trades_slave,,append!{trades_slave},true,,,,,filter)
2.5 取消訂閱
每一次訂閱都由一個訂閱主題topic做爲惟一標識。若是訂閱時topic已存在,那麼會訂閱失敗。這時須要經過unsubscribeTable函數取消訂閱才能再次訂閱。取消訂閱示例以下:
//unsubscribe a local table unsubscribeTable(,"pubTable","actionName1") //unsubscribe a remote table unsubscribeTable("NODE_1","pubTable","actionName1")
若要刪除共享的流數據表,可使用undef函數。
undef("pubStreamTable",SHARED)
2.6 數據持久化
默認狀況下,流計算的表把全部數據保存在內存中。隨着流數據持續寫入,系統可能會出現內存不足的狀況。爲了不這個問題,咱們能夠設置流數據持久化到磁盤。若是流數據表的行數達到設定的界限值,前面一半的記錄行會從內存轉移到磁盤。持久化的數據支持重訂閱,當訂閱指定數據下標時,下標的計算包含持久化的數據。流數據持久化另外一個重要的功能是流數據的備份和回覆,當節點出現異常重啓時,持久化的數據會在重啓時自動載入到流數據表。
要啓動數據持久化,首先要在節點的配置文件中添加持久化路徑:
persisitenceDir = /data/streamCache
在腳本中使用enableTablePersistence函數設置針對某一個流數據表啓用持久化。 下面的示例針對pubTable表啓用持久化,其中asyn=true,compress=true, cacheSize=1000000,即當流數據表達到100萬行數據時啓用持久化,採用異步方式壓縮保存。
對於持久化是否啓用異步,須要對持久化數據一致性和性能之間做權衡,當流數據的一致性要求極高時,可使用同步方式,這樣只有保證持久化作完,數據纔會進入發佈隊列;若對實時性要求極高,不但願磁盤IO影響到流數據的實時性,那麼能夠啓用異步方式,只有啓用異步方式時,持久化工做線程數persistenceWorkerNum配置項纔會起做用,當有多個publisher表須要持久化,增長persistenceWorkerNum能夠提高異步保存的效率。
enableTablePersistence(pubTable,true, true, 1000000)
當不須要保存在磁盤上的流數據時,經過clearTablePersistence函數能夠刪除持久化數據。
clearTablePersistence(pubTable)
當整個流數據寫入結束時,可使用disableTablePersistence命令關閉持久化。
disableTablePersistence(pubTable)
使用getPersistenceMeta函數獲取流數據表的持久化細節狀況:
getPersistenceMeta(pubTable)
輸出的結果是一個字典。
//內存中保留數據記錄數 sizeInMemory->0 //啓用異步持久化 asynWrite->true //流數據表總記錄數 totalSize->0 //啓用壓縮存儲 compress->true //當前內存中數據相對總記錄數的偏移量,在持久化運行過程當中遵循公式 memoryOffset = totalSize - sizeInMemory memoryOffset->0 //已經持久化到磁盤的數據記錄數 sizeOnDisk->0 //持久化路徑 persistenceDir->/hdd/persistencePath/pubTable //hashValue是對本表作持久化的工做線程標識,當配置項persistenceWorkerNum>1時,hashValue可能不爲0 hashValue->0
3. 使用Java API來訂閱DolphinDB流數據
當流數據進入DolphinDB併發布以後,數據的消費者多是DolphinDB自己的聚合引擎,也多是第三方的消息隊列或者第三方程序。因此DolphinDB提供了Streaming API供第三方程序來訂閱流數據。當有新數據進入時,這些經過API的訂閱者可以及時的接收到通知,這使得DolphinDB的流數據框架可與第三方的應用作深刻的整合。目前DolphinDB提供Java流數據API,後續會逐步支持C++、C#等流數據API。
Java API處理數據的方式有兩種:輪詢方式(Polling)和事件方式(EventHandler)。
輪詢方式示例代碼(Java):
PollingClient client = new PollingClient(subscribePort); TopicPoller poller1 = client.subscribe(serverIP, serverPort, tableName, offset); while (true) { ArrayList<IMessage> msgs = poller1.poll(1000); if (msgs.size() > 0) { BasicInt value = msgs.get(0).getEntity(2); //取數據中第一行第二個字段 } }
當每次流數據表有新數據發佈時,poller1會拉取到新數據,不然會阻塞在poller1.poll方法這裏等待。
事件方式示例代碼:
Java API使用預先設定的MessageHandler獲取和處理新數據。 首先須要調用者先定義數據處理器Handler,Handler須要實現com.xxdb.streaming.client.MessageHandler接口。
Handler實現示例以下:
public class MyHandler implements MessageHandler { public void doEvent(IMessage msg) { BasicInt qty = msg.getValue(2); //..處理數據 } }
在啓動訂閱時,把handler實例做爲參數傳入訂閱函數。
ThreadedClient client = new ThreadedClient(subscribePort); client.subscribe(serverIP, serverPort, tableName, new MyHandler(), offsetInt);
當每次流數據表有新數據發佈時,API會調用MyHandler方法,並將新數據經過msg參數傳入。
4. 監控流數據運行狀態
當流數據經過訂閱方式進行數據的實時處理,全部的計算都在後臺進行,用戶沒法直觀的看到運行的狀況。DolphinDB提供getStreamingStat函數,能夠全方位監控流數據狀態。
getStreamingStat函數返回的是一個tuple結構,其中包含了pubConns, subConns, persistWorker, subWorkers四個表。
4.1 pubConns
pubConns表是流數據發佈者狀態監控。每一個發佈者線程的最大隊列深度默認是1000萬。
列名稱 說明 client 發佈端信息,記錄發佈端IP和端口 queueDepthLimit 發佈端數據緩衝區隊列最大限制 queueDepth 發佈端數據緩衝區隊列深度 tables 發佈的流數據表,多表經過,號分隔
查看錶內容:
getStreamingStat().pubConns client queueDepthLimit queueDepth tables 192.168.1.61:8086 10,000,000 0 st1,st
pubConns表會列出當前全部的publisher節點,發佈隊列狀況,以及發佈的流數據表名稱。
4.2 subConns
subConns表是流數據訂閱者連接狀態監控。
列名稱 說明 publisher 發佈端信息,記錄發佈端IP和端口 cumMsgCount 累計訂閱消息數 cumMsgLatency 累計消息延遲時間(毫秒) lastMsgLatency 最後一次訂閱數據延遲時間(毫秒) lastUpdate 最後一次數據更新時間
查看錶內容:
getStreamingStat().subConns publisher cumMsgCount cumMsgLatency lastMsgLatency lastUpdate local8081 199,980 19,799 10,990 2018.11.21T07:19:59.767945044
這張表列出全部非本地訂閱方的連接狀態和消息統計信息。
4.3 persistWorkers
persistWorkers 表是持久化工做線程監控。每一個持久化工做線程的最大隊列深度默認是1000萬。
列名稱 說明 workerId worker編號 queueDepthLimit 隊列深度限制 queueDepth 隊列深度 tables 持久化表
只有當持久化啓用時,才能經過getStreamingStat獲取這張表,這裏記錄了全部持久化的表信息,這張表的記錄數等於persistenceWorkerNum配置數。好比持久化兩張數據表,能夠經過getStreamingStat().persistWorkers查看。
當persistenceWorkerNum=1時,結果爲:
getStreamingStat().persistWorkers workerId queueDepthLimit queueDepth tables 0 10,000,000 0 st1,st
當persistenceWorkerNum=3時,結果爲:
getStreamingStat().persistWorkers workerId queueDepthLimit queueDepth tables 0 10,000,000 0 st 1 10,000,000 0 st1 2 10,000,000 0
從結果能夠看出,persistenceWorkerNum爲持久化數據表提供並行化能力。
4.4 subWorkers
subWorkers表是流數據訂閱者工做線程監控,這張表每條記錄表明一個訂閱工做線程。每一個訂閱者線程的最大隊列深度默認是1000萬。
列名稱 說明 workerId worker編號 queueDepthLimit 訂閱端數據緩衝區隊列最大限制 queueDepth 訂閱端數據緩衝區隊列深度 processedMsgCount 已處理消息數量 failedMsgCount 處理失敗消息數量 lastErrMsg 上次失敗的消息 topics 已訂閱主題
配置項subExecutors, subExecutorPooling這兩個配置項的對流數據處理的影響,在這張表上能夠獲得充分的展示。在訂閱兩張流數據表st、st1時,能夠經過getStreamingStat().subWorkers查看。
當subExecutorPooling=false, subExecutors=1時,結果以下:
getStreamingStat().subWorkers workerId queueDepthLimit queueDepth processedMsgCount failedMsgCount lastErrMsg topics 0 10,000,000 0 0 0 local8081/st/st_act,local8081/st1/st1_act
此時,全部表的訂閱消息共用一個線程隊列。
當subExecutorPooling=false, subExecutors=2時,結果以下:
getStreamingStat().subWorkers workerId queueDepthLimit queueDepth processedMsgCount failedMsgCount lastErrMsg topics 0 10,000,000 0 0 0 local8081/st/st_act 0 10,000,000 0 0 0 local8081/st/st_act
此時,各個表訂閱消息分配到兩個線程隊列獨立處理。
當subExecutorPooling=true, subExecutors=2時,結果以下:
getStreamingStat().subWorkers workerId queueDepthLimit queueDepth processedMsgCount failedMsgCount lastErrMsg topics 0 10,000,000 0 0 0 local8081/st/st_act,local8081/st1/st1_act 0 10,000,000 0 0 0 local8081/st/st_act,local8081/st1/st1_act
此時,各個表的訂閱消息共享由兩個線程組成的線程池。
當有流數據進入時,能夠經過這個表觀察到已處理數據量等信息:
getStreamingStat().subWorkers workerId queueDepthLimit queueDepth processedMsgCount failedMsgCount lastErrMsg topics 0 10,000,000 0 100,621 0 local8081/st/st_act,local8081/st1/st1_act 0 10,000,000 0 99,359 0 local8081/st/st_act,local8081/st1/st1_act
5. 流數據性能調優
當數據流量極大而系統來不及處理時,系統監控中會看到訂閱端subWorkers表的queueDepth數值極高,此時系統會按照從訂閱端隊列-發佈端隊列-數據注入端逐級反饋數據壓力。當訂閱端隊列深度達到上限時開始阻止發佈端數據進入,此時發佈端的隊列開始累積,當發佈端的隊列深度達到上限時,系統會阻止流數據注入端寫入數據。這時能夠經過如下幾種方式來調整,使得系統對流數據的處理性能達到最優化。
- 調整訂閱參數中的batchSize和throttle參數,來組織數據的批處理和控制接收數據的流量,平衡發佈端和訂閱端的緩存,讓數據處理速度和流數據輸入速度達到一個動態的平衡。batchSize能夠設定等待流數據積累到必定量時才進行消費,能夠充分發揮數據批量處理的性能優點,可是這樣會帶來必定程度的內存佔用;而當batchSize比較大的時候,特殊狀況下會發生數據量一直沒有達到batchSize而長期滯留在緩衝區的狀況,throttle參數值是一個時間間隔,它的做用是即便batchSize未知足,也能將緩衝區的數據消費掉。
- 能夠經過調整subExecutors配置參數,增長訂閱端計算的並行度,來加快訂閱端隊列的消費速度。系統默認採用哈希算法爲每個訂閱分配一個executor。 在訂閱處理過程當中,若是須要確保兩個訂閱用同一個executor來處理,能夠在訂閱函數subscribeTable中指定參數hash的值。兩個訂閱使用相同的hash值,來指定用同一個線程來處理這兩個訂閱數據流,這樣能夠保證這兩個流數據表的時序同步。當有多個executor存在時,若是不一樣訂閱的數據流頻率不均或者處理複雜度差別很大,容易致使低負載的executor資源閒置。經過設置subExecutorPooling=true,可讓全部executor做爲一個共享線程池,共同處理全部訂閱的消息。在這種共享池模式下,全部訂閱的消息進入同一個隊列,多個executor從隊列中讀取消息並行處理。共享線程池處理流數據的一個反作用是不能保證消息按到達的時間順序處理。當實際場景對消息處理的時間順序有嚴格要求時,不能開啓此設置。
- 若流數據表(stream table)啓用同步持久化,那麼磁盤的IO可能會成爲瓶頸。一種處理方法是參考2.4採用異步方式持久化數據,同時設置一個合理的持久化隊列(maxPersistenceQueueDepth,默認值10000000條消息)。固然也能夠經過更換硬件,提供更高寫入性能的存儲設備好比SSD硬盤來提升寫入性能。持久化路徑經過參數persistence來設置。
- 若是數據發佈端(publisher)成爲系統的瓶頸,譬如訂閱的客戶端太多可能致使發佈瓶頸,能夠採用幾種處理辦法。首先能夠經過多級級聯下降每個發佈節點的訂閱數量,對延遲不敏感的應用能夠訂閱二級甚至三級的發佈節點。其次能夠調整部分參數來平衡延遲和吞吐量兩個指標。參數maxMsgNumBlock設置批量發送消息時批的大小,默認值是1024。通常狀況下,批的值較大,吞吐量能提高,但網絡延遲會增長。
- 若輸入流數據的流量波動較大,高峯期致使消費隊列積壓至隊列峯值(默認1000萬),那麼能夠修改配置項maxPubQueueDepthPerSite和maxSubQueueDepth來增長髮布端和訂閱端的最大隊列深度,提升系統抵抗數據流大幅波動的能力。隊列深度增長時,內存消耗會增長,要正確估算內存的使用量,合理配置內存。
6. 流數據的展現
流數據可視化按功能能夠分爲兩種可視化類型:
- 一種是實時值監控,用滑動窗口固定一個時間區間,把流數據聚合爲一個值,並定時刷新,一般用於指標的監控和預警;
- 另外一種是趨勢監控,把新產生的數據附加到原有的數據上並以可視化圖表的方式漸進更新,一般用於作數據全局分析。