Druid若是須要經過Kafka加載實時數據並進行OLAP,須要下載Druid的另外一個組建,也就是tranquility-distribution,下載地址:http://static.druid.io/tranquility/releases/tranquility-distribution-0.8.2.tgz數據庫
tranquility-distribution提供了Server和Kafka兩種經過流來加載數據方式,因爲業務環境的因素,對於Server的方式沒有作什麼研究,感興趣的夥伴能夠前往Druid的官網進行圍觀。接下來,詳細說一下我研究的Kafka加載數據方式。json
1. Druid的數據組成bash
不管使用何種方式加載本身的數據,都須要明白Druid的數據組成,由於須要經過配置文件描述加載的數據如進行存儲,存儲的格式是什麼樣子的。工具
在Druid中數據的存儲爲三個部分,分別爲:timestampSpec、dimensionsSpec和metricsSpec,其中timestampSpec是時間戳字段,這個字段是進行Query時的主要字段,這也是Druid能夠做爲時序數據庫使用的主要因素。dimensionsSpec是維度字段,能夠理解爲分組條件或MapReduce中的Key。metricsSpec爲根據維度對數據進行加工進而產生的數據,好比:count、sum、max、min、first、last等等。oop
舉個栗子性能
原始數據是這個樣子的ui
{"unit": "milliseconds", "http_method": "GET", "value": 70, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/list", "metricType": "request/latency", "server": "www2.example.com"} {"unit": "milliseconds", "http_method": "GET", "value": 116, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/list", "metricType": "request/latency", "server": "www4.example.com"} {"unit": "milliseconds", "http_method": "GET", "value": 35, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/get/80", "metricType": "request/latency", "server": "www1.example.com"} {"unit": "milliseconds", "http_method": "GET", "value": 44, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/", "metricType": "request/latency", "server": "www5.example.com"} {"unit": "milliseconds", "http_method": "GET", "value": 40, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/list", "metricType": "request/latency", "server": "www1.example.com"} {"unit": "milliseconds", "http_method": "GET", "value": 84, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/get/33", "metricType": "request/latency", "server": "www4.example.com"} {"unit": "milliseconds", "http_method": "GET", "value": 42, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/get/54", "metricType": "request/latency", "server": "www5.example.com"} {"unit": "milliseconds", "http_method": "GET", "value": 115, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/", "metricType": "request/latency", "server": "www5.example.com"} {"unit": "milliseconds", "http_method": "GET", "value": 32, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/", "metricType": "request/latency", "server": "www5.example.com"} {"unit": "milliseconds", "http_method": "GET", "value": 82, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/get/15", "metricType": "request/latency", "server": "www4.example.com"} {"unit": "milliseconds", "http_method": "GET", "value": 123, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/get/22", "metricType": "request/latency", "server": "www3.example.com"} {"unit": "milliseconds", "http_method": "GET", "value": 57, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/", "metricType": "request/latency", "server": "www3.example.com"} {"unit": "milliseconds", "http_method": "GET", "value": 26, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/list", "metricType": "request/latency", "server": "www1.example.com"} {"unit": "milliseconds", "http_method": "GET", "value": 44, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/get/45", "metricType": "request/latency", "server": "www3.example.com"} {"unit": "milliseconds", "http_method": "GET", "value": 132, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/get/42", "metricType": "request/latency", "server": "www5.example.com"} {"unit": "milliseconds", "http_method": "GET", "value": 113, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/get/70", "metricType": "request/latency", "server": "www4.example.com"} {"unit": "milliseconds", "http_method": "GET", "value": 144, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/list", "metricType": "request/latency", "server": "www5.example.com"} {"unit": "milliseconds", "http_method": "GET", "value": 41, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/", "metricType": "request/latency", "server": "www1.example.com"} {"unit": "milliseconds", "http_method": "GET", "value": 90, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/get/94", "metricType": "request/latency", "server": "www5.example.com"} {"unit": "milliseconds", "http_method": "GET", "value": 89, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/", "metricType": "request/latency", "server": "www4.example.com"} {"unit": "milliseconds", "http_method": "GET", "value": 114, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/", "metricType": "request/latency", "server": "www3.example.com"} {"unit": "milliseconds", "http_method": "GET", "value": 29, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/", "metricType": "request/latency", "server": "www5.example.com"} {"unit": "milliseconds", "http_method": "GET", "value": 25, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/get/41", "metricType": "request/latency", "server": "www4.example.com"} {"unit": "milliseconds", "http_method": "GET", "value": 88, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/", "metricType": "request/latency", "server": "www2.example.com"} {"unit": "milliseconds", "http_method": "GET", "value": 137, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/list", "metricType": "request/latency", "server": "www4.example.com"}
咱們對這些數據進行加工,其中timestampSpec爲timestamp,dimensionsSpec包括page、server,metricsSpec爲count和value的sum,加工以後Druid中存儲的數據格式將變成下面的樣子code
{"timestamp":"2017-11-28T03:16:20Z", "page":"/list", "server":"www2.example.com", "count":1, "valueSum":70} {"timestamp":"2017-11-28T03:16:20Z", "page":"/list", "server":"www4.example.com", "count":2, "valueSum":253} {"timestamp":"2017-11-28T03:16:20Z", "page":"/get/80", "server":"www1.example.com", "count":1, "valueSum":35} {"timestamp":"2017-11-28T03:16:20Z", "page":"/", "server":"www5.example.com", "count":4, "valueSum":320} {"timestamp":"2017-11-28T03:16:20Z", "page":"/list", "server":"www1.example.com", "count":2, "valueSum":66} {"timestamp":"2017-11-28T03:16:20Z", "page":"/get/33", "server":"www4.example.com", "count":1, "valueSum":84} {"timestamp":"2017-11-28T03:16:20Z", "page":"/get/54", "server":"www5.example.com", "count":1, "valueSum":42} {"timestamp":"2017-11-28T03:16:20Z", "page":"/get/15", "server":"www4.example.com", "count":1, "valueSum":82} {"timestamp":"2017-11-28T03:16:20Z", "page":"/get/22", "server":"www3.example.com", "count":1, "valueSum":123} {"timestamp":"2017-11-28T03:16:20Z", "page":"/", "server":"www3.example.com", "count":2, "valueSum":171} {"timestamp":"2017-11-28T03:16:20Z", "page":"/get/45", "server":"www3.example.com", "count":1, "valueSum":44} {"timestamp":"2017-11-28T03:16:20Z", "page":"/get/42", "server":"www5.example.com", "count":1, "valueSum":132} {"timestamp":"2017-11-28T03:16:20Z", "page":"/get/70", "server":"www4.example.com", "count":1, "valueSum":113} {"timestamp":"2017-11-28T03:16:20Z", "page":"/list", "server":"www5.example.com", "count":1, "valueSum":144} {"timestamp":"2017-11-28T03:16:20Z", "page":"/", "server":"www1.example.com", "count":1, "valueSum":41} {"timestamp":"2017-11-28T03:16:20Z", "page":"/get/94", "server":"www5.example.com", "count":1, "valueSum":90} {"timestamp":"2017-11-28T03:16:20Z", "page":"/", "server":"www4.example.com", "count":1, "valueSum":89} {"timestamp":"2017-11-28T03:16:20Z", "page":"/get/41", "server":"www4.example.com", "count":1, "valueSum":25} {"timestamp":"2017-11-28T03:16:20Z", "page":"/", "server":"www2.example.com", "count":1, "valueSum":88}
2.經過Kafka加載數據,並完成上面例子的加工操做orm
在導入數據的過程當中,咱們須要對獲取的數據如何加工進行配置,具體kafka.json配置以下:server
{ "dataSources" : { "test1" : { --數據源名稱,與下方的dataSource的值要徹底相同 "spec" : { --開始描述 "dataSchema" : { --開始數據表描述 "dataSource" : "test1", --數據源名稱,注意上下對應 "parser" : { "type" : "string", "parseSpec" : { "timestampSpec" : { "column" : "timestamp", "format" : "auto" }, "dimensionsSpec" : { --維度字段都有那些 "dimensions" : ["page", "server"] }, "format" : "json" --格式化工具類型,對應從kafka中加載的數據的格式 } }, "granularitySpec" : { --合併、分卷設置 "type" : "uniform", "segmentGranularity" : "hour", --按照小時進行分卷設置 "queryGranularity" : "none" --默認合併模式,採用按秒進行合併的方式進行 }, "metricsSpec" : [ --合併計算列 { "type" : "count", --count操做 "name" : "count" --列名 }, { "type" : "longSum", --對long型數據的sum操做,固然還有doubleSum "name" : "value_sum", --列名 "fieldName" : "value" --須要進行Sum的屬性名 } ] }, "ioConfig" : { "type" : "realtime" --實時加載 }, "tuningConfig" : { "type" : "realtime", "maxRowsInMemory" : "100000", --內存中最大保存行號 "intermediatePersistPeriod" : "PT10M", --內存提交到磁盤的時間間隔 "windowPeriod" : "PT10M" --時間窗口緩衝時間 } }, "properties" : { "task.partitions" : "1", "task.replicants" : "1", "topicPattern" : "sunz" } } }, "properties" : { "zookeeper.connect" : "localhost", --Druid使用的Zookeeper集羣所在主機 "druid.discovery.curator.path" : "/druid/discovery", "druid.selectors.indexing.serviceName" : "druid/overlord", "commit.periodMillis" : "15000", "consumer.numThreads" : "2", "kafka.zookeeper.connect" : "devhadoop241", --kafka所使用的Zookeeper所在主機 "kafka.group.id" : "tranquility-kafka" --Topic名稱 } }
啓動方式比較簡單,只須要執行
bin/tranquility kafka -configFile conf/kafka.json
啓動tranquility以後,就能夠經過kafka加載數據了,如何創建Kafka生產者這個就再也不贅述。
3.關於分段、聚合、內存提交及時間窗口四個時間相關設置的詳解
在上面的配置中,有四個屬性的配置是相當重要的,分別爲segmentGranularity--數據分段/分卷時間,queryGranularity數據碰撞時間、intermediatePersistPeriod內存持續時間/提交硬盤持久化時間間隔,windowPeriod窗口期/過時數據延遲加載週期。
segmentGranularity
Druid的數據是分卷儲存的,經過這個屬性,咱們能夠設置具體的分卷週期,好比:按分鐘、小時、天、周、月、年進行分卷,具體分卷方式能夠根據具體業務狀況和數據規模進行設置。
queryGranularity
數據碰撞週期,這個週期所設置的是在何種時間顆粒度上對數據進行聚合操做,默認設置爲none,若是設置爲none則根據數據源中所設置時間戳字段進行聚合,若是時間戳同樣則自動聚合,固然也能夠設置爲分鐘和小時等,固然這個時間的設置必定要小於數據分卷週期
intermediatePersistPeriod
加載的數據在內存中駐守時間長度,加載的數據因爲聚合須要,會先放置在內存中,一段時間以後統一提交到磁盤進行持久化操做,這個屬性是影響性能的重要屬性,設置的週期過大,內存消耗嚴重,致使假死,設置的太小,致使每次聚合都將操做磁盤,效率低下
windowPeriod
時間窗口期的做用是對數據延遲加載的設置,好比咱們的數據分卷方式是按照小時進行分卷的,那麼在只能realtime模式下,只能處理當前時段的數據,也就是意味着好比如今是12:04,那麼只能處理12:00--13:00之間產生的數據,而因爲11:00--12:00訪問量過大,致使上一時段的數據尚未處理完,這部分數據將被拋棄,爲了防止此類的數據丟失,咱們設置時間窗口,對處理數據的時段設置進行緩衝,好比上一配置中,設置的是10分鐘的時間窗口,也就意味着,在12:10以前,依然能夠處理上一時段的數據。
四屬性的設置策略
從上面的配置的做用能夠看出,以上四屬性的配置,要遵循 segmentGranularity > queryGranularity >= intermediatePersistPeriod >= windowPeriod的方式來進行配置才能保證處理性能。固然也須要衡量查詢需求、數量吞吐量和硬件資源。