本教程演示瞭如何使用 Druid 的 Kafka indexing 服務從 Kafka 流中加載數據至 Druid。html
在本教程中,咱們假設你已經按照 quickstart 文檔中使用micro-quickstart
單機配置所描述的下載了 Druid,並在本機運行了 Druid。你不須要加載任何數據。shell
Apache Kafka是一種高吞吐量消息總線,可與 Druid 很好地配合使用。在本教程中,咱們將使用 Kafka 2.1.0。在終端運行下面命令下載 Kafka:apache
curl -O https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz tar -xzf kafka_2.12-2.1.0.tgz cd kafka_2.12-2.1.0
在終端運行下面命令啓動 kafka broker:json
./bin/kafka-server-start.sh config/server.properties
運行下面命令建立名爲wikipedia
的 topic,咱們將向其發送數據:bootstrap
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikipedia
爲wikipedia
topic 啓動一個 kafka producer,併發送數據。服務器
在 Druid 目錄下,運行下面命令:併發
cd quickstart/tutorial gunzip -c wikiticker-2015-09-12-sampled.json.gz > wikiticker-2015-09-12-sampled.json
在 Kafka 目錄下運行下面命令,將{PATH_TO_DRUID}替換成你的 Kafka 路徑:app
export KAFKA_OPTS="-Dfile.encoding=UTF-8" ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < {PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json
上面命令會向 kakfa 的wikiapedia topic 發送 events。以後,咱們將使用 Druid 的 Kafka indexing 服務從 Kafka topic 中提取數據。curl
導航至 localhost:8080 並單擊控制檯頂部的Load data
。ide
選擇 Apache Kafka
並單擊 Connect data
.
輸入 bootstrap:localhost:9092
和 topic:wikipedia
。
單擊Preview
並肯定你看到的數據正確。
找到數據後,能夠單擊"Next: Parse data"進入下一步。
data loader 會嘗試自動選擇正確的數據解析器。在本示例中,將選擇json
解析器。你能夠嘗試選擇其餘解析器,看看 Druid 是如何解析數據的。
選擇json
解析器,點擊Next: Parse time
進入下一步,來肯定 timestamp 列。
Druid 須要一個主 timestamp 列(內部將存儲在__time 列)。若是你的數據中沒有 timestamp 列,選擇Constant value
。在咱們的示例中,將選擇time
列,由於它是數據之中惟一能夠做爲主時間列的候選者。
單擊Next: ...
兩次以跳過Transform
和Filter
步驟。
您無需在這些步驟中輸入任何內容,由於應用提取數據的時間變換和過濾器不在本教程範圍內。
在Configure schema
步驟中,你能夠配置哪些維度和指標能夠攝入 Druid。這是數據被攝入 Druid 後呈現的樣子。因爲咱們的數據集比較小,點擊Rollup
開關關閉 rollup 功能。
對 schema 配置滿意後,單擊Next
進入Partition
步驟,以調整數據至 segment 的分區。
在這裏,您能夠調整如何在 Druid 中將數據拆分爲多個段。因爲這是一個很小的數據集,所以在此步驟中無需進行任何調整。
單擊Tune
步驟後,進入發佈步驟。
在Publish
步驟中,咱們能夠指定 Druid 中的數據源名稱。咱們將此數據源命名爲wikipedia
。最後,單擊Next
以查看 spec。
這是你構建的 spec。嘗試隨意返回並在以前的步驟中進行更改,以查看變更將如何更新 spec。一樣,你也能夠直接編輯 spec,並在前面的步驟中看到它。
對 spec 滿意後,點擊Submit
建立攝取任務。
你將進入任務視圖,重點關注新建立的任務。任務視圖設置爲自動刷新,等待任務成功。
當一項任務成功完成時,意味着它創建了一個或多個 segment,這些 segment 將由數據服務器接收。
Datasources
從標題導航到視圖。
等待直到你的數據源(wikipedia
)出現。加載 segment 時可能須要幾秒鐘。
一旦看到綠色(徹底可用)圓圈,就能夠查詢數據源。此時,你能夠轉到Query
視圖以對數據源運行 SQL 查詢。
運行SELECT * FROM "wikipedia"
查詢以查看結果。
在控制檯中,單擊Submit supervisor
打開提交 supervisor 窗口。
粘貼如下 spec 並點擊提交:
{ "type": "kafka", "spec" : { "dataSchema": { "dataSource": "wikipedia", "timestampSpec": { "column": "time", "format": "auto" }, "dimensionsSpec": { "dimensions": [ "channel", "cityName", "comment", "countryIsoCode", "countryName", "isAnonymous", "isMinor", "isNew", "isRobot", "isUnpatrolled", "metroCode", "namespace", "page", "regionIsoCode", "regionName", "user", { "name": "added", "type": "long" }, { "name": "deleted", "type": "long" }, { "name": "delta", "type": "long" } ] }, "metricsSpec" : [], "granularitySpec": { "type": "uniform", "segmentGranularity": "DAY", "queryGranularity": "NONE", "rollup": false } }, "tuningConfig": { "type": "kafka", "reportParseExceptions": false }, "ioConfig": { "topic": "wikipedia", "inputFormat": { "type": "json" }, "replicas": 2, "taskDuration": "PT10M", "completionTimeout": "PT20M", "consumerProperties": { "bootstrap.servers": "localhost:9092" } } } }
這將啓動 supervisor,並分化出 task 監聽數據流入。
爲了直接啓動服務,咱們須要在 Druid 包根目錄下運行下面命令提交一個 supervisor spec 給 Druid overlord:
curl -XPOST -H'Content-Type: application/json' -d @quickstart/tutorial/wikipedia-kafka-supervisor.json http://localhost:8081/druid/indexer/v1/supervisor
若是 supervisor 成功建立,你將獲得一個包含 supervisor ID 的響應。在咱們的示例中,將返回{"id":"wikipedia"}
。
你能夠在控制檯中查看當前 supervisor 和 tasks: http://localhost:8888/unified-console.html#tasks.
當數據發送給 Kafka stream 後,馬上就能夠查詢數據。
本文翻譯自 Druid 官方文檔
請關注咱們。一塊兒學習 Druid 知識。