SpillableMemoryChannel是1.5版本新增的一個channel。這個channel優先將evnet放在內存中,一旦內存達到設定的容量就使用file channel寫入磁盤。而後讀的時候會按照順序讀取:會經過一個DrainOrderQueue來保證不論是內存中的仍是溢出(本文的「溢出」指的是內存channel已滿,須要使用file channel存儲數據)文件中的順序。這個Channel是memory channel和file channel的一個折中,雖然在內存中的數據仍然可能由於進程的忽然中斷而丟失,可是相對於memory channel而言一旦sink處理速度跟不上不至於丟失數據(後者一旦滿了爆發異常會丟失後續的數據),提升了數據的可靠性;相對於file channel而言天然是大大提升了速度,可是可靠性較file channel有所下降。html
咱們來看一下SpillableMemoryChannel的繼承結構:SpillableMemoryChannel extends FileChannel,原來SpillableMemoryChannel是file的子類,天熱具備file channel的特性。可是它的BasicTransactionSemantics是本身實現的。接下來咱們來分析分析這個channel,這個channel能夠當作是兩個channel。相關內容傳送門:Flume-NG源碼閱讀之FileChannel 和 flume-ng源碼閱讀memory-channel(原創) 。數組
Stack
,在用做隊列時快於 LinkedList,可是不是線程安全的不支持多線程併發操做;put操做老是對queue中的最後(尾)一個元素操做,take操做老是對queue中第一個(頭)操做;put時,若是是內存channel,在queue增長的就是正數,若是是溢出操做增長的就是負數,內存和溢出分別對應queue中不一樣的元素(能夠分類去讀);take時,若是從內存中取數據,就會使得queue第一個元素的值不斷縮小(正數)至0,而後刪除這個元素,若是是從溢出文件中取數據則會使得queue中第一個元素不斷增大(負數)至0,而後刪除這個元素;這樣就會造成流,使得put不斷追加數據到流中,take不斷從流中取數據,這個流就是有序的,且流中元素其實就是內存中的evnet個數和溢出文件中event的個數。
1 public static class DrainOrderQueue { 2 public ArrayDeque<MutableInteger> queue = new ArrayDeque<MutableInteger>(1000); 3 4 public int totalPuts = 0; // for debugging only 5 private long overflowCounter = 0; // # of items in overflow channel 6 7 public String dump() { 8 StringBuilder sb = new StringBuilder(); 9 10 sb.append(" [ "); 11 for (MutableInteger i : queue) { 12 sb.append(i.intValue()); 13 sb.append(" "); 14 } 15 sb.append("]"); 16 return sb.toString(); 17 } 18 19 public void putPrimary(Integer eventCount) { 20 totalPuts += eventCount; 21 if ( (queue.peekLast() == null) || queue.getLast().intValue() < 0) { //獲取,但不移除此雙端隊列的最後一個元素;若是此雙端隊列爲空,則返回 null 22 queue.addLast(new MutableInteger(eventCount)); 23 } else { 24 queue.getLast().add(eventCount);//獲取,但不移除此雙端隊列的第一個元素。 25 } 26 } 27 28 public void putFirstPrimary(Integer eventCount) { 29 if ( (queue.peekFirst() == null) || queue.getFirst().intValue() < 0) { //獲取,但不移除此雙端隊列的第一個元素;若是此雙端隊列爲空,則返回 null。 30 queue.addFirst(new MutableInteger(eventCount)); 31 } else { 32 queue.getFirst().add(eventCount);//獲取,但不移除此雙端隊列的第一個元素。 33 } 34 } 35 36 public void putOverflow(Integer eventCount) { 37 totalPuts += eventCount; 38 if ( (queue.peekLast() == null) || queue.getLast().intValue() > 0) { 39 queue.addLast(new MutableInteger(-eventCount)); 40 } else { 41 queue.getLast().add(-eventCount); 42 } 43 overflowCounter += eventCount; 44 } 45 46 public void putFirstOverflow(Integer eventCount) { 47 if ( (queue.peekFirst() == null) || queue.getFirst().intValue() > 0) { 48 queue.addFirst(new MutableInteger(-eventCount)); 49 } else { 50 queue.getFirst().add(-eventCount); 51 } 52 overflowCounter += eventCount; 53 } 54 55 public int front() { 56 return queue.getFirst().intValue(); 57 } 58 59 public boolean isEmpty() { 60 return queue.isEmpty(); 61 } 62 63 public void takePrimary(int takeCount) { 64 MutableInteger headValue = queue.getFirst(); 65 66 // this condition is optimization to avoid redundant conversions of 67 // int -> Integer -> string in hot path 68 if (headValue.intValue() < takeCount) { 69 throw new IllegalStateException("Cannot take " + takeCount + 70 " from " + headValue.intValue() + " in DrainOrder Queue"); 71 } 72 73 headValue.add(-takeCount); 74 if (headValue.intValue() == 0) { 75 queue.removeFirst(); 76 } 77 } 78 79 public void takeOverflow(int takeCount) { 80 MutableInteger headValue = queue.getFirst(); 81 if(headValue.intValue() > -takeCount) { 82 throw new IllegalStateException("Cannot take " + takeCount + " from " 83 + headValue.intValue() + " in DrainOrder Queue head " ); 84 } 85 86 headValue.add(takeCount); 87 if (headValue.intValue() == 0) { 88 queue.removeFirst(); //獲取並移除此雙端隊列第一個元素。 89 } 90 overflowCounter -= takeCount; 91 } 92 93 }
咱們一個方法一個方法的來剖析這個類:安全
(1)dump(),這個方法比較簡單就是得到queue中全部元素的數據量;多線程
(2)putPrimary(Integer eventCount),這個方法用在put操做的commit時,在commitPutsToPrimary()方法中被調用,表示向內存提交數據。這個方法會嘗試獲取queue中最後一個元素,若是爲空(說明沒數據)或者元素數值小於0(說明這個元素是面向溢出文件的),就新建一個元素賦值這個事務的event數量加入queue;不然表示當前是的元素表徵的是內存中的event數量,直接累加便可。併發
(3)putFirstPrimary(Integer eventCount),在doRollback()回滾的時候被調用,表示將takeList中的數據放回內存memQueue的頭。這個方法會嘗試獲取queue中第一個元素,若是爲空(說明沒數據)或者元素數值小於0(說明這個元素是面向溢出文件的),就新建一個元素賦值takeList的event數量加入queue;不然表示當前是的元素表徵的是內存中的event數量,直接累加便可。app
(4)putOverflow(Integer eventCount),這個方法發生在put操做的commit時,在commitPutsToOverflow_core方法和start()方法中,後者是設置初始量,前者表示內存channel已滿要溢出到file channel。這個方法會嘗試獲取queue中最後一個元素,若是爲空(說明沒數據)或者元素數值大於0(表示這個元素是面向內存的),就新建一個元素賦值這個事務的event數量加入queue,這裏賦值爲負數;不然表示當前是的元素表徵的是溢出文件中的event數量,直接累加負數便可。ide
(5)putFirstOverflow(Integer eventCount),在doRollback()回滾的時候被調用,表示將takeList中event的數量放回溢出文件。這個方法會嘗試獲取queue中第一個元素,若是爲空(說明沒數據)或者元素數值大於0(表示這個元素是面向內存的),就新建一個元素賦值這個事務的 event數量加入queue,這裏賦值爲負數;不然表示當前是的元素表徵的是溢出到文件中的event數量,直接累加負數便可。ui
(6)front(),返回queue中第一個元素的值this
(7)takePrimary(int takeCount),這個方法在doTake()中被調用,表示take發生以後,要將內存中的event數量減takeCount(這個值通常都是1,即每次取一個)。這個方法會獲取第一個元素的值(表示內存channel中有多少event),若是這個值比takeCount小,說明內存中沒有足夠的數量,這種狀況不該該發生,報錯;不然將這個元素的值減去takeCount,表示已取出takeCount個。最後若是這個元素的值爲0,則從queue中刪除這個元素。注意這裏雖然是能夠取takeCount個,可是源碼調用這個參數都是一次取1個而已。spa
(8)takeOverflow(int takeCount),這個方法在doTake()中被調用,表示take發生以後,要將溢出文件中的event數量加上takeCount(這個值通常都是1,即每次取一個)。這個 方法會獲取第一個元素的值(表示溢出文件中有多少event),若是這個值比takeCount的負值大,說明文件中沒有足夠的數量,這種狀況不該該發生,報錯;不然將這個元素的值加上takeCount,表示已取出takeCount個。最後若是這個元素的值爲0,則從queue中刪除這個元素。注意這裏雖然是能夠取 takeCount個,可是源碼調用這個參數都是一次取1個而已。
1 protected void doPut(Event event) throws InterruptedException { 2 channelCounter.incrementEventPutAttemptCount(); 3 4 putCalled = true; //說明是在put操做 5 int eventByteSize = (int)Math.ceil(estimateEventSize(event)/ avgEventSize);//獲取這個event能夠佔用幾個slot 6 if (!putList.offer(event)) { //加入putList 7 throw new ChannelFullException("Put queue in " + getName() + 8 " channel's Transaction having capacity " + putList.size() + 9 " full, consider reducing batch size of sources"); 10 } 11 putListByteCount += eventByteSize; 12 }
這個方法比較簡單,就是put開始;設置putCalled爲true表示put操做;計算佔用slot個數;將event放入putList等待commit操做;putListByteCount加上這個evnet佔用的slot數。
1 protected Event doTake() throws InterruptedException { 2 channelCounter.incrementEventTakeAttemptCount(); 3 if (!totalStored.tryAcquire(overflowTimeout, TimeUnit.SECONDS)) { 4 LOGGER.debug("Take is backing off as channel is empty."); 5 return null; 6 } 7 boolean takeSuceeded = false; 8 try { 9 Event event; 10 synchronized(queueLock) { 11 int drainOrderTop = drainOrder.front(); 12 13 if (!takeCalled) { 14 takeCalled = true; 15 if (drainOrderTop < 0) { 16 useOverflow = true; 17 overflowTakeTx = getOverflowTx(); //獲取file channle的事務 18 overflowTakeTx.begin(); 19 } 20 } 21 22 if (useOverflow) { 23 if (drainOrderTop > 0) { 24 LOGGER.debug("Take is switching to primary"); 25 return null; // takes should now occur from primary channel 26 } 27 28 event = overflowTakeTx.take(); 29 ++takeCount; 30 drainOrder.takeOverflow(1); 31 } else { 32 if (drainOrderTop < 0) { 33 LOGGER.debug("Take is switching to overflow"); 34 return null; // takes should now occur from overflow channel 35 } 36 37 event = memQueue.poll(); //獲取並移除此雙端隊列所表示的隊列的頭(換句話說,此雙端隊列的第一個元素);若是此雙端隊列爲空,則返回 null。 38 ++takeCount; 39 drainOrder.takePrimary(1); 40 Preconditions.checkNotNull(event, "Queue.poll returned NULL despite" 41 + " semaphore signalling existence of entry"); 42 } 43 } 44 45 int eventByteSize = (int)Math.ceil(estimateEventSize(event)/ avgEventSize); 46 if (!useOverflow) { 47 // takeList is thd pvt, so no need to do this in synchronized block 48 takeList.offer(event); 49 } 50 51 takeListByteCount += eventByteSize; 52 takeSuceeded = true; 53 return event; 54 } finally { 55 if(!takeSuceeded) { 56 totalStored.release(); 57 } 58 } 59 }
因爲ArrayDeque是非線程安全的(memQueue就是ArrayDeque),因此take操做從memQueue獲取數據時,要獨佔memQueue。任何對memQueue都要進行同步,這裏是同步queueLock。
doTake方法會先檢查totalStored中有無許可,即channel中有無數據;而後同步;再獲取drainOrder的頭元素,若是takeCalled爲false(初始爲false),則設置其爲true,再判斷獲取到的drainOrder頭元素的值是否爲負數,負數說明數據在溢出文件中,設置useOverflow爲true表示要從溢出文件中讀取數據而且獲取file channel的FileBackedTransaction賦值給overflowTakeTx,begin()能夠獲取數據。若是useOverflow爲true則轉到調用overflowTakeTx.take獲取event,而後takeCount自增1,調用drainOrder.takeOverflow(1)修改隊列中溢出event數量的值。若是useOverflow爲false說明數據在內存中,直接調用memQueue.poll()得到event,而後takeCount自增1,調用drainOrder.takePrimary(1)修改隊列中內存中evnet數量的值。而後計算這個event佔用的slot數。若是是從內存channel中讀取的event則將其放入takeList中;takeListByteCount加上這個evnet佔用的slot數。最後返回event。
1 protected void doRollback() { 2 LOGGER.debug("Rollback() of " + 3 (takeCalled ? " Take Tx" : (putCalled ? " Put Tx" : "Empty Tx"))); 4 5 if (putCalled) { 6 if (overflowPutTx!=null) { 7 overflowPutTx.rollback(); 8 } 9 if (!useOverflow) { 10 bytesRemaining.release(putListByteCount); 11 putList.clear(); 12 } 13 putListByteCount = 0; 14 } else if (takeCalled) { 15 synchronized(queueLock) { 16 if (overflowTakeTx!=null) { 17 overflowTakeTx.rollback(); 18 } 19 if (useOverflow) { 20 drainOrder.putFirstOverflow(takeCount); 21 } else { 22 int remainingCapacity = memoryCapacity - memQueue.size(); 23 Preconditions.checkState(remainingCapacity >= takeCount, 24 "Not enough space in memory queue to rollback takes. This" + 25 " should never happen, please report"); 26 while (!takeList.isEmpty()) { 27 memQueue.addFirst(takeList.removeLast()); 28 } 29 drainOrder.putFirstPrimary(takeCount); 30 } 31 } 32 totalStored.release(takeCount); 33 } else { 34 overflowTakeTx.rollback(); 35 } 36 channelCounter.setChannelSize(memQueue.size() + drainOrder.overflowCounter); 37 }
若是putCalled爲true,則代表正在進行的是put操做。若是overflowPutTx不爲null,說明是在溢出,執行overflowPutTx的roolback方法進行回滾。若是沒有溢出,則bytesRemaining釋放putListByteCount許可,表示騰出putListByteCount個slot;清空putList;最後將putListByteCount置爲0。若是takeCalled爲true,說明正在進行的操做是take,若是overflowTakeTx不爲null,說明是在溢出,執行overflowTakeTx的roolback方法進行回滾;若是在溢出,則調用drainOrder.putFirstOverflow(takeCount)修改queue中溢出文件中的event的數量;若是在使用內存channel,則計算出內存channel中還能夠最多存儲event的數量,若是這個數量小於takeCount,則報錯,不然將takeList中的全部event加入memQueue的頭部,執行drainOrder.putFirstPrimary(takeCount)來修改queue中內存channel存放的event的數量;而後totalStored釋放takeCount個許可,表示內存channel中增長了takeCount個event。
5、stop方法,會調用父類file channel中的stop方法。
6、createTransaction()方法,直接返回一個SpillableMemoryTransaction對象。這說明take和put能夠併發執行,可是當涉及到memQueue時,仍是須要同步。
至此,這個新的channel介紹完了。整體來講SpillableMemoryChannel是精心設計的一個channel,兼顧Flume內置的file channel和memory channel的優勢,又增長了一個選擇,大夥可根據須要選擇合適的channel。