採集需求:某服務器的某特定目錄下,會不斷產生新的文件,每當有新文件出現,就須要把文件採集到HDFS中去web
根據需求,首先定義如下3大要素服務器
l 採集源,即source——監控文件目錄 : spooldirspa
l 下沉目標,即sink——HDFS文件系統 : hdfs sink3d
l source和sink之間的傳遞通道——channel,可用file channel 也能夠用內存memory channel日誌
配置文件編寫:code
vi spooldir-hdfs-sink.conform
#定義三大組件的名稱blog
agent1.sources = source1內存
agent1.sinks = sink1ci
agent1.channels = channel1
# 配置source組件
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /root/data/
agent1.sources.source1.fileHeader = false
#配置攔截器
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = timestamp
# 配置sink組件
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path =/weblog/flume-collection/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
#滾動生成的文件按大小生成
agent1.sinks.sink1.hdfs.rollSize = 102400
#滾動生成的文件按行數生成
agent1.sinks.sink1.hdfs.rollCount = 1000000
#滾動生成的文件按時間生成
agent1.sinks.sink1.hdfs.rollInterval = 60
#開啓滾動生成目錄
agent1.sinks.sink1.hdfs.round = true
#以10爲一梯度滾動生成
agent1.sinks.sink1.hdfs.roundValue = 10
#單位爲分鐘
agent1.sinks.sink1.hdfs.roundUnit = minute
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600
agent1.channels.channel1.keep-alive = 120
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
添加數據
aaa.txt
13601249301 100 200 300 400 500 600 700
13601249302 100 200 300 400 500 600 700
13601249303 100 200 300 400 500 600 700
13601249304 100 200 300 400 500 600 700
13601249305 100 200 300 400 500 600 700
執行命令
bin/flume-ng agent -c conf -f conf/spooldir-hdfs-sink.conf -n agent1 -Dflume.root.logger=INFO,console
flume的source採用spoodir時! 目錄下面不容許存放同名的文件,不然報錯!
Channel參數解釋:
capacity:默認該通道中最大的能夠存儲的event數量
trasactionCapacity:每次最大能夠從source中拿到或者送到sink中的event數量
keep-alive:event添加到通道中或者移出的容許時間
其餘組件:Interceptor(攔截器)
用於Source的一組Interceptor,按照預設的順序在必要地方裝飾和過濾events。
內建的Interceptors容許增長event的headers好比:時間戳、主機名、靜態標記等等
定製的interceptors能夠經過內省event payload(讀取原始日誌),實現本身的業務邏輯(很強大)