Druid:經過 Kafka 加載流數據

**## 開始html

本教程演示瞭如何使用 Druid 的 Kafka indexing 服務從 Kafka 流中加載數據至 Druid。shell

在本教程中,咱們假設你已經按照 quickstart 文檔中使用micro-quickstart單機配置所描述的下載了 Druid,並在本機運行了 Druid。你不須要加載任何數據。apache

下載並啓動 Kafka

Apache Kafka是一種高吞吐量消息總線,可與 Druid 很好地配合使用。在本教程中,咱們將使用 Kafka 2.1.0。在終端運行下面命令下載 Kafka:json

curl -O https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz
tar -xzf kafka_2.12-2.1.0.tgz
cd kafka_2.12-2.1.0

在終端運行下面命令啓動 kafka broker:bootstrap

./bin/kafka-server-start.sh config/server.properties

運行下面命令建立名爲wikipedia的 topic,咱們將向其發送數據:服務器

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikipedia

向 Kafka 加載數據

wikipedia topic 啓動一個 kafka producer,併發送數據。併發

在 Druid 目錄下,運行下面命令:app

cd quickstart/tutorial
gunzip -c wikiticker-2015-09-12-sampled.json.gz > wikiticker-2015-09-12-sampled.json

在 Kafka 目錄下運行下面命令,將{PATH_TO_DRUID}替換成你的 Kafka 路徑:curl

export KAFKA_OPTS="-Dfile.encoding=UTF-8"
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < {PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json

上面命令會向 kakfa 的wikiapedia topic 發送 events。以後,咱們將使用 Druid 的 Kafka indexing 服務從 Kafka topic 中提取數據。學習

經過 data loader 加載數據

導航至 localhost:8080 並單擊控制檯頂部的Load data

選擇 Apache Kafka 並單擊 Connect data.

輸入 bootstrap:localhost:9092和 topic:wikipedia

單擊Preview並肯定你看到的數據正確。

找到數據後,能夠單擊"Next: Parse data"進入下一步。

data loader 會嘗試自動選擇正確的數據解析器。在本示例中,將選擇json解析器。你能夠嘗試選擇其餘解析器,看看 Druid 是如何解析數據的。

選擇json解析器,點擊Next: Parse time進入下一步,來肯定 timestamp 列。

Druid 須要一個主 timestamp 列(內部將存儲在__time 列)。若是你的數據中沒有 timestamp 列,選擇Constant value。在咱們的示例中,將選擇time列,由於它是數據之中惟一能夠做爲主時間列的候選者。

單擊Next: ...兩次以跳過TransformFilter步驟。

您無需在這些步驟中輸入任何內容,由於應用提取數據的時間變換和過濾器不在本教程範圍內。

Configure schema步驟中,你能夠配置哪些維度和指標能夠攝入 Druid。這是數據被攝入 Druid 後呈現的樣子。因爲咱們的數據集比較小,點擊Rollup開關關閉 rollup 功能。

對 schema 配置滿意後,單擊Next進入Partition步驟,以調整數據至 segment 的分區。

在這裏,您能夠調整如何在 Druid 中將數據拆分爲多個段。因爲這是一個很小的數據集,所以在此步驟中無需進行任何調整。

單擊Tune步驟後,進入發佈步驟。

Publish步驟中,咱們能夠指定 Druid 中的數據源名稱。咱們將此數據源命名爲wikipedia。最後,單擊Next以查看 spec。

這是你構建的 spec。嘗試隨意返回並在以前的步驟中進行更改,以查看變更將如何更新 spec。一樣,你也能夠直接編輯 spec,並在前面的步驟中看到它。

對 spec 滿意後,點擊Submit建立攝取任務。

你將進入任務視圖,重點關注新建立的任務。任務視圖設置爲自動刷新,等待任務成功。

當一項任務成功完成時,意味着它創建了一個或多個 segment,這些 segment 將由數據服務器接收。

Datasources從標題導航到視圖。

等待直到你的數據源(wikipedia)出現。加載 segment 時可能須要幾秒鐘。

一旦看到綠色(徹底可用)圓圈,就能夠查詢數據源。此時,你能夠轉到Query視圖以對數據源運行 SQL 查詢。

運行SELECT * FROM "wikipedia"查詢以查看結果。

經過控制檯提交 supervisor

在控制檯中,單擊Submit supervisor打開提交 supervisor 窗口。

粘貼如下 spec 並點擊提交:

{
  "type": "kafka",
  "spec" : {
    "dataSchema": {
      "dataSource": "wikipedia",
      "timestampSpec": {
        "column": "time",
        "format": "auto"
      },
      "dimensionsSpec": {
        "dimensions": [
          "channel",
          "cityName",
          "comment",
          "countryIsoCode",
          "countryName",
          "isAnonymous",
          "isMinor",
          "isNew",
          "isRobot",
          "isUnpatrolled",
          "metroCode",
          "namespace",
          "page",
          "regionIsoCode",
          "regionName",
          "user",
          { "name": "added", "type": "long" },
          { "name": "deleted", "type": "long" },
          { "name": "delta", "type": "long" }
        ]
      },
      "metricsSpec" : [],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "DAY",
        "queryGranularity": "NONE",
        "rollup": false
      }
    },
    "tuningConfig": {
      "type": "kafka",
      "reportParseExceptions": false
    },
    "ioConfig": {
      "topic": "wikipedia",
      "inputFormat": {
        "type": "json"
      },
      "replicas": 2,
      "taskDuration": "PT10M",
      "completionTimeout": "PT20M",
      "consumerProperties": {
        "bootstrap.servers": "localhost:9092"
      }
    }
  }
}

這將啓動 supervisor,並分化出 task 監聽數據流入。

直接提交 supervisor

爲了直接啓動服務,咱們須要在 Druid 包根目錄下運行下面命令提交一個 supervisor spec 給 Druid overlord:

curl -XPOST -H'Content-Type: application/json' -d @quickstart/tutorial/wikipedia-kafka-supervisor.json http://localhost:8081/druid/indexer/v1/supervisor

若是 supervisor 成功建立,你將獲得一個包含 supervisor ID 的響應。在咱們的示例中,將返回{"id":"wikipedia"}

你能夠在控制檯中查看當前 supervisor 和 tasks: http://localhost:8888/unified-console.html#tasks.

查詢數據

當數據發送給 Kafka stream 後,馬上就能夠查詢數據。

本文翻譯自 Druid 官方文檔

請關注咱們。一塊兒學習 Druid 知識。

碼哥字節 **

相關文章
相關標籤/搜索