Flume-ng源碼解析之Channel組件

若是還沒看過Flume-ng源碼解析之啓動流程,能夠點擊Flume-ng源碼解析之啓動流程 查看api

1 接口介紹

組件的分析順序是按照上一篇中啓動順序來分析的,首先是Channel,而後是Sink,最後是Source,在開始看組件源碼以前咱們先來看一下兩個重要的接口,一個是LifecycleAware ,另外一個是NamedComponentapp

1.1 LifecycleAware

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface LifecycleAware {

  public void start();

  public void stop();

  public LifecycleState getLifecycleState();

}

很是簡單就是三個方法,start()、stop()和getLifecycleState,這個接口是flume好多類都要實現的接口,包括Flume-ng源碼解析之啓動流程
所中提到PollingPropertiesFileConfigurationProvider(),只要涉及到生命週期的都會實現該接口,固然組件們也是要實現的!ide

1.2 NamedComponent

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface NamedComponent {

  public void setName(String name);

  public String getName();

}

這個沒什麼好講的,就是用來設置名字的。ui

2 Channel

做爲Flume三大核心組件之一的Channel,咱們有必要來看看它的構成:this

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Channel extends LifecycleAware, NamedComponent {

  public void put(Event event) throws ChannelException;
  public Event take() throws ChannelException;
  public Transaction getTransaction();
}

那麼從上面的接口中咱們能夠看到Channel的主要功能就是put()和take(),那麼咱們就來看一下它的具體實現。這裏咱們選擇MemoryChannel做爲例子,可是MemoryChannel太長了,咱們就截取一小段來看看.net

public class MemoryChannel extends BasicChannelSemantics {
    private static Logger LOGGER = LoggerFactory.getLogger(MemoryChannel.class);
    private static final Integer defaultCapacity = Integer.valueOf(100);
    private static final Integer defaultTransCapacity = Integer.valueOf(100);
    
    public MemoryChannel() {
    }

    ...
}

咱們又看到它繼承了BasicChannelSemantics ,從名字咱們能夠看出它是一個基礎的Channel,咱們繼續看看看它的實現debug

@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class BasicChannelSemantics extends AbstractChannel {

  private ThreadLocal<BasicTransactionSemantics> currentTransaction
      = new ThreadLocal<BasicTransactionSemantics>();

  private boolean initialized = false;

  protected void initialize() {}

  protected abstract BasicTransactionSemantics createTransaction();

  @Override
  public void put(Event event) throws ChannelException {
    BasicTransactionSemantics transaction = currentTransaction.get();
    Preconditions.checkState(transaction != null,
        "No transaction exists for this thread");
    transaction.put(event);
  }

  @Override
  public Event take() throws ChannelException {
    BasicTransactionSemantics transaction = currentTransaction.get();
    Preconditions.checkState(transaction != null,
        "No transaction exists for this thread");
    return transaction.take();
  }

  @Override
  public Transaction getTransaction() {

    if (!initialized) {
      synchronized (this) {
        if (!initialized) {
          initialize();
          initialized = true;
        }
      }
    }

    BasicTransactionSemantics transaction = currentTransaction.get();
    if (transaction == null || transaction.getState().equals(
            BasicTransactionSemantics.State.CLOSED)) {
      transaction = createTransaction();
      currentTransaction.set(transaction);
    }
    return transaction;
  }
}

找了許久,終於發現了put()和take(),可是仔細一看,它們內部調用的是BasicTransactionSemantics 的put()和take(),有點失望,繼續來看看BasicTransactionSemanticscode

public abstract class BasicTransactionSemantics implements Transaction {

  private State state;
  private long initialThreadId;

  protected void doBegin() throws InterruptedException {}
  protected abstract void doPut(Event event) throws InterruptedException;
  protected abstract Event doTake() throws InterruptedException;
  protected abstract void doCommit() throws InterruptedException;
  protected abstract void doRollback() throws InterruptedException;
  protected void doClose() {}

  protected BasicTransactionSemantics() {
    state = State.NEW;
    initialThreadId = Thread.currentThread().getId();
  }

  protected void put(Event event) {
    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
        "put() called from different thread than getTransaction()!");
    Preconditions.checkState(state.equals(State.OPEN),
        "put() called when transaction is %s!", state);
    Preconditions.checkArgument(event != null,
        "put() called with null event!");

