Druid:一個用於大數據實時處理的開源分佈式系統

Druid是一個用於大數據實時查詢和分析的高容錯、高性能開源分佈式系統,旨在快速處理大規模的數據,並可以實現快速查詢和分析。尤爲是當發生代碼部署、機器故障以及其餘產品系統遇到宕機等狀況時,Druid仍可以保持100%正常運行。建立Druid的最初意圖主要是爲了解決查詢延遲問題,當時試圖使用Hadoop來實現交互式查詢分析,可是很難知足實時分析的須要。而Druid提供了以交互方式訪問數據的能力,並權衡了查詢的靈活性和性能而採起了特殊的存儲格式。html

Druid功能介於PowerDrillDremel之間,它幾乎實現了Dremel的全部功能,而且從PowerDrill吸取一些有趣的數據格式。Druid容許以相似Dremel和PowerDrill的方式進行單表查詢,同時還增長了一些新特性,如爲局部嵌套數據結構提供列式存儲格式、爲快速過濾作索引、實時攝取和查詢、高容錯的分佈式體系架構等。從官方得知,Druid的具備如下主要特徵:java

  • 爲分析而設計——Druid是爲OLAP工做流的探索性分析而構建,它支持各類過濾、聚合和查詢等類;
  • 快速的交互式查詢——Druid的低延遲數據攝取架構容許事件在它們建立後毫秒內可被查詢到;
  • 高可用性——Druid的數據在系統更新時依然可用,規模的擴大和縮小都不會形成數據丟失;
  • 可擴展——Druid已實現天天可以處理數十億事件和TB級數據。

Druid應用最多的是相似於廣告分析創業公司Metamarkets中的應用場景,如廣告分析、互聯網廣告系統監控以及網絡監控等。當業務中出現如下狀況時,Druid是一個很好的技術方案選擇:node

  • 須要交互式聚合和快速探究大量數據時;
  • 須要實時查詢分析時;
  • 具備大量數據時,如天天數億事件的新增、天天數10T數據的增長;
  • 對數據尤爲是大數據進行實時分析時;
  • 須要一個高可用、高容錯、高性能數據庫時。

 

Historical節點  :對非實時數據進行處理存儲和查詢mysql

Realtime節:實時攝取數據、監聽輸入數據流sql

Coordinator節點:監控Historical節點shell

Broker節點:接收來自外部客戶端的查詢和將查詢轉發到Realtime和Historical節點數據庫

Indexer節點:負責索引服務apache

一個Druid集羣有各類類型的節點(Node)組成,每一個節點均可以很好的處理一些的事情,這些節點包括對非實時數據進行處理存儲和查詢的Historical節點、實時攝取數據、監聽輸入數據流的Realtime節、監控Historical節點的Coordinator節點、接收來自外部客戶端的查詢和將查詢轉發到Realtime和Historical節點的Broker節點、負責索引服務的Indexer節點json

 

 

查詢操做中數據流和各個節點的關係以下圖所示:數組

以下圖是Druid集羣的管理層架構,該圖展現了相關節點和集羣管理所依賴的其餘組件(如負責服務發現的ZooKeeper集羣)的關係:

 

 

 

 

1、Druid簡介
2、Druid架構組成及相關依賴
3、Druid集羣配置
4、Druid集羣啓動
5、Druid查詢
6、後記

1、Druid簡介

Druid是一個爲大型冷數據集上實時探索查詢而設計的開源數據分析和存儲系統,提供極具成本效益而且永遠在線的實時數據攝取和任意數據處理。

主要特性:

  • 爲分析而設計——Druid是爲OLAP工做流的探索性分析而構建。它支持各類filter、aggregator和查詢類型,併爲添加新功能提供了一個框架。用戶已經利用Druid的基礎設施開發了高級K查詢和直方圖功能。
  • 交互式查詢——Druid的低延遲數據攝取架構容許事件在它們建立後毫秒內查詢,由於Druid的查詢延時經過只讀取和掃描有必要的元素被優化。Aggregate和 filter沒有坐等結果。
  • 高可用性——Druid是用來支持須要一直在線的SaaS的實現。你的數據在系統更新時依然可用、可查詢。規模的擴大和縮小不會形成數據丟失。
  • 可伸縮——現有的Druid部署天天處理數十億事件和TB級數據。Druid被設計成PB級別。

