【轉】Flume(NG)架構設計要點及配置實踐

Flume(NG)架構設計要點及配置實踐

Flume NG是一個分佈式、可靠、可用的系統,它可以將不一樣數據源的海量日誌數據進行高效收集、聚合、移動,最後存儲到一箇中心化數據存儲系統中。由原來的Flume OG到如今的Flume NG,進行了架構重構,而且如今NG版本徹底不兼容原來的OG版本。通過架構重構後,Flume NG更像是一個輕量的小工具,很是簡單,容易適應各類方式日誌收集,並支持failover和負載均衡。前端

架構設計要點shell

Flume的架構主要有一下幾個核心概念:數據庫

  • Event:一個數據單元,帶有一個可選的消息頭
  • Flow:Event從源點到達目的點的遷移的抽象
  • Client:操做位於源點處的Event,將其發送到Flume Agent
  • Agent:一個獨立的Flume進程,包含組件Source、Channel、Sink
  • Source:用來消費傳遞到該組件的Event
  • Channel:中轉Event的一個臨時存儲,保存有Source組件傳遞過來的Event
  • Sink:從Channel中讀取並移除Event,將Event傳遞到Flow Pipeline中的下一個Agent(若是有的話)

Flume NG架構,如圖所示:
flume-ng-architecture
外部系統產生日誌,直接經過Flume的Agent的Source組件將事件(如日誌行)發送到中間臨時的channel組件,最後傳遞給Sink組件,HDFS Sink組件能夠直接把數據存儲到HDFS集羣上。
一個最基本Flow的配置,格式以下:apache

01 # list the sources, sinks and channels for the agent
02 <Agent>.sources = <Source1> <Source2>
03 <Agent>.sinks = <Sink1> <Sink2>
04 <Agent>.channels = <Channel1> <Channel2>
05  
06 # set channel for source
07 <Agent>.sources.<Source1>.channels = <Channel1> <Channel2> ...
08 <Agent>.sources.<Source2>.channels = <Channel1> <Channel2> ...
09  
10 # set channel for sink
11 <Agent>.sinks.<Sink1>.channel = <Channel1>
12 <Agent>.sinks.<Sink2>.channel = <Channel2>

尖括號裏面的,咱們能夠根據實際需求或業務來修更名稱。下面詳細說明:
表示配置一個Agent的名稱,一個Agent確定有一個名稱。與是Agent的Source組件的名稱,消費傳遞過來的Event。與是Agent的Channel組件的名稱。與是Agent的Sink組件的名稱,從Channel中消費(移除)Event。
上面配置內容中,第一組中配置Source、Sink、Channel,它們的值能夠有1個或者多個;第二組中配置Source將把數據存儲(Put)到哪個Channel中,能夠存儲到1個或多個Channel中,同一個Source將數據存儲到多個Channel中,其實是Replication;第三組中配置Sink從哪個Channel中取(Task)數據,一個Sink只能從一個Channel中取數據。
下面,根據官網文檔,咱們展現幾種Flow Pipeline,各自適應於什麼樣的應用場景:緩存

  • 多個Agent順序鏈接

flume-multiseq-agents
能夠將多個Agent順序鏈接起來,將最初的數據源通過收集,存儲到最終的存儲系統中。這是最簡單的狀況,通常狀況下,應該控制這種順序鏈接的Agent的數量,由於數據流經的路徑變長了,若是不考慮failover的話,出現故障將影響整個Flow上的Agent收集服務。服務器

  • 多個Agent的數據匯聚到同一個Agent

flume-join-agent
這種狀況應用的場景比較多,好比要收集Web網站的用戶行爲日誌,Web網站爲了可用性使用的負載均衡的集羣模式,每一個節點都產生用戶行爲日誌,能夠爲每一個節點都配置一個Agent來單獨收集日誌數據,而後多個Agent將數據最終匯聚到一個用來存儲數據存儲系統,如HDFS上。架構

  • 多路(Multiplexing)Agent

