原文連接:https://blog.csdn.net/wangpei1949/article/details/80472928node
flume中有三種可監控文件或目錄的source、分別是Exec Source、Spooling Directory Source和Taildir Source。git
Taildir Source是1.7版本的新特性,綜合了Spooling Directory Source和Exec Source的優勢。
github
使用場景:數據庫
Exec Source
Exec Source可經過tail -f命令去tail住一個文件,而後實時同步日誌到sink。但存在的問題是,當agent進程掛掉重啓後,會有重複消費的問題。能夠經過增長UUID來解決,或經過改進ExecSource來解決。
Spooling Directory Source
Spooling Directory Source可監聽一個目錄,同步目錄中的新文件到sink,被同步完的文件可被當即刪除或被打上標記。適合用於同步新文件,但不適合對實時追加日誌的文件進行監聽並同步。若是須要實時監聽追加內容的文件,可對SpoolDirectorySource進行改進。
Taildir Source
Taildir Source可實時監控一批文件,並記錄每一個文件最新消費位置,agent進程重啓後不會有重複消費的問題。
使用時建議用1.8.0版本的flume,1.8.0版本中解決了Taildir Source一個可能會丟數據的bug。apache
還有github上讀取數據庫的source以及遞歸遍歷文件子目錄的文件夾
Taildir Source
agent配置
# source的名字
agent.sources = s1
# channels的名字
agent.channels = c1
# sink的名字
agent.sinks = r1json
# 指定source使用的channel
agent.sources.s1.channels = c1
# 指定sink使用的channel
agent.sinks.r1.channel = c1bootstrap
######## source相關配置 ########
# source類型
agent.sources.s1.type = TAILDIR
# 元數據位置
agent.sources.s1.positionFile = /Users/wangpei/tempData/flume/taildir_position.json
# 監控的目錄
agent.sources.s1.filegroups = f1
agent.sources.s1.filegroups.f1=/Users/wangpei/tempData/flume/data/.*log
agent.sources.s1.fileHeader = true數組
######## channel相關配置 ########
# channel類型
agent.channels.c1.type = file
# 數據存放路徑
agent.channels.c1.dataDirs = /Users/wangpei/tempData/flume/filechannle/dataDirs
# 檢查點路徑
agent.channels.c1.checkpointDir = /Users/wangpei/tempData/flume/filechannle/checkpointDir
# channel中最多緩存多少
agent.channels.c1.capacity = 1000
# channel一次最多吐給sink多少
agent.channels.c1.transactionCapacity = 100緩存
######## sink相關配置 ########
# sink類型
agent.sinks.r1.type = org.apache.flume.sink.kafka.KafkaSink
# brokers地址
agent.sinks.r1.kafka.bootstrap.servers = localhost:9092
# topic
agent.sinks.r1.kafka.topic = testTopic3
# 壓縮
agent.sinks.r1.kafka.producer.compression.type = snappy
記錄每一個文件消費位置的元數據
#配置
agent.sources.s1.positionFile = /Users/wangpei/tempData/flume/taildir_position.json
#內容
[
{
"inode":6028358,
"pos":144,
"file":"/Users/wangpei/tempData/flume/data/test.log"
},
{
"inode":6028612,
"pos":20,
"file":"/Users/wangpei/tempData/flume/data/test_a.log"
}
] app
能夠看到,在taildir_position.json文件中,經過json數組的方式,記錄了每一個文件最新的消費位置,每消費一次便去更新這個文件。