Druid的單機版安裝參考:http://www.javashuo.com/article/p-cxpcxjat-kd.htmlapache
Druid實時接入Kafka的過程json
下載、安裝、啓動kafka過程:bootstrap
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.1/kafka_2.11-2.2.1.tgz tar -zxvf kafka_2.11-2.2.1.tgz ln -s kafka_2.11-2.2.1 kafka $KAFKA_HOME/kafka-server-start.sh ~/kafka/config/server.properties 1>/dev/null 2>&1 &
建立topic : wikipedia./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikipedia
ide
解壓wikiticker-2015-09-12-sampled.json.gz文件,這個步驟是給kafka topic準備輸入文件ui
cd $DRUID_HOME/quickstart/tutorial gunzip -k wikiticker-2015-09-12-sampled.json.gz
這個步驟操做完成後,在$DRUID_HOME/quickstart/tutorial文件夾下生成wikiticker-2015-09-12-sampled.jsonspa
上圖配置文件以下,其中bootstrap.servers配置kafka地址code
{ "type": "kafka", "dataSchema": { "dataSource": "wikipedia", "parser": { "type": "string", "parseSpec": { "format": "json", "timestampSpec": { "column": "time", "format": "auto" }, "dimensionsSpec": { "dimensions": [ "channel", "cityName", "comment", "countryIsoCode", "countryName", "isAnonymous", "isMinor", "isNew", "isRobot", "isUnpatrolled", "metroCode", "namespace", "page", "regionIsoCode", "regionName", "user", { "name": "added", "type": "long" }, { "name": "deleted", "type": "long" }, { "name": "delta", "type": "long" } ] } } }, "metricsSpec" : [], "granularitySpec": { "type": "uniform", "segmentGranularity": "DAY", "queryGranularity": "NONE", "rollup": false } }, "tuningConfig": { "type": "kafka", "reportParseExceptions": false }, "ioConfig": { "topic": "wikipedia", "replicas": 2, "taskDuration": "PT10M", "completionTimeout": "PT20M", "consumerProperties": { "bootstrap.servers": "localhost:9092" } } }
接下來要將wikiticker-2015-09-12-sampled.json文件內容,利用kafka生產者腳本寫入wikipedia的topic中orm
export KAFKA_OPTS="-Dfile.encoding=UTF-8" ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < {PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json