flume1.4.0源碼結構剖析

flume基本思想:


       source負責收集數據,channel負責緩存數據,sink負責消費channel中的數據,具體使用方式這裏不贅述web

生命週期管理:

生命週期相關代碼在flume-ng-core文件夾下的lifecycle子文件夾內數組

  1. public interface MaterializedConfiguration {  緩存

  2.   

  3.   public void addSourceRunner(String name, SourceRunner sourceRunner);  數據結構

  4.   

  5.   public void addSinkRunner(String name, SinkRunner sinkRunner);  架構

  6.   

  7.   public void addChannel(String name, Channel channel);  app

  8.   

  9.   public ImmutableMap<String, SourceRunner> getSourceRunners();  負載均衡

  10.   

  11.   public ImmutableMap<String, SinkRunner> getSinkRunners();  dom

  12.   

  13.   public ImmutableMap<String, Channel> getChannels();  ide

  14.   

  15. }  函數


生命週期相關代碼在flume-ng-core文件夾下的lifecycle子文件夾內

flume的全部組件(除了monitor service)都有生命週期的概念,主要做用是用來標記組件目前所屬的狀態。flume組件的生命週期有四個狀態,分別是IDLE,START,STOP,ERROR,意義以下:

IDLE 組件已經構造完成
START 組件已經啓動
STOP 組件已經中止
ERROR 組件發生了錯誤


       其中對象自己須要實現LifecycleAware接口【見代碼】,flume對組件初始化完成後會調用LifecycleSupervisor.supervise()將該組件加入監控,LifecycleSupervisor類內部對每個組件會啓動一個定時線程MonitorRunnable,在定時線程中調用組件的start/stop函數對組件進行控制

       這裏注意對組件stop函數的調用是在unsupervise裏面進行的,而不是經過定時線程MonitorRunnable控制的

flume 啓動流程:

      main函數依次作了如下幾件事

      1.解析命令行

      2.尋找命令行裏的配置文件路徑,解析配置文件,並放在一個File對象裏

      3.調用AbstractConfigurationProvider實例,傳入File配置文件構造對象,返回一個MaterializedConfiguration對象,該對象裏存放全部的sink,sinkgroup,source,channel組件

      4.調用application.start(),該函數將全部組件加入生命週期管理並註冊啓動monitor服務

      5.向JVM註冊一個函數,當進程結束時中止全部組件

      這裏要注意第三步其實根據命令行中的no-reload-conf參數【是否關閉重載配置文件】選擇調用AbstractConfigurationProvider的派生類PollingPropertiesFileConfigurationProvider或者PropertiesFileConfigurationProvider。

       其餘幾步都很直接,比較複雜的流程在調用AbstractConfigurationProvider,解析配置文件並初始化組件這一步。

       調用AbstractConfigurationProvider的目的是初始化全部組件,掛好相互關聯的鉤子並將組件封裝到MaterializedConfiguration對象中返回給上層。

       AbstractConfigurationProvider類的功能比較好理解,關鍵在於flume支持配置重載,配置重載則channel組件涉及到是否複用的問題【從新生成channel可能會致使以前數據的丟失問題】。AbstractConfigurationProvider類的getConfiguration()函數主要處理了這一套邏輯

       getConfiguration()函數的執行基本流程以下:

      1.加載配置文件

      2.load channels

      3.load sources,這一塊要傳入2中load的channel,由於要掛channel和source之間的鉤子【這一塊其實是將channel封裝到ChannelProcessor中,將ChannelProcessor傳入Source,關於ChannelProcessor的做用見下文Channel結構介紹】

      4.load sinks,這一塊要傳入2中load的channel,由於要掛channel和sink之間的鉤子

      5.移除沒有sink和source鏈接的channel【若是隻有source/sink這裏不會檢測】

      6.將sink,source,channel封裝在MaterializedConfiguration對象中並返回,注意source是封裝在SourceRunner類中,sink是封裝在SinkRunner中。這裏的定義是Runner負責與操做系統打交道,而source/sink專一於維護自身邏輯。SinkRunner中還要注意一點是一個Sink只能在一個SinkGroup裏面。


注意:

      1.加載配置文件時調用了FlumeConfiguration類,該類內部有對配置文件進行合法性校驗,校驗的錯誤列表能夠經過getConfigurationErrors()接口獲取

      2.每一個組件都有對應的Factor類(如ChannelFactor),經過調用Factor類的create()函數建立對象

      3.load_channel函數中channel能夠經過加上@Disposable聲明表示不復用,從新加載配置時生成一個新的channel

      4.細讀load_channels,load_sources,load_sinks代碼,能夠發現加載組件時分了兩種狀況,has ComponentConfiguration object和do not hava a ComponentConfiguration object。這一塊能夠不用理會,細讀配置文件邏輯好像所有都是後一種do not hava a ComponentConfiguration object


      以上步驟執行完,flume就能夠提供服務了。

