本文中全部與flume相關術語都採用斜體英文表示,這些術語的含義以下所示。web
flume 一個可靠的,分佈式的,用於採集,聚合,傳輸海量日誌數據的系統。數據結構
Web Server 一個產生 Events 的系統。架構
Agent flume 系統中的一個節點,它主要包含三個部件:Source, Channel, Sink。併發
Event 事件,在 flume-agent 內部傳輸的數據結構。一個 Event 由 Map<String, String>Headers 和 byte[] body 組成,其中 Headers 保存了 Event 的屬性,body 保存了 Event 的內容。dom
Source Agent Source 用來接收 WebServer 產生的 Events,以及其餘 flume-agent 中的 Sink產生的 Events。分佈式
Channel Source 將 Events 放在 Channel 中保存,Channel 主要有兩種,是 MemoryChannel 和FileChannel,分別將 Events 存放在內存中和文件中。oop
Sink Sink 用來消費 Channel 內保存的 Events,而後將 Events 發送出去。spa
Sinkgroups 將多個 Sink 組合在一塊兒,造成 Sinkgroups。設計
HDFS Hadoop分佈式文件系統,它用來存儲日誌數據,也就是 Sinks 發送出來的 Events。日誌
以下圖1所示,單個 Agent 主要包括三個部件:Source, Channel, Sink。
圖1 單Agent的數據流模型
整個數據流以下:
Web Server 產生 Events,並將 Events 發送到 Source 中。
Source 接收 Events,並將 Events 發送到 Channel 中。
Channel 存儲 Events。
Sink 消費Channel 中存儲的 Events,並將 Events 發送到 HDFS。
HDFS 磁盤存儲 Events。
如圖2所示,兩個 Agent 組成的數據流傳輸模型。
圖2: 兩個 Agent 串行傳輸數據流模型
整個數據流以下:
Agent foo Agent foo 中的 Source 接收外部 Events,存儲到 Channel 中,Sink 從 Channel 中獲取 Events,再將 Events 傳輸到 Agent bar 的 Source 中。
Agent bar Agent bar 中的 Source 接收 Agent foo 的 Sink 發送的 Events,存儲到 barChannel,再由 bar Sink消費。
整個數據流只作了一件事,就是傳輸數據。
如圖3所示,Agent1,Agent2,Agent3負責從不一樣的 Web Server 中接收 Events,並將 Events 發送到 Agent4,Agent4 再將 Events 發送到 HDFS。
圖3:收集數據流模型
整個數據流以下:
Agent1 接收 Events,並將 Events 傳輸到Agent4。
Agent2 接收 Events,並將 Events 傳輸到Agent4。
Agent3 接收 Events,並將 Events 傳輸到Agent4。
Agent4 接收 Agent1,Agent2,Agent3 的 Events,而後將 Events 存儲到 HDFS。
整個數據流完成的功能:不一樣的 Agent 收集不一樣的 Web Server 產生的日誌數據,並將全部的日誌數據存儲到一個目的地 HDFS。
一個 Agent 中能夠由一個 Source ,多個 Channels ,多個 Sinks 組成多路數據流,其多路數據流模型以下圖4所示。
一個 Source 接收外部 Events,並將 Events 發送到三路 Channel 中去,而後不一樣的 Sink 消費不一樣的 Channel 內的 Events ,再將 Events 進行不一樣的處理。
Source 如何將 Events 發送到不一樣的 Channel 中?這裏 flume 採用了兩種不一樣的策略,是replicating 和 multiplexing 。
其中 replicating 是 Source 將每一個 Event 都發送到 Channel 中,這樣就將 Events 複製成 3 份發到不一樣的地方去。
其中 multiplexing 是 Source 根據一些映射關係,將不一樣種類的 Event 發送到不一樣的 Channel中去,即將全部 Events 分紅3份,分別發送到三個 Channels。
圖4 多路數據流模型
整個數據流以下:
Agent foo Source 將接收到的 Events 發送到 Channel1--Sink1--HDFS,Channel2--Sink2--JMS,Channel3--Sink3--Agent bar
Agent bar Source 接收 Agent foo 中 Sink3 發送的 Events,而後發送到 Channel4--Sink
如今考慮這樣兩個問題,一是 某 Sink 負責消費某 Channel 中的 Events,那麼若是該 Sink 掛掉以後, 該 Channel 則會堵死。
二是 某 Sink 負責消費某 Channel 中的 Events,那麼若是該 Sink 速度慢,或者該 Sink 的消費能力達不到 Source 的接收能力呢? 大量的 Events 會在 Channel 中堆積,形成堵塞。
爲了解決這兩種狀況,flume 中存在一種數據流模型,將多個 Sinks 綁定在一塊兒,造成Sinkgroups,它們共同負責消費某個 Channel 內的 Events。
可是在某個時刻,只有一個 Sink 消費 Channel 內的 Events,因而有兩種策略保證從Sinkgroups 中選擇出一個 Sink 來消費 Channel 中的 Events。
這兩種策略分別是:failover 和 load_balance。其中 failover 機制,會將全部 Sinks 標識一個優先級,一個以優先級爲序的 Map 保存着 活着的 Sink,一個隊列保存着 失敗的 Sink。
每次都會選擇優先級最高的活着的 Sink 來消費 Channel 的 Events。每過一段時間就對失敗隊列中的 Sinks 進行檢測,若是變活以後,就將其插進 活着的 Sink Map。
另外一種 load_balance機制,在這種機制下,還有兩種不一樣的策略,分別是 round_robin 和random。則 round_robin 就是不斷地輪詢 Sinkgroups 內的 Sinks,已保證均衡。
random 則是從 Sinkgroups 中的 Sinks 隨機選擇一個。
該數據流模型以下圖5所示。
圖5 Sinkgroups數據流模型
整個數據流以下:
Source 負責接收 Events,並將其發送到 Channel 中。
Channel 負責存儲 Events。
Sinkgroups 負責消費 Channel 中的 Events,並將 Events 發送到 HDFS 存儲。
以下圖6所示,在單個 Agent 中,能夠由多個 Sources,Channels,Sinks 組成多條徹底不相交的數據流。
圖6
整個數據流以下:
Source1 Channel1 Sink1 HDFS1 組成數據流1
Source2 Channel2 Sink2 HDFS2 組成數據流2
數據流1和數據流2徹底不相關。
從上面介紹的6種不一樣的數據流模型中,咱們能夠得知,模型1和模型2至關於程序設計中的順序執行。
模型3中 Agent1,Agent2,Agent3 收集 Events 處於並行狀態,向 Agent4 發送 Events 處於併發狀態。
模型4 至關於程序設計中的 if--else,選擇模型。
模型5中 三個 Sinks 也至關於處於併發狀態。
模型6 至關於程序設計中的 並行模型。
則經過這6種不一樣的數據流模型,咱們能夠將它們進行不一樣的組合,造成各類各樣的數據流模型,以應付咱們的需求。
不一樣的數據流模型,具備不一樣的功能,可是這些數據流模型是靠哪些組件,哪些策略來構成的。本節將分析不一樣的數據流模型在 Agent 內部是如何實現的。
以下圖6所示,這是 Agent 內部一個比較完整的架構圖,它不單單包含了 Source, Channel,Sink,還包含了 SinkRunner, Interceptor, ChannelSelector, Transaction,
SinkRunner, SinkProcessor, SinkSelector。下面咱們將詳細介紹每一個部件在 Agent 內部所承擔的責任。
圖6 Agent 內部組件架構圖
從途中能夠看出,將整個數據流分紅兩階段,分別是第一階段:Source --> Channel, 第二階段: Channel --> Sink。
下面就從這兩個階段來詳細介紹各個組件在數據流過程當中承擔的責任。
圖6既是數據流圖,也是對象結構圖。從圖中能夠看出,一個 SourceRunner 對象包含一個 Source對象,一個 Source 對象包含一個 ChannelProcessor對象,
一個 ChannelProcessor 對象包含 多個 Interceptor 對象和一個 ChannelSelector 對象。
首先 SourceRunner 負責啓動 Source, 則 Source 監控是否有 Events 發送過來,若是有,則接收 Events。
其次 Events 被 ChannelProcessor 中的 Interceptor 進行過濾,Interceptor 的功能有三種,分別是 丟棄 Event,修改 Event 再返回,直接返回 Event(不作任何操做)。
舉例:Interceptor 有兩種比較好理解的,分別是 Timestamp Interceptor 和 host Interceptor,Timestamp Interceptor 會對每一個 Event 的添加屬性時間戳, host Interceptor會爲
每一個 Event 添加屬性 host 。
而後,ChannelSelector 的主要功能是完成上面的多路數據流模型,分別有兩種,replicating 和multiplexing。也就是說 ChannelSelector 爲每一個 Event 選擇它所要發送到的 Channel。
在 replicating 模式下,每一個 Event 都被髮送到 多個 Channel 中;在 multiplexing 模式下,不一樣的 Events 會被髮送到不一樣的 Channel 中。
最後,Source 與每一個 Channel 經過 Transaction 創建鏈接,將 Events 發送到 Channel 中去。
從圖中能夠看出,一個 SinkRunner 對象包含一個 SinkProcessor 對象,一個 SinkProcessor 對象包含多個 Sinks 和/或 一個 SinkSelector。
首先 SinkRunner 啓動一個 SinkProcessor 對象,SinkProcessor 有三種,分別是DefaultSinkProcessor,FailoverSinkProcessor,LoadBalancingSinkProcessor。
看到這裏你是否是有點印象了,對,這就是咱們上面提到的 Sinkgroups 數據流模型。若是單個Sink 的話,則使用 DefaultSinkProcessor,它負責啓動 Sink;
若是多個 Sinks 組成一組的話,則能夠設置 SinkProcessor 爲 failover 或 loadBalance。
其中 FailoverSinkProcessor 會將各個 Sink 設置優先級。保存了一個 SortedMap<Integer, Sink> liveSinks,活着的 Sinks,一個 Queue<FailedSink> failedSinks,保存死的 Sinks。
每次會從 liveSinks 中選擇一個優先級最高的 Sink 來消費 Events。若是某個 Sink 掛掉,則將其放在 failedSinks裏,而且每次都嘗試 failedSinks中的第一個 Sink,若是它能變活,則將其
轉到 liveSinks 中。其中 LoadBalancingSinkProcessor 裏有一個對象 SinkSelector,該SinkSelector 有兩種,分別是 round_robin 和 random。這裏你又有印象啦。則 SinkSelector 就是在 Sinkgroups 中
選擇某個 Sink 來消費 Events。 round_robin 是輪詢 Sinkgroups 中的全部 Sinks, random 是從 Sinkgroups 中隨機選擇 某個 Sink。
其次,SinkProcessor 會選擇某個 Sink,啓動 Sink。
最後,Sink 與 Channel 經過 Transaction 創建鏈接。消費 Channel 內的 Events。