就係統而言,Druid功能位於PowerDrill和Dremel之間。它實現幾乎全部Dremel提供的工具(Dremel處理任意嵌套數據結構,而Druid只容許一個基於數組的嵌套級別)而且從PowerDrill吸取一些有趣的數據格式和壓縮方法。

Druid對於須要實時單1、海量數據流攝取產品很是適合。特別是若是你面向無停機操做時,若是你對查詢查詢的靈活性和原始數據訪問要求,高於對速度和無停機操做,Druid可能不是正確的解決方案。在談到查詢速度時候,頗有必要澄清「快速」的意思是:Druid是徹底有可能在6TB的數據集上實現秒級查詢。

2、Druid架構組成及其餘依賴

2.1 Overlord Node (Indexing Service)

Overlord會造成一個加載批處理和實時數據到系統中的集羣,同時會對存儲在系統中的數據變動(也稱爲索引服務)作出響應。另外,還包含了Middle Manager和Peons,一個Peon負責執行單個task,而Middle Manager負責管理這些Peons。

2.2 Coordinator Node

監控Historical節點組,以確保數據可用、可複製,而且在通常的「最佳」配置。它們經過從MySQL讀取數據段的元數據信息,來決定哪些數據段應該在集羣中被加載,使用Zookeeper來肯定哪一個Historical節點存在,而且建立Zookeeper條目告訴Historical節點加載和刪除新數據段。

2.3 Historical Node

是對「historical」數據(非實時)進行處理存儲和查詢的地方。Historical節點響應從Broker節點發來的查詢,並將結果返回給broker節點。它們在Zookeeper的管理下提供服務,並使用Zookeeper監視信號加載或刪除新數據段。

2.4 Broker Node

接收來自外部客戶端的查詢,並將這些查詢轉發到Realtime和Historical節點。當Broker節點收到結果,它們將合併這些結果並將它們返回給調用者。因爲了解拓撲,Broker節點使用Zookeeper來肯定哪些Realtime和Historical節點的存在。

2.5 Real-time Node

實時攝取數據,它們負責監聽輸入數據流並讓其在內部的Druid系統當即獲取,Realtime節點一樣只響應broker節點的查詢請求,返回查詢結果到broker節點。舊數據會被從Realtime節點轉存至Historical節點。

2.6 ZooKeeper

爲集羣服務發現和維持當前的數據拓撲而服務;

2.7 MySQL

用來維持系統服務所需的數據段的元數據;

2.8 Deep Storage

保存「冷數據」,可使用HDFS。

 

3、Druid集羣配置

3.1 環境信息

我這裏有兩臺機器,node1有32G內存,上面部署了Histotical Node和Coordinator Node;node2有72G內存,上面部署了其餘四個服務。

3.2 通用配置(Common Configuration)

##建立MySQL數據庫

CREATE DATABASE `druid` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
grant all on druid.* to druid@’%’ identified by ‘druid1234′ WITH GRANT OPTION;
flush privileges;

##配置文件

cd $DRUID_HOME/config/_common
vi common.runtime.properties(全部節點)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
##使用Mysql存儲元數據
druid.extensions.coordinates=[ "io.druid.extensions:druid-examples" , "io.druid.extensions:druid-kafka-eight" , "io.druid.extensions:mysql-metadata-storage" ]
 
##zookeeper
druid.zk.service.host=zkNode1:2181,zkNode2:2181,zkNode3:2181
 
##Mysql配置
druid.metadata.storage. type =mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql: //node1 :3306 /druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=diurd1234
 
##配置deep storage到HDFS
druid.storage. type =hdfs
druid.storage.storageDirectory=hdfs: //cdh5/tmp/druid/storage
 
##配置查詢緩存,暫用本地,可配置memcached
druid.cache. type = local
druid.cache.sizeInBytes=10737418240
 
##配置監控
druid.monitoring.monitors=[ "com.metamx.metrics.JvmMonitor" ]
 
