Druid經過Kafka加載數據

    Druid若是須要經過Kafka加載實時數據並進行OLAP,須要下載Druid的另外一個組建,也就是tranquility-distribution,下載地址:http://static.druid.io/tranquility/releases/tranquility-distribution-0.8.2.tgz數據庫

    tranquility-distribution提供了Server和Kafka兩種經過流來加載數據方式,因爲業務環境的因素,對於Server的方式沒有作什麼研究,感興趣的夥伴能夠前往Druid的官網進行圍觀。接下來,詳細說一下我研究的Kafka加載數據方式。json

1.  Druid的數據組成bash

    不管使用何種方式加載本身的數據,都須要明白Druid的數據組成,由於須要經過配置文件描述加載的數據如進行存儲,存儲的格式是什麼樣子的。工具

    在Druid中數據的存儲爲三個部分,分別爲:timestampSpec、dimensionsSpec和metricsSpec,其中timestampSpec是時間戳字段,這個字段是進行Query時的主要字段,這也是Druid能夠做爲時序數據庫使用的主要因素。dimensionsSpec是維度字段,能夠理解爲分組條件或MapReduce中的Key。metricsSpec爲根據維度對數據進行加工進而產生的數據,好比:count、sum、max、min、first、last等等。oop

    舉個栗子性能

    原始數據是這個樣子的ui

{"unit": "milliseconds", "http_method": "GET", "value": 70, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/list", "metricType": "request/latency", "server": "www2.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 116, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/list", "metricType": "request/latency", "server": "www4.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 35, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/get/80", "metricType": "request/latency", "server": "www1.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 44, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/", "metricType": "request/latency", "server": "www5.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 40, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/list", "metricType": "request/latency", "server": "www1.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 84, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/get/33", "metricType": "request/latency", "server": "www4.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 42, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/get/54", "metricType": "request/latency", "server": "www5.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 115, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/", "metricType": "request/latency", "server": "www5.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 32, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/", "metricType": "request/latency", "server": "www5.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 82, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/get/15", "metricType": "request/latency", "server": "www4.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 123, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/get/22", "metricType": "request/latency", "server": "www3.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 57, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/", "metricType": "request/latency", "server": "www3.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 26, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/list", "metricType": "request/latency", "server": "www1.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 44, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/get/45", "metricType": "request/latency", "server": "www3.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 132, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/get/42", "metricType": "request/latency", "server": "www5.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 113, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/get/70", "metricType": "request/latency", "server": "www4.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 144, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/list", "metricType": "request/latency", "server": "www5.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 41, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/", "metricType": "request/latency", "server": "www1.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 90, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/get/94", "metricType": "request/latency", "server": "www5.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 89, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/", "metricType": "request/latency", "server": "www4.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 114, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/", "metricType": "request/latency", "server": "www3.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 29, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/", "metricType": "request/latency", "server": "www5.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 25, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/get/41", "metricType": "request/latency", "server": "www4.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 88, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/", "metricType": "request/latency", "server": "www2.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 137, "timestamp": "2017-11-28T03:16:20Z", "http_code": "200", "page": "/list", "metricType": "request/latency", "server": "www4.example.com"}

咱們對這些數據進行加工,其中timestampSpec爲timestamp,dimensionsSpec包括page、server,metricsSpec爲count和value的sum,加工以後Druid中存儲的數據格式將變成下面的樣子code

{"timestamp":"2017-11-28T03:16:20Z", "page":"/list", "server":"www2.example.com", "count":1, "valueSum":70}
{"timestamp":"2017-11-28T03:16:20Z", "page":"/list", "server":"www4.example.com", "count":2, "valueSum":253}
{"timestamp":"2017-11-28T03:16:20Z", "page":"/get/80", "server":"www1.example.com", "count":1, "valueSum":35}
{"timestamp":"2017-11-28T03:16:20Z", "page":"/", "server":"www5.example.com", "count":4, "valueSum":320}
{"timestamp":"2017-11-28T03:16:20Z", "page":"/list", "server":"www1.example.com", "count":2, "valueSum":66}
{"timestamp":"2017-11-28T03:16:20Z", "page":"/get/33", "server":"www4.example.com", "count":1, "valueSum":84}
{"timestamp":"2017-11-28T03:16:20Z", "page":"/get/54", "server":"www5.example.com", "count":1, "valueSum":42}
{"timestamp":"2017-11-28T03:16:20Z", "page":"/get/15", "server":"www4.example.com", "count":1, "valueSum":82}
{"timestamp":"2017-11-28T03:16:20Z", "page":"/get/22", "server":"www3.example.com", "count":1, "valueSum":123}
{"timestamp":"2017-11-28T03:16:20Z", "page":"/", "server":"www3.example.com", "count":2, "valueSum":171}
{"timestamp":"2017-11-28T03:16:20Z", "page":"/get/45", "server":"www3.example.com", "count":1, "valueSum":44}
{"timestamp":"2017-11-28T03:16:20Z", "page":"/get/42", "server":"www5.example.com", "count":1, "valueSum":132}
{"timestamp":"2017-11-28T03:16:20Z", "page":"/get/70", "server":"www4.example.com", "count":1, "valueSum":113}
{"timestamp":"2017-11-28T03:16:20Z", "page":"/list", "server":"www5.example.com", "count":1, "valueSum":144}
{"timestamp":"2017-11-28T03:16:20Z", "page":"/", "server":"www1.example.com", "count":1, "valueSum":41}
{"timestamp":"2017-11-28T03:16:20Z", "page":"/get/94", "server":"www5.example.com", "count":1, "valueSum":90}
{"timestamp":"2017-11-28T03:16:20Z", "page":"/", "server":"www4.example.com", "count":1, "valueSum":89}
{"timestamp":"2017-11-28T03:16:20Z", "page":"/get/41", "server":"www4.example.com", "count":1, "valueSum":25}
{"timestamp":"2017-11-28T03:16:20Z", "page":"/", "server":"www2.example.com", "count":1, "valueSum":88}

