Flume NG是一個分佈式、可靠、可用的系統,它可以將不一樣數據源的海量日誌數據進行高效收集、聚合、移動,最後存儲到一箇中心化數據存儲系統中。由原來的Flume OG到如今的Flume NG,進行了架構重構,而且如今NG版本徹底不兼容原來的OG版本。通過架構重構後,Flume NG更像是一個輕量的小工具,很是簡單,容易適應各類方式日誌收集,並支持failover和負載均衡。前端
架構設計要點shell
Flume的架構主要有一下幾個核心概念:數據庫
- Event:一個數據單元,帶有一個可選的消息頭
- Flow:Event從源點到達目的點的遷移的抽象
- Client:操做位於源點處的Event,將其發送到Flume Agent
- Agent:一個獨立的Flume進程,包含組件Source、Channel、Sink
- Source:用來消費傳遞到該組件的Event
- Channel:中轉Event的一個臨時存儲,保存有Source組件傳遞過來的Event
- Sink:從Channel中讀取並移除Event,將Event傳遞到Flow Pipeline中的下一個Agent(若是有的話)
Flume NG架構,如圖所示:

外部系統產生日誌,直接經過Flume的Agent的Source組件將事件(如日誌行)發送到中間臨時的channel組件,最後傳遞給Sink組件,HDFS Sink組件能夠直接把數據存儲到HDFS集羣上。
一個最基本Flow的配置,格式以下:apache
01 |
# list the sources, sinks and channels for the agent |
02 |
<Agent>.sources = <Source1> <Source2> |
03 |
<Agent>.sinks = <Sink1> <Sink2> |
04 |
<Agent>.channels = <Channel1> <Channel2> |
06 |
# set channel for source |
07 |
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2> ... |
08 |
<Agent>.sources.<Source2>.channels = <Channel1> <Channel2> ... |
10 |
# set channel for sink |
11 |
<Agent>.sinks.<Sink1>.channel = <Channel1> |
12 |
<Agent>.sinks.<Sink2>.channel = <Channel2> |
尖括號裏面的,咱們能夠根據實際需求或業務來修更名稱。下面詳細說明:
表示配置一個Agent的名稱,一個Agent確定有一個名稱。與是Agent的Source組件的名稱,消費傳遞過來的Event。與是Agent的Channel組件的名稱。與是Agent的Sink組件的名稱,從Channel中消費(移除)Event。
上面配置內容中,第一組中配置Source、Sink、Channel,它們的值能夠有1個或者多個;第二組中配置Source將把數據存儲(Put)到哪個Channel中,能夠存儲到1個或多個Channel中,同一個Source將數據存儲到多個Channel中,其實是Replication;第三組中配置Sink從哪個Channel中取(Task)數據,一個Sink只能從一個Channel中取數據。
下面,根據官網文檔,咱們展現幾種Flow Pipeline,各自適應於什麼樣的應用場景:緩存

能夠將多個Agent順序鏈接起來,將最初的數據源通過收集,存儲到最終的存儲系統中。這是最簡單的狀況,通常狀況下,應該控制這種順序鏈接的Agent的數量,由於數據流經的路徑變長了,若是不考慮failover的話,出現故障將影響整個Flow上的Agent收集服務。服務器

這種狀況應用的場景比較多,好比要收集Web網站的用戶行爲日誌,Web網站爲了可用性使用的負載均衡的集羣模式,每一個節點都產生用戶行爲日誌,能夠爲每一個節點都配置一個Agent來單獨收集日誌數據,而後多個Agent將數據最終匯聚到一個用來存儲數據存儲系統,如HDFS上。架構

