首先全部核心組件都會實現org.apache.flume.lifecycle.LifecycleAware接口:javascript
public interface LifecycleAware { public void start(); public void stop(); public LifecycleState getLifecycleState(); }
start方法在整個Flume啓動時或者初始化組件時都會調用start方法進行組件初始化,Flume組件出現異常中止時會調用stop,getLifecycleState返回組件的生命週期狀態,有IDLE, START, STOP, ERROR四個狀態。php
若是開發的組件須要配置,如設置一些屬性;能夠實現org.apache.flume.conf.Configurable接口:java
public interface Configurable { public void configure(Context context); }
Flume在啓動組件以前會調用configure來初始化組件一些配置。web
Source用於採集日誌數據,有兩種實現方式:輪訓拉取和事件驅動機制;Source接口以下:算法
public interface Source extends LifecycleAware, NamedComponent { public void setChannelProcessor(ChannelProcessor channelProcessor); public ChannelProcessor getChannelProcessor(); }
Source接口首先繼承了LifecycleAware接口,而後只提供了ChannelProcessor的setter和getter接口,也就是說它的的全部邏輯的實現應該在LifecycleAware接口的start和stop中實現;ChannelProcessor以前介紹過用來進行日誌流的過濾和Channel的選擇及調度。sql
而 Source 是經過 SourceFactory 工廠建立,默認提供了 DefaultSourceFactory ,其首先經過 Enum 類型 org.apache.flume.conf.source.SourceType 查找默認實現,如exec ,則找到 org.apache.flume.source.ExecSource 實現,若是找不到直接Class.forName(className) 建立。 apache
Source 提供了兩種機制: PollableSource (輪訓拉取)和 EventDrivenSource (事件驅動):負載均衡
PollableSource 默認提供了以下實現:dom
好比 JMSSource 實現使用 javax.jms.MessageConsumer.receive(pollTimeout) 主動去拉取消息。異步
EventDrivenSource 默認提供了以下實現:
好比 NetcatSource 、 HttpSource 就是事件驅動,即被動等待;好比 HttpSource 就是內部啓動了一個內嵌的 Jetty 啓動了一個 Servlet 容器,經過 FlumeHTTPServlet 去接收消息。
Flume 提供了 SourceRunner 用來啓動 Source 的流轉:
從本組件也能夠看出: 1 、首先要初始化 ChannelProcessor ,其實現時初始化過濾器鏈; 2 、接着啓動 Source 並更改本組件的狀態。
public class PollableSourceRunner extends SourceRunner { @Override public void start() { PollableSource source = (PollableSource) getSource(); ChannelProcessor cp = source.getChannelProcessor(); cp.initialize(); source.start(); runner = new PollingRunner(); runner.source = source; runner.counterGroup = counterGroup; runner.shouldStop = shouldStop; runnerThread = new Thread(runner); runnerThread.setName(getClass().getSimpleName() + "-" + source.getClass().getSimpleName() + "-" + source.getName()); runnerThread.start(); lifecycleState = LifecycleState.START; } }
而 PollingRunner 首先初始化組件,可是又啓動了一個線程 PollingRunner ,其做用就是輪訓拉取數據:
@Override public void run() { while (!shouldStop.get()) { //若是沒有中止,則一直在死循環運行 counterGroup.incrementAndGet("runner.polls"); try { //調用PollableSource的process方法進行輪訓拉取,而後判斷是否遇到了失敗補償 if (source.process().equals(PollableSource.Status.BACKOFF)) {/ counterGroup.incrementAndGet("runner.backoffs"); //失敗補償時暫停線程處理,等待超時時間以後重試 Thread.sleep(Math.min( counterGroup.incrementAndGet("runner.backoffs.consecutive") * source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval())); } else { counterGroup.set("runner.backoffs.consecutive", 0L); } } catch (InterruptedException e) { } } } } }
Flume 在啓動時會判斷 Source 是 PollableSource 仍是 EventDrivenSource 來選擇使用 PollableSourceRunner 仍是 EventDrivenSourceRunner 。
好比 HttpSource 實現,其經過 FlumeHTTPServlet 接收消息而後:
List<Event> events = Collections.emptyList(); //create empty list //首先從請求中獲取Event events = handler.getEvents(request); //而後交給ChannelProcessor進行處理 getChannelProcessor().processEventBatch(events);
到此基本的 Source 流程就介紹完了,其做用就是監聽日誌,採集,而後交給ChannelProcessor 進行處理。
Channel 用於鏈接 Source 和 Sink , Source 生產日誌發送到 Channel , Sink 從Channel 消費日誌;也就是說經過 Channel 實現了 Source 和 Sink 的解耦,能夠實現多對多的關聯,和 Source 、 Sink 的異步化。
以前 Source 採集到日誌後會交給 ChannelProcessor 處理,那麼接下來咱們先從ChannelProcessor 入手,其依賴三個組件:
private final ChannelSelector selector; //Channel選擇器 private final InterceptorChain interceptorChain; //攔截器鏈 private ExecutorService execService; //用於實現可選Channel的ExecutorService,默認是單線程實現
接下來看下其是如何處理 Event 的:
public void processEvent(Event event) { event = interceptorChain.intercept(event); //首先進行攔截器鏈過濾 if (event == null) { return; } List<Event> events = new ArrayList<Event>(1); events.add(event); //經過Channel選擇器獲取必須成功處理的Channel,而後事務中執行 List<Channel> requiredChannels = selector.getRequiredChannels(event); for (Channel reqChannel : requiredChannels) { executeChannelTransaction(reqChannel, events, false); } //經過Channel選擇器獲取可選的Channel,這些Channel失敗是能夠忽略,不影響其餘Channel的處理 List<Channel> optionalChannels = selector.getOptionalChannels(event); for (Channel optChannel : optionalChannels) { execService.submit(new OptionalChannelTransactionRunnable(optChannel, events)); } }
另外內部還提供了批處理實現方法 processEventBatch ;對於內部事務實現的話能夠參考 executeChannelTransaction 方法,總體事務機制相似於 JDBC :
private static void executeChannelTransaction(Channel channel, List<Event> batch, boolean isOptional) { //一、獲取Channel上的事務 Transaction tx = channel.getTransaction(); Preconditions.checkNotNull(tx, "Transaction object must not be null"); try { //二、開啓事務 tx.begin(); //三、在Channel上執行批量put操做 for (Event event : batch) { channel.put(event); } //四、成功後提交事務 tx.commit(); } catch (Throwable t) { //五、異常後回滾事務 tx.rollback(); if (t instanceof Error) { LOG.error("Error while writing to channel: " + channel, t); throw (Error) t; } else if(!isOptional) {//若是是可選的Channel,異常忽略 throw new ChannelException("Unable to put batch on required " + "channel: " + channel, t); } } finally { //最後關閉事務 tx.close(); } }
Interceptor 用於過濾 Event ,即傳入一個 Event 而後進行過濾加工,而後返回一個新的 Event ,接口以下:
public interface Interceptor { public void initialize(); public Event intercept(Event event); public List<Event> intercept(List<Event> events); public void close(); }
能夠看到其提供了 initialize 和 close 方法用於啓動和關閉; intercept 方法用於過濾或加工 Event 。好比 HostInterceptor 攔截器用於獲取本機 IP 而後默認添加到 Event 的字段爲 host 的 Header 中。
接下來就是 ChannelSelector 選擇器了,其經過以下方式建立:
//獲取ChannelSelector配置,好比agent.sources.s1.selector.type = replicatingChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration();//使用Source關聯的Channel建立,好比agent.sources.s1.channels = c1 c2ChannelSelector selector = ChannelSelectorFactory.create(sourceChannels, selectorConfig);
ChannelSelector 默認提供了兩種實現:複製和多路複用:
默認實現是複製選擇器 ReplicatingChannelSelector ,即把接收到的消息複製到每個 Channel ;多路複用選擇器 MultiplexingChannelSelector 會根據 Event Header 中的參數進行選擇,以此來選擇使用哪一個 Channel 。
而 Channel 是 Event 中轉的地方, Source 發佈 Event 到 Channel , Sink 消費Channel 的 Event ; Channel 接口提供了以下接口用來實現 Event 流轉:
public interface Channel extends LifecycleAware, NamedComponent { public void put(Event event) throws ChannelException; public Event take() throws ChannelException; public Transaction getTransaction(); }
put 用於發佈 Event , take 用於消費 Event , getTransaction 用於事務支持。默認提供了以下 Channel 的實現:
對於 Channel 的實現咱們後續單獨章節介紹。
Sink 從 Channel 消費 Event ,而後進行轉移到收集 / 聚合層或存儲層。 Sink 接口以下所示:
public interface Sink extends LifecycleAware, NamedComponent { public void setChannel(Channel channel); public Channel getChannel(); public Status process() throws EventDeliveryException; public static enum Status { READY, BACKOFF } }
相似於 Source ,其首先繼承了 LifecycleAware ,而後提供了 Channel 的getter/setter 方法,並提供了 process 方法進行消費,此方法會返回消費的狀態,READY 或 BACKOFF 。
Sink 也是經過 SinkFactory 工廠來建立,其也提供了 DefaultSinkFactory 默認工廠,好比傳入 hdfs ,會先查找 Enum org.apache.flume.conf.sink.SinkType ,而後找到相應的默認處理類 org.apache.flume.sink.hdfs.HDFSEventSink ,若是沒找到默認處理類,直接經過 Class.forName(className) 進行反射建立。
咱們知道 Sink 還提供了分組功能,用於把多個 Sink 聚合爲一組進行使用,內部提供了 SinkGroup 用來完成這個事情。此時問題來了,如何去調度多個 Sink ,其內部使用了 SinkProcessor 來完成這個事情,默認提供了故障轉移和負載均衡兩個策略。
首先 SinkGroup 就是聚合多個 Sink 爲一組,而後將多個 Sink 傳給SinkProcessorFactory 進行建立 SinkProcessor ,而策略是根據配置文件中配置的如agent.sinkgroups.g1.processor.type = load_balance 來選擇的。
SinkProcessor 提供了以下實現:
DefaultSinkProcessor :默認實現,用於單個 Sink 的場景使用。
FailoverSinkProcessor :故障轉移實現:
public Status process() throws EventDeliveryException { Long now = System.currentTimeMillis(); //一、首先檢查失敗隊列的頭部的Sink是否已通過了失敗補償等待時間了 while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) { //二、若是可使用了,則從失敗Sink隊列獲取隊列第一個Sink FailedSink cur = failedSinks.poll(); Status s; try { s = cur.getSink().process(); //三、使用此Sink進行處理 if (s == Status.READY) { //四、若是處理成功 liveSinks.put(cur.getPriority(), cur.getSink()); //4.一、放回存活Sink隊列 activeSink = liveSinks.get(liveSinks.lastKey()); } else { failedSinks.add(cur); //4.二、若是此時不是READY,即BACKOFF期間,再次放回失敗隊列 } return s; } catch (Exception e) { cur.incFails(); //五、若是遇到異常了,則增長失敗次數,並放回失敗隊列 failedSinks.add(cur); } } Status ret = null; while(activeSink != null) { //六、此時失敗隊列中沒有Sink能處理了,那麼須要使用存活Sink隊列進行處理 try { ret = activeSink.process(); return ret; } catch (Exception e) { //七、處理失敗進行轉移到失敗隊列 activeSink = moveActiveToDeadAndGetNext(); } } throw new EventDeliveryException("All sinks failed to process, " + "nothing left to failover to"); }
失敗隊列是一個優先級隊列,使用 refresh 屬性排序,而 refresh 是經過以下機制計算的:
refresh = System.currentTimeMillis() + Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY);
其中 maxPenalty 是最大等待時間,默認 30s ,而 (1 << sequentialFailures) * FAILURE_PENALTY) 用於實現指數級等待時間遞增, FAILURE_PENALTY 是 1s 。
LoadBalanceSinkProcessor :用於實現 Sink 的負載均衡,其經過 SinkSelector 進行實現,相似於 ChannelSelector 。 LoadBalanceSinkProcessor 在啓動時會根據配置,如 agent.sinkgroups.g1.processor.selector = random 進行選擇,默認提供了兩種選擇器:
LoadBalanceSinkProcessor 使用以下機制進行負載均衡:
public Status process() throws EventDeliveryException { Status status = null; //一、使用選擇器建立相應的迭代器,也就是用來選擇Sink的迭代器 Iterator<Sink> sinkIterator = selector.createSinkIterator(); while (sinkIterator.hasNext()) { Sink sink = sinkIterator.next(); try { //二、選擇器迭代Sink進行處理,若是成功直接break掉此次處理,這次負載均衡就算完成了 status = sink.process(); break; } catch (Exception ex) { //三、失敗後會通知選擇器,採起相應的失敗退避補償算法進行處理 selector.informSinkFailed(sink); LOGGER.warn("Sink failed to consume event. " + "Attempting next sink if available.", ex); } } if (status == null) { throw new EventDeliveryException("All configured sinks have failed"); } return status; }
如上的核心就是怎麼建立迭代器,如何進行失敗退避補償處理,首先咱們看下RoundRobinSinkSelector 實現,其內部是經過通用的 RoundRobinOrderSelector 選擇器實現:
public Iterator<T> createIterator() { //一、獲取存活的Sink索引, List<Integer> activeIndices = getIndexList(); int size = activeIndices.size(); //二、若是上次記錄的下一個存活Sink的位置超過了size,那麼從隊列頭從新開始計數 if (nextHead >= size) { nextHead = 0; } //三、獲取本次使用的起始位置 int begin = nextHead++; if (nextHead == activeIndices.size()) { nextHead = 0; } //四、從該位置開始迭代,其實現相似於環形隊列,好比整個隊列是5,起始位置是3,則按照 三、四、0、一、2的順序進行輪訓,實現了輪訓算法 int[] indexOrder = new int[size]; for (int i = 0; i < size; i++) { indexOrder[i] = activeIndices.get((begin + i) % size); } //indexOrder是迭代順序,getObjects返回相關的Sinks; return new SpecificOrderIterator<T>(indexOrder, getObjects()); }
getIndexList 實現以下:
protected List<Integer> getIndexList() { long now = System.currentTimeMillis(); List<Integer> indexList = new ArrayList<Integer>(); int i = 0; for (T obj : stateMap.keySet()) { if (!isShouldBackOff() || stateMap.get(obj).restoreTime < now) { indexList.add(i); } i++; } return indexList; }
isShouldBackOff() 表示是否開啓退避算法支持,若是不開啓,則認爲每一個 Sink 都是存活的,每次都會重試,經過 agent.sinkgroups.g1.processor.backoff = true 配置開啓,默認 false ; restoreTime 和以前介紹的 refresh 同樣,是退避補償等待時間,算法相似,就很少介紹了。
那麼何時調用 Sink 進行消費呢?其相似於 SourceRunner , Sink 提供了SinkRunner 進行輪訓拉取處理, SinkRunner 會輪訓調度 SinkProcessor 消費 Channel的消息,而後調用 Sink 進行轉移。 SinkProcessor 以前介紹過,其負責消息複製 / 路由。
SinkRunner 實現以下:
public void start() { SinkProcessor policy = getPolicy(); policy.start(); runner = new PollingRunner(); runner.policy = policy; runner.counterGroup = counterGroup; runner.shouldStop = new AtomicBoolean(); runnerThread = new Thread(runner); runnerThread.setName("SinkRunner-PollingRunner-" + policy.getClass().getSimpleName()); runnerThread.start(); lifecycleState = LifecycleState.START;}
即獲取 SinkProcessor 而後啓動它,接着啓動輪訓線程去處理。 PollingRunner 線程負責輪訓消息,核心實現以下:
public void run() { while (!shouldStop.get()) { //若是沒有中止 try { if (policy.process().equals(Sink.Status.BACKOFF)) {//若是處理失敗了,進行退避補償處理 counterGroup.incrementAndGet("runner.backoffs"); Thread.sleep(Math.min( counterGroup.incrementAndGet("runner.backoffs.consecutive") * backoffSleepIncrement, maxBackoffSleep)); //暫停退避補償設定的超時時間 } else { counterGroup.set("runner.backoffs.consecutive", 0L); } } catch (Exception e) { try { Thread.sleep(maxBackoffSleep); //若是遇到異常則等待最大退避時間 } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } } }
總體實現相似於 PollableSourceRunner 實現,總體處理都是交給 SinkProcessor 完成的。 SinkProcessor 會輪訓 Sink 的 process 方法進行處理;此處以 LoggerSink 爲例:
@Overridepublic Status process() throws EventDeliveryException { Status result = Status.READY; Channel channel = getChannel(); //一、獲取事務 Transaction transaction = channel.getTransaction(); Event event = null; try { //二、開啓事務 transaction.begin(); //三、從Channel獲取Event event = channel.take(); if (event != null) { if (logger.isInfoEnabled()) { logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog)); } } else {//四、若是Channel中沒有Event,則默認進入故障補償機制,即防止死循環形成CPU負載高 result = Status.BACKOFF; } //五、成功後提交事務 transaction.commit(); } catch (Exception ex) { //六、失敗後回滾事務 transaction.rollback(); throw new EventDeliveryException("Failed to log event: " + event, ex); } finally { //七、關閉事務 transaction.close(); } return result; }
Sink 中一些實現是支持批處理的,好比 RollingFileSink :
//一、開啓事務//二、批處理for (int i = 0; i < batchSize; i++) { event = channel.take(); if (event != null) { sinkCounter.incrementEventDrainAttemptCount(); eventAttemptCounter++; serializer.write(event); } }//三、提交/回滾事務、關閉事務
定義一個批處理大小而後在事務中執行批處理。