##配置Indexing service的名字
druid.selectors.indexing.serviceName=druid /overlord
 
##
druid.emitter=logging

3.3 Overlord Node(Indexing Service)

在運行Overlord Node節點上:

cd $DRUID_HOME/config/overlord
vi runtime.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
druid.host=node2
druid.port=8090
druid.service=druid /overlord
 
# Only required if you are autoscaling middle managers
druid.indexer.autoscale.doAutoscale= true
druid.indexer.autoscale.strategy=ec2
druid.indexer.autoscale.workerIdleTimeout=PT90m
druid.indexer.autoscale.terminatePeriod=PT5M
druid.indexer.autoscale.workerVersion=0
 
# Upload all task logs to deep storage
druid.indexer.logs. type =hdfs
druid.indexer.logs.directory=hdfs: //cdh5/tmp/druid/indexlog
 
# Run in remote mode
druid.indexer.runner. type =remote
druid.indexer.runner.minWorkerVersion=0
 
# Store all task state in the metadata storage
druid.indexer.storage. type =metadata

3.4 MiddleManager Node

在運行MiddleManager Node節點上:
cd $DRUID_HOME/config/middleManager
vi runtime.properties

1
2
3
4
5
6
7
8
9
10
druid.host=node2
druid.port=8091
druid.service=druid /middlemanager
 
druid.indexer.logs. type =hdfs
druid.indexer.logs.directory=hdfs: //cdh5/tmp/druid/indexlog
 
# Resources for peons
druid.indexer.runner.javaOpts=-server -Xmx2g -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
druid.indexer.task.baseTaskDir= /tmp/persistent/task/

3.5 Coordinator Node

在運行Coordinator Node節點上:
cd $DRUID_HOME/config/coordinator
vi runtime.properties

1
2
3
4
5
druid.host=node1
druid.port=8081
druid.service=coordinator
 
druid.coordinator.startDelay=PT5M

3.6 Historical Node

在運行Historical Node節點上:
cd $DRUID_HOME/config/historical
vi runtime.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
druid.host=node1
druid.port=8082
druid.service=druid /historical
 
druid.historical.cache.useCache= true
druid.historical.cache.populateCache= true
 
druid.processing.buffer.sizeBytes=1073741824
druid.processing.numThreads=9
 
druid.server.http.numThreads=9
druid.server.maxSize=300000000000
 
druid.segmentCache.locations=[{ "path" : " /tmp/druid/indexCache" , "maxSize" : 300000000000}]
 
druid.monitoring.monitors=[ "io.druid.server.metrics.HistoricalMetricsMonitor" , "com.metamx.metrics.JvmMonitor" ]

3.7 Broker Node

在運行Broker Node節點上:
cd $DRUID_HOME/config/broker
vi runtime.properties

1
2
3
4
5
6
7
8
9
10
11
druid.host=node2
druid.port=8092
druid.service=druid /broker
 
druid.broker.http.numConnections=20
druid.broker.http.readTimeout=PT5M
 
druid.processing.buffer.sizeBytes=2147483647
druid.processing.numThreads=11
 
druid.server.http.numThreads=20

3.8 Real-time Node

在運行Real-time Node節點上:
cd $DRUID_HOME/config/realtime
vi runtime.properties

1
2
3
4
5
6
7
8
9
10
11
druid.host=node2
druid.port=8093
druid.service=druid /realtime
 
druid.processing.buffer.sizeBytes=1073741824
druid.processing.numThreads=5
 
# Override emitter to print logs about events ingested, rejected, etc
druid.emitter=logging
 
druid.monitoring.monitors=[ "io.druid.segment.realtime.RealtimeMetricsMonitor" , "com.metamx.metrics.JvmMonitor" ]

4、Druid集羣啓動

首次啓動時候,能夠遵循下面的啓動順序。

4.1 Broker Node

cd $DRUID_HOME/
cp run_druid_server.sh run_broker.sh
vi run_broker.sh

替換如下內容:

1
2
3
4
5
6
7
SERVER_TYPE=broker
 
