Gobblin採集kafka數據

一.Gobblin環境變量準備css

須要配置好Gobblin0.11.0工做時對應的環境變量,能夠去Gobblin的bin目錄的gobblin-env.sh配置,好比html

複製代碼
job.name=GobblinKafkaQuickStart
job.group=GobblinKafka
job.description=Gobblin quick start job for Kafka
job.lock.enabled=false
job.schedule=0/3 * * * ?
kafka.brokers=101.236.39.141:9092,101.236.46.114:9092,101.236.46.113:9092
source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=gobblin.extract.kafka
     
writer.builder.class=gobblin.writer.SimpleDataWriterBuilder
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=txt
     
data.publisher.type=gobblin.publisher.BaseDataPublisher
     
mr.job.max.mappers=1
     
metrics.reporting.file.enabled=true
metrics.log.dir=${env:GOBBLIN_WORK_DIR}/metrics
metrics.reporting.file.suffix=txt
     
bootstrap.with.offset=earliest
複製代碼

這裏須要配置好抽取數據的kafka broker以及一些gobblin的工做組件,如source,extract,writer,publisher等,不明白的能夠參考Gobblin wiki,很詳細.java

我這裏額外配置了一個job.schedule讓gobblin三分鐘檢查一次kafka的全部topic是否有新增,而後抽取任務就會三分鐘一次定時執行.這裏用的Gobblin自帶的Quartz定時器.git

ok,配置好之後進入Gobblin根目錄,啓動命令如:github

 bin/gobblin-standalone.sh –conffile $GOBBLIN_JOB_CONFIG_DIR/gobblinStandalone.pull startjson

我這裏GOBBLIN_JOB_CONFIG_DIR有多個pull文件,所以須要指明,若是GOBBLIN_JOB_CONFIG_DIR下只有一個配置文件,那麼直接bin/gobblin-standalone.sh start便可執行bootstrap

最終抽取過來的數據會輸出到GOBBLIN_WORK_DIR/job-output 中去.app

三.Gobblin MapReduce模式配置和使用ide

此次配置Gobblin會使用MapReduce來抽取kafka數據到Hdfs,新建gobblin-mr.pull文件,配置以下工具

複製代碼
job.name=GobblinKafkaMapreduce
job.group=GobblinKafkaForMapreduce
job.description=Gobblin quick start job for Kafka
job.lock.enabled=false
kafka.brokers=101.236.39.141:9092,101.236.46.114:9092,101.236.46.113:9092

source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=gobblin.extract.kafka
topic.whitelist=boot

writer.builder.class=gobblin.writer.SimpleDataWriterBuilder
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=txt
simple.writer.delimiter=\n
data.publisher.type=gobblin.publisher.BaseDataPublisher

mr.job.max.mappers=1

metrics.reporting.file.enabled=true
metrics.log.dir=${env:GOBBLIN_WORK_DIR}/metrics
metrics.reporting.file.suffix=txt

bootstrap.with.offset=earliest

fs.uri=hdfs://101.236.39.141:9000
#fs.uri=hdfs://clusterYL
writer.fs.uri=${fs.uri}
state.store.fs.uri=${fs.uri}

mr.job.root.dir=/gobblin/working
state.store.dir=/gobblin/state-store
task.data.root.dir=/gobblin/task-data
data.publisher.final.dir=/gobblin/job-output

複製代碼

 

注意標紅部分的配置第一行,我這裏加了topic過濾,只對topic名稱爲jsonTest的主題感興趣

由於需求是須要將gobblin的topic數據按照天天每小時來進行目錄分區,具體分區目錄須要根據kafka record中的時間字段來

我這裏record是json格式的,時間字段格式如{…"time":"2016-10-12 00:30:20"…},所以須要繼承Gobblin的TimeBasedWriterPartitioner來重寫子類方法按照時間字段對hdfs的目錄分區

如下配置須要注意

fs.uri=master:8020

改爲本身的集羣的hdfs地址

重寫的hdfs按照json時間字段分區的子類,代碼我提交到github了,參考以下連接

https://github.com/cssdongl/gobblin/blob/master/gobblin-example/src/main/java/gobblin/example/simplejson/TimeBasedJsonWriterPartitioner.java

將擴展後的類加入Gobblin相應的模塊,我這裏是放入gobblin-example模塊中去了,從新build,build有問題的話請參考這篇文章

上面配置文件最後的那些路徑都是hdfs路徑,請確保Gobblin有讀寫權限

隨後啓動命令

bin/gobblin-mapreduce.sh --conf $GOBBLIN_JOB_CONFIG_DIR/gobblin-mr.pull

運行成功後,hdfs會出現以下目錄,jsonTest是按照對應topic名稱生成的,以下圖

GobblinPartion1

 

GobblinPartion3

注意MR模式配置Quartz定時調度我試了好幾回不起做用,所以若是須要定時執行抽取的話請利用外部的工具,好比Linux的crontab或者Oozie或者Azkaban都是能夠的.

四.Gobblin使用總結

1>先熟悉Gobblin官方wiki,寫的很詳細

2>github上fork一個源代碼仔細閱讀下source,extract,partioner這塊兒的代碼

3>使用中遇到問題多研究Gobblin的log和Hadoop的log.

參考資料:

http://gobblin.readthedocs.io/en/latest/case-studies/Kafka-HDFS-Ingestion/

http://gobblin.readthedocs.io/en/latest/user-guide/Partitioned-Writers/

http://gobblin.readthedocs.io/en/latest/developer-guide/IDE-setup/

http://gobblin.readthedocs.io/en/latest/user-guide/FAQs/

相關文章
相關標籤/搜索