大數據學習——採集文件到HDFS

採集需求:好比業務系統使用log4j生成的日誌,日誌內容不斷增長,須要把追加到日誌文件中的數據實時採集到hdfsspa

 

根據需求,首先定義如下3大要素日誌

l  採集源,即source——監控文件內容更新 :  exec  ‘tail -F file’code

l  下沉目標,即sink——HDFS文件系統  :  hdfs sinkorm

l  Source和sink之間的傳遞通道——channel,可用file channel 也能夠用 內存channelblog

 

vi exec-hdfs-sink.confip

agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1內存

# Describe/configure tail -F source1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /root/logs/access_log
agent1.sources.source1.channels = channel1it

#configure host for source
agent1.sources.source1.interceptors = i1 i2
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname
#agent1.sources.source1.interceptors.i1.useIP=true 表示使用ip地址或者主機名console

agent1.sources.source1.interceptors.i1.useIP=false
agent1.sources.source1.interceptors.i2.type = timestampclass

# Describe sink1

agent1.sinks.sink1.type = hdfs

#a1.sinks.k1.channel = c1

agent1.sinks.sink1.hdfs.path=hdfs://mini1:9000/file/%{hostname}/%y-%m-%d/%H-%M

agent1.sinks.sink1.hdfs.filePrefix = access_log

agent1.sinks.sink1.hdfs.batchSize= 100

agent1.sinks.sink1.hdfs.fileType = DataStream

agent1.sinks.sink1.hdfs.writeFormat =Text

agent1.sinks.sink1.hdfs.rollSize = 10240

agent1.sinks.sink1.hdfs.rollCount = 1000

模擬數據

mkdir logs
cd logs
while true; do date >>access_log ;sleep 0.5s; done 

 

 啓動

bin/flume-ng agent -c conf -f conf/exec-hdfs-sink.conf -n agent1 -Dflume.root.logger=INFO,console

查看結果

 

相關文章
相關標籤/搜索