    try {
      doPut(event);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new ChannelException(e.toString(), e);
    }
  }

  protected Event take() {
    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
        "take() called from different thread than getTransaction()!");
    Preconditions.checkState(state.equals(State.OPEN),
        "take() called when transaction is %s!", state);

    try {
      return doTake();
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      return null;
    }
  }

  protected State getState() {
    return state;
  }

  ...//咱們這裏只是討論put和take,因此一些暫時不涉及的方法就被我幹掉,有興趣恩典朋友能夠自行閱讀

  protected static enum State {
    NEW, OPEN, COMPLETED, CLOSED
  }
}

又是一個抽象類,put()和take()內部調用的仍是抽象方法doPut()和doTake(),看到這裏,我相信沒有耐心的同窗已經崩潰了,可是就差最後一步了,既然是抽象類,那麼最終Channel所使用的確定是它的一個實現類,這時候咱們能夠回到一開始使用的MemoryChannel,到裏面找找有沒有線索,一看,MemoryChannel中就藏着個內部類component

private class MemoryTransaction extends BasicTransactionSemantics {
    private LinkedBlockingDeque<Event> takeList;
    private LinkedBlockingDeque<Event> putList;
    private final ChannelCounter channelCounter;
    private int putByteCounter = 0;
    private int takeByteCounter = 0;

    public MemoryTransaction(int transCapacity, ChannelCounter counter) {
      putList = new LinkedBlockingDeque<Event>(transCapacity);
      takeList = new LinkedBlockingDeque<Event>(transCapacity);

      channelCounter = counter;
    }

    @Override
    protected void doPut(Event event) throws InterruptedException {
      channelCounter.incrementEventPutAttemptCount();
      int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);

      if (!putList.offer(event)) {
        throw new ChannelException(
            "Put queue for MemoryTransaction of capacity " +
            putList.size() + " full, consider committing more frequently, " +
            "increasing capacity or increasing thread count");
      }
      putByteCounter += eventByteSize;
    }

    @Override
    protected Event doTake() throws InterruptedException {
      channelCounter.incrementEventTakeAttemptCount();
      if (takeList.remainingCapacity() == 0) {
        throw new ChannelException("Take list for MemoryTransaction, capacity " +
            takeList.size() + " full, consider committing more frequently, " +
            "increasing capacity, or increasing thread count");
      }
      if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
        return null;
      }
      Event event;
      synchronized (queueLock) {
        event = queue.poll();
      }
      Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
          "signalling existence of entry");
      takeList.put(event);

      int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
      takeByteCounter += eventByteSize;

      return event;
    }

   //...依然刪除暫時不須要的方法

  }

在這個類中咱們能夠看到doPut()和doTake()的實現方法,也明白MemoryChannel的put()和take()最終調用的是MemoryTransaction 的doPut()和doTake()。blog

有朋友看到這裏覺得此次解析就要結束了,其實好戲還在後頭,Channel中還有兩個重要的類ChannelProcessor和ChannelSelector,耐心地聽我慢慢道來。

3 ChannelProcessor

ChannelProcessor 的做用就是執行put操做,將數據放到channel裏面。每一個ChannelProcessor實例都會配備一個ChannelSelector來決定event要put到那個channl當中

public class ChannelProcessor implements Configurable {
    private static final Logger LOG = LoggerFactory.getLogger(ChannelProcessor.class);
    private final ChannelSelector selector;
    private final InterceptorChain interceptorChain;

    public ChannelProcessor(ChannelSelector selector) {
        this.selector = selector;
        this.interceptorChain = new InterceptorChain();
    }

    public void initialize() {
        this.interceptorChain.initialize();
    }

    public void close() {
        this.interceptorChain.close();
    }

    public void configure(Context context) {
        this.configureInterceptors(context);
    }

    private void configureInterceptors(Context context) {
        //配置攔截器
    }

    public ChannelSelector getSelector() {
        return this.selector;
    }

    public void processEventBatch(List<Event> events) {
        ...
        while(i$.hasNext()) {
            Event optChannel = (Event)i$.next();
            List tx = this.selector.getRequiredChannels(optChannel);

            ...//將event放到Required隊列

            t1 = this.selector.getOptionalChannels(optChannel);

            Object eventQueue;
            ...//將event放到Optional隊列
           
        }

        ...//event的分配操做

    }

