一.Gobblin環境變量準備css
須要配置好Gobblin0.11.0工做時對應的環境變量,能夠去Gobblin的bin目錄的gobblin-env.sh配置,好比html
job.name=GobblinKafkaQuickStart
job.group=GobblinKafka
job.description=Gobblin quick start job for Kafka
job.lock.enabled=false
job.schedule=0/3 * * * ?
kafka.brokers=101.236.39.141:9092,101.236.46.114:9092,101.236.46.113:9092
source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=gobblin.extract.kafka
writer.builder.class=gobblin.writer.SimpleDataWriterBuilder
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=txt
data.publisher.type=gobblin.publisher.BaseDataPublisher
mr.job.max.mappers=1
metrics.reporting.file.enabled=true
metrics.log.dir=${env:GOBBLIN_WORK_DIR}/metrics
metrics.reporting.file.suffix=txt
bootstrap.with.offset=earliest
這裏須要配置好抽取數據的kafka broker以及一些gobblin的工做組件,如source,extract,writer,publisher等,不明白的能夠參考Gobblin wiki,很詳細.java
我這裏額外配置了一個job.schedule讓gobblin三分鐘檢查一次kafka的全部topic是否有新增,而後抽取任務就會三分鐘一次定時執行.這裏用的Gobblin自帶的Quartz定時器.git
ok,配置好之後進入Gobblin根目錄,啓動命令如:github
bin/gobblin-standalone.sh –conffile $GOBBLIN_JOB_CONFIG_DIR/gobblinStandalone.pull startjson
我這裏GOBBLIN_JOB_CONFIG_DIR有多個pull文件,所以須要指明,若是GOBBLIN_JOB_CONFIG_DIR下只有一個配置文件,那麼直接bin/gobblin-standalone.sh start便可執行bootstrap
最終抽取過來的數據會輸出到GOBBLIN_WORK_DIR/job-output 中去.app
三.Gobblin MapReduce模式配置和使用ide
此次配置Gobblin會使用MapReduce來抽取kafka數據到Hdfs,新建gobblin-mr.pull文件,配置以下工具
job.name=GobblinKafkaMapreduce
job.group=GobblinKafkaForMapreduce
job.description=Gobblin quick start job for Kafka
job.lock.enabled=false
kafka.brokers=101.236.39.141:9092,101.236.46.114:9092,101.236.46.113:9092
source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=gobblin.extract.kafka
topic.whitelist=boot
writer.builder.class=gobblin.writer.SimpleDataWriterBuilder
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=txt
simple.writer.delimiter=\n
data.publisher.type=gobblin.publisher.BaseDataPublisher
mr.job.max.mappers=1
metrics.reporting.file.enabled=true
metrics.log.dir=${env:GOBBLIN_WORK_DIR}/metrics
metrics.reporting.file.suffix=txt
bootstrap.with.offset=earliest
fs.uri=hdfs://101.236.39.141:9000
#fs.uri=hdfs://clusterYL
writer.fs.uri=${fs.uri}
state.store.fs.uri=${fs.uri}
mr.job.root.dir=/gobblin/working
state.store.dir=/gobblin/state-store
task.data.root.dir=/gobblin/task-data
data.publisher.final.dir=/gobblin/job-output
注意標紅部分的配置第一行,我這裏加了topic過濾,只對topic名稱爲jsonTest的主題感興趣
由於需求是須要將gobblin的topic數據按照天天每小時來進行目錄分區,具體分區目錄須要根據kafka record中的時間字段來
我這裏record是json格式的,時間字段格式如{…"time":"2016-10-12 00:30:20"…},所以須要繼承Gobblin的TimeBasedWriterPartitioner來重寫子類方法按照時間字段對hdfs的目錄分區
如下配置須要注意
fs.uri=master:8020
改爲本身的集羣的hdfs地址
重寫的hdfs按照json時間字段分區的子類,代碼我提交到github了,參考以下連接
將擴展後的類加入Gobblin相應的模塊,我這裏是放入gobblin-example模塊中去了,從新build,build有問題的話請參考這篇文章
上面配置文件最後的那些路徑都是hdfs路徑,請確保Gobblin有讀寫權限
隨後啓動命令
bin/gobblin-mapreduce.sh --conf $GOBBLIN_JOB_CONFIG_DIR/gobblin-mr.pull
運行成功後,hdfs會出現以下目錄,jsonTest是按照對應topic名稱生成的,以下圖
注意MR模式配置Quartz定時調度我試了好幾回不起做用,所以若是須要定時執行抽取的話請利用外部的工具,好比Linux的crontab或者Oozie或者Azkaban都是能夠的.
四.Gobblin使用總結
1>先熟悉Gobblin官方wiki,寫的很詳細
2>github上fork一個源代碼仔細閱讀下source,extract,partioner這塊兒的代碼
3>使用中遇到問題多研究Gobblin的log和Hadoop的log.
參考資料:
http://gobblin.readthedocs.io/en/latest/case-studies/Kafka-HDFS-Ingestion/
http://gobblin.readthedocs.io/en/latest/user-guide/Partitioned-Writers/
http://gobblin.readthedocs.io/en/latest/developer-guide/IDE-setup/