# start process
JAVA_ARGS= "${JAVA_ARGS} -Xmx10g -Xms5g -XX:NewSize=2g -XX:MaxNewSize=2g -XX:MaxDirectMemorySize=24g -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
JAVA_ARGS= "${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8"
JAVA_ARGS= "${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid"
JAVA_ARGS= "${JAVA_ARGS} -Dcom.sun.management.jmxremote.port=17071 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Ddruid.extensions.localRepository=${MAVEN_DIR}"

執行./run_broker.sh啓動Broker Node:

4.2 Historical Node

cd $DRUID_HOME/
cp run_druid_server.sh run_historical.sh

vi run_historical.sh

替換如下內容:

1
2
3
4
5
6
7
SERVER_TYPE=historical
 
# start process
JAVA_ARGS= "${JAVA_ARGS} -Xmx10g -Xms10g -XX:NewSize=2g -XX:MaxNewSize=2g -XX:MaxDirectMemorySize=16g -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
JAVA_ARGS= "${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8"
JAVA_ARGS= "${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid"
JAVA_ARGS= "${JAVA_ARGS} -Ddruid.extensions.localRepository=${MAVEN_DIR}"

執行命令./run_historical.sh啓動Historical Node:

4.3 Coordinator Node

cd $DRUID_HOME/
cp run_druid_server.sh run_coordinator.sh
vi run_coordinator.sh

替換如下內容:

1
2
3
4
5
6
7
SERVER_TYPE=coordinator
 
# start process
JAVA_ARGS= "${JAVA_ARGS} -Xmx10g -Xms10g -XX:NewSize=512m -XX:MaxNewSize=512m -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
JAVA_ARGS= "${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8"
JAVA_ARGS= "${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid"
JAVA_ARGS= "${JAVA_ARGS} -Ddruid.extensions.localRepository=${MAVEN_DIR}"

執行命令./run_coordinator.sh啓動Coordinator Node.

4.4 Middle Manager

cd $DRUID_HOME/
cp run_druid_server.sh run_middleManager.sh
vi run_middleManager.sh

替換如下內容:

1
2
3
4
5
SERVER_TYPE=middleManager
# start process
JAVA_ARGS= "${JAVA_ARGS} -Xmx64m -Xms64m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8"
JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir= /tmp/druid  -Ddruid.extensions.localR
epository=${MAVEN_DIR}"

執行命令./run_middleManager.sh啓動MiddleManager Node。

4.5 Overlord Node

cd $DRUID_HOME/
cp run_druid_server.sh run_overlord.sh
vi run_overlord.sh

替換如下內容:

1
2
3
4
5
6
SERVER_TYPE=overlord
# start process
JAVA_ARGS= "${JAVA_ARGS} -Xmx4g -Xms4g -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
JAVA_ARGS= "${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8"
JAVA_ARGS= "${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid"
JAVA_ARGS= "${JAVA_ARGS} -Ddruid.extensions.localRepository=${MAVEN_DIR}"

執行命令./run_overlord.sh啓動Overlord Node:

4.6 Real-time Node

cd $DRUID_HOME/
cp run_druid_server.sh run_realtime.sh
vi run_realtime.sh
替換如下內容:

1
2
3
4
5
6
7
8
9
10
11
SERVER_TYPE=realtime
 
# start process
JAVA_ARGS="${JAVA_ARGS} -Xmx13g -Xms13g -XX:NewSize=2g -XX:MaxNewSize=2g -XX:MaxDirectMemorySize=9g -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -
XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError"
JAVA_ARGS= "${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8"
JAVA_ARGS= "${JAVA_ARGS} -Ddruid.realtime.specFile=/home/liuxiaowen/druid-0.8.1/examples/wikipedia/wikipedia_realtime.spec"
JAVA_ARGS= "${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid"
JAVA_ARGS="${JAVA_ARGS} -Dcom.sun.management.jmxremote.port=17072 -Dcom.sun.management.jmxremote.authenticate= false  -Dcom.sun.management.jmxremot
e.ssl= false "
JAVA_ARGS= "${JAVA_ARGS} -Ddruid.extensions.localRepository=${MAVEN_DIR}"

##特別須要注意參數:

