咱們曾經在Flume-NG中的Channel與Transaction關係(原創)這篇文章中說了channel和Transaction的關係,可是在source和sink中都會使用Transaction,那麼Transaction的併發性如何?html
Transaction是介於channel和source、sink直接的一層緩存,爲了安全性和可靠性source、sink不能直接訪問channel,只能訪問在他之上的Transaction,經過Transaction間接操做channel中的數據。緩存
這節咱們以memory channel和file channel來研究一下Flume-NG的Transaction併發性。安全
首先全部channel的父類都是BasicChannelSemantics,全部Transaction的父類都是BasicTransactionSemantics,mem channel中的Transaction是MemoryTransaction是內部私有類;file channel的Transaction是FileBackedTransaction。通常來講本身定義channel須要實現本身的Transaction。多線程
咱們在看源碼時發現FileBackedTransaction不容許take和put同時操做,在其doCommit和doRollback方法中都有限制使得,究其緣由是:。而mem channel的Transaction則沒有諸多限制。併發
咱們在source和sink中見到的getTransaction()獲取的Transaction是同一個嗎?若是不是併發性是怎麼保證的?第一個很明顯不是同一個,試想若是都是同一個,那麼不一樣組件好比一個source和一個sink都會有Transaction.close操做,將會關閉事務,那關閉晚的還如何commit?咱們再來看下getTransaction()代碼: ide
1 /** 2 * <p> 3 * Initializes the channel if it is not already, then checks to see 4 * if there is an open transaction for this thread, creating a new 5 * one via <code>createTransaction</code> if not. 6 * @return the current <code>Transaction</code> object for the 7 * calling thread 8 * </p> 9 */ 10 @Override 11 public Transaction getTransaction() { 12 13 if (!initialized) { 14 synchronized (this) { 15 if (!initialized) { 16 initialize(); 17 initialized = true; 18 } 19 } 20 } 21 22 BasicTransactionSemantics transaction = currentTransaction.get(); 23 if (transaction == null || transaction.getState().equals( 24 BasicTransactionSemantics.State.CLOSED)) { 25 transaction = createTransaction(); 26 currentTransaction.set(transaction); 27 } 28 return transaction; 29 }
上面咱們能夠看出來,若是transaction還未初始化或者transaction的狀態是CLOSED(就是執行了close()方法改了狀態),說明須要經過createTransaction()新建一個Transaction,createTransaction()這個方法在子類中實現的。咱們來看看mem和file的createTransaction()方法的代碼,先看mem的:高併發
1 @Override 2 protected BasicTransactionSemantics createTransaction() { 3 return new MemoryTransaction(transCapacity, channelCounter); 4 }
直接就返回了本身的Transaction對象,在看file的createTransaction()方法的代碼:性能
1 @Override 2 protected BasicTransactionSemantics createTransaction() { 3 if(!open) { 4 String msg = "Channel closed " + channelNameDescriptor; 5 if(startupError != null) { 6 msg += ". Due to " + startupError.getClass().getName() + ": " + 7 startupError.getMessage(); 8 throw new IllegalStateException(msg, startupError); 9 } 10 throw new IllegalStateException(msg); 11 } 12 FileBackedTransaction trans = transactions.get(); 13 if(trans != null && !trans.isClosed()) { //在這保證put和take只能一個時刻有一個 14 Preconditions.checkState(false, 15 "Thread has transaction which is still open: " + 16 trans.getStateAsString() + channelNameDescriptor); 17 } 18 trans = new FileBackedTransaction(log, TransactionIDOracle.next(), 19 transactionCapacity, keepAlive, queueRemaining, getName(), 20 channelCounter); 21 transactions.set(trans); 22 return trans; 23 }
這個就比mem的複雜了點,畢竟代碼多了很多。優化
在看上面的getTransaction()方法,若是已經建立了一個Transaction則會放入currentTransaction中,而後之後再調用getTransaction()就會經過currentTransaction返回currentTransaction.get(),這莫不是同一個Transaction嗎?那就好像有點不對了,對吧,那究竟是怎麼回事呢?this
關鍵在於currentTransaction這個東西,咱們看聲明:private ThreadLocal<BasicTransactionSemantics> currentTransaction = new ThreadLocal<BasicTransactionSemantics>()是ThreadLocal的實例,可能有不少人不瞭解這個東西,其實我也不瞭解!!簡單來講:ThreadLocal使得各線程可以保持各自獨立的一個對象,ThreadLocal並非一個Thread,而是Thread的局部變量,爲解決多線程程序的併發問題提供了一種新的思路,詳細請谷歌、百度之。ThreadLocal有一個ThreadLocalMap靜態內部類,你能夠簡單理解爲一個MAP,這個‘Map’爲每一個線程複製一個變量的‘拷貝’存儲其中,這個「Map」的key是當前線程的ID,value就是set的變量,而get方法會依據當前線程ID從ThreadLocalMap中獲取對應的變量,我們這裏就是Transaction。這下明白了吧,每一個source和sink都會有單獨的線程來驅動的,因此都有各自的Transaction,是不一樣的,所以也就能夠併發了(針對memory channel)。
可是上面file的createTransaction()方法爲何是那樣的?由於咱們說了file的Transaction不能同時put和take(同一個Transaction通常只會作一個事就是put或者take),也就是不存在併發性的,因此在file channel中的transactions也設置爲了private final ThreadLocal<FileBackedTransaction> transactions =new ThreadLocal<FileBackedTransaction>(),因爲channel也是單獨的線程驅使,因此這個transactions中始終只存在一對值就是file channel的線程ID和建立的Transaction,若是不一樣sink或者source調用getTransaction()時會試圖經過createTransaction()方法來建立新的Transaction可是file的createTransaction()方法因爲已經有了一個Transaction,在其關閉以前是不會贊成 再次建立的,因此就只能等待這個Transaction關閉了,所以也就保證了put和take不會同時存在了。也就沒有併發性了,性能天然大受影響。
那file channel爲何不讓put和take同時操做呢?這個問題很值得研究,一:put、take、commit、rollback都會獲取log的共享鎖,一旦獲取其餘就只能讀,獲取鎖的目的就是這四個操做都要寫入log文件;二,put操做並不會將寫入log的event的指針放到queue中,而是在commit中才會放到queue中;3、take的操做會直接從queue中取數據,這時若是put已經commit就能夠獲取數據,若是沒有則會返回null;4、因爲四個操做都會獲取log鎖,致使實際上寫達不到併發,並且這個log鎖使得即便是寫不一樣的數據文件也不可能,由於只有這一個鎖,不是每一個數據文件一個鎖(數據文件的個數是動態的這個很差作);5、若take和put同時操做會使得可能交替執行獲取鎖,此時可能put沒commit而queue中無數據,take獲取鎖以後也沒什麼意義並且也是輪流不是並行,只會下降put和take的性能,好比put和take各自單獨只需1s便可,可是這樣可能須要2s甚至更長時間(take一直在等待put的commit)才能完成。綜上不讓put和take同時操做比較合理。
可是有沒有更好的方案能夠提升file的性能呢?由於file是基於文件的性能不可能很高,更爲合理的辦法是合理提升併發性,能夠優化的一個方案是put、take、commit、rollback單獨以文件存放,並設置相應的多個鎖,可是文件的動態變化以及put和put、take和take、commit和commit、rollback和rollback之間的併發性又難以實現了,彷佛只適合take和put的併發,這樣貌似會使得file channel更復雜了,可是性能應該會提升一些,會不會得不償失啊?
還有一個問題就是:file channel中的createTransaction()方法若是再次建立Transaction,而先前建立的並未關閉,會執行Preconditions.checkState(false,"Thread has transaction which is still open: " +trans.getStateAsString()+ channelNameDescriptor)會直接拋出異常,可是彷佛日誌中沒有相似的異常啊,並且進程也並未中斷,可是顯然使用了file channel的flume,sink和source能夠正常運行,這是怎麼搞得?