Flume-ng源碼解析之Sink組件

若是你還沒看過Flume-ng源碼解析系列中的啓動流程和Channel組件,能夠點擊下面連接:
Flume-ng源碼解析之啓動流程
Flume-ng源碼解析之Channel組件node

做爲啓動流程中第二個啓動的組件,咱們今天來看看Sink的細節apache

1 Sink

Sink在agent中扮演的角色是消費者,將event輸送到特定的位置負載均衡

首先依然是看代碼,由代碼咱們能夠看出Sink是一個接口,裏面最主要的方法是process(),用來處理從Channel中獲取的數據。Sink的實例是由SinkFactory.create()生成的。ide

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Sink extends LifecycleAware, NamedComponent {
  public void setChannel(Channel channel);
  public Channel getChannel();
  /* 用來處理channel中取來的event*/
  public Status process() throws EventDeliveryException;
  public static enum Status {
    READY, BACKOFF
  }
}

在啓動流程中咱們瞭解到Application中啓動的不是Sink,而是SinkRunner,由名字咱們能夠看出這是一個驅動類。咱們來看看代碼,主要看它的start()this

public class SinkRunner implements LifecycleAware {

  ...

  @Override
  public void start() {
    SinkProcessor policy = getPolicy();

    policy.start();

    runner = new PollingRunner();

    runner.policy = policy;
    runner.counterGroup = counterGroup;
    runner.shouldStop = new AtomicBoolean();

    runnerThread = new Thread(runner);
    runnerThread.setName("SinkRunner-PollingRunner-" +
        policy.getClass().getSimpleName());
    runnerThread.start();

    lifecycleState = LifecycleState.START;
  }
  ...

}

咱們知道啓動SinkRunner實際上就是調用它的start(),而在start()中能夠看到主要是啓動了一個SinkProcessor,而這個SinkProcessor在建立SinkRunnner的時候已經指定了,若是你想要了解配置文件是如何處理的,能夠要去看看conf包裏面的類,能夠看看org.apache.flume.node.AbstractConfigurationProvider中的getConfiguration()。.net

咱們接着看看SinkProcessor線程

public interface SinkProcessor extends LifecycleAware, Configurable {
  Status process() throws EventDeliveryException;
  void setSinks(List<Sink> sinks);
}

SinkProcesor是一個接口,他的實現類由SinkProcessorFactory的getProcessor()生成,在AbstractConfigurationProvider中的loadSinkGroup()調用SinkGroup中的configure()生成。code

public class SinkGroup implements Configurable, ConfigurableComponent {
  List<Sink> sinks;
  SinkProcessor processor;
  SinkGroupConfiguration conf;

  public SinkGroup(List<Sink> groupSinks) {
    sinks = groupSinks;
  }
  
  public SinkProcessor getProcessor() {
    return processor;
  }

  @Override
  public void configure(ComponentConfiguration conf) {
    this.conf = (SinkGroupConfiguration) conf;
    processor =
        SinkProcessorFactory.getProcessor(this.conf.getProcessorContext(),
            sinks);
  }
}

那麼咱們以DefalutSinkProcessor爲例子看看blog

public class DefaultSinkProcessor implements SinkProcessor, ConfigurableComponent {
  private Sink sink;
  private LifecycleState lifecycleState;

  @Override
  public void start() {
    Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set");
    sink.start();
    lifecycleState = LifecycleState.START;
  }

  @Override
  public void stop() {
    Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set");
    sink.stop();
    lifecycleState = LifecycleState.STOP;
  }

  @Override
  public LifecycleState getLifecycleState() {
    return lifecycleState;
  }

  @Override
  public void configure(Context context) {
  }

  @Override
  public Status process() throws EventDeliveryException {
    return sink.process();
  }

  @Override
  public void setSinks(List<Sink> sinks) {
    Preconditions.checkNotNull(sinks);
    Preconditions.checkArgument(sinks.size() == 1, "DefaultSinkPolicy can "
        + "only handle one sink, "
        + "try using a policy that supports multiple sinks");
    sink = sinks.get(0);
  }

  @Override
  public void configure(ComponentConfiguration conf) {

  }

}

從上面的代碼中咱們能夠看到SinkProcessor執行的仍是sink的start、stop和process方法,那麼SinkProcessor的做用是什麼,Flume提供leFailoverSinkProcessor和LoadBalancingSinkProcessor,顧名思義,一個是失效備援,一個是負載均衡,那麼SinkProcessor不一樣子類的存在就是爲了實現不一樣的分配操做和策略。而sink的start()一般是啓動線程去執行消費操做。接口

相關文章
相關標籤/搜索