在sink和source中(無論是內置仍是自定義的),基本都有以下代碼,這些代碼在sink中的process方法中,而在source中本身不須要去寫,在source中getChannelProcessor().processEventBatch(events)方法中會自動建立下面相似的: 緩存
... Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); Event event = null; Status result = Status.READY; transaction.begin(); ... event = channel.take();//getChannelProcessor().processEvent(event);,前者用於sink後者用於source ... transaction.commit(); transaction.rollback() transaction.close(); ...
那麼有些人就要問了?從上述代碼中彷佛只須要獲取channel就能夠了,由於獲取數據時只須要event = channel.take()或者ide
getChannelProcessor().processEvent(event)?這樣對嗎?你能夠去掉transaction試試,結果顯示是不行的,出錯!
那麼爲何呢?這確實有點讓人疑惑,但實際上channel.take()操做是transaction.doTake()。也就是實際的put和take等操做都是在transaction中進行的,所以要用channel必需要先建立transcation纔可使用。而channel.getTransaction()方法就是獲取(已經建立)或建立(尚未)transcation,BasicChannelSemantics的相對應代碼以下: this
@Override public Transaction getTransaction() { if (!initialized) { synchronized (this) { if (!initialized) { initialize(); initialized = true; } } } BasicTransactionSemantics transaction = currentTransaction.get();//獲取transcation if (transaction == null || transaction.getState().equals(//若是transaction不存在或者已關閉就建立 BasicTransactionSemantics.State.CLOSED)) { transaction = createTransaction();//建立 currentTransaction.set(transaction);//賦值給currentTransaction } return transaction; }
該方法在全部channel的父類BasicChannelSemantics中,而後在具體實現的channel類中須要實現protected abstract BasicTransactionSemantics createTransaction()這個抽象方法來獲取相應的transaction對象。BasicChannelSemantics把transaction.take()和transaction.put(event)方法進一步封裝成take()和put(event)方法,這倆方法就是暴露在sink或者source中的channel.take()和channel.put(event)方法。spa
@Override public void put(Event event) throws ChannelException { BasicTransactionSemantics transaction = currentTransaction.get(); Preconditions.checkState(transaction != null, "No transaction exists for this thread"); transaction.put(event); } @Override public Event take() throws ChannelException { BasicTransactionSemantics transaction = currentTransaction.get(); Preconditions.checkState(transaction != null, "No transaction exists for this thread"); return transaction.take(); }
由此,能夠看出工做行程了吧!code
Transaction transaction = channel.getTransaction();這一句至少要執行一次,由於執行一次以後就會將transcation對象緩存到currentTransaction中,後續就不會再建立transaction了。