Flume-NG中Transaction併發性探究

  咱們曾經在Flume-NG中的Channel與Transaction關係(原創)這篇文章中說了channel和Transaction的關係,可是在source和sink中都會使用Transaction,那麼Transaction的併發性如何?html

  Transaction是介於channel和source、sink直接的一層緩存,爲了安全性和可靠性source、sink不能直接訪問channel,只能訪問在他之上的Transaction,經過Transaction間接操做channel中的數據。緩存

  這節咱們以memory channel和file channel來研究一下Flume-NG的Transaction併發性。安全

  首先全部channel的父類都是BasicChannelSemantics,全部Transaction的父類都是BasicTransactionSemantics,mem channel中的Transaction是MemoryTransaction是內部私有類;file channel的Transaction是FileBackedTransaction。通常來講本身定義channel須要實現本身的Transaction。多線程

  咱們在看源碼時發現FileBackedTransaction不容許take和put同時操做,在其doCommit和doRollback方法中都有限制使得,究其緣由是:。而mem channel的Transaction則沒有諸多限制。併發

  咱們在source和sink中見到的getTransaction()獲取的Transaction是同一個嗎?若是不是併發性是怎麼保證的?第一個很明顯不是同一個,試想若是都是同一個,那麼不一樣組件好比一個source和一個sink都會有Transaction.close操做,將會關閉事務,那關閉晚的還如何commit?咱們再來看下getTransaction()代碼:  ide

 1   /**
 2    * <p>
 3    * Initializes the channel if it is not already, then checks to see
 4    * if there is an open transaction for this thread, creating a new
 5    * one via <code>createTransaction</code> if not.
 6    * @return the current <code>Transaction</code> object for the
 7    *     calling thread
 8    * </p>
 9    */
10   @Override
11   public Transaction getTransaction() {
12 
13     if (!initialized) {
14       synchronized (this) {
15         if (!initialized) {
16           initialize();
17           initialized = true;
18         }
19       }
20     }
21 
22     BasicTransactionSemantics transaction = currentTransaction.get();
23     if (transaction == null || transaction.getState().equals(
24             BasicTransactionSemantics.State.CLOSED)) {
25       transaction = createTransaction();
26       currentTransaction.set(transaction);
27     }
28     return transaction;
29   }

  上面咱們能夠看出來,若是transaction還未初始化或者transaction的狀態是CLOSED(就是執行了close()方法改了狀態),說明須要經過createTransaction()新建一個Transaction,createTransaction()這個方法在子類中實現的。咱們來看看mem和file的createTransaction()方法的代碼,先看mem的:高併發

