Flume-NG中的Channel與Transaction關係(原創)

  在sink和source中(無論是內置仍是自定義的),基本都有以下代碼,這些代碼在sink中的process方法中,而在source中本身不須要去寫,在source中getChannelProcessor().processEventBatch(events)方法中會自動建立下面相似的:  緩存

    ...
    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();
    Event event = null;
    Status result = Status.READY;
    transaction.begin();
    ...
    event = channel.take();//getChannelProcessor().processEvent(event);,前者用於sink後者用於source
    ...
    transaction.commit();
    transaction.rollback()
    transaction.close();
    ...

  那麼有些人就要問了?從上述代碼中彷佛只須要獲取channel就能夠了,由於獲取數據時只須要event = channel.take()或者ide

getChannelProcessor().processEvent(event)?這樣對嗎?你能夠去掉transaction試試,結果顯示是不行的,出錯!

  那麼爲何呢?這確實有點讓人疑惑,但實際上channel.take()操做是transaction.doTake()。也就是實際的put和take等操做都是在transaction中進行的,所以要用channel必需要先建立transcation纔可使用。而channel.getTransaction()方法就是獲取(已經建立)或建立(尚未)transcation,BasicChannelSemantics的相對應代碼以下:  this

@Override
  public Transaction getTransaction() {

    if (!initialized) {
      synchronized (this) {
        if (!initialized) {
          initialize();
          initialized = true;
        }
      }
    }

    BasicTransactionSemantics transaction = currentTransaction.get();//獲取transcation
    if (transaction == null || transaction.getState().equals(//若是transaction不存在或者已關閉就建立
            BasicTransactionSemantics.State.CLOSED)) {
      transaction = createTransaction();//建立
      currentTransaction.set(transaction);//賦值給currentTransaction
    }
    return transaction;
  }

  該方法在全部channel的父類BasicChannelSemantics中,而後在具體實現的channel類中須要實現protected abstract BasicTransactionSemantics createTransaction()這個抽象方法來獲取相應的transaction對象。BasicChannelSemantics把transaction.take()和transaction.put(event)方法進一步封裝成take()和put(event)方法,這倆方法就是暴露在sink或者source中的channel.take()和channel.put(event)方法。spa

 @Override
  public void put(Event event) throws ChannelException {
    BasicTransactionSemantics transaction = currentTransaction.get();
    Preconditions.checkState(transaction != null,
        "No transaction exists for this thread");
    transaction.put(event);
  }

  @Override
  public Event take() throws ChannelException {
    BasicTransactionSemantics transaction = currentTransaction.get();
    Preconditions.checkState(transaction != null,
        "No transaction exists for this thread");
    return transaction.take();
  }

  由此,能夠看出工做行程了吧!code

  Transaction transaction = channel.getTransaction();這一句至少要執行一次,由於執行一次以後就會將transcation對象緩存到currentTransaction中,後續就不會再建立transaction了。
相關文章
相關標籤/搜索