flume各組件的內部結構:

組件的結構代碼在flume-ng-core文件夾中

channel:

       channel模塊的做用是給數據提供中轉的儲藏地,由flume的架構圖能夠看出channel負責給source模塊提供寫數據接口,給sink模塊提供讀數據接口。實際實現中每個source模塊中都有一個ChannelProcessor,經過ChannelProcessor向Channel寫入數據,而sink模塊是直接經過Channel模塊提供的接口讀取。

      爲何Source寫入要經過封裝的Processor進行?這裏有兩個緣由:1.source模塊寫入的數據可能在寫入Channel前須要進行過濾處理;2.source模塊可能要將不一樣的數據寫到不一樣的Channel上實現數據分發的效果。ChannelProcessor主要就是爲了解決這兩個問題存在的。如下是ChannelProcessor的結構圖:


       上圖中:InterceptorChain模塊主要解決問題1,source寫入的數據能夠通過InterceptorChain模塊中一系列的過濾器依次處理,最終過濾出來的數據進入ChannelSelector模塊,ChannelSelector解決了上面提到的問題2,提供了將不一樣數據分發到不一樣Channel的機制。

        Channel中基本的數據單元是Event,每一個Event包含有一個map<String, String>類型的header,每個Interceptor能夠在header裏面添加key,value形式的tag讓後續流程處理,或者按照某些規則將過濾event。

        ChannelSelector目前有兩種類型。一種是ReplicatingChannelSelector,消息發往全部的channel,另外一種是MultiplexingChannelSelector,能夠指定發往某些Channel

        其中Channel也有兩種類型:一種是required,一種是optional.這二者的區別是required類型若是寫入失敗會throw Exception。optional類型的寫入失敗不處理。

       Channel中處理數據的最小單元是Transaction,一個Transaction由多個Event組成。Channel內部實現要保證一個Transaction要麼所有成功,要麼所有不成功。具體實現方法下面細說。


       下面是這幾個類之間的關係圖。圖中Channel只畫了MemoryChannel,Interceptor的具體實現類沒有畫。


       其中要注意的是:

       1.ChannelProcessor的初始化是在初始化Source模塊的時候。

       2.每個線程有本身的Transaction,這個具體實現是在BasicChannelSemantics類中利用TreadLocal機制。這麼作的好處是維護transaction相對方便。

       3. Transaction的維護方法:MemoryChannel的機制是每個Transaction有本身的存儲空間,分爲putlist和takelist,MemoryChannel有一個公共的queue。

       每次take時從queue中取出一個event並放置在本地的takelist內並返回該event,commit的時候清空takelist,rollback時將takelist中的event從新放回queue。

       每次put時將event放置在putlist中,commit時將putlist中內容寫入queue中,rollback時將putlist清空。

      4.BasicChannelSemantics類主要負責的是維護(thread,transaction)這樣的關係。而因爲Transaction是接口,不能直接給BasicChannelSemantics進行操做。因此這裏又封裝了一個BasicTransactionSemantics類。這兩個類實現相對簡單,這裏不細述。


上面介紹初始化的時候曾經說過source和sink封裝在SourceRunner和SinkRunner中。Source和Sink這兩部分主要各類對接外部系統的模塊比較多,實際結構仍是比較簡單的

source:

       source的主要功能是收集外部數據,並經過ChannelProcessor寫入Channel。Source分爲兩種類型,PollableSource和EventDrivenSource,分別對應不一樣的SourceRunner。其中PollableSource是指那些須要定時輪詢的Source,好比定時掃描文件,讀取新增內容。其餘Source都是EventDrivenSource。


       1.注意BasicSourceSemantics類型維護了Source的生命週期。

       2.PollableSource類型的source具體的功能函數封裝在process()中,PollableSourceRunner負責按期調source.process()將數據寫入Channel.

       3.EventDrivenSource類型所有邏輯基本都在Source實現裏面,經過start接口啓動【start接口是LifecycleSupervisor負責調用的,上文有描述】。每一種source根據不一樣的外部系統有不一樣的處理邏輯,這裏不贅述。 