2.經過Kafka加載數據,並完成上面例子的加工操做orm

    在導入數據的過程當中,咱們須要對獲取的數據如何加工進行配置,具體kafka.json配置以下:server

{
  "dataSources" : {
    "test1" : { --數據源名稱,與下方的dataSource的值要徹底相同
      "spec" : { --開始描述
        "dataSchema" : { --開始數據表描述
          "dataSource" : "test1", --數據源名稱,注意上下對應
          "parser" : {
            "type" : "string",
            "parseSpec" : {
              "timestampSpec" : {
                "column" : "timestamp",
                "format" : "auto"
              },
              "dimensionsSpec" : { --維度字段都有那些
                "dimensions" : ["page", "server"]
              },
              "format" : "json" --格式化工具類型,對應從kafka中加載的數據的格式
            }
          },
          "granularitySpec" : { --合併、分卷設置
            "type" : "uniform",
            "segmentGranularity" : "hour", --按照小時進行分卷設置
            "queryGranularity" : "none" --默認合併模式,採用按秒進行合併的方式進行
          },
          "metricsSpec" : [ --合併計算列
            {
              "type" : "count", --count操做
              "name" : "count" --列名
            },
            {
              "type" : "longSum", --對long型數據的sum操做,固然還有doubleSum
              "name" : "value_sum", --列名
              "fieldName" : "value" --須要進行Sum的屬性名
            }
          ]
        },
        "ioConfig" : {
          "type" : "realtime" --實時加載
        },
        "tuningConfig" : {
          "type" : "realtime",
          "maxRowsInMemory" : "100000",  --內存中最大保存行號
          "intermediatePersistPeriod" : "PT10M", --內存提交到磁盤的時間間隔
          "windowPeriod" : "PT10M" --時間窗口緩衝時間
        }
      },
      "properties" : {
        "task.partitions" : "1",
        "task.replicants" : "1",
        "topicPattern" : "sunz"
      }
    }
  },
  "properties" : {
    "zookeeper.connect" : "localhost",  --Druid使用的Zookeeper集羣所在主機
    "druid.discovery.curator.path" : "/druid/discovery",
    "druid.selectors.indexing.serviceName" : "druid/overlord",
    "commit.periodMillis" : "15000",
    "consumer.numThreads" : "2",
    "kafka.zookeeper.connect" : "devhadoop241", --kafka所使用的Zookeeper所在主機
    "kafka.group.id" : "tranquility-kafka" --Topic名稱
  }
}

    啓動方式比較簡單,只須要執行

bin/tranquility kafka -configFile conf/kafka.json

    啓動tranquility以後,就能夠經過kafka加載數據了,如何創建Kafka生產者這個就再也不贅述。

3.關於分段、聚合、內存提交及時間窗口四個時間相關設置的詳解

    在上面的配置中,有四個屬性的配置是相當重要的,分別爲segmentGranularity--數據分段/分卷時間,queryGranularity數據碰撞時間、intermediatePersistPeriod內存持續時間/提交硬盤持久化時間間隔,windowPeriod窗口期/過時數據延遲加載週期。

    segmentGranularity

    Druid的數據是分卷儲存的,經過這個屬性,咱們能夠設置具體的分卷週期,好比:按分鐘、小時、天、周、月、年進行分卷,具體分卷方式能夠根據具體業務狀況和數據規模進行設置。

    queryGranularity

    數據碰撞週期,這個週期所設置的是在何種時間顆粒度上對數據進行聚合操做,默認設置爲none,若是設置爲none則根據數據源中所設置時間戳字段進行聚合,若是時間戳同樣則自動聚合,固然也能夠設置爲分鐘和小時等,固然這個時間的設置必定要小於數據分卷週期

    intermediatePersistPeriod

    加載的數據在內存中駐守時間長度,加載的數據因爲聚合須要,會先放置在內存中,一段時間以後統一提交到磁盤進行持久化操做,這個屬性是影響性能的重要屬性,設置的週期過大,內存消耗嚴重,致使假死,設置的太小,致使每次聚合都將操做磁盤,效率低下

    windowPeriod

    時間窗口期的做用是對數據延遲加載的設置,好比咱們的數據分卷方式是按照小時進行分卷的,那麼在只能realtime模式下,只能處理當前時段的數據,也就是意味着好比如今是12:04,那麼只能處理12:00--13:00之間產生的數據,而因爲11:00--12:00訪問量過大,致使上一時段的數據尚未處理完,這部分數據將被拋棄,爲了防止此類的數據丟失,咱們設置時間窗口,對處理數據的時段設置進行緩衝,好比上一配置中,設置的是10分鐘的時間窗口,也就意味着,在12:10以前,依然能夠處理上一時段的數據。

    四屬性的設置策略

    從上面的配置的做用能夠看出,以上四屬性的配置,要遵循 segmentGranularity > queryGranularity >= intermediatePersistPeriod >= windowPeriod的方式來進行配置才能保證處理性能。固然也須要衡量查詢需求、數量吞吐量和硬件資源。

相關文章
相關標籤/搜索