flume-multiplexing-agent
這種模式,有兩種方式,一種是用來複制(Replication),另外一種是用來分流(Multiplexing)。Replication方式,能夠將最前端的數據源複製多份,分別傳遞到多個channel中,每一個channel接收到的數據都是相同的,配置格式,以下所示:app

01 # List the sources, sinks and channels for the agent
02 <Agent>.sources = <Source1>
03 <Agent>.sinks = <Sink1> <Sink2>
04 <Agent>.channels = <Channel1> <Channel2>
05  
06 # set list of channels for source (separated by space)
07 <Agent>.sources.<Source1>.channels = <Channel1> <Channel2>
08  
09 # set channel for sinks
10 <Agent>.sinks.<Sink1>.channel = <Channel1>
11 <Agent>.sinks.<Sink2>.channel = <Channel2>
12  
13 <Agent>.sources.<Source1>.selector.type = replicating

上面指定了selector的type的值爲replication,其餘的配置沒有指定,使用的Replication方式,Source1會將數據分別存儲到Channel1和Channel2,這兩個channel裏面存儲的數據是相同的,而後數據被傳遞到Sink1和Sink2。
Multiplexing方式,selector能夠根據header的值來肯定數據傳遞到哪個channel,配置格式,以下所示:負載均衡

1 # Mapping for multiplexing selector
2 <Agent>.sources.<Source1>.selector.type = multiplexing
3 <Agent>.sources.<Source1>.selector.header = <someHeader>
4 <Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
5 <Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
6 <Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
7 #...
8  
9 <Agent>.sources.<Source1>.selector.default = <Channel2>

上面selector的type的值爲multiplexing,同時配置selector的header信息,還配置了多個selector的mapping的值,即header的值:若是header的值爲Value一、Value2,數據從Source1路由到Channel1;若是header的值爲Value二、Value3,數據從Source1路由到Channel2。分佈式

  • 實現load balance功能

flume-load-balance-agents
Load balancing Sink Processor可以實現load balance功能,上圖Agent1是一個路由節點,負責將Channel暫存的Event均衡到對應的多個Sink組件上,而每一個Sink組件分別鏈接到一個獨立的Agent上,示例配置,以下所示:

1 a1.sinkgroups = g1
2 a1.sinkgroups.g1.sinks = k1 k2 k3
3 a1.sinkgroups.g1.processor.type = load_balance
4 a1.sinkgroups.g1.processor.backoff = true
5 a1.sinkgroups.g1.processor.selector = round_robin
6 a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
  • 實現failover能

Failover Sink Processor可以實現failover功能,具體流程相似load balance,可是內部處理機制與load balance徹底不一樣:Failover Sink Processor維護一個優先級Sink組件列表,只要有一個Sink組件可用,Event就被傳遞到下一個組件。若是一個Sink可以成功處理Event,則會加入到一個Pool中,不然會被移出Pool並計算失敗次數,設置一個懲罰因子,示例配置以下所示:

1 a1.sinkgroups = g1
2 a1.sinkgroups.g1.sinks = k1 k2 k3
3 a1.sinkgroups.g1.processor.type = failover
4 a1.sinkgroups.g1.processor.priority.k1 = 5
5 a1.sinkgroups.g1.processor.priority.k2 = 7
6 a1.sinkgroups.g1.processor.priority.k3 = 6
7 a1.sinkgroups.g1.processor.maxpenalty = 20000

基本功能

咱們看一下,Flume NG都支持哪些功能(目前最新版本是1.5.0.1),瞭解它的功能集合,可以讓咱們在應用中更好地選擇使用哪種方案。說明Flume NG的功能,實際仍是圍繞着Agent的三個組件Source、Channel、Sink來看它可以支持哪些技術或協議。咱們再也不對各個組件支持的協議詳細配置進行說明,經過列表的方式分別對三個組件進行概要說明:

  • Flume Source
