flume_kafkaChannel_kafkaSink

agent.sources = source
#抽取類型爲目錄
agent.sources.source.type = spooldir
#抽取的文件目錄
agent.sources.source.spoolDir = /root/tmp/flume/data
#添加一個存儲絕對路徑文件名的頭
agent.sources.source.fileHeader = true
#never or immediate 收集文件完成以後是否刪除
agent.sources.source.deletePolicy = immediate
#監視子目錄以查找新文件。
agent.sources.source.recursiveDirectorySearch = false
#批量傳輸到通道的粒度
agent.sources.source.batchSize = 20
#解串器使用的字符集,將輸入文件視爲文本
agent.sources.source.inputCharset = UTF-8
#輸入文件中出現不可解碼的字符FAIL,REPLACE,IGNORE
agent.sources.source.decodeErrorPolicy = REPLACEapache


agent.sources.source.channels = channel
#kafka  通道
agent.channels  =  channel
agent.channels.channel.type = org.apache.flume.channel.kafka.KafkaChannel
agent.channels.channel.kafka.bootstrap.servers = slave1:9092
agent.channels.channel.kafka.topic = channeltopic
agent.channels.channel.kafka.consumer.group.id = flume-consumer
#偏移重置earliest,latest,none
agent.channels.channel.kafka.consumer.auto.offset.reset = none
agent.channels.channel.kafka.pollTimeout = 1000bootstrap

#file  通道
#agent.channels  =  channel
#agent.channels.channel.type  =  file 
#agent.channels.channel.checkpointDir  =  /root/tmp/flume/checkpoint 
#agent.channels.channel.checkpointOnClose = true
#agent.channels.channel.dataDirs  =  /root/tmp/flume/channeldataapp

agent.sinks.sink.channel = channelcode

agent.sinks = sink
agent.sinks.sink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.sink.kafka.topic = test
agent.sinks.sink.kafka.bootstrap.servers = slave1:9092
#一個批處理中處理的消息數
agent.sinks.sink.kafka.flumeBatchSize = 100
#0(從不等待確認),1(僅等待領導者),-1(等待全部副本)
agent.sinks.sink.kafka.producer.acks = -1
agent.sinks.sink.kafka.producer.linger.ms = 1
agent.sinks.sink.kafka.producer.compression.type = snappy
 server

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息