-Ddruid.realtime.specFile=/home/liuxiaowen/druid-0.8.1/examples/wikipedia/wikipedia_realtime.spec

啓動RealTime Node須要指定一個realtime數據源的配置文件,本文中使用example提供的wikipedia_realtime.spec,啓動後,該數據源從irc.wikimedia.org獲取實時數據。

關於RealTime Node的配置,後續文章將會詳細介紹。

執行命令./run_realtime.sh啓動RealTime Node。

5、Druid查詢

第四部分中啓動RealTime Node時候使用了例子中自帶的配置文件wikipedia_realtime.spec,啓動後,該RealTime Node會從irc.wikimedia.org獲取實時數據,本章將以該數據源爲例,學習幾種最多見的查詢。

5.1 select查詢

首先編輯查詢配置文件select_query.json

1
2
3
4
5
6
7
8
9
10
11
{
    "queryType" : "select" ,
    "dataSource" : "wikipedia" ,
    "dimensions" :[],
    "metrics" :[],
    "granularity" : "all" ,
    "intervals" : [
      "2015-11-01/2015-11-20"
    ],
    "pagingSpec" :{ "pagingIdentifiers" : {}, "threshold" :10}
  }

該配置文件的含義是從數據源」wikipedia」進行select查詢全部列,時間區間爲2015-11-01/2015-11-20,每10條記錄一個分頁。

執行命令查詢:

curl -X POST ‘http://node2:8093/druid/v2/?pretty’ -H ‘content-type: application/json’ -d @select_query.json

瞬間返回結果:

5.2 基於時間序列的查詢Timeseries query

編輯查詢配置文件timeseries.json

1
2
3
4
5
6
7
8
9
10
{
     "queryType" : "timeseries" ,
     "dataSource" : "wikipedia" ,
     "intervals" : [ "2010-01-01/2020-01-01"  ],
     "granularity" : "minute" ,
     "aggregations" : [
         { "type" : "longSum" , "fieldName" : "count" , "name" : "edit_count" },
         { "type" : "doubleSum" , "fieldName" : "added" , "name" : "chars_added" }
     ]
}

該配置文件的含義是:從數據源」 wikipedia」中進行時間序列查詢,區間爲2010-01-01/2020-01-01,按分鐘彙總結果,彙總字段爲count和added;

執行查詢命令:

curl -X POST ‘http://node2:8093/druid/v2/?pretty’ -H ‘content-type: application/json’ -d @timeseries.json

一樣瞬間返回結果:

5.3 TopN查詢

編輯查詢文件topn.json

1
2
3
4
5
6
7
8
9
10
11
12
13
{
   "queryType" : "topN" ,
   "dataSource" : "wikipedia" ,
   "granularity" : "all" ,
   "dimension" : "page" ,
   "metric" : "edit_count" ,
   "threshold"  : 10,
   "aggregations" : [
     { "type" : "longSum" , "fieldName" : "count" , "name" : "edit_count" }
   ],
   "filter" : { "type" : "selector" , "dimension" : "country" , "value" : "United States"  },
   "intervals" : [ "2012-10-01T00:00/2020-01-01T00" ]
}

該文件含義是:從數據源」 wikipedia」進行TopN查詢,其中N=10,維度爲page,指標爲edit_count,也就是,在page維度上將edit_count彙總後取Top 10.

執行查詢命令:

curl -X POST ‘http://node2:8093/druid/v2/?pretty’ -H ‘content-type: application/json’ -d @topn.json

結果爲:

6、後記

Druid目前已經有不少公司用於實時計算和實時OLAP,並且效果很好。雖然它的配置和查詢都比較複雜和繁瑣,但若是是真正基於海量數據的實時OLAP,它的威力仍是很強大的。我將持續學習和分享Druid的相關技術,驗證它在海量數據實時OLAP上的效果,敬請關注個人博客

參考文章:

http://druid.io

http://www.csdn.net/article/2014-10-30/2822381/2

 

 

 

 

 

參考 http://www.infoq.com/cn/news/2015/04/druid-data/

http://druid.io/

http://www.open-open.com/lib/view/open1447852962978.html

相關文章
相關標籤/搜索