Source類型 說明
Avro Source 支持Avro協議(其實是Avro RPC),內置支持
Thrift Source 支持Thrift協議,內置支持
Exec Source 基於Unix的command在標準輸出上生產數據
JMS Source 從JMS系統(消息、主題)中讀取數據,ActiveMQ已經測試過
Spooling Directory Source 監控指定目錄內數據變動
Twitter 1% firehose Source 經過API持續下載Twitter數據,試驗性質
Netcat Source 監控某個端口,將流經端口的每個文本行數據做爲Event輸入
Sequence Generator Source 序列生成器數據源,生產序列數據
Syslog Sources 讀取syslog數據,產生Event,支持UDP和TCP兩種協議
HTTP Source 基於HTTP POST或GET方式的數據源,支持JSON、BLOB表示形式
Legacy Sources 兼容老的Flume OG中Source(0.9.x版本)
  • Flume Channel
Channel類型 說明
Memory Channel Event數據存儲在內存中
JDBC Channel Event數據存儲在持久化存儲中,當前Flume Channel內置支持Derby
File Channel Event數據存儲在磁盤文件中
Spillable Memory Channel Event數據存儲在內存中和磁盤上,當內存隊列滿了,會持久化到磁盤文件(當前試驗性的,不建議生產環境使用)
Pseudo Transaction Channel 測試用途
Custom Channel 自定義Channel實現
  • Flume Sink
Sink類型 說明
HDFS Sink 數據寫入HDFS
Logger Sink 數據寫入日誌文件
Avro Sink 數據被轉換成Avro Event,而後發送到配置的RPC端口上
Thrift Sink 數據被轉換成Thrift Event,而後發送到配置的RPC端口上
IRC Sink 數據在IRC上進行回放
File Roll Sink 存儲數據到本地文件系統
Null Sink 丟棄到全部數據
HBase Sink 數據寫入HBase數據庫
Morphline Solr Sink 數據發送到Solr搜索服務器(集羣)
ElasticSearch Sink 數據發送到Elastic Search搜索服務器(集羣)
Kite Dataset Sink 寫數據到Kite Dataset,試驗性質的
Custom Sink 自定義Sink實現

另外還有Channel Selector、Sink Processor、Event Serializer、Interceptor等組件,能夠參考官網提供的用戶手冊。

應用實踐

安裝Flume NG很是簡單,咱們使用最新的1.5.0.1版本,執行以下命令:

1 cd /usr/local
3 tar xvzf apache-flume-1.5.0.1-bin.tar.gz
4 cd apache-flume-1.5.0.1-bin

若是須要使用到Hadoop集羣,保證Hadoop相關的環境變量都已經正確配置,而且Hadoop集羣可用。下面,經過一些實際的配置實例,來了解Flume的使用。爲了簡單期間,channel咱們使用Memory類型的channel。

  • Avro Source+Memory Channel+Logger Sink

使用apache-flume-1.5.0.1自帶的例子,使用Avro Source接收外部數據源,Logger做爲sink,即經過Avro RPC調用,將數據緩存在channel中,而後經過Logger打印出調用發送的數據。
配置Agent,修改配置文件conf/flume-conf.properties,內容以下:

01 # Define a memory channel called ch1 on agent1
02 agent1.channels.ch1.type = memory
03  
04 # Define an Avro source called avro-source1 on agent1 and tell it
05 # to bind to 0.0.0.0:41414. Connect it to channel ch1.
06 agent1.sources.avro-source1.channels = ch1
07 agent1.sources.avro-source1.type = avro
08 agent1.sources.avro-source1.bind = 0.0.0.0
09 agent1.sources.avro-source1.port = 41414
10  
11 # Define a logger sink that simply logs all events it receives
12 # and connect it to the other end of the same channel.
13 agent1.sinks.log-sink1.channel = ch1
14 agent1.sinks.log-sink1.type = logger
15  
16 # Finally, now that we've defined all of our components, tell
17 # agent1 which ones we want to activate.
18 agent1.channels = ch1
19 agent1.channels.ch1.capacity = 1000
20 agent1.sources = avro-source1
21 agent1.sinks = log-sink1

首先,啓動Agent進程:

1 bin/flume-ng agent -c ./conf/ -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent1

而後,啓動Avro Client,發送數據:

1 bin/flume-ng avro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/local/programs/logs/sync.log -Dflume.root.logger=DEBUG,console
  • Avro Source+Memory Channel+HDFS Sink

配置Agent,修改配置文件conf/flume-conf-hdfs.properties,內容以下:

01 # Define a source, channel, sink
02 agent1.sources = avro-source1
03 agent1.channels = ch1
04 agent1.sinks = hdfs-sink
05  
06 # Configure channel
07 agent1.channels.ch1.type = memory
08 agent1.channels.ch1.capacity = 1000000
09 agent1.channels.ch1.transactionCapacity = 500000
10  
11 # Define an Avro source called avro-source1 on agent1 and tell it
12 # to bind to 0.0.0.0:41414. Connect it to channel ch1.
13 agent1.sources.avro-source1.channels = ch1
14 agent1.sources.avro-source1.type = avro
15 agent1.sources.avro-source1.bind = 0.0.0.0
16 agent1.sources.avro-source1.port = 41414
17  
18 # Define a logger sink that simply logs all events it receives
19 # and connect it to the other end of the same channel.
20 agent1.sinks.hdfs-sink1.channel = ch1
21 agent1.sinks.hdfs-sink1.type = hdfs
22 agent1.sinks.hdfs-sink1.hdfs.path = hdfs://h1:8020/data/flume/
23 agent1.sinks.hdfs-sink1.hdfs.filePrefix = sync_file
24 agent1.sinks.hdfs-sink1.hdfs.fileSuffix = .log
25 agent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576
26 agent1.sinks.hdfs-sink1.rollInterval = 0
27 agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
28 agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500
29 agent1.sinks.hdfs-sink1.hdfs.round = true
30 agent1.sinks.hdfs-sink1.hdfs.roundUnit = minute
31 agent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25
32 agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
33 agent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1
34 agent1.sinks.hdfs-sink1.fileType = SequenceFile
35 agent1.sinks.hdfs-sink1.writeFormat = TEXT

首先,啓動Agent:

1 bin/flume-ng agent -c ./conf/ -f conf/flume-conf-hdfs.properties -Dflume.root.logger=INFO,console -n agent1

而後,啓動Avro Client,發送數據:

1 bin/flume-ng avro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/local/programs/logs/sync.log -Dflume.root.logger=DEBUG,console

能夠查看同步到HDFS上的數據:

1 hdfs dfs -ls /data/flume

結果示例,以下所示:

1 -rw-r--r--   3 shirdrn supergroup    1377617 2014-09-16 14:35 /data/flume/sync_file.1410849320761.log
2 -rw-r--r--   3 shirdrn supergroup    1378137 2014-09-16 14:35 /data/flume/sync_file.1410849320762.log
3 -rw-r--r--   3 shirdrn supergroup     259148 2014-09-16 14:35 /data/flume/sync_file.1410849320763.log
  • Spooling Directory Source+Memory Channel+HDFS Sink

配置Agent,修改配置文件flume-conf-spool.properties,內容以下:

01 # Define source, channel, sink
02 agent1.sources = spool-source1
03 agent1.channels = ch1
04 agent1.sinks = hdfs-sink1
05  
06 # Configure channel
07 agent1.channels.ch1.type = memory
08 agent1.channels.ch1.capacity = 1000000
09 agent1.channels.ch1.transactionCapacity = 500000
10  
11 # Define and configure an Spool directory source
12 agent1.sources.spool-source1.channels = ch1
13 agent1.sources.spool-source1.type = spooldir
14 agent1.sources.spool-source1.spoolDir = /home/shirdrn/data/
15 agent1.sources.spool-source1.ignorePattern = event(_\d{4}\-\d{2}\-\d{2}_\d{2}_\d{2})?\.log(\.COMPLETED)?
16 agent1.sources.spool-source1.batchSize = 50
17 agent1.sources.spool-source1.inputCharset = UTF-8
18  
19 # Define and configure a hdfs sink
20 agent1.sinks.hdfs-sink1.channel = ch1
21 agent1.sinks.hdfs-sink1.type = hdfs
22 agent1.sinks.hdfs-sink1.hdfs.path = hdfs://h1:8020/data/flume/
23 agent1.sinks.hdfs-sink1.hdfs.filePrefix = event_%y-%m-%d_%H_%M_%S
24 agent1.sinks.hdfs-sink1.hdfs.fileSuffix = .log
25 agent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576
26 agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
27 agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500
28 agent1.sinks.hdfs-sink1.hdfs.round = true
29 agent1.sinks.hdfs-sink1.hdfs.roundUnit = minute
30 agent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25
31 agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
32 agent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1
33 agent1.sinks.hdfs-sink1.fileType = SequenceFile
34 agent1.sinks.hdfs-sink1.writeFormat = TEXT
35 agent1.sinks.hdfs-sink1.rollInterval = 0

啓動Agent進程,執行以下命令:

1 bin/flume-ng agent -c ./conf/ -f conf/flume-conf-spool.properties -Dflume.root.logger=INFO,console -n agent1

能夠查看HDFS上同步過來的數據:

1 hdfs dfs -ls /data/flume

結果示例,以下所示:

01 -rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355094.log
02 -rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355095.log
03 -rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355096.log
04 -rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355097.log
05 -rw-r--r--   3 shirdrn supergroup       1530 2014-09-17 10:53 /data/flume/event_14-09-17_10_52_00.1410922355098.log
06 -rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380386.log
07 -rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380387.log
08 -rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380388.log
09 -rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380389.log
10 -rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380390.log
  • Exec Source+Memory Channel+File Roll Sink

配置Agent,修改配置文件flume-conf-file.properties,內容以下:

01 # Define source, channel, sink
02 agent1.sources = tail-source1
03 agent1.channels = ch1
04 agent1.sinks = file-sink1
05  
06 # Configure channel
07 agent1.channels.ch1.type = memory
08 agent1.channels.ch1.capacity = 1000000
09 agent1.channels.ch1.transactionCapacity = 500000
10  
11 # Define and configure an Exec source
12 agent1.sources.tail-source1.channels = ch1
13 agent1.sources.tail-source1.type = exec
14 agent1.sources.tail-source1.command = tail -F /home/shirdrn/data/event.log
15 agent1.sources.tail-source1.shell = /bin/sh -c
16 agent1.sources.tail-source1.batchSize = 50
17  
18 # Define and configure a File roll sink
19 # and connect it to the other end of the same channel.
20 agent1.sinks.file-sink1.channel = ch1
21 agent1.sinks.file-sink1.type = file_roll
22 agent1.sinks.file-sink1.batchSize = 100
23 agent1.sinks.file-sink1.serializer = TEXT
24 agent1.sinks.file-sink1.sink.directory = /home/shirdrn/sink_data

啓動Agent進程,執行以下命令:

1 bin/flume-ng agent -c ./conf/ -f conf/flume-conf-file.properties -Dflume.root.logger=INFO,console -n agent1

能夠查看File Roll Sink對應的本地文件系統目錄/home/shirdrn/sink_data下,示例以下所示:

1 -rw-rw-r-- 1 shirdrn shirdrn 13944825 Sep 17 11:36 1410924990039-1
2 -rw-rw-r-- 1 shirdrn shirdrn 11288870 Sep 17 11:37 1410924990039-2
3 -rw-rw-r-- 1 shirdrn shirdrn        0 Sep 17 11:37 1410924990039-3
4 -rw-rw-r-- 1 shirdrn shirdrn 20517500 Sep 17 11:38 1410924990039-4
5 -rw-rw-r-- 1 shirdrn shirdrn 16343250 Sep 17 11:38 1410924990039-5

有關Flume NG更多配置及其說明,請參考官方用戶手冊,很是詳細。

相關文章
相關標籤/搜索