DolphinDB和Druid都是分佈式的分析型時序數據庫。儘管前者使用c++開發,後者使用java開發,二者在架構、功能、應用場景等方面有不少共同點。本報告在SQL查詢、數據導入、磁盤佔用空間等方面對二者進行性能的對比測試。java
測試數據集使用約300GB的美國股票市場交易與報價數據。經過測試咱們發現:node
DolphinDB是一款分析型的分佈式時序數據庫,由C++編寫,內置流數據處理引擎,並行計算引擎和分佈式計算的功能。DolphinDB database 內置分佈式文件系統,支持集羣水平和垂直擴展。提供類SQL和Python的腳本語言,不只能夠用SQL進行對數據進行操做,也能夠完成更爲複雜的內存計算。提供其它經常使用編程語言的API,方便與已有應用程序集成。DolphinDB能對萬億級數據快速處理,在金融領域中的歷史數據分析建模與實時流數據處理,以及物聯網領域中的海量傳感器數據處理與實時分析等場景中均有很是出色的表現。mysql
Druid是一個由Java語言實現的OLAP數據倉庫,適用於萬億級別數據量上的低延時查詢和插入以及實時流數據分析。Druid採用分佈式、SN架構和列式存儲、倒排索引、位圖索引等關鍵技術,具備高可用性和高擴展性的特色。同時,Druid提供了多種語言接口,支持部分SQL。c++
2.1 硬件配置web
本次測試的硬件配置以下:算法
設備:DELL OptiPlex 7060sql
CPU:Inter(R) Core™ i7-8700 CPU @ 3.20GHz,6核心12線程數據庫
內存:32GB編程
硬盤:256GB SSD,1.8TB希捷ST2000DM008-2FR102機械硬盤緩存
操做系統:Ubuntu 16.04 x64
2.2 環境配置
本次的測試環境爲單服務器下的多節點集羣。設置DolphinDB的數據節點的個數爲4個,單個數據節點最大可用內存設置爲4GB。設置Druid的節點個數爲5個,分別爲overload,broker,historical,coordinator和middleManager。Druid默認對查詢結果進行緩存,影響測試時經過屢次查詢求平均值這個方法的正確性,故關閉query cache的功能。爲不影響Druid的寫入性能測試, 關閉了Druid的roll up功能。其餘配置均服從默認配置。
原始csv文件存儲在HDD上。數據庫存儲在SSD上。
本次測試採用了2007年8月美國股票市場level1的TAQ數據集。TAQ數據集按日分爲23個csv文件,單個文件大小在7.8G到19.1G不等,整個數據集大小約290G,共有6,561,693,704條數據。
測試數據集TAQ在DolphinDB和Druid中各個字段的數據類型以下所示:
在Druid中,DATE字段指定爲timestamp列。其它字段均用做dimension字段。
在DolphinDB中,採用股票代碼+日期組合分區,其中按照股票代碼範圍分爲128個分區,按照日期分爲23個分區。
Druid僅支持時間範圍分區,所以咱們把DATE列指定爲timestamp類型,以日爲單位,共劃分爲23個分區。
咱們從數據庫查詢性能、I/O性能以及磁盤佔用空間三方面對DolphinDB和Druid進行了對比測試。
5.1 數據庫查詢性能
DolphinDB腳本語言支持SQL語法,同時針對時序數據進行了功能擴展。Druid提供了基於Json數據格式的語言進行查詢,同時也提供了dsql來進行SQL查詢。本次測試使用Druid自帶的dsql。
咱們對TAQ數據集進行了若干種經常使用的SQL查詢。爲了減小偶然因素對結果的影響,本次查詢性能測試對每種查詢操做均進行了10次,對總時間取平均值,時間以毫秒爲單位。測試DolphinDB時,咱們使用了timer語句來評估SQL語句在服務端的執行時間。因爲Druid中沒有提供輸出查詢時間的工具或函數,採用了客戶端命令行工具dsql打印的執行時間。Druid返回的執行時間相比DolphinDB,多了查詢結果的傳輸和顯示時間。因爲查詢返回的數據量都很小,dsql與Druid服務器又在同一個節點上,影響的時間在1ms左右。1ms左右的時間不影響咱們的測試結論,所以沒有作特殊處理。
7個查詢的SQL表示以下表所示。
測試結果以下表所示。
從結果能夠看出,對於幾乎全部查詢,DolphinDB的性能都優於Druid,速度大約是Druid的3到30倍。
因爲Druid只容許根據時間戳進行segment的劃分,而DolphinDB容許從多個維度上對數據進行劃分,在TAQ分區時用了時間和股票代碼兩個維度,所以在查詢中須要根據股票代碼過濾或分組的測試(如第一、三、六、7項測試)中,DolphinDB的優點更明顯。
5.2 I/O性能測試
咱們測試了DolphinDB和Druid在導入單個文件(7.8G)和多個文件(290.8G)時的性能。公平起見,咱們關閉了Druid的Roll up功能。測試結果以下表所示,時間以秒爲單位。
相同狀況下導入單個文件,Druid的導入時間是DolphinDB的16倍以上,導入多個文件時,因爲DolphinDB支持並行導入,速度相比Druid更快。數據導入腳本見附錄2。
5.3 磁盤佔用空間測試
數據導入到DolphinDB和Druid後,咱們比較了二者的數據壓縮率。測試結果以下表所示。
DolphinDB採用LZ4壓縮算法,對列式儲存的數據進行快速壓縮。DolphinDB的SYMBOL類型在壓縮以前,會使用字典編碼,將字符串轉化成整型。Druid在數據儲存過程當中,對timestamp和metrics採用LZ4算法直接壓縮,對dimensions字段使用字典編碼、位圖索引以及roaring bitmap進行壓縮。使用字典編碼能夠減小字符串存儲的空間,位圖索引可快速地進行按位邏輯操做,位圖索引壓縮進一步節約了儲存空間。
本次測試中,DolphinDB數據庫佔用的磁盤空間比Druid高出約80%。形成這個差別的主要因素是BID和OFR兩個浮點型字段在DolphinDB和Druid上的壓縮比有很大的差別。在DolphinDB上,這個兩個字段的壓縮比是20%,而在Druid上高達5%。緣由是測試數據集是一個歷史數據集,數據已經按照日期和股票兩個字段排序。一個股票在短期內的報價變化很小,unique的報價個數很是有限,Druid使用位圖壓縮的效果很是好。
雖然Druid數據庫的壓縮比更高,靜態的磁盤空間佔用較小,可是Druid運行時會產生segment cache目錄,總的磁盤空間佔用達到65 GB。而DolphinDB運行時不須要額外的空間,總的磁盤空間反而比Druid略小。
DolphinDB對於Druid的性能優點來自於多個方面,包括(1)存儲機制和分區機制上的差異,(2)開發語言(c++ vs java)上的差異,(3)內存管理上的差異,以及(4)算法(如排序和哈希)實現上的差異。
在分區上,Druid只支持時間類型的範圍分區,相對於支持值分區、範圍分區、散列分區和列表分區且每張表可根據多個字段進行組合分區的DolphinDB而言缺少靈活性。DolphinDB的分區粒度更細,不易出現數據或查詢集中到某個節點的狀況,在查詢時DolphinDB所須要掃描的數據塊也更少,響應時間更短,性能也更加出色。
除去性能,DolphinDB在功能上比Druid也更爲完善。在SQL的支持方面,DolphinDB支持很是強大的window function機制,對SQL join的支持也更爲全面。對時序數據特有的sliding function,asof join, window join,DolphinDB都有很好的支持。DolphinDB集數據庫、編程語言和分佈式計算於一體,除了常規的數據庫查詢功能,DolphinDB也支持更爲複雜的內存計算,分佈式計算以及流計算。
DolphinDB和Druid在運行方式上也略有區別。在Druid崩潰後或是將segment-cache清空後重啓時,須要花大量的時間從新加載數據,將每個segment解壓到segment-cache中再進行查詢,效率較低,cache也會佔用較大的空間,所以Druid在從新啓動時須要等待較長的時間,而且要求更大的空間。
附錄1. 環境配置
(1) DolphinDB配置
controller.cfg
localSite=localhost:9919:ctl9919 localExecutors=3 maxConnections=128 maxMemSize=4 webWorkerNum=4 workerNum=4 dfsReplicationFactor=1 dfsReplicaReliabilityLevel=0 enableDFS=1 enableHTTPS=0
cluster.nodes
localSite,mode localhost:9910:agent,agent localhost:9921:DFS_NODE1,datanode localhost:9922:DFS_NODE2,datanode localhost:9923:DFS_NODE3,datanode localhost:9924:DFS_NODE4,datanode
cluster.cfg
maxConnection=128 workerNum=8 localExecutors=7 webWorkerNum=2 maxMemSize=4
agent.cfg
workerNum=3 localExecutors=2 maxMemSize=4 localSite=localhost:9910:agent controllerSite=localhost:9919:ctl9919
(2) Druid配置
_common
# Zookeeper druid.zk.service.host=zk.host.ip druid.zk.paths.base=/druid # Metadata storage druid.metadata.storage.type=mysql druid.metadata.storage.connector.connectURI=jdbc:mysql://db.example.com:3306/druid # Deep storage druid.storage.type=local druid.storage.storageDirectory=var/druid/segments # Indexing service logs druid.indexer.logs.type=file druid.indexer.logs.directory=var/druid/indexing-logs
broker:
Xms24g Xmx24g XX:MaxDirectMemorySize=4096m # HTTP server threads druid.broker.http.numConnections=5 druid.server.http.numThreads=25 # Processing threads and buffers druid.processing.buffer.sizeBytes=2147483648 druid.processing.numThreads=7 # Query cache druid.broker.cache.useCache=false druid.broker.cache.populateCache=false coordinator: Xms3g Xmx3g historical: Xms8g Xmx8g # HTTP server threads druid.server.http.numThreads=25 # Processing threads and buffers druid.processing.buffer.sizeBytes=2147483648 druid.processing.numThreads=7 # Segment storage druid.segmentCache.locations=[{"path":"var/druid/segment-cache","maxSize":0}] druid.server.maxSize=130000000000 druid.historical.cache.useCache=false druid.historical.cache.populateCache=false middleManager: Xms64m Xmx64m # Number of tasks per middleManager druid.worker.capacity=3 # HTTP server threads druid.server.http.numThreads=25 # Processing threads and buffers on Peons druid.indexer.fork.property.druid.processing.buffer.sizeBytes=4147483648 druid.indexer.fork.property.druid.processing.numThreads=2
overload:
Xms3g Xmx3g
附錄2. 數據導入腳本
DolphinDB database 腳本:
if (existsDatabase("dfs://TAQ")) dropDatabase("dfs://TAQ") db = database("/Druid/table", SEQ, 4) t=loadTextEx(db, 'table', ,"/data/data/TAQ/TAQ20070801.csv") t=select count(*) as ct from t group by symbol buckets = cutPoints(exec symbol from t, 128) buckets[size(buckets)-1]=`ZZZZZ t1=table(buckets as bucket) t1.saveText("/data/data/TAQ/buckets.txt") db1 = database("", VALUE, 2007.08.01..2007.09.01) partition = loadText("/data/data/buckets.txt") partitions = exec * from partition db2 = database("", RANGE, partitions) db = database("dfs://TAQ", HIER, [db1, db2]) db.createPartitionedTable(table(100:0, `symbol`date`time`bid`ofr`bidsiz`ofrsiz`mode`ex`mmid, [SYMBOL, DATE, SECOND, DOUBLE, DOUBLE, INT, INT, INT, CHAR, SYMBOL]), `quotes, `date`symbol) def loadJob() { filenames = exec filename from files('/data/data/TAQ') db = database("dfs://TAQ") filedir = '/data/data/TAQ' for(fname in filenames){ jobId = fname.strReplace(".csv", "") jobName = jobId submitJob(jobId,jobName, loadTextEx{db, "quotes", `date`symbol,filedir+'/'+fname}) } } loadJob() select * from getRecentJobs() TAQ = loadTable("dfs://TAQ","quotes");
Druid腳本:
{ "type" : "index", "spec" : { "dataSchema" : { "dataSource" : "TAQ", "parser" : { "type" : "string", "parseSpec" : { "format" : "csv", "dimensionsSpec" : { "dimensions" : [ "TIME", "SYMBOL", {"name":"BID", "type" : "double"}, {"name":"OFR", "type" : "double"}, {"name":"BIDSIZ", "type" : "int"}, {"name":"OFRSIZ", "type" : "int"}, "MODE", "EX", "MMID" ] }, "timestampSpec": { "column": "DATE", "format": "yyyyMMdd" }, "columns" : ["SYMBOL", "DATE", "TIME", "BID", "OFR", "BIDSIZ", "OFRSIZ", "MODE", "EX", "MMID"] } }, "metricsSpec" : [], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "day", "queryGranularity" : "none", "intervals" : ["2007-08-01/2007-09-01"], "rollup" : false } }, "ioConfig" : { "type" : "index", "firehose" : { "type" : "local", "baseDir" : "/data/data/", "filter" : "TAQ.csv" }, "appendToExisting" : false }, "tuningConfig" : { "type" : "index", "targetPartitionSize" : 5000000, "maxRowsInMemory" : 25000, "forceExtendableShardSpecs" : true } } }