    public void processEvent(Event event) {
        event = this.interceptorChain.intercept(event);
        if(event != null) {
            List requiredChannels = this.selector.getRequiredChannels(event);
            Iterator optionalChannels = requiredChannels.iterator();

            ...//event的分配操做

            List optionalChannels1 = this.selector.getOptionalChannels(event);
            Iterator i$1 = optionalChannels1.iterator();

            ...//event的分配操做
        }
    }
}

爲了簡化代碼,我進行了一些刪除,只保留須要講解的部分,說白了Channel中的兩個寫入方法,都是須要從做爲參數傳入的selector中獲取對應的channel來執行event的put操做。接下來咱們來看看ChannelSelector

4 ChannelSelector

ChannelSelector是一個接口,咱們能夠經過ChannelSelectorFactory來建立它的子類,Flume提供了兩個實現類MultiplexingChannelSelector和ReplicatingChannelSelector。

public interface ChannelSelector extends NamedComponent, Configurable {
    void setChannels(List<Channel> var1);

    List<Channel> getRequiredChannels(Event var1);

    List<Channel> getOptionalChannels(Event var1);

    List<Channel> getAllChannels();
}

經過ChannelSelectorFactory 的create來建立,create中調用getSelectorForType來得到一個selector,經過配置文件中的type來建立相應的子類

public class ChannelSelectorFactory {

  private static final Logger LOGGER = LoggerFactory.getLogger(
      ChannelSelectorFactory.class);

  public static ChannelSelector create(List<Channel> channels,
      Map<String, String> config) {

      ...
  }

  public static ChannelSelector create(List<Channel> channels,
      ChannelSelectorConfiguration conf) {
    String type = ChannelSelectorType.REPLICATING.toString();
    if (conf != null) {
      type = conf.getType();
    }
    ChannelSelector selector = getSelectorForType(type);
    selector.setChannels(channels);
    Configurables.configure(selector, conf);
    return selector;
  }

  private static ChannelSelector getSelectorForType(String type) {
    if (type == null || type.trim().length() == 0) {
      return new ReplicatingChannelSelector();
    }

    String selectorClassName = type;
    ChannelSelectorType  selectorType = ChannelSelectorType.OTHER;

    try {
      selectorType = ChannelSelectorType.valueOf(type.toUpperCase(Locale.ENGLISH));
    } catch (IllegalArgumentException ex) {
      LOGGER.debug("Selector type {} is a custom type", type);
    }

    if (!selectorType.equals(ChannelSelectorType.OTHER)) {
      selectorClassName = selectorType.getChannelSelectorClassName();
    }

    ChannelSelector selector = null;

    try {
      @SuppressWarnings("unchecked")
      Class<? extends ChannelSelector> selectorClass =
          (Class<? extends ChannelSelector>) Class.forName(selectorClassName);
      selector = selectorClass.newInstance();
    } catch (Exception ex) {
      throw new FlumeException("Unable to load selector type: " + type
          + ", class: " + selectorClassName, ex);
    }

    return selector;
  }

}

對於這兩種Selector簡單說一下:

1)MultiplexingChannelSelector
下面是一個channel selector 配置文件

agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

MultiplexingChannelSelector類中定義了三個屬性,用於存儲不一樣類型的channel

private Map<String, List<Channel>> channelMapping;
    private Map<String, List<Channel>> optionalChannels;
    private List<Channel> defaultChannels;

那麼具體分配原則以下:

  • 若是設置了maping,那麼會event確定會給指定的channel,若是同時設置了optional,也會發送給optionalchannel
  • 若是沒有設置maping,設置default,那麼event會發送給defaultchannel,若是還同時設置了optional,那麼也會發送給optionalchannel
  • 若是maping和default都沒指定,若是有指定option,那麼會發送給optionalchannel,可是發送給optionalchannel不會進行失敗重試

2)ReplicatingChannelSelector

分配原則比較簡單

  • 若是是replicating的話,那麼若是沒有指定optional,那麼所有channel都有,若是某個channel指定爲option的話,那麼就要從requiredChannel移除,只發送給optionalchannel

5 總結:

做爲一個承上啓下的組件,Channel的做用就是將source來的數據經過本身流向sink,那麼ChannelProcessor就起到將event put到分配好的channel中,而分配的規則是由selector決定的,flume提供的selector有multiplexing和replicating兩種。因此ChannelProcessor通常都是在Source中被調用。那麼Channel的take()確定是在Sink中調用的。

相關文章
相關標籤/搜索