sink&sinkgroup:

        sink/sinkgroup的主要做用是消費Channel中的數據,其基本思想是按期輪詢Sink,從Channel中拉取必定量數據作處理。

        一個Sink只能對應一個下游,可是一個下游不可靠。因此flume裏面有一個機制將多個sink封裝爲一個SinkGroup,每次消費時經過SinkSelector選擇不一樣Sink實現容錯/負載均衡方案

        若是SinkSelector須要對Sink作篩選,那麼它須要知道每個Sink的狀態。每個sink都要本身維護一個status變量,有兩個值【READY, BACKOFF】,定義在interface Sink中,READY的意思是該sink能夠從Channel中取數據,BACKOFF的意義是該sink目前不能從Channel中取數據。這個status就是給SinkSelector作決策使用的。

Sink的結構圖以下:


       SinkRunner的邏輯跟PollableSource一致。每隔一段時間調用SinkProcessor的process()函數

       SinkProcessor裏面封裝了Sink和SinkSelector。若是是單獨sink,直接調用這個sink的process(),若是是一組sink,經過SinkSelector選擇出一個sink,再調用這個sink的process().

      目前flume裏面實現的SinkProcessor有三種,DefaultSinkProcessor/FailoverSinkProcessor/LoadBalancingSinkProcessor

       1.DefaultSinkProcessor:提供給單獨Sink使用。直接調用sink的process()函數


       2.FailoverSinkProcessor:提供給SinkGroup使用。這個Processor給每一個sink都設置了一個priority【經過配置文件】,其思想是每次都會【儘可能】調用priority最大的sink。
            這個Processor將sinks分爲兩組,一組是Queue類型的failedSinks隊列,保存上一次sink.process()調用出錯的sink,而且每個調用出錯的sink都有一個冷卻時間,冷卻時間以內的sink不會調用,另外一組是SortedMap<Integer, Sink> liveSinks,保存正常的sink,其中Integer指該sink的priority

        每次選擇sink的流程以下:a.從failedSinks裏面取出冷卻最久的sink,調用process,若是成功則將該sink從新加入liveSinks,若是失敗則刷新這個sink的冷卻時間並將其從新push回failedSinks。而後試圖在failedSinks尋找下一個sink

        b.若是failedSinks爲空或者failedSinks內的sink所有不可用,則調用liveSinks裏面權重最大的sink,若是該sink調用失敗則塞入failedSinks


        3.LoadBalancingSinkProcessor:基本思想是將Channel中的數據平均分配到該Processor的全部sink裏。

        這個Processor下面有兩個Selector,分別是RandomOrderSinkSelector和RoundRobinSinkSelector。前者提供的功能是返回一個隨機排序的sink數組,後者則是返回一個輪詢調度形式的數組,好比這一次返回【1,2,3】,下一次就是【2,3,1】,下下一次就是【3,1,2】。

        LoadBalancingSinkProcessor調用用戶配置的Seletor返回一個數組,而後依次調用這個數組中的sink.process(),直到某一次調用成功,則返回,若是所有失敗會返回錯誤。

        這個Processor有一個比較重要的配置【backoff】。這個配置決定了某個sink調用失敗後的行爲。若是backoff爲true,則該sink失敗後就會冷卻一段時間。這段時間調用Selector返回的數組不會包含該sink。

        Sink主要是實現從Channel衆消費數據的邏輯,這裏不一一描述。要注意的是SinkGroup中一個Sink若是卡住這個SinkGroup會卡住。

監控模塊:

        在啓動流程章節裏提到,在啓動全部組件後會啓動minotor監控進程。監控主要分爲兩部分,收集數據和上報數據。

       收集數據的邏輯是夾雜在具體Channel,Sink,Source等的實現邏輯裏面的。其中sink,source,channel等結構監控數據的數據結構在flume-ng-core文件夾下面的instrumententation裏面。分別是SinkCounter,SourceCounter和ChannelCounter等。若是要本身實現這些組件,須要調用這幾個類的接口上報監控信息。flume-ng-core下面還有一個CounterGroup,這個統計了一些Runner的事件。這個接口應該是老接口。

       每個Count都會在MBean裏面註冊。上報的數據存儲在這個MBean對象裏。

       flume內置三種上報監控的方式,分別是zabbix,ganglia和http。其中http方式是在flume側起一個server,由外部主動來拉取監控信息。

       沒看到flume有一段時間內的統計信息。全部統計信息都是統計整個組件生命週期的。


       以上就是Flume源碼剖析的所有內容。Flume自己提供了很是多的組件以外還有很是好的可擴展性。就總體架構而言仍是很是清晰的。除了配置文件結構比較複雜,其餘代碼的可讀性仍是至關高的

相關文章
相關標籤/搜索