若是你還沒看過Flume-ng源碼解析系列中的啓動流程、Channel組件和Sink組件,能夠點擊下面連接:
Flume-ng源碼解析之啓動流程
Flume-ng源碼解析之Channel組件
Flume-ng源碼解析之Sink組件app
在前面三篇文章中咱們初步瞭解了Flume的啓動流程、Channel組件和Sink組件,接下來咱們一塊兒來看看agent三大組件中Source組件。ide
Source,做爲agent中的消息來源組件,咱們來看看它是如何將event傳遞給channel的和它的特性。ui
依然先看代碼:.net
@InterfaceAudience.Public @InterfaceStability.Stable public interface Source extends LifecycleAware, NamedComponent { public void setChannelProcessor(ChannelProcessor channelProcessor); public ChannelProcessor getChannelProcessor(); }
咱們能夠看到它裏面定義的兩個須要實現方法是getChannelProcessor和setChannelProcessor,咱們大概能夠猜到,source就是經過ChannelProcessor將event傳輸給channel的。debug
這裏先來了解一下Source的類型,Flume根據數據來源的特性將Source分紅兩類類,像Http、netcat和exec等就是屬於事件驅動型(EventDrivenSource),而kafka和Jms等就是屬於輪詢拉取型(PollableSource)。code
據咱們在啓動流程中瞭解到的,Application是先啓動SourceRunner,再由SourceRunner來啓動source,那麼既然source有兩種類型,那麼Sourcerunner也分爲EventDrivenSourceRunner和PollableSourceRunner,咱們來看看它們的start():blog
EventDrivenSourceRunner事件
public class EventDrivenSourceRunner extends SourceRunner { … @Override public void start() { Source source = getSource(); ChannelProcessor cp = source.getChannelProcessor(); cp.initialize(); source.start(); lifecycleState = LifecycleState.START; } … }
PollableSourceRunnerrem
public class PollableSourceRunner extends SourceRunner { … @Override public void start() { PollableSource source = (PollableSource) getSource(); ChannelProcessor cp = source.getChannelProcessor(); cp.initialize(); source.start(); runner = new PollingRunner(); runner.source = source; runner.counterGroup = counterGroup; runner.shouldStop = shouldStop; runnerThread = new Thread(runner); runnerThread.setName(getClass().getSimpleName() + "-" + source.getClass().getSimpleName() + "-" + source.getName()); runnerThread.start(); lifecycleState = LifecycleState.START; } … public static class PollingRunner implements Runnable { private PollableSource source; private AtomicBoolean shouldStop; private CounterGroup counterGroup; @Override public void run() { logger.debug("Polling runner starting. Source:{}", source); while (!shouldStop.get()) { counterGroup.incrementAndGet("runner.polls"); try { if (source.process().equals(PollableSource.Status.BACKOFF)) { counterGroup.incrementAndGet("runner.backoffs"); Thread.sleep(Math.min( counterGroup.incrementAndGet("runner.backoffs.consecutive") * source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval())); } else { counterGroup.set("runner.backoffs.consecutive", 0L); } } catch (InterruptedException e) { logger.info("Source runner interrupted. Exiting"); counterGroup.incrementAndGet("runner.interruptions"); } catch (EventDeliveryException e) { logger.error("Unable to deliver event. Exception follows.", e); counterGroup.incrementAndGet("runner.deliveryErrors"); } catch (Exception e) { counterGroup.incrementAndGet("runner.errors"); logger.error("Unhandled exception, logging and sleeping for " + source.getMaxBackOffSleepInterval() + "ms", e); try { Thread.sleep(source.getMaxBackOffSleepInterval()); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } } logger.debug("Polling runner exiting. Metrics:{}", counterGroup); } } }
不管是PollableSourceRunner仍是EventDrivenSourceRunner,都是調用它裏面的source的start()。這個時候咱們看到ChannelProcessor的存在,那麼就會有疑惑,這ChannelProcessor哪來的?咱們仍是得看回AbstarctConfigurationProvider,查看裏面的loadSources(),咱們就會發現下面這段代碼:get
ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration(); ChannelSelector selector = ChannelSelectorFactory.create(sourceChannels, selectorConfig); ChannelProcessor channelProcessor = new ChannelProcessor(selector); Configurables.configure(channelProcessor, config); source.setChannelProcessor(channelProcessor);
到這裏咱們基本已經瞭解了Source的啓動流程,下面以AvroSource爲例看看,source是在哪裏調用ChannelProcessor的插入方法。
public class AvroSource extends AbstractSource implements EventDrivenSource, Configurable, AvroSourceProtocol { … @Override public Status append(AvroFlumeEvent avroEvent) { if (logger.isDebugEnabled()) { if (LogPrivacyUtil.allowLogRawData()) { logger.debug("Avro source {}: Received avro event: {}", getName(), avroEvent); } else { logger.debug("Avro source {}: Received avro event", getName()); } } sourceCounter.incrementAppendReceivedCount(); sourceCounter.incrementEventReceivedCount(); Event event = EventBuilder.withBody(avroEvent.getBody().array(), toStringMap(avroEvent.getHeaders())); try { getChannelProcessor().processEvent(event); } catch (ChannelException ex) { logger.warn("Avro source " + getName() + ": Unable to process event. " + "Exception follows.", ex); return Status.FAILED; } sourceCounter.incrementAppendAcceptedCount(); sourceCounter.incrementEventAcceptedCount(); return Status.OK; } @Override public Status appendBatch(List<AvroFlumeEvent> events) { logger.debug("Avro source {}: Received avro event batch of {} events.", getName(), events.size()); sourceCounter.incrementAppendBatchReceivedCount(); sourceCounter.addToEventReceivedCount(events.size()); List<Event> batch = new ArrayList<Event>(); for (AvroFlumeEvent avroEvent : events) { Event event = EventBuilder.withBody(avroEvent.getBody().array(), toStringMap(avroEvent.getHeaders())); batch.add(event); } try { getChannelProcessor().processEventBatch(batch); } catch (Throwable t) { logger.error("Avro source " + getName() + ": Unable to process event " + "batch. Exception follows.", t); if (t instanceof Error) { throw (Error) t; } return Status.FAILED; } sourceCounter.incrementAppendBatchAcceptedCount(); sourceCounter.addToEventAcceptedCount(events.size()); return Status.OK; } … }
在append方法中咱們能夠看到getChannelProcessor().processEvent(event);,因此不一樣的Source根據它的不一樣觸發機制和拉取機制,在特定的時候調用ChannelProcessor來執行event的插入。 ·
到此爲止,咱們就完成了對Flume啓動流程和三大組件的研究,鑑於能力,其中有些細節沒辦法深刻研究,但願之後有時間可以繼續深刻分析下去。