這種模式,有兩種方式,一種是用來複制(Replication),另外一種是用來分流(Multiplexing)。Replication方式,能夠將最前端的數據源複製多份,分別傳遞到多個channel中,每一個channel接收到的數據都是相同的,配置格式,以下所示:app
01 |
# List the sources, sinks and channels for the agent |
02 |
<Agent>.sources = <Source1> |
03 |
<Agent>.sinks = <Sink1> <Sink2> |
04 |
<Agent>.channels = <Channel1> <Channel2> |
06 |
# set list of channels for source (separated by space) |
07 |
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2> |
09 |
# set channel for sinks |
10 |
<Agent>.sinks.<Sink1>.channel = <Channel1> |
11 |
<Agent>.sinks.<Sink2>.channel = <Channel2> |
13 |
<Agent>.sources.<Source1>.selector.type = replicating |
上面指定了selector的type的值爲replication,其餘的配置沒有指定,使用的Replication方式,Source1會將數據分別存儲到Channel1和Channel2,這兩個channel裏面存儲的數據是相同的,而後數據被傳遞到Sink1和Sink2。
Multiplexing方式,selector能夠根據header的值來肯定數據傳遞到哪個channel,配置格式,以下所示:負載均衡
1 |
# Mapping for multiplexing selector |
2 |
<Agent>.sources.<Source1>.selector.type = multiplexing |
3 |
<Agent>.sources.<Source1>.selector.header = <someHeader> |
4 |
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1> |
5 |
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2> |
6 |
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2> |
9 |
<Agent>.sources.<Source1>.selector.default = <Channel2> |
上面selector的type的值爲multiplexing,同時配置selector的header信息,還配置了多個selector的mapping的值,即header的值:若是header的值爲Value一、Value2,數據從Source1路由到Channel1;若是header的值爲Value二、Value3,數據從Source1路由到Channel2。分佈式

