採集需求:某服務器的某特定目錄下,會不斷產生新的文件,每當有新文件出現,就須要把文件採集到HDFS中去html
根據需求,首先定義如下3大要素前端
配置文件編寫:node
#定義三大組件的名稱
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# 配置source組件
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /home/hadoop/logs/
agent1.sources.source1.fileHeader = false
#配置攔截器
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname
# 配置sink組件
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 102400
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 60
#agent1.sinks.sink1.hdfs.round = true
#agent1.sinks.sink1.hdfs.roundValue = 10
#agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
Channel參數解釋:web
capacity:默認該通道中最大的能夠存儲的event數量apache
trasactionCapacity:每次最大能夠從source中拿到或者送到sink中的event數量服務器
keep-alive:event添加到通道中或者移出的容許時間ide
根據需求,首先定義如下3大要素oop
配置文件編寫:.ui
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# Describe/configure tail -F source1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /home/hadoop/logs/access_log
agent1.sources.source1.channels = channel1
#configure host for source
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname
# Describe sink1
agent1.sinks.sink1.type = hdfs
#a1.sinks.k1.channel = c1
agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 102400
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 60
agent1.sinks.sink1.hdfs.round = true
agent1.sinks.sink1.hdfs.roundValue = 10
agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
tail-hdfs.conf 用tail命令獲取數據,下沉到hdfs mkdir /home/hadoop/log while true do echo 111111 >> /home/admin/log/test.log sleep 0.5 done tail -F test.log 採集到hdfs中, 文件中的目錄不用本身建的 檢查下hdfs式否是salf模式: hdfs dfsadmin -report bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1 前端頁面查看下, master:50070, 文件目錄: /flum/events/16-04-20/ 啓動命令: bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1 ################################################################ # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 #exec 指的是命令 # Describe/configure the source a1.sources.r1.type = exec #F根據文件名追中, f根據文件的nodeid追中 a1.sources.r1.command = tail -F /home/admin/log/test.log a1.sources.r1.channels = c1 # Describe the sink #下沉目標 a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 #指定目錄, flum幫作目的替換 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/ #文件的命名, 前綴 a1.sinks.k1.hdfs.filePrefix = events- #10 分鐘就改目錄 a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute #文件滾動以前的等待時間(秒) a1.sinks.k1.hdfs.rollInterval = 3 #文件滾動的大小限制(bytes) a1.sinks.k1.hdfs.rollSize = 500 #寫入多少個event數據後滾動文件(事件個數) a1.sinks.k1.hdfs.rollCount = 20 #5個事件就往裏面寫入 a1.sinks.k1.hdfs.batchSize = 5 #用本地時間格式化目錄 a1.sinks.k1.hdfs.useLocalTimeStamp = true #下沉後, 生成的文件類型,默認是Sequencefile,可用DataStream,則爲普通文本 a1.sinks.k1.hdfs.fileType = DataStream # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
在第一臺機器上啓動接收服務:this
從avro端口接收數據,下沉到logger bin/flume-ng agent -c conf -f conf/avro-logger.conf -n a1 -Dflume.root.logger=INFO,console ######### 採集配置文件,avro-hdfs.conf # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source #source中的avro組件是接收者服務, 綁定本機 a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
在第二臺機器上發送數據:
向第二臺機器的4141端口發送test.log文件數據
發送數據:
$ bin/flume-ng avro-client -H 第一臺機器ip -p 4141 -F /home/admin/log/test.log
Flume支持衆多的source和sink類型,詳細手冊可參考官方文檔