1 @Override
2   protected BasicTransactionSemantics createTransaction() {
3     return new MemoryTransaction(transCapacity, channelCounter);
4   }

  直接就返回了本身的Transaction對象,在看file的createTransaction()方法的代碼:性能

 1 @Override
 2   protected BasicTransactionSemantics createTransaction() {
 3     if(!open) {
 4       String msg = "Channel closed " + channelNameDescriptor;
 5       if(startupError != null) {
 6         msg += ". Due to " + startupError.getClass().getName() + ": " +
 7             startupError.getMessage();
 8         throw new IllegalStateException(msg, startupError);
 9       }
10       throw new IllegalStateException(msg);
11     }
12     FileBackedTransaction trans = transactions.get();
13     if(trans != null && !trans.isClosed()) {        //在這保證put和take只能一個時刻有一個
14       Preconditions.checkState(false,
15           "Thread has transaction which is still open: " +
16               trans.getStateAsString()  + channelNameDescriptor);
17     }
18     trans = new FileBackedTransaction(log, TransactionIDOracle.next(),
19         transactionCapacity, keepAlive, queueRemaining, getName(),
20         channelCounter);
21     transactions.set(trans);
22     return trans;
23   }

  這個就比mem的複雜了點,畢竟代碼多了很多。優化

  在看上面的getTransaction()方法,若是已經建立了一個Transaction則會放入currentTransaction中,而後之後再調用getTransaction()就會經過currentTransaction返回currentTransaction.get(),這莫不是同一個Transaction嗎?那就好像有點不對了,對吧,那究竟是怎麼回事呢?this

  關鍵在於currentTransaction這個東西,咱們看聲明:private ThreadLocal<BasicTransactionSemantics> currentTransaction = new ThreadLocal<BasicTransactionSemantics>()是ThreadLocal的實例,可能有不少人不瞭解這個東西,其實我也不瞭解!!簡單來講:ThreadLocal使得各線程可以保持各自獨立的一個對象,ThreadLocal並非一個Thread,而是Thread的局部變量,爲解決多線程程序的併發問題提供了一種新的思路,詳細請谷歌、百度之。ThreadLocal有一個ThreadLocalMap靜態內部類,你能夠簡單理解爲一個MAP,這個‘Map’爲每一個線程複製一個變量的‘拷貝’存儲其中,這個「Map」的key是當前線程的ID,value就是set的變量,而get方法會依據當前線程ID從ThreadLocalMap中獲取對應的變量,我們這裏就是Transaction。這下明白了吧,每一個source和sink都會有單獨的線程來驅動的,因此都有各自的Transaction,是不一樣的,所以也就能夠併發了(針對memory channel)。

  可是上面file的createTransaction()方法爲何是那樣的?由於咱們說了file的Transaction不能同時put和take(同一個Transaction通常只會作一個事就是put或者take),也就是不存在併發性的,因此在file channel中的transactions也設置爲了private final ThreadLocal<FileBackedTransaction> transactions =new ThreadLocal<FileBackedTransaction>(),因爲channel也是單獨的線程驅使,因此這個transactions中始終只存在一對值就是file channel的線程ID和建立的Transaction,若是不一樣sink或者source調用getTransaction()時會試圖經過createTransaction()方法來建立新的Transaction可是file的createTransaction()方法因爲已經有了一個Transaction,在其關閉以前是不會贊成 再次建立的,因此就只能等待這個Transaction關閉了,所以也就保證了put和take不會同時存在了。也就沒有併發性了,性能天然大受影響。

  那file channel爲何不讓put和take同時操做呢?這個問題很值得研究,一:put、take、commit、rollback都會獲取log的共享鎖,一旦獲取其餘就只能讀,獲取鎖的目的就是這四個操做都要寫入log文件;二,put操做並不會將寫入log的event的指針放到queue中,而是在commit中才會放到queue中;3、take的操做會直接從queue中取數據,這時若是put已經commit就能夠獲取數據,若是沒有則會返回null;4、因爲四個操做都會獲取log鎖,致使實際上寫達不到併發,並且這個log鎖使得即便是寫不一樣的數據文件也不可能,由於只有這一個鎖,不是每一個數據文件一個鎖(數據文件的個數是動態的這個很差作);5、若take和put同時操做會使得可能交替執行獲取鎖,此時可能put沒commit而queue中無數據,take獲取鎖以後也沒什麼意義並且也是輪流不是並行,只會下降put和take的性能,好比put和take各自單獨只需1s便可,可是這樣可能須要2s甚至更長時間(take一直在等待put的commit)才能完成。綜上不讓put和take同時操做比較合理。

  可是有沒有更好的方案能夠提升file的性能呢?由於file是基於文件的性能不可能很高,更爲合理的辦法是合理提升併發性,能夠優化的一個方案是put、take、commit、rollback單獨以文件存放,並設置相應的多個鎖,可是文件的動態變化以及put和put、take和take、commit和commit、rollback和rollback之間的併發性又難以實現了,彷佛只適合take和put的併發,這樣貌似會使得file channel更復雜了,可是性能應該會提升一些,會不會得不償失啊?

 

  還有一個問題就是:file channel中的createTransaction()方法若是再次建立Transaction,而先前建立的並未關閉,會執行Preconditions.checkState(false,"Thread has transaction which is still open: " +trans.getStateAsString()+ channelNameDescriptor)會直接拋出異常,可是彷佛日誌中沒有相似的異常啊,並且進程也並未中斷,可是顯然使用了file channel的flume,sink和source能夠正常運行,這是怎麼搞得?

相關文章
相關標籤/搜索