在AbstractConfigurationProvider類中loadSources方法會將全部的source進行封裝成SourceRunner放到了Map<String, SourceRunner> sourceRunnerMap之中。相關代碼以下:html
1 Map<String, String> selectorConfig = context.getSubProperties( 2 BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX); 3 4 ChannelSelector selector = ChannelSelectorFactory.create( 5 sourceChannels, selectorConfig); 6 7 ChannelProcessor channelProcessor = new ChannelProcessor(selector); 8 Configurables.configure(channelProcessor, context); 9 source.setChannelProcessor(channelProcessor); 10 sourceRunnerMap.put(sourceName, 11 SourceRunner.forSource(source));
每一個source都有selector。上述代碼會獲取配置文件中關於source的selector配置信息;而後構造ChannelSelector對象selector;並封裝selector對象成ChannelProcessor對象channelProcessor;執行channelProcessor.configure方法進行配置;設置soure的channelprocessor,最後封裝爲sourceRunner和source名稱一塊兒放入sourceRunnerMap中。 數據結構
1、ChannelSelector selector = ChannelSelectorFactory.create(sourceChannels, selectorConfig)會根據配置文件中指定的類型實例化一個ChannelSelector(共兩種ReplicatingChannelSelector複製和MultiplexingChannelSelector複用)若是沒有指定類型默認是ReplicatingChannelSelector,也就是配置文件中不用配置selector會將每一個event複製發送到多個channel;selector.setChannels(channels);對此slector進行配置configure(context)。這兩中selector都實現了三個方法getRequiredChannels(Event event)、getOptionalChannels(Event event) 以及configure(Context context)。其實Event要發送到的channel有兩種組成:RequiredChannels和OptionalChannels,對應兩個方法。app
(1)ReplicatingChannelSelector的configure(context)方法會得到經過"optional"在配置文件中指定的可選發送的channels(能夠多個,經過空格分割);獲取requiredChannels是此source對應的channel中能夠活動的channel列表;而後獲取全部channel的名字及其與channel的映射channelNameMap;而後將可選的channel加入optionalChannels並從requiredChannels去掉有對應的channel,在這裏並無檢查可選channel的合法性以及能夠配置此source指定的channel以外的channel,requiredChannels和optionalChannels不能有交集,有交集的話會從requiredChannels中刪除相交的channel,因此若是配置文件中optional指定的channel列表和source指定的列表相同getOptionalChannels方法有可能會返回所有可活動channel列表使得數據重複,因此建議optional指定的channel最好是source指定以外的其餘channel(好比是其餘source的channel)。getOptionalChannels方法就是直接返回optionalChannels列表,getRequiredChannels方法返回requiredChannels列表,若是requiredChannels爲null,則返回所有的能夠活動的channel列表。ide
(2)MultiplexingChannelSelector的configure(context)先獲取要匹配的event的header的headerName,只能選擇一個headerName;得到默認發送到的channel列表defaultChannels,能夠指定多個默認channel;得到mapping的各個子值,及對應的channel名稱mapConfig;用來存儲header不一樣的值及其對應的要發送到的channel列表(每一個map能夠發送到多個channel中,每一個channel也能夠同時對應多個mapping),存入channelMapping(這個數據結構是用來存儲mapping值及對應的channel列表的);optionalChannels是配置的可選值及其要發送到的channel列表的映射關係,channelMapping中已經出現的channel不容許再次在optionalChannels出現(防止數據重複),若是channelMapping沒有這個值對應的channel列表(表示可能會使用默認的channel列表)則使過濾與默認channel列表的交集,optionalChannels存儲的是對應header的各個值及其等於該值的event要發送到的可選擇的channel列表。getOptionalChannels(Event event)方法返回的是optionalChannels中該event的指定header對應的可選擇的channel列表。getRequiredChannels(Event event)方法返回的是channelMapping中該event的指定header對應的channel列表,若是爲null(表示因爲該event的headers沒有匹配的channel就發送到默認的channel中)就返回默認發送列表defaultChannels。須要說明的是選擇器配置文件中的"default"、"mapping."、"optional."這三個是同等級的,沒有匹配後二者的值時纔會選擇發送到default對應的channel列表,後二者的值都是event的header中對應配置文件中指定的"header"的各類值。當調用getRequiredChannels(Event event)和getOptionalChannels(Event event)方法時都會對這個event的相應header查找對應要發送到的channel列表。優化
2、 ChannelProcessor channelProcessor = new ChannelProcessor(selector)這個是封裝選擇器構造channelprocessor。其構造方法會賦值selector並構造一個InterceptorChain對象interceptorChain。ChannelProcessor類負責管理選擇器selector和攔截器interceptor。ui
3、執行channelProcessor.configure(Context)進行必要的配置,該方法會調用channelProcessor.configureInterceptors(context)對攔截器們進行獲取和配置,configureInterceptors方法會先從配置文件中獲取interceptor的組件名字interceptorNames[](能夠多個),而後獲取全部的「interceptors.」的配置信息interceptorContexts,而後遍歷全部interceptorNames從配置文件中獲取屬於這個interceptor的配置信息及類型(type),根據類型構建相應的interceptor並進行配置configure,加入interceptors列表(用來存放實例化的interceptor);最後將列表傳遞給interceptorChain。關於更多interceptor的信息能夠看這篇Flume-NG源碼閱讀之Interceptor(原創) 。 spa
4、source.setChannelProcessor(channelProcessor)賦值。各個source經過getChannelProcessor()方法獲取processor調用其processEventBatch(events)或者processEvent(event)來將event送到channel中。線程
5、sourceRunnerMap.put(sourceName,SourceRunner.forSource(source))將source封裝成SourceRunner放入sourceRunnerMap。SourceRunner.forSource會根據這個source所實現的接口封裝成不一樣的Runner,有兩種接口PollableSource和EventDrivenSource,前者是有本身線程來驅動的須要實現process方法,後者是沒有單獨的線程來驅動的沒有process方法。code
1 public static SourceRunner forSource(Source source) { 2 SourceRunner runner = null; 3 4 if (source instanceof PollableSource) { 5 runner = new PollableSourceRunner(); 6 ((PollableSourceRunner) runner).setSource((PollableSource) source); 7 } else if (source instanceof EventDrivenSource) { 8 runner = new EventDrivenSourceRunner(); 9 ((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source); 10 } else { 11 throw new IllegalArgumentException("No known runner type for source " 12 + source); 13 } 14 15 return runner; 16 }
(1)PollableSourceRunner的start()方法會獲取source的ChannelProcessor,而後執行其initialize()方法,該方法會調用interceptorChain.initialize()方法對攔截器們進行初始化(遍歷全部攔截器而後執行攔截器的initialize()方法);而後執行source.start()啓動source;再啓動一個線程PollingRunner,它的run方法會始終執行source.process()並根據返回的狀態值作一些統計工做。htm
(2)EventDrivenSourceRunner的start()方法會獲取source的ChannelProcessor,而後執行其initialize()方法,該方法會調用interceptorChain.initialize()方法對攔截器們進行初始化(遍歷全部攔截器而後執行攔截器的initialize()方法);而後執行source.start()啓動source。
這樣就完成了sourceRunnerMap的組裝。當在Application中的startAllComponents方法中經過materializedConfiguration.getSourceRunners()獲取全部的SourceRunner並放入supervisor.supervise中去執行,會調用到SourceRunner.start()方法,即上面剛講到的內容。這樣source就啓動了。而後當將封裝的Events或者Event發送到channel時,須要使用對應的方法ChannelProcessor.processEventBatch(List<Event> events)或者ChannelProcessor.processEvent(Event event)就能夠將數據從source傳輸到channel中,這兩個方法都會在開始調用interceptorChain.intercept(events)或者interceptorChain.intercept(event)對event增長headers(若是有多個interceptor會遍歷interceptors處理每一個event)。ChannelProcessor都是經過在source中直接調用getChannelProcessor()(在全部的source的父類AbstractSource中實現的)得到。看一看processEventBatch(List<Event> events)代碼:
1 public void processEventBatch(List<Event> events) { 2 Preconditions.checkNotNull(events, "Event list must not be null"); 3 4 events = interceptorChain.intercept(events); 5 6 Map<Channel, List<Event>> reqChannelQueue = //須要發送到的每一個channel及其要發送到這個channel的event列表 7 new LinkedHashMap<Channel, List<Event>>(); 8 9 Map<Channel, List<Event>> optChannelQueue = //可選的每一個channel及其要發送到這個channel的event列表 10 new LinkedHashMap<Channel, List<Event>>(); 11 12 for (Event event : events) { 13 List<Channel> reqChannels = selector.getRequiredChannels(event); //獲取須要發送到的全部channel 14 15 for (Channel ch : reqChannels) { 16 List<Event> eventQueue = reqChannelQueue.get(ch); 17 if (eventQueue == null) { 18 eventQueue = new ArrayList<Event>(); 19 reqChannelQueue.put(ch, eventQueue); 20 } 21 eventQueue.add(event); //將event放入對應channel的event列表 22 } 23 24 List<Channel> optChannels = selector.getOptionalChannels(event); //獲取可選的要發送到的全部channel 25 26 for (Channel ch: optChannels) { 27 List<Event> eventQueue = optChannelQueue.get(ch); 28 if (eventQueue == null) { 29 eventQueue = new ArrayList<Event>(); 30 optChannelQueue.put(ch, eventQueue); 31 } 32 33 eventQueue.add(event); //將event放入對應channel的event列表 34 } 35 } 36 37 // Process required channels 38 for (Channel reqChannel : reqChannelQueue.keySet()) { 39 Transaction tx = reqChannel.getTransaction(); //建立事務 40 Preconditions.checkNotNull(tx, "Transaction object must not be null"); 41 try { 42 tx.begin(); 43 44 List<Event> batch = reqChannelQueue.get(reqChannel); 45 46 for (Event event : batch) { //發送到須要發送到的channel 47 reqChannel.put(event); 48 } 49 50 tx.commit(); 51 } catch (Throwable t) { 52 tx.rollback(); //事務回滾 53 if (t instanceof Error) { 54 LOG.error("Error while writing to required channel: " + 55 reqChannel, t); 56 throw (Error) t; 57 } else { 58 throw new ChannelException("Unable to put batch on required " + 59 "channel: " + reqChannel, t); 60 } 61 } finally { 62 if (tx != null) { 63 tx.close(); 64 } 65 } 66 }
上述代碼不復雜,會得到全部須要發送到的channel和全部可選的channel,而後針對每一個channel,將全部event放入一個列表與該channel組成映射;而後會遍歷兩種channel列表中的每一個channel將它對應的全部event發送到對應的channel中。這個方法寫的不夠友好,還能夠再優化,由於方法的參數自己就是一個列表能夠省去一層for循環,直接將reqChannelQueue.put(ch, eventQueue)和optChannelQueue.put(ch, eventQueue)中的eventQueue改成傳遞過來的參數List<Event> events就能夠達到優化的目的。
processEvent(Event event)方法就更簡單了,將這個event發送到這兩種channel列表中每一個channel就能夠。
在發送到channel的過程當中咱們也發現都會有事務的建立(getTransaction())、開始(tx.begin())、提交(tx.commit())、回滾(tx.rollback())、關閉(tx.close())等操做,這是必須的。在sink中這些操做須要顯示的去調用,而在source端則封裝在processEvent和processEventBatch方法中,不須要顯示的調用了,但不是不調用。
至此,sourceRunner的配置、初始化、執行就講解完畢了。在配置文件中看到的interceptor和selector都是在這裏進行配置及執行的。經過了解上述,咱們自定義source組件是否是更容易了。呵呵
後續還有精彩內容!敬請期待哈!