文章做者:foochanehtml
原文連接:foochane.cn/article/201…java
Flume日誌採集框架 安裝和部署 Flume運行機制 採集靜態文件到hdfs 採集動態日誌文件到hdfs 兩個agent級聯git
在一個完整的離線大數據處理系統中,除了hdfs+mapreduce+hive組成分析系統的核心以外,還須要數據採集、結果數據導出、任務調度等不可或缺的輔助系統,而這些輔助工具在hadoop生態體系中都有便捷的開源框架,如圖所示:github
Flume
是一個分佈式、可靠、和高可用的海量日誌採集、聚合和傳輸的系統。Flume
能夠採集文件,socket
數據包、文件、文件夾、kafka
等各類形式源數據,又能夠將採集到的數據(下沉sink
)輸出到HDFS
、hbase
、hive
、kafka
等衆多外部存儲系統中。shell
對於通常的採集需求,經過對flume的簡單配置便可實現。apache
Flume
針對特殊場景也具有良好的自定義擴展能力,所以,flume
能夠適用於大部分的平常數據採集場景。緩存
Flume
分佈式系統中最核心的角色是agent
,flume
採集系統就是由一個個agent
所鏈接起來造成,每個agent
至關於一個數據傳遞員,內部有三個組件:bash
Source
:採集組件,用於跟數據源對接,以獲取數據Sink
:下沉組件,用於往下一級agent
傳遞數據或者往最終存儲系統傳遞數據Channel
:傳輸通道組件,用於從source
將數據傳遞到sink
單個agent採集數據:服務器
多級agent之間串聯:app
1 下載安裝包apache-flume-1.9.0-bin.tar.gz
解壓
2 在conf
文件夾下的flume-env.sh
添加JAVA_HOME
export JAVA_HOME=/usr/local/bigdata/java/jdk1.8.0_211
複製代碼
3 根據採集的需求,添加採集方案配置文件,文件名能夠任意取
具體能夠看後面的示例
4 啓動flume
測試環境下:
$ bin/flume/-ng agent -c conf/ -f ./dir-hdfs.conf -n agent1 -Dflume.root.logger=INFO,console
複製代碼
命令說明:
-c
:指定flume自帶的配置文件目錄,不用本身修改-f
:指定本身的配置文件,這裏問當前文件夾下的dir-hdfs.conf
-n
:指定本身配置文件中使用那個agent
,對應的配置文件中定義的名字。-Dflume.root.logger
:把日誌打印在控制檯,類型爲INFO
,這個只用於測試,後面將打印到日誌文件中生產中,啓動flume,應該把flume啓動在後臺:
nohup bin/flume-ng agent -c ./conf -f ./dir-hdfs.conf -n agent1 1>/dev/null 2>&1 &
複製代碼
某服務器的某特定目錄下,會不斷產生新的文件,每當有新文件出現,就須要把文件採集到HDFS中去
在安裝目錄下添加文件dir-hdfs.conf
,而後添加配置信息。
先獲取agent
,命名爲agent1
,後面的配置都跟在agent1
後面,也能夠改成其餘值,如agt1
,同一個配置文件中能夠有多個配置配置方案,啓動agent
的時候獲取對應的名字就能夠。
根據需求,首先定義如下3大要素
即source
——監控文件目錄 : spooldir
spooldir
有以下特性:
COMPLETED
(可修改)即sink——HDFS
文件系統 : hdfs sink
即channel
——可用file channel
也能夠用內存channel
#定義三大組件的名稱
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# 配置source組件
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /root/log/
agent1.sources.source1.fileSuffix=.FINISHED
#文件每行的長度,注意這裏若是事情文件每行超過這個長度會自動切斷,會致使數據丟失
agent1.sources.source1.deserializer.maxLineLength=5120
# 配置sink組件
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path =hdfs://Master:9000/access_log/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.filePrefix = app_log
agent1.sinks.sink1.hdfs.fileSuffix = .log
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
# roll:滾動切換:控制寫文件的切換規則
## 按文件體積(字節)來切
agent1.sinks.sink1.hdfs.rollSize = 512000
## 按event條數切
agent1.sinks.sink1.hdfs.rollCount = 1000000
## 按時間間隔切換文件
agent1.sinks.sink1.hdfs.rollInterval = 60
## 控制生成目錄的規則
agent1.sinks.sink1.hdfs.round = true
agent1.sinks.sink1.hdfs.roundValue = 10
agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
# channel組件配置
agent1.channels.channel1.type = memory
## event條數
agent1.channels.channel1.capacity = 500000
##flume事務控制所須要的緩存容量600條event
agent1.channels.channel1.transactionCapacity = 600
# 綁定source、channel和sink之間的鏈接
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
複製代碼
Channel
參數解釋:
capacity
:默認該通道中最大的能夠存儲的event
數量trasactionCapacity
:每次最大能夠從source
中拿到或者送到sink
中的event
數量keep-alive
:event
添加到通道中或者移出的容許時間$ bin/flume/-ng agent -c conf/ -f dir-hdfs.conf -n agent1 -Dflume.root.logger=INFO,console
複製代碼
好比業務系統使用log4j生成的日誌,日誌內容不斷增長,須要把追加到日誌文件中的數據實時採集到hdfs
配置文件名稱:tail-hdfs.conf
根據需求,首先定義如下3大要素:
source
——監控文件內容更新 : exec
tail -F file
sink——HDFS
文件系統 : hdfs sinkSource
和sink
之間的傳遞通道——channel
,可用file channel
也能夠用 內存channel
配置文件內容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/app_weichat_login.log
# Describe the sink
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path =hdfs://Master:9000/app_weichat_login_log/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.filePrefix = weichat_log
agent1.sinks.sink1.hdfs.fileSuffix = .dat
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 100
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 60
agent1.sinks.sink1.hdfs.round = true
agent1.sinks.sink1.hdfs.roundValue = 1
agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
複製代碼
啓動命令:
bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1
複製代碼
從tail命令獲取數據發送到avro端口 另外一個節點可配置一個avro源來中繼數據,發送外部存儲
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/log/access.log
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hdp-05
a1.sinks.k1.port = 4141
a1.sinks.k1.batch-size = 2
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
複製代碼
從avro端口接收數據,下沉到hdfs
採集配置文件,avro-hdfs.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
##source中的avro組件是一個接收者服務
a1.sources.r1.type = avro
a1.sources.r1.bind = hdp-05
a1.sources.r1.port = 4141
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/taildata/%y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = tail-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 24
a1.sinks.k1.hdfs.roundUnit = hour
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 50
a1.sinks.k1.hdfs.batchSize = 10
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件類型,默認是Sequencefile,可用DataStream,則爲普通文本
a1.sinks.k1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
複製代碼