若是你還沒看過Flume-ng源碼解析系列中的啓動流程和Channel組件,能夠點擊下面連接:
Flume-ng源碼解析之啓動流程
Flume-ng源碼解析之Channel組件node
做爲啓動流程中第二個啓動的組件,咱們今天來看看Sink的細節apache
Sink在agent中扮演的角色是消費者,將event輸送到特定的位置負載均衡
首先依然是看代碼,由代碼咱們能夠看出Sink是一個接口,裏面最主要的方法是process(),用來處理從Channel中獲取的數據。Sink的實例是由SinkFactory.create()生成的。ide
@InterfaceAudience.Public @InterfaceStability.Stable public interface Sink extends LifecycleAware, NamedComponent { public void setChannel(Channel channel); public Channel getChannel(); /* 用來處理channel中取來的event*/ public Status process() throws EventDeliveryException; public static enum Status { READY, BACKOFF } }
在啓動流程中咱們瞭解到Application中啓動的不是Sink,而是SinkRunner,由名字咱們能夠看出這是一個驅動類。咱們來看看代碼,主要看它的start()this
public class SinkRunner implements LifecycleAware { ... @Override public void start() { SinkProcessor policy = getPolicy(); policy.start(); runner = new PollingRunner(); runner.policy = policy; runner.counterGroup = counterGroup; runner.shouldStop = new AtomicBoolean(); runnerThread = new Thread(runner); runnerThread.setName("SinkRunner-PollingRunner-" + policy.getClass().getSimpleName()); runnerThread.start(); lifecycleState = LifecycleState.START; } ... }
咱們知道啓動SinkRunner實際上就是調用它的start(),而在start()中能夠看到主要是啓動了一個SinkProcessor,而這個SinkProcessor在建立SinkRunnner的時候已經指定了,若是你想要了解配置文件是如何處理的,能夠要去看看conf包裏面的類,能夠看看org.apache.flume.node.AbstractConfigurationProvider中的getConfiguration()。.net
咱們接着看看SinkProcessor線程
public interface SinkProcessor extends LifecycleAware, Configurable { Status process() throws EventDeliveryException; void setSinks(List<Sink> sinks); }
SinkProcesor是一個接口,他的實現類由SinkProcessorFactory的getProcessor()生成,在AbstractConfigurationProvider中的loadSinkGroup()調用SinkGroup中的configure()生成。code
public class SinkGroup implements Configurable, ConfigurableComponent { List<Sink> sinks; SinkProcessor processor; SinkGroupConfiguration conf; public SinkGroup(List<Sink> groupSinks) { sinks = groupSinks; } public SinkProcessor getProcessor() { return processor; } @Override public void configure(ComponentConfiguration conf) { this.conf = (SinkGroupConfiguration) conf; processor = SinkProcessorFactory.getProcessor(this.conf.getProcessorContext(), sinks); } }
那麼咱們以DefalutSinkProcessor爲例子看看blog
public class DefaultSinkProcessor implements SinkProcessor, ConfigurableComponent { private Sink sink; private LifecycleState lifecycleState; @Override public void start() { Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set"); sink.start(); lifecycleState = LifecycleState.START; } @Override public void stop() { Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set"); sink.stop(); lifecycleState = LifecycleState.STOP; } @Override public LifecycleState getLifecycleState() { return lifecycleState; } @Override public void configure(Context context) { } @Override public Status process() throws EventDeliveryException { return sink.process(); } @Override public void setSinks(List<Sink> sinks) { Preconditions.checkNotNull(sinks); Preconditions.checkArgument(sinks.size() == 1, "DefaultSinkPolicy can " + "only handle one sink, " + "try using a policy that supports multiple sinks"); sink = sinks.get(0); } @Override public void configure(ComponentConfiguration conf) { } }
從上面的代碼中咱們能夠看到SinkProcessor執行的仍是sink的start、stop和process方法,那麼SinkProcessor的做用是什麼,Flume提供leFailoverSinkProcessor和LoadBalancingSinkProcessor,顧名思義,一個是失效備援,一個是負載均衡,那麼SinkProcessor不一樣子類的存在就是爲了實現不一樣的分配操做和策略。而sink的start()一般是啓動線程去執行消費操做。接口