Load balancing Sink Processor可以實現load balance功能,上圖Agent1是一個路由節點,負責將Channel暫存的Event均衡到對應的多個Sink組件上,而每一個Sink組件分別鏈接到一個獨立的Agent上,示例配置,以下所示:
2 |
a1.sinkgroups.g1.sinks = k1 k2 k3 |
3 |
a1.sinkgroups.g1.processor.type = load_balance |
4 |
a1.sinkgroups.g1.processor.backoff = true |
5 |
a1.sinkgroups.g1.processor.selector = round_robin |
6 |
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000 |
Failover Sink Processor可以實現failover功能,具體流程相似load balance,可是內部處理機制與load balance徹底不一樣:Failover Sink Processor維護一個優先級Sink組件列表,只要有一個Sink組件可用,Event就被傳遞到下一個組件。若是一個Sink可以成功處理Event,則會加入到一個Pool中,不然會被移出Pool並計算失敗次數,設置一個懲罰因子,示例配置以下所示:
2 |
a1.sinkgroups.g1.sinks = k1 k2 k3 |
3 |
a1.sinkgroups.g1.processor.type = failover |
4 |
a1.sinkgroups.g1.processor.priority.k1 = 5 |
5 |
a1.sinkgroups.g1.processor.priority.k2 = 7 |
6 |
a1.sinkgroups.g1.processor.priority.k3 = 6 |
7 |
a1.sinkgroups.g1.processor.maxpenalty = 20000 |
基本功能
咱們看一下,Flume NG都支持哪些功能(目前最新版本是1.5.0.1),瞭解它的功能集合,可以讓咱們在應用中更好地選擇使用哪種方案。說明Flume NG的功能,實際仍是圍繞着Agent的三個組件Source、Channel、Sink來看它可以支持哪些技術或協議。咱們再也不對各個組件支持的協議詳細配置進行說明,經過列表的方式分別對三個組件進行概要說明:
Source類型 |
說明 |
Avro Source |
支持Avro協議(其實是Avro RPC),內置支持 |
Thrift Source |
支持Thrift協議,內置支持 |
Exec Source |
基於Unix的command在標準輸出上生產數據 |
JMS Source |
從JMS系統(消息、主題)中讀取數據,ActiveMQ已經測試過 |
Spooling Directory Source |
監控指定目錄內數據變動 |
Twitter 1% firehose Source |
經過API持續下載Twitter數據,試驗性質 |
Netcat Source |
監控某個端口,將流經端口的每個文本行數據做爲Event輸入 |
Sequence Generator Source |
序列生成器數據源,生產序列數據 |
Syslog Sources |
讀取syslog數據,產生Event,支持UDP和TCP兩種協議 |
HTTP Source |
基於HTTP POST或GET方式的數據源,支持JSON、BLOB表示形式 |
Legacy Sources |
兼容老的Flume OG中Source(0.9.x版本) |
Channel類型 |
說明 |
Memory Channel |
Event數據存儲在內存中 |
JDBC Channel |
Event數據存儲在持久化存儲中,當前Flume Channel內置支持Derby |
File Channel |
Event數據存儲在磁盤文件中 |
Spillable Memory Channel |
Event數據存儲在內存中和磁盤上,當內存隊列滿了,會持久化到磁盤文件(當前試驗性的,不建議生產環境使用) |
Pseudo Transaction Channel |
測試用途 |
Custom Channel |
自定義Channel實現 |
Sink類型 |
說明 |
HDFS Sink |
數據寫入HDFS |
Logger Sink |
數據寫入日誌文件 |
Avro Sink |
數據被轉換成Avro Event,而後發送到配置的RPC端口上 |
Thrift Sink |
數據被轉換成Thrift Event,而後發送到配置的RPC端口上 |
IRC Sink |
數據在IRC上進行回放 |
File Roll Sink |
存儲數據到本地文件系統 |
Null Sink |
丟棄到全部數據 |
HBase Sink |
數據寫入HBase數據庫 |
Morphline Solr Sink |
數據發送到Solr搜索服務器(集羣) |
ElasticSearch Sink |
數據發送到Elastic Search搜索服務器(集羣) |
Kite Dataset Sink |
寫數據到Kite Dataset,試驗性質的 |
Custom Sink |
自定義Sink實現 |
另外還有Channel Selector、Sink Processor、Event Serializer、Interceptor等組件,能夠參考官網提供的用戶手冊。
應用實踐
安裝Flume NG很是簡單,咱們使用最新的1.5.0.1版本,執行以下命令:
3 |
tar xvzf apache-flume-1.5.0.1-bin.tar.gz |
4 |
cd apache-flume-1.5.0.1-bin |
若是須要使用到Hadoop集羣,保證Hadoop相關的環境變量都已經正確配置,而且Hadoop集羣可用。下面,經過一些實際的配置實例,來了解Flume的使用。爲了簡單期間,channel咱們使用Memory類型的channel。
- Avro Source+Memory Channel+Logger Sink
使用apache-flume-1.5.0.1自帶的例子,使用Avro Source接收外部數據源,Logger做爲sink,即經過Avro RPC調用,將數據緩存在channel中,而後經過Logger打印出調用發送的數據。
配置Agent,修改配置文件conf/flume-conf.properties,內容以下:
01 |
# Define a memory channel called ch1 on agent1 |
02 |
agent1.channels.ch1.type = memory |
04 |
# Define an Avro source called avro-source1 on agent1 and tell it |
05 |
# to bind to 0.0.0.0:41414. Connect it to channel ch1. |
06 |
agent1.sources.avro-source1.channels = ch1 |
07 |
agent1.sources.avro-source1.type = avro |
08 |
agent1.sources.avro-source1.bind = 0.0.0.0 |
09 |
agent1.sources.avro-source1.port = 41414 |
11 |
# Define a logger sink that simply logs all events it receives |
12 |
# and connect it to the other end of the same channel. |
13 |
agent1.sinks.log-sink1.channel = ch1 |
14 |
agent1.sinks.log-sink1.type = logger |
16 |
# Finally, now that we've defined all of our components, tell |
17 |
# agent1 which ones we want to activate. |
19 |
agent1.channels.ch1.capacity = 1000 |
20 |
agent1.sources = avro-source1 |
21 |
agent1.sinks = log-sink1 |
首先,啓動Agent進程:
1 |
bin/flume-ng agent -c ./conf/ -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent1 |
而後,啓動Avro Client,發送數據:
1 |
bin/flume-ng avro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/ local /programs/logs/ sync .log -Dflume.root.logger=DEBUG,console |
- Avro Source+Memory Channel+HDFS Sink
配置Agent,修改配置文件conf/flume-conf-hdfs.properties,內容以下:
01 |
# Define a source, channel, sink |
02 |
agent1.sources = avro-source1 |
04 |
agent1.sinks = hdfs-sink |
07 |
agent1.channels.ch1.type = memory |
08 |
agent1.channels.ch1.capacity = 1000000 |
09 |
agent1.channels.ch1.transactionCapacity = 500000 |
11 |
# Define an Avro source called avro-source1 on agent1 and tell it |
12 |
# to bind to 0.0.0.0:41414. Connect it to channel ch1. |
13 |
agent1.sources.avro-source1.channels = ch1 |
14 |
agent1.sources.avro-source1.type = avro |
15 |
agent1.sources.avro-source1.bind = 0.0.0.0 |
16 |
agent1.sources.avro-source1.port = 41414 |
18 |
# Define a logger sink that simply logs all events it receives |
19 |
# and connect it to the other end of the same channel. |
20 |
agent1.sinks.hdfs-sink1.channel = ch1 |
21 |
agent1.sinks.hdfs-sink1.type = hdfs |
23 |
agent1.sinks.hdfs-sink1.hdfs.filePrefix = sync_file |
24 |
agent1.sinks.hdfs-sink1.hdfs.fileSuffix = .log |
25 |
agent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576 |
26 |
agent1.sinks.hdfs-sink1.rollInterval = 0 |
27 |
agent1.sinks.hdfs-sink1.hdfs.rollCount = 0 |
28 |
agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500 |
29 |
agent1.sinks.hdfs-sink1.hdfs.round = true |
30 |
agent1.sinks.hdfs-sink1.hdfs.roundUnit = minute |
31 |
agent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25 |
32 |
agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true |
33 |
agent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1 |
34 |
agent1.sinks.hdfs-sink1.fileType = SequenceFile |
35 |
agent1.sinks.hdfs-sink1.writeFormat = TEXT |
首先,啓動Agent:
1 |
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-hdfs.properties -Dflume.root.logger=INFO,console -n agent1 |
而後,啓動Avro Client,發送數據:
1 |
bin/flume-ng avro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/ local /programs/logs/ sync .log -Dflume.root.logger=DEBUG,console |
能夠查看同步到HDFS上的數據:
1 |
hdfs dfs - ls /data/flume |
結果示例,以下所示:
1 |
-rw-r--r-- 3 shirdrn supergroup 1377617 2014-09-16 14:35 /data/flume/sync_file.1410849320761.log |
2 |
-rw-r--r-- 3 shirdrn supergroup 1378137 2014-09-16 14:35 /data/flume/sync_file.1410849320762.log |
3 |
-rw-r--r-- 3 shirdrn supergroup 259148 2014-09-16 14:35 /data/flume/sync_file.1410849320763.log |
- Spooling Directory Source+Memory Channel+HDFS Sink
配置Agent,修改配置文件flume-conf-spool.properties,內容以下:
01 |
# Define source, channel, sink |
02 |
agent1.sources = spool-source1 |
04 |
agent1.sinks = hdfs-sink1 |
07 |
agent1.channels.ch1.type = memory |
08 |
agent1.channels.ch1.capacity = 1000000 |
09 |
agent1.channels.ch1.transactionCapacity = 500000 |
11 |
# Define and configure an Spool directory source |
12 |
agent1.sources.spool-source1.channels = ch1 |
13 |
agent1.sources.spool-source1.type = spooldir |
14 |
agent1.sources.spool-source1.spoolDir = /home/shirdrn/data/ |
15 |
agent1.sources.spool-source1.ignorePattern = event(_\d{4}\-\d{2}\-\d{2}_\d{2}_\d{2})?\.log(\.COMPLETED)? |
16 |
agent1.sources.spool-source1.batchSize = 50 |
17 |
agent1.sources.spool-source1.inputCharset = UTF-8 |
19 |
# Define and configure a hdfs sink |
20 |
agent1.sinks.hdfs-sink1.channel = ch1 |
21 |
agent1.sinks.hdfs-sink1.type = hdfs |
23 |
agent1.sinks.hdfs-sink1.hdfs.filePrefix = event_%y-%m-%d_%H_%M_%S |
24 |
agent1.sinks.hdfs-sink1.hdfs.fileSuffix = .log |
25 |
agent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576 |
26 |
agent1.sinks.hdfs-sink1.hdfs.rollCount = 0 |
27 |
agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500 |
28 |
agent1.sinks.hdfs-sink1.hdfs.round = true |
29 |
agent1.sinks.hdfs-sink1.hdfs.roundUnit = minute |
30 |
agent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25 |
31 |
agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true |
32 |
agent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1 |
33 |
agent1.sinks.hdfs-sink1.fileType = SequenceFile |
34 |
agent1.sinks.hdfs-sink1.writeFormat = TEXT |
35 |
agent1.sinks.hdfs-sink1.rollInterval = 0 |
啓動Agent進程,執行以下命令:
1 |
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-spool.properties -Dflume.root.logger=INFO,console -n agent1 |
能夠查看HDFS上同步過來的數據:
1 |
hdfs dfs - ls /data/flume |
結果示例,以下所示:
01 |
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355094.log |
02 |
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355095.log |
03 |
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355096.log |
04 |
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355097.log |
05 |
-rw-r--r-- 3 shirdrn supergroup 1530 2014-09-17 10:53 /data/flume/event_14-09-17_10_52_00.1410922355098.log |
06 |
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380386.log |
07 |
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380387.log |
08 |
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380388.log |
09 |
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380389.log |
10 |
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380390.log |
- Exec Source+Memory Channel+File Roll Sink
配置Agent,修改配置文件flume-conf-file.properties,內容以下:
01 |
# Define source, channel, sink |
02 |
agent1.sources = tail-source1 |
04 |
agent1.sinks = file-sink1 |
07 |
agent1.channels.ch1.type = memory |
08 |
agent1.channels.ch1.capacity = 1000000 |
09 |
agent1.channels.ch1.transactionCapacity = 500000 |
11 |
# Define and configure an Exec source |
12 |
agent1.sources.tail-source1.channels = ch1 |
13 |
agent1.sources.tail-source1.type = exec |
14 |
agent1.sources.tail-source1.command = tail -F /home/shirdrn/data/event.log |
15 |
agent1.sources.tail-source1.shell = /bin/sh -c |
16 |
agent1.sources.tail-source1.batchSize = 50 |
18 |
# Define and configure a File roll sink |
19 |
# and connect it to the other end of the same channel. |
20 |
agent1.sinks.file-sink1.channel = ch1 |
21 |
agent1.sinks.file-sink1.type = file_roll |
22 |
agent1.sinks.file-sink1.batchSize = 100 |
23 |
agent1.sinks.file-sink1.serializer = TEXT |
24 |
agent1.sinks.file-sink1.sink.directory = /home/shirdrn/sink_data |
啓動Agent進程,執行以下命令:
1 |
bin/flume-ng agent -c ./conf/ -f conf/flume-conf- file .properties -Dflume.root.logger=INFO,console -n agent1 |
能夠查看File Roll Sink對應的本地文件系統目錄/home/shirdrn/sink_data下,示例以下所示:
1 |
-rw-rw-r-- 1 shirdrn shirdrn 13944825 Sep 17 11:36 1410924990039-1 |
2 |
-rw-rw-r-- 1 shirdrn shirdrn 11288870 Sep 17 11:37 1410924990039-2 |
3 |
-rw-rw-r-- 1 shirdrn shirdrn 0 Sep 17 11:37 1410924990039-3 |
4 |
-rw-rw-r-- 1 shirdrn shirdrn 20517500 Sep 17 11:38 1410924990039-4 |
5 |
-rw-rw-r-- 1 shirdrn shirdrn 16343250 Sep 17 11:38 1410924990039-5 |
有關Flume NG更多配置及其說明,請參考官方用戶手冊,很是詳細。