Flume-NG(1.5版本)中SpillableMemoryChannel源碼級分析

  SpillableMemoryChannel是1.5版本新增的一個channel。這個channel優先將evnet放在內存中,一旦內存達到設定的容量就使用file channel寫入磁盤。而後讀的時候會按照順序讀取:會經過一個DrainOrderQueue來保證不論是內存中的仍是溢出(本文的「溢出」指的是內存channel已滿,須要使用file channel存儲數據)文件中的順序。這個Channel是memory channel和file channel的一個折中,雖然在內存中的數據仍然可能由於進程的忽然中斷而丟失,可是相對於memory channel而言一旦sink處理速度跟不上不至於丟失數據(後者一旦滿了爆發異常會丟失後續的數據),提升了數據的可靠性;相對於file channel而言天然是大大提升了速度,可是可靠性較file channel有所下降。html

  咱們來看一下SpillableMemoryChannel的繼承結構:SpillableMemoryChannel extends FileChannel,原來SpillableMemoryChannel是file的子類,天熱具備file channel的特性。可是它的BasicTransactionSemantics是本身實現的。接下來咱們來分析分析這個channel,這個channel能夠當作是兩個channel。相關內容傳送門:Flume-NG源碼閱讀之FileChannel 和 flume-ng源碼閱讀memory-channel(原創) 。數組

   1、首先來看configure(Context context)方法,這個方法是對這個channel進行配置。一些主要參數介紹:
  (1)Semaphore totalStored,這兩個channel【內存channel(並非flume內置的memory channel,這裏是新實現的一個,本文中的「內存channel」若無說明就是新實現的這個)和溢出而使用的file channel】中event數量的總和的信號量,初始爲0;
  (2)ArrayDeque<Event> memQueue,這就是這裏的內存channel,使用能夠改變大小的數組雙端隊列ArrayDeque,存儲event數據;
  (3)int memoryCapacity(對應參數名 "memoryCapacity"),內存channel中存儲的event的最大數量;
  (4)Semaphore memQueRemaining,內存channel剩餘的可存儲event的數量的信號量,初始大小爲memoryCapacity;
  (5)int overflowTimeout(對應參數名 "overflowTimeout"),溢出超時時間,指的是內存channel滿了以後,切換到file channel的等待時間,默認是3s;
  (6)double overflowDeactivationThreshold(對應參數名 "overflowDeactivationThreshold"),指的是中止溢出的閾值------內存channel剩餘內存(這裏指可再存儲的event數量),默認5%;
  (7)volatile int byteCapacityBufferPercentage(對應參數名 "byteCapacityBufferPercentage"),用來限制內存channel使用物理內存量,默認20;
  (8)volatile double avgEventSize()(對應參數名 "avgEventSize"),指定每一個event的大小,用來計算內存channel可使用的slot總數量,會把event量化爲slot,而不是字節,默認500;
  (9)volatile int byteCapacity(對應參數名 "byteCapacity"),slot數量,默認是JVM可以使用的最大物理內存(可經過配置 "byteCapacity"參數來控制物理內存使用 )的80% * (1 - byteCapacityBufferPercentage * .01 )) / avgEventSize得來;
  (10)Semaphore bytesRemaining,內存channel中剩餘可以使用的slot數量信號量,初始大小是byteCapacity;
  (11)volatile int lastByteCapacity,動態加載配置文件時纔會有用,記錄上一次的ByteCapacity,用於修改bytesRemaining信號量的大小;
  (12)int overflowCapacity(對應參數名 "overflowCapacity"),用於設置file channel的容量,默認是1億;
  此外,boolean overflowDisabled用來是否禁用溢出,只要overflowCapacity不小於1就不會禁用;boolean overflowActivated表示是否可使用溢出,默認是false;還會對對file channel的 "keep-alive"設置爲0;最後會經過super.configure(context)來對file channel進行配置。對於file channel的配置信息能夠和SpillableMemoryChannel的配置信息在一塊兒配置。
  2、start()方法,首先會super.start()啓動file channel,獲取file中溢出的數據量overFlowCount,重置totalStored和 DrainOrderQueue對象drainOrder,內存channel的start是不會有數據的。
  3、須要講一下DrainOrderQueue drainOrder = new DrainOrderQueue()。咱們知道SpillableMemoryChannel實際上是由兩個channel組成,分別是內存channel和file channel,所以數據也會分佈在內存和磁盤文件之中,那咱們take時,是什麼機制呢?換句話說就是何時讀內存中的數據,何時讀磁盤上文件的數據?take的順序怎麼樣呢?咱們但願take的順序和put的順序同樣,先put的應該先take,因此咱們應該給全部的put(包括內存和文件)進行「編號」使得能夠有序的take,還要注意的就是須要標示這個take是應該從內存仍是file中去讀。爲此設計了DrainOrderQueue類,來使得有序的put和take。
  這個類設計的狠精巧,是保證take和put正常合理操做的關鍵。在講以前先大概說一下原理:這個類的關鍵屬性是ArrayDeque<MutableInteger> queue,這也是一個ArrayDeque,ArrayDeque特性是數組可變且大小不受限制,可在頭尾操做,此類極可能在用做堆棧時快於 Stack,在用做隊列時快於 LinkedList,可是不是線程安全的不支持多線程併發操做;put操做老是對queue中的最後(尾)一個元素操做,take操做老是對queue中第一個(頭)操做;put時,若是是內存channel,在queue增長的就是正數,若是是溢出操做增長的就是負數,內存和溢出分別對應queue中不一樣的元素(能夠分類去讀);take時,若是從內存中取數據,就會使得queue第一個元素的值不斷縮小(正數)至0,而後刪除這個元素,若是是從溢出文件中取數據則會使得queue中第一個元素不斷增大(負數)至0,而後刪除這個元素;這樣就會造成流,使得put不斷追加數據到流中,take不斷從流中取數據,這個流就是有序的,且流中元素其實就是內存中的evnet個數和溢出文件中event的個數。
  好了,DrainOrderQueue詳細代碼以下:
 1   public static class DrainOrderQueue {
 2     public ArrayDeque<MutableInteger> queue = new ArrayDeque<MutableInteger>(1000);
 3 
 4     public int totalPuts = 0;  // for debugging only
 5     private long overflowCounter = 0; // # of items in overflow channel
 6 
 7     public  String dump() {
 8       StringBuilder sb = new StringBuilder();
 9 
10       sb.append("  [ ");
11       for (MutableInteger i : queue) {
12         sb.append(i.intValue());
13         sb.append(" ");
14       }
15       sb.append("]");
16       return  sb.toString();
17     }
18 
19     public void putPrimary(Integer eventCount) {
20       totalPuts += eventCount;
21       if (  (queue.peekLast() == null) || queue.getLast().intValue() < 0) {    //獲取,但不移除此雙端隊列的最後一個元素;若是此雙端隊列爲空,則返回 null
22         queue.addLast(new MutableInteger(eventCount));
23       } else {
24         queue.getLast().add(eventCount);//獲取,但不移除此雙端隊列的第一個元素。
25       }
26     }
27 
28     public void putFirstPrimary(Integer eventCount) {
29       if ( (queue.peekFirst() == null) || queue.getFirst().intValue() < 0) {    //獲取,但不移除此雙端隊列的第一個元素;若是此雙端隊列爲空,則返回 null。
30         queue.addFirst(new MutableInteger(eventCount));
31       } else {
32         queue.getFirst().add(eventCount);//獲取,但不移除此雙端隊列的第一個元素。
33       }
34     }
35 
36     public void putOverflow(Integer eventCount) {
37       totalPuts += eventCount;
38       if ( (queue.peekLast() == null) ||  queue.getLast().intValue() > 0) {
39         queue.addLast(new MutableInteger(-eventCount));
40       } else {
41         queue.getLast().add(-eventCount);
42       }
43       overflowCounter += eventCount;
44     }
45 
46     public void putFirstOverflow(Integer eventCount) {
47       if ( (queue.peekFirst() == null) ||  queue.getFirst().intValue() > 0) {
48         queue.addFirst(new MutableInteger(-eventCount));
49       }  else {
50         queue.getFirst().add(-eventCount);
51       }
52       overflowCounter += eventCount;
53     }
54 
55     public int front() {
56       return queue.getFirst().intValue();
57     }
58 
59     public boolean isEmpty() {
60       return queue.isEmpty();
61     }
62 
63     public void takePrimary(int takeCount) {
64       MutableInteger headValue = queue.getFirst();
65 
66       // this condition is optimization to avoid redundant conversions of
67       // int -> Integer -> string in hot path
68       if (headValue.intValue() < takeCount)  {
69         throw new IllegalStateException("Cannot take " + takeCount +
70                 " from " + headValue.intValue() + " in DrainOrder Queue");
71       }
72 
73       headValue.add(-takeCount);
74       if (headValue.intValue() == 0) {
75         queue.removeFirst();
76       }
77     }
78 
79     public void takeOverflow(int takeCount) {
80       MutableInteger headValue = queue.getFirst();
81       if(headValue.intValue() > -takeCount) {
82         throw new IllegalStateException("Cannot take " + takeCount + " from "
83                 + headValue.intValue() + " in DrainOrder Queue head " );
84       }
85 
86       headValue.add(takeCount);
87       if (headValue.intValue() == 0) {
88         queue.removeFirst();    //獲取並移除此雙端隊列第一個元素。
89       }
90       overflowCounter -= takeCount;
91     }
92 
93   }
View Code

  咱們一個方法一個方法的來剖析這個類:安全

  (1)dump(),這個方法比較簡單就是得到queue中全部元素的數據量;多線程

  (2)putPrimary(Integer eventCount),這個方法用在put操做的commit時,在commitPutsToPrimary()方法中被調用,表示向內存提交數據。這個方法會嘗試獲取queue中最後一個元素,若是爲空(說明沒數據)或者元素數值小於0(說明這個元素是面向溢出文件的),就新建一個元素賦值這個事務的event數量加入queue;不然表示當前是的元素表徵的是內存中的event數量,直接累加便可。併發

  (3)putFirstPrimary(Integer eventCount),在doRollback()回滾的時候被調用,表示將takeList中的數據放回內存memQueue的頭。這個方法會嘗試獲取queue中第一個元素,若是爲空(說明沒數據)或者元素數值小於0(說明這個元素是面向溢出文件的),就新建一個元素賦值takeList的event數量加入queue;不然表示當前是的元素表徵的是內存中的event數量,直接累加便可。app

  (4)putOverflow(Integer eventCount),這個方法發生在put操做的commit時,在commitPutsToOverflow_core方法和start()方法中,後者是設置初始量,前者表示內存channel已滿要溢出到file channel。這個方法會嘗試獲取queue中最後一個元素,若是爲空(說明沒數據)或者元素數值大於0(表示這個元素是面向內存的),就新建一個元素賦值這個事務的event數量加入queue,這裏賦值爲負數;不然表示當前是的元素表徵的是溢出文件中的event數量,直接累加負數便可。ide

  (5)putFirstOverflow(Integer eventCount),在doRollback()回滾的時候被調用,表示將takeList中event的數量放回溢出文件。這個方法會嘗試獲取queue中第一個元素,若是爲空(說明沒數據)或者元素數值大於0(表示這個元素是面向內存的),就新建一個元素賦值這個事務的 event數量加入queue,這裏賦值爲負數;不然表示當前是的元素表徵的是溢出到文件中的event數量,直接累加負數便可。ui

  (6)front(),返回queue中第一個元素的值this

  (7)takePrimary(int takeCount),這個方法在doTake()中被調用,表示take發生以後,要將內存中的event數量減takeCount(這個值通常都是1,即每次取一個)。這個方法會獲取第一個元素的值(表示內存channel中有多少event),若是這個值比takeCount小,說明內存中沒有足夠的數量,這種狀況不該該發生,報錯;不然將這個元素的值減去takeCount,表示已取出takeCount個。最後若是這個元素的值爲0,則從queue中刪除這個元素。注意這裏雖然是能夠取takeCount個,可是源碼調用這個參數都是一次取1個而已。spa

  (8)takeOverflow(int takeCount),這個方法在doTake()中被調用,表示take發生以後,要將溢出文件中的event數量加上takeCount(這個值通常都是1,即每次取一個)。這個 方法會獲取第一個元素的值(表示溢出文件中有多少event),若是這個值比takeCount的負值大,說明文件中沒有足夠的數量,這種狀況不該該發生,報錯;不然將這個元素的值加上takeCount,表示已取出takeCount個。最後若是這個元素的值爲0,則從queue中刪除這個元素。注意這裏雖然是能夠取 takeCount個,可是源碼調用這個參數都是一次取1個而已。

  4、這個channel的BasicTransactionSemantics:SpillableMemoryTransaction,這是每一個channel的必須實現的可靠性保證。這個類也有一些屬性:
  (1)BasicTransactionSemantics overflowTakeTx = null,這個是file channel的事務FileBackedTransaction,表示take操做從溢出文件中獲取event;
  (2)BasicTransactionSemantics overflowPutTx = null,這個是file channel的事務FileBackedTransaction,表示put操做溢出到磁盤文件;
  (3)boolean useOverflow = false,是否使用溢出;
  (4)boolean putCalled = false,put操做,初次put的時候會置爲true;
  (5)boolean takeCalled = false,take操做,初次take的時候會置爲true;
  (6)int largestTakeTxSize = 5000,不是常量,能夠再分配;
  (7)int largestPutTxSize = 5000,不是常量,能夠再分配;
  (8)Integer overflowPutCount = 0,此次事務溢出的event的數量;
  (9)int putListByteCount = 0,此次事務putList全部event佔用字節總和;
  (10)int takeListByteCount = 0,此次事務takeList全部event佔用字節總和;
  (11)int takeCount = 0,此次事務take操做的個數;
  (12)ArrayDeque<Event> takeList,從memQueue拿出來的event暫存之所;
  (13)ArrayDeque<Event> putList,放入memQueue以前event的暫存之所; 
  按照國際慣例必須實現的4個方法:
  A、doPut(Event event),代碼以下:
 1 protected void doPut(Event event) throws InterruptedException {
 2       channelCounter.incrementEventPutAttemptCount();
 3 
 4       putCalled = true;    //說明是在put操做
 5       int eventByteSize = (int)Math.ceil(estimateEventSize(event)/ avgEventSize);//獲取這個event能夠佔用幾個slot
 6       if (!putList.offer(event)) {    //加入putList
 7         throw new ChannelFullException("Put queue in " + getName() +
 8                 " channel's Transaction having capacity " + putList.size() +
 9                 " full, consider reducing batch size of sources");
10       }
11       putListByteCount += eventByteSize;
12     }
View Code

  這個方法比較簡單,就是put開始;設置putCalled爲true表示put操做;計算佔用slot個數;將event放入putList等待commit操做;putListByteCount加上這個evnet佔用的slot數。

  B、doTake(),代碼以下: 
 1 protected Event doTake() throws InterruptedException {
 2       channelCounter.incrementEventTakeAttemptCount();
 3       if (!totalStored.tryAcquire(overflowTimeout, TimeUnit.SECONDS)) {
 4         LOGGER.debug("Take is backing off as channel is empty.");
 5         return null;
 6       }
 7       boolean takeSuceeded = false;
 8       try {
 9         Event event;
10         synchronized(queueLock) {
11           int drainOrderTop = drainOrder.front();
12 
13           if (!takeCalled) {
14             takeCalled = true;
15             if (drainOrderTop < 0) {
16               useOverflow = true;
17               overflowTakeTx = getOverflowTx();        //獲取file channle的事務
18               overflowTakeTx.begin();
19             }
20           }
21 
22           if (useOverflow) {
23             if (drainOrderTop > 0) {
24               LOGGER.debug("Take is switching to primary");
25               return null;       // takes should now occur from primary channel
26             }
27 
28             event = overflowTakeTx.take();
29             ++takeCount;
30             drainOrder.takeOverflow(1);
31           } else {
32             if (drainOrderTop < 0) {
33               LOGGER.debug("Take is switching to overflow");
34               return null;      // takes should now occur from overflow channel
35             }
36 
37             event = memQueue.poll();    //獲取並移除此雙端隊列所表示的隊列的頭(換句話說,此雙端隊列的第一個元素);若是此雙端隊列爲空,則返回 null。
38             ++takeCount;
39             drainOrder.takePrimary(1);
40             Preconditions.checkNotNull(event, "Queue.poll returned NULL despite"
41                     + " semaphore signalling existence of entry");
42           }
43         }
44 
45         int eventByteSize = (int)Math.ceil(estimateEventSize(event)/ avgEventSize);
46         if (!useOverflow) {
47           // takeList is thd pvt, so no need to do this in synchronized block
48           takeList.offer(event);
49         }
50 
51         takeListByteCount += eventByteSize;
52         takeSuceeded = true;
53         return event;
54       } finally {
55         if(!takeSuceeded) {
56           totalStored.release();
57         }
58       }
59     }
View Code

   因爲ArrayDeque是非線程安全的(memQueue就是ArrayDeque),因此take操做從memQueue獲取數據時,要獨佔memQueue。任何對memQueue都要進行同步,這裏是同步queueLock。

  doTake方法會先檢查totalStored中有無許可,即channel中有無數據;而後同步;再獲取drainOrder的頭元素,若是takeCalled爲false(初始爲false),則設置其爲true,再判斷獲取到的drainOrder頭元素的值是否爲負數,負數說明數據在溢出文件中,設置useOverflow爲true表示要從溢出文件中讀取數據而且獲取file channel的FileBackedTransaction賦值給overflowTakeTx,begin()能夠獲取數據。若是useOverflow爲true則轉到調用overflowTakeTx.take獲取event,而後takeCount自增1,調用drainOrder.takeOverflow(1)修改隊列中溢出event數量的值。若是useOverflow爲false說明數據在內存中,直接調用memQueue.poll()得到event,而後takeCount自增1,調用drainOrder.takePrimary(1)修改隊列中內存中evnet數量的值。而後計算這個event佔用的slot數。若是是從內存channel中讀取的event則將其放入takeList中;takeListByteCount加上這個evnet佔用的slot數。最後返回event。

  C、doCommit()方法,若是putCalled爲true就會調用putCommit()方法來處理put的操做,若是takeCalled爲true就調用takeCommit()方法來處理take操做。
  一、putCommit()方法,會首先依據overflowActivated的真假來設置超時時間。內存channel的溢出狀況由兩個信號量控制memQueRemaining和bytesRemaining,前者控制着event的數量,後者控制着物理內存的使用狀況,若是這二者中的任何一個不知足都會觸發溢出,溢出會設置overflowActivated = true;useOverflow = true,若是useOverflow爲true,就調用commitPutsToOverflow()方法來處理溢出,這個方法會建立一個file channel的FileBackedTransaction賦值給overflowPutTx,begin能夠put數據,而後依次將putList中的event經過overflowPutTx.put(event)放入file channel中,調用commitPutsToOverflow_core方法來處理overflowPutTx提交事務,再調用drainOrder.putOverflow(putList.size())修改queue中溢出文件中event的數量,若是在overflowPutTx提交過程當中失敗,最多再嘗試一次,中間等待overflowTimeout秒。返回到commitPutsToOverflow方法,將totalStored釋放putList.size的許可,溢出數量overflowPutCount增長putList.size。到這溢出的狀況完成。若是putCommit()useOverflow爲false則說明event在內存channel中,會調用commitPutsToPrimary()來處理,這個方法會將putList中的全部event放入memQueue中,而後調用drainOrder.putPrimary(putList.size())修改queue中內存中event的數量,修改maxMemQueueSize的值,將totalStored釋放putList.size的許可
  二、takeCommit()方法,若是overflowTakeTx不爲null,說明是從溢出文件取得的event,就調用commit方法提交事務。而後得到內存channel剩餘空間的百分比,包括兩部分之和,一部分是內存channel還能夠再存儲evnet的數量,另外一部分就是takeCount,他們倆之和與memoryCapacity(不能爲0)之比就是百分比memoryPercentFree。若是overflowActivated爲true且memoryPercentFree不小於overflowDeactivationThreshold,說明內存中剩餘空間已經達到了中止溢出的閾值,就設置overflowActivated爲false中止溢出,這樣其實會致使內存滿了以後等待溢出的時間加長。若是take操做是從內存channel中取數據,memQueRemaining會釋放takeCount個許可,表示騰出takeCount個空間;bytesRemaining會釋放takeListByteCount個許可,表示騰出takeListByteCount個slot。
  D、doRollback(),代碼以下:
 1 protected void doRollback() {
 2       LOGGER.debug("Rollback() of " +
 3               (takeCalled ? " Take Tx" : (putCalled ? " Put Tx" : "Empty Tx")));
 4 
 5       if (putCalled) {
 6         if (overflowPutTx!=null) {
 7           overflowPutTx.rollback();
 8         }
 9         if (!useOverflow) {
10           bytesRemaining.release(putListByteCount);
11           putList.clear();
12         }
13         putListByteCount = 0;
14       } else if (takeCalled) {
15         synchronized(queueLock) {
16           if (overflowTakeTx!=null) {
17             overflowTakeTx.rollback();
18           }
19           if (useOverflow) {
20             drainOrder.putFirstOverflow(takeCount);
21           } else {
22             int remainingCapacity = memoryCapacity - memQueue.size();
23             Preconditions.checkState(remainingCapacity >= takeCount,
24                     "Not enough space in memory queue to rollback takes. This" +
25                             " should never happen, please report");
26             while (!takeList.isEmpty()) {
27               memQueue.addFirst(takeList.removeLast());
28             }
29             drainOrder.putFirstPrimary(takeCount);
30           }
31         }
32         totalStored.release(takeCount);
33       } else {
34         overflowTakeTx.rollback();
35       }
36       channelCounter.setChannelSize(memQueue.size() + drainOrder.overflowCounter);
37     }
View Code

   若是putCalled爲true,則代表正在進行的是put操做。若是overflowPutTx不爲null,說明是在溢出,執行overflowPutTx的roolback方法進行回滾。若是沒有溢出,則bytesRemaining釋放putListByteCount許可,表示騰出putListByteCount個slot;清空putList;最後將putListByteCount置爲0。若是takeCalled爲true,說明正在進行的操做是take,若是overflowTakeTx不爲null,說明是在溢出,執行overflowTakeTx的roolback方法進行回滾;若是在溢出,則調用drainOrder.putFirstOverflow(takeCount)修改queue中溢出文件中的event的數量;若是在使用內存channel,則計算出內存channel中還能夠最多存儲event的數量,若是這個數量小於takeCount,則報錯,不然將takeList中的全部event加入memQueue的頭部,執行drainOrder.putFirstPrimary(takeCount)來修改queue中內存channel存放的event的數量;而後totalStored釋放takeCount個許可,表示內存channel中增長了takeCount個event。

  5、stop方法,會調用父類file channel中的stop方法。

  6、createTransaction()方法,直接返回一個SpillableMemoryTransaction對象。這說明take和put能夠併發執行,可是當涉及到memQueue時,仍是須要同步。

 

  至此,這個新的channel介紹完了。整體來講SpillableMemoryChannel是精心設計的一個channel,兼顧Flume內置的file channel和memory channel的優勢,又增長了一個選擇,大夥可根據須要選擇合適的channel。

相關文章
相關標籤/搜索