source負責收集數據,channel負責緩存數據,sink負責消費channel中的數據,具體使用方式這裏不贅述web
生命週期相關代碼在flume-ng-core文件夾下的lifecycle子文件夾內數組
public interface MaterializedConfiguration { 緩存
public void addSourceRunner(String name, SourceRunner sourceRunner); 數據結構
public void addSinkRunner(String name, SinkRunner sinkRunner); 架構
public void addChannel(String name, Channel channel); app
public ImmutableMap<String, SourceRunner> getSourceRunners(); 負載均衡
public ImmutableMap<String, SinkRunner> getSinkRunners(); dom
public ImmutableMap<String, Channel> getChannels(); ide
} 函數
生命週期相關代碼在flume-ng-core文件夾下的lifecycle子文件夾內
flume的全部組件(除了monitor service)都有生命週期的概念,主要做用是用來標記組件目前所屬的狀態。flume組件的生命週期有四個狀態,分別是IDLE,START,STOP,ERROR,意義以下:
IDLE | 組件已經構造完成 |
START | 組件已經啓動 |
STOP | 組件已經中止 |
ERROR | 組件發生了錯誤 |
其中對象自己須要實現LifecycleAware接口【見代碼】,flume對組件初始化完成後會調用LifecycleSupervisor.supervise()將該組件加入監控,LifecycleSupervisor類內部對每個組件會啓動一個定時線程MonitorRunnable,在定時線程中調用組件的start/stop函數對組件進行控制
這裏注意對組件stop函數的調用是在unsupervise裏面進行的,而不是經過定時線程MonitorRunnable控制的
main函數依次作了如下幾件事
1.解析命令行
2.尋找命令行裏的配置文件路徑,解析配置文件,並放在一個File對象裏
3.調用AbstractConfigurationProvider實例,傳入File配置文件構造對象,返回一個MaterializedConfiguration對象,該對象裏存放全部的sink,sinkgroup,source,channel組件
4.調用application.start(),該函數將全部組件加入生命週期管理並註冊啓動monitor服務
5.向JVM註冊一個函數,當進程結束時中止全部組件
這裏要注意第三步其實根據命令行中的no-reload-conf參數【是否關閉重載配置文件】選擇調用AbstractConfigurationProvider的派生類PollingPropertiesFileConfigurationProvider或者PropertiesFileConfigurationProvider。
其餘幾步都很直接,比較複雜的流程在調用AbstractConfigurationProvider,解析配置文件並初始化組件這一步。
調用AbstractConfigurationProvider的目的是初始化全部組件,掛好相互關聯的鉤子並將組件封裝到MaterializedConfiguration對象中返回給上層。
AbstractConfigurationProvider類的功能比較好理解,關鍵在於flume支持配置重載,配置重載則channel組件涉及到是否複用的問題【從新生成channel可能會致使以前數據的丟失問題】。AbstractConfigurationProvider類的getConfiguration()函數主要處理了這一套邏輯
getConfiguration()函數的執行基本流程以下:
1.加載配置文件
2.load channels
3.load sources,這一塊要傳入2中load的channel,由於要掛channel和source之間的鉤子【這一塊其實是將channel封裝到ChannelProcessor中,將ChannelProcessor傳入Source,關於ChannelProcessor的做用見下文Channel結構介紹】
4.load sinks,這一塊要傳入2中load的channel,由於要掛channel和sink之間的鉤子
5.移除沒有sink和source鏈接的channel【若是隻有source/sink這裏不會檢測】
6.將sink,source,channel封裝在MaterializedConfiguration對象中並返回,注意source是封裝在SourceRunner類中,sink是封裝在SinkRunner中。這裏的定義是Runner負責與操做系統打交道,而source/sink專一於維護自身邏輯。SinkRunner中還要注意一點是一個Sink只能在一個SinkGroup裏面。
注意:
1.加載配置文件時調用了FlumeConfiguration類,該類內部有對配置文件進行合法性校驗,校驗的錯誤列表能夠經過getConfigurationErrors()接口獲取
2.每一個組件都有對應的Factor類(如ChannelFactor),經過調用Factor類的create()函數建立對象
3.load_channel函數中channel能夠經過加上@Disposable聲明表示不復用,從新加載配置時生成一個新的channel
4.細讀load_channels,load_sources,load_sinks代碼,能夠發現加載組件時分了兩種狀況,has ComponentConfiguration object和do not hava a ComponentConfiguration object。這一塊能夠不用理會,細讀配置文件邏輯好像所有都是後一種do not hava a ComponentConfiguration object
以上步驟執行完,flume就能夠提供服務了。
組件的結構代碼在flume-ng-core文件夾中
channel模塊的做用是給數據提供中轉的儲藏地,由flume的架構圖能夠看出channel負責給source模塊提供寫數據接口,給sink模塊提供讀數據接口。實際實現中每個source模塊中都有一個ChannelProcessor,經過ChannelProcessor向Channel寫入數據,而sink模塊是直接經過Channel模塊提供的接口讀取。
爲何Source寫入要經過封裝的Processor進行?這裏有兩個緣由:1.source模塊寫入的數據可能在寫入Channel前須要進行過濾處理;2.source模塊可能要將不一樣的數據寫到不一樣的Channel上實現數據分發的效果。ChannelProcessor主要就是爲了解決這兩個問題存在的。如下是ChannelProcessor的結構圖:
上圖中:InterceptorChain模塊主要解決問題1,source寫入的數據能夠通過InterceptorChain模塊中一系列的過濾器依次處理,最終過濾出來的數據進入ChannelSelector模塊,ChannelSelector解決了上面提到的問題2,提供了將不一樣數據分發到不一樣Channel的機制。
Channel中基本的數據單元是Event,每一個Event包含有一個map<String, String>類型的header,每個Interceptor能夠在header裏面添加key,value形式的tag讓後續流程處理,或者按照某些規則將過濾event。
ChannelSelector目前有兩種類型。一種是ReplicatingChannelSelector,消息發往全部的channel,另外一種是MultiplexingChannelSelector,能夠指定發往某些Channel
其中Channel也有兩種類型:一種是required,一種是optional.這二者的區別是required類型若是寫入失敗會throw Exception。optional類型的寫入失敗不處理。
Channel中處理數據的最小單元是Transaction,一個Transaction由多個Event組成。Channel內部實現要保證一個Transaction要麼所有成功,要麼所有不成功。具體實現方法下面細說。
下面是這幾個類之間的關係圖。圖中Channel只畫了MemoryChannel,Interceptor的具體實現類沒有畫。
其中要注意的是:
1.ChannelProcessor的初始化是在初始化Source模塊的時候。
2.每個線程有本身的Transaction,這個具體實現是在BasicChannelSemantics類中利用TreadLocal機制。這麼作的好處是維護transaction相對方便。
3. Transaction的維護方法:MemoryChannel的機制是每個Transaction有本身的存儲空間,分爲putlist和takelist,MemoryChannel有一個公共的queue。
每次take時從queue中取出一個event並放置在本地的takelist內並返回該event,commit的時候清空takelist,rollback時將takelist中的event從新放回queue。
每次put時將event放置在putlist中,commit時將putlist中內容寫入queue中,rollback時將putlist清空。
4.BasicChannelSemantics類主要負責的是維護(thread,transaction)這樣的關係。而因爲Transaction是接口,不能直接給BasicChannelSemantics進行操做。因此這裏又封裝了一個BasicTransactionSemantics類。這兩個類實現相對簡單,這裏不細述。
上面介紹初始化的時候曾經說過source和sink封裝在SourceRunner和SinkRunner中。Source和Sink這兩部分主要各類對接外部系統的模塊比較多,實際結構仍是比較簡單的
source的主要功能是收集外部數據,並經過ChannelProcessor寫入Channel。Source分爲兩種類型,PollableSource和EventDrivenSource,分別對應不一樣的SourceRunner。其中PollableSource是指那些須要定時輪詢的Source,好比定時掃描文件,讀取新增內容。其餘Source都是EventDrivenSource。
1.注意BasicSourceSemantics類型維護了Source的生命週期。
2.PollableSource類型的source具體的功能函數封裝在process()中,PollableSourceRunner負責按期調source.process()將數據寫入Channel.
3.EventDrivenSource類型所有邏輯基本都在Source實現裏面,經過start接口啓動【start接口是LifecycleSupervisor負責調用的,上文有描述】。每一種source根據不一樣的外部系統有不一樣的處理邏輯,這裏不贅述。
sink/sinkgroup的主要做用是消費Channel中的數據,其基本思想是按期輪詢Sink,從Channel中拉取必定量數據作處理。
一個Sink只能對應一個下游,可是一個下游不可靠。因此flume裏面有一個機制將多個sink封裝爲一個SinkGroup,每次消費時經過SinkSelector選擇不一樣Sink實現容錯/負載均衡方案
若是SinkSelector須要對Sink作篩選,那麼它須要知道每個Sink的狀態。每個sink都要本身維護一個status變量,有兩個值【READY, BACKOFF】,定義在interface Sink中,READY的意思是該sink能夠從Channel中取數據,BACKOFF的意義是該sink目前不能從Channel中取數據。這個status就是給SinkSelector作決策使用的。
Sink的結構圖以下:
SinkRunner的邏輯跟PollableSource一致。每隔一段時間調用SinkProcessor的process()函數
SinkProcessor裏面封裝了Sink和SinkSelector。若是是單獨sink,直接調用這個sink的process(),若是是一組sink,經過SinkSelector選擇出一個sink,再調用這個sink的process().
目前flume裏面實現的SinkProcessor有三種,DefaultSinkProcessor/FailoverSinkProcessor/LoadBalancingSinkProcessor
1.DefaultSinkProcessor:提供給單獨Sink使用。直接調用sink的process()函數
2.FailoverSinkProcessor:提供給SinkGroup使用。這個Processor給每一個sink都設置了一個priority【經過配置文件】,其思想是每次都會【儘可能】調用priority最大的sink。
這個Processor將sinks分爲兩組,一組是Queue類型的failedSinks隊列,保存上一次sink.process()調用出錯的sink,而且每個調用出錯的sink都有一個冷卻時間,冷卻時間以內的sink不會調用,另外一組是SortedMap<Integer, Sink> liveSinks,保存正常的sink,其中Integer指該sink的priority
每次選擇sink的流程以下:a.從failedSinks裏面取出冷卻最久的sink,調用process,若是成功則將該sink從新加入liveSinks,若是失敗則刷新這個sink的冷卻時間並將其從新push回failedSinks。而後試圖在failedSinks尋找下一個sink
b.若是failedSinks爲空或者failedSinks內的sink所有不可用,則調用liveSinks裏面權重最大的sink,若是該sink調用失敗則塞入failedSinks
3.LoadBalancingSinkProcessor:基本思想是將Channel中的數據平均分配到該Processor的全部sink裏。
這個Processor下面有兩個Selector,分別是RandomOrderSinkSelector和RoundRobinSinkSelector。前者提供的功能是返回一個隨機排序的sink數組,後者則是返回一個輪詢調度形式的數組,好比這一次返回【1,2,3】,下一次就是【2,3,1】,下下一次就是【3,1,2】。
LoadBalancingSinkProcessor調用用戶配置的Seletor返回一個數組,而後依次調用這個數組中的sink.process(),直到某一次調用成功,則返回,若是所有失敗會返回錯誤。
這個Processor有一個比較重要的配置【backoff】。這個配置決定了某個sink調用失敗後的行爲。若是backoff爲true,則該sink失敗後就會冷卻一段時間。這段時間調用Selector返回的數組不會包含該sink。
Sink主要是實現從Channel衆消費數據的邏輯,這裏不一一描述。要注意的是SinkGroup中一個Sink若是卡住這個SinkGroup會卡住。
在啓動流程章節裏提到,在啓動全部組件後會啓動minotor監控進程。監控主要分爲兩部分,收集數據和上報數據。
收集數據的邏輯是夾雜在具體Channel,Sink,Source等的實現邏輯裏面的。其中sink,source,channel等結構監控數據的數據結構在flume-ng-core文件夾下面的instrumententation裏面。分別是SinkCounter,SourceCounter和ChannelCounter等。若是要本身實現這些組件,須要調用這幾個類的接口上報監控信息。flume-ng-core下面還有一個CounterGroup,這個統計了一些Runner的事件。這個接口應該是老接口。
每個Count都會在MBean裏面註冊。上報的數據存儲在這個MBean對象裏。
flume內置三種上報監控的方式,分別是zabbix,ganglia和http。其中http方式是在flume側起一個server,由外部主動來拉取監控信息。
沒看到flume有一段時間內的統計信息。全部統計信息都是統計整個組件生命週期的。
以上就是Flume源碼剖析的所有內容。Flume自己提供了很是多的組件以外還有很是好的可擴展性。就總體架構而言仍是很是清晰的。除了配置文件結構比較複雜,其餘代碼的可讀性仍是至關高的