[從源碼學設計] Flume 之 memory channel

[從源碼學設計] Flume 之 memory channel

0x00 摘要

在使用Flume時,有時遇到以下錯誤信息:Space for commit to queue couldn't be acquiredhtml

究其緣由,是在memory channel的使用中出現了問題。java

本文就以此爲切入點,帶你們一塊兒剖析下 Flume 中 MemoryChannel 的實現數據庫

0x01 業務範疇

1.1 用途和特色

Flume的用途:高可用的,高可靠的,分佈式的海量日誌採集、聚合和傳輸的系統。segmentfault

這裏咱們介紹與本文相關的特色:緩存

  • Flume的管道是基於事務,保證了數據在傳送和接收時的一致性.
  • Flume是可靠的,容錯性高的,可升級的,易管理的,而且可定製的
  • 當收集數據的速度超過將寫入數據的時候,也就是當收集信息遇到峯值時,這時候收集的信息很是大,甚至超過了系統的寫入數據能力,這時候,Flume會在數據生產者和數據收容器間作出調整,保證其可以在二者之間提供平穩的數據.

1.2 Channel

這裏就要介紹channel的概念。channel是一種短暫的存儲容器,它將從source處接收到的event格式的數據緩存起來,直到它們被sinks消費掉,它在source和sink間起着橋樑的做用,channel是一個完整的事務,這一點保證了數據在收發的時候的一致性。而且它能夠和任意數量的source和sink連接。安全

支持的類型主要有: JDBC channel , File System channel , Memory channel等,大體區別以下:網絡

  • Memory Channel:events存儲在Java Heap,即內存隊列中(內存的大小是能夠指定的)。對於流量較高和因爲agent故障而準備丟失數據的流程來講,這是一個理想的選擇;
  • File Channel:event保存在本地文件中,可靠性高,但吞吐量低於Memory Channel;
  • JDBC Channel :event存儲在持久化存儲庫中(其背後是一個數據庫),JDBC channel目前支持嵌入式Derby。這是一個持續的channel,對於可恢復性很是重要的流程來講是理想的選擇;
  • Kafka Channel:events存儲在Kafka集羣中。Kafka提供高可用性和高可靠性,因此當agent或者kafka broker 崩潰時,events能立刻被其餘sinks可用。

本文主要涉及Memory Channel,因此看看其特性。架構

  • 好處:速度快,吞吐量大;
  • 壞處:根據計算機工做的原理就能夠得知,凡是在內存中計算的數據,只要電腦出現故障致使停機,那麼內存中數據是不會進行保存的;
  • 所適用的場景:高吞吐量,容許數據丟失的業務中;

1.3 研究重點

由此,咱們能夠總結出來 Flume 的一些重點功能:併發

  • 可靠的,容錯性高的;
  • 實現事務;
  • 速度快,吞吐量大;
  • 能夠調節收集的速度以解決生產者消費者不一致;
  • 可升級的,易管理,可定製的;

由於MemoryChannel屬於Flume的重要模塊,因此,咱們本文就看看是MemoryChannel是如何確保Flume以上特色的,這也是本文的學習思路。app

1.4 實際可以學到什麼

如何回滾,使用鎖,信號量 ,動態擴容,如何解決生產者消費者不一致問題。

1.5 總述

MemoryChannel仍是比較簡單的,主要是經過MemoryTransaction中的putList、takeList與MemoryChannel中的queue進行數據流轉和事務控制,這裏的queue至關於持久化層,只不過放到了內存中,若是是FileChannel的話,會把這個queue放到本地文件中。

MemoryChannel受內存空間的影響,若是數據產生的過快,同時獲取信號量超時容易形成數據的丟失。並且Flume進程掛掉,數據也會丟失。

具體是:

  • 維持一個隊列,隊列的兩端分別是source和sink。
  • source使用doPut方法往putList插入Event
  • sink使用doTake方法從queue中獲取event放入takeList,而且提供rollback方法,用於回滾。
  • commit方法做用是把putList中的event一次性寫到queue;

下面表示了Event在一個使用了MemoryChannel的agent中數據流向:

source ---> putList ---> queue ---> takeList ---> sink

爲了你們更好的理解,咱們提早把最終圖例發到這裏。

具體以下圖:

+----------+                                                                          +-------+
|  Source  |    +----------------------------------------------------------------+    | Sink  |
+-----+----+    | [MemoryChannel]                                                |    +---+---+
      |         |   +--------------------------------------------------------+   |        ^
      |         |   | [MemoryTransaction]                                    |   |        |
      |         |   |                                                        |   |        |
      |         |   |                                                        |   |        |
      |         |   |    channelCounter                                      |   |        |
      |         |   |                                                        |   |        |
      |         |   |    putByteCounter                     takeByteCounter  |   |        |
      |         |   |                                                        |   |        |
      |         |   |    +-----------+                      +------------+   |   |doTake  |
      +----------------> |  putList  |                      |  takeList  +----------------+
      doPut     |   |    +----+--+---+                      +----+---+---+   |   |
                |   |         |  ^                               |   ^       |   |
                |   |         |  |                               |   |       |   |
                |   +--------------------------------------------------------+   |
                |             |  |                               |   | poll      |
                |             |  |                               |   |           |
                |             |  |  rollback         rollback    |   |           |
                |             |  +--------------+  +-------------+   |           |
                |             |                 |  |                 |           |
                |             |                 |  v                 |           |
                |             |  doCommit    +--+--+---+  doCommit   |           |
                |             +------------> |  queue  | +-----------+           |
                |                            +---------+                         |
                +----------------------------------------------------------------+

手機上如圖:

img

0x02 定義

咱們要看看MemoryChannel重要變量的定義,這裏咱們沒有按照代碼順序來,而是從新整理。

2.1 接口

MemorChannel中最重要的部分主要是Channel、Transaction 和Configurable三個接口。

Channel接口 主要聲明瞭Channel中的三個方法,就是隊列基本功能

public void put(Event event) throws ChannelException; //從指定的Source中得到Event放入指定的Channel中
public Event take() throws ChannelException;     //從Channel中取出event放入Sink中
public Transaction getTransaction();             //得到當前Channel的事務實例

Transaction接口 主要聲明瞭flume中事務機制的四個方法,就是事務功能

enum TransactionState { Started, Committed, RolledBack, Closed }    //枚舉類型,指定了事務的四種狀態,事務開始、提交、失敗回滾、關閉
void begin(); 
void commit();
void rollback();
void close();

Configurable接口 主要是和flume配置組件相關的,須要從flume配置系統獲取配置信息的任何組件,都必須實現該接口。該接口中只聲明瞭一個context方法,用於獲取配置信息。

大致邏輯以下:

+-----------+          +--------------+        +---------------+
|           |          |              |        |               |
|  Channel  |          |  Transaction |        | Configurable  |
|           |          |              |        |               |
+-----------+          +--------------+        +---------------+

     ^                        ^                        ^
     |                        |                        |
     |                        |                        |
     |                        |                        |
     |          +-------------+--------------+         |
     |          |                            |         |
     |          |        MemorChannel        +---------+
     +-------+  |                            |
                |                            |
                |                            |
                |                            |
                |                            |
                |                            |
                |                            |
                +----------------------------+

下面咱們具體講講成員變量。

2.2 配置參數

首先是一系列業務配置參數。

//定義隊列中一次容許的事件總數
  private static final Integer defaultCapacity = 100;
  
  //定義一個事務中容許的事件總數
  private static final Integer defaultTransCapacity = 100;

  //將物理內存轉換成槽(slot)數,默認是100
  private static final double byteCapacitySlotSize = 100;
  
  //定義隊列中事件所使用空間的最大字節數(默認是JVM最大可用內存的0.8)
  private static final Long defaultByteCapacity = (long)(Runtime.getRuntime().maxMemory() * .80);

  //定義byteCapacity和預估Event大小之間的緩衝區百分比:
  private static final Integer defaultByteCapacityBufferPercentage = 20;
  
  //添加或者刪除一個event的超時時間,單位秒:
  private static final Integer defaultKeepAlive = 3;

  // maximum items in a transaction queue
  private volatile Integer transCapacity;
  private volatile int keepAlive;
  private volatile int byteCapacity;
  private volatile int lastByteCapacity;
  private volatile int byteCapacityBufferPercentage;
  private ChannelCounter channelCounter;

這些參數基本都在configure(Context context)中設置,基本邏輯以下:

  • 設置 capacity:MemroyChannel的容量,默認是100。

  • 設置 transCapacity:每一個事務最大的容量,也就是每一個事務可以獲取的最大Event數量。默認也是100。事務容量必須小於等於Channel Queue容量。

  • 設置 byteCapacityBufferPercentage:用來肯定byteCapacity的一個百分比參數,即咱們定義的字節容量和實際事件容量的百分比,由於咱們定義的字節容量主要考慮Event body,而忽略Event header,所以須要減去Event header部分的內存佔用,能夠認爲該參數定義了Event header佔了實際字節容量的百分比,默認20%;

  • 設置 byteCapacity:byteCapacity等於設置的byteCapacity值或堆的80%乘以1減去byteCapacityBufferPercentage的百分比,而後除以100。具體是首先讀取配置文件定義的byteCapacity,若是沒有定義,則使用默認defaultByteCapacity,而defaultByteCapacity默認是JVM物理內存的80%(Runtime.getRuntime().maxMemory() * .80);那麼實際byteCapacity=定義的byteCapacity * (1- Event header百分比)/ byteCapacitySlotSize;byteCapacitySlotSize默認100,即計算百分比的一個係數。

  • 設置 keep-alive:增長和刪除一個Event的超時時間(單位:秒)。

  • 設置初始化 LinkedBlockingDeque對象,大小爲capacity。以及各類信號量對象。

  • 最後初始化計數器。

配置代碼摘要以下:

public void configure(Context context) {
    capacity = context.getInteger("capacity", defaultCapacity);
    transCapacity = context.getInteger("transactionCapacity", defaultTransCapacity);
    byteCapacityBufferPercentage = context.getInteger("byteCapacityBufferPercentage",
                                                       defaultByteCapacityBufferPercentage);
    byteCapacity = (int) ((context.getLong("byteCapacity", defaultByteCapacity).longValue() *(1 - byteCapacityBufferPercentage * .01)) / byteCapacitySlotSize);
    if (byteCapacity < 1) {
        byteCapacity = Integer.MAX_VALUE;
    }
    keepAlive = context.getInteger("keep-alive", defaultKeepAlive);
    resizeQueue(capacity);
    if (channelCounter == null) {
      channelCounter = new ChannelCounter(getName());
    }
}

2.2.1 channel屬性

ChannelCounter 須要單獨說一下。其就是把channel的一些屬性封裝了一下,初始化了一個ChannelCounter,是一個計數器,記錄如當前隊列放入Event數、取出Event數、成功數等。

private ChannelCounter channelCounter;

定義以下:

public class ChannelCounter extends MonitoredCounterGroup implements
    ChannelCounterMBean {
  private static final String COUNTER_CHANNEL_SIZE = "channel.current.size";
  private static final String COUNTER_EVENT_PUT_ATTEMPT ="channel.event.put.attempt";
  private static final String COUNTER_EVENT_TAKE_ATTEMPT = "channel.event.take.attempt";
  private static final String COUNTER_EVENT_PUT_SUCCESS = "channel.event.put.success";
  private static final String COUNTER_EVENT_TAKE_SUCCESS = "channel.event.take.success";
  private static final String COUNTER_CHANNEL_CAPACITY = "channel.capacity";
}

2.4 Semaphore和Queue

其次是Semaphore和Queue。主要就是用來協助制事務。

MemoryChannel有三個信號量用來控制事務,防止容量越界:queueStored,queueRemaining,bytesRemaining。

  • queueLock:建立一個Object當作隊列鎖,操做隊列的時候保證數據的一致性;
  • queue:使用LinkedBlockingDeque queue維持一個隊列,隊列的兩端分別是source和sink;
  • queueStored:來保存queue中當前的保存的event的數目,即已經存儲的容量大小,後面tryAcquire方法能夠判斷是否能夠take到一個event;
  • queueRemaining:來保存queue中當前可用的容量,即空閒的容量大小,能夠用來判斷當前是否有能夠提交必定數量的event到queue中;
  • bytesRemaining : 表示可使用的內存大小。該大小就是計算後的byteCapacity值。
private Object queueLock = new Object();

  @GuardedBy(value = "queueLock")
  private LinkedBlockingDeque<Event> queue;

  private Semaphore queueRemaining;

  private Semaphore queueStored;

  private Semaphore bytesRemaining;// 表示可使用的內存大小。該大小就是計算後的byteCapacity值。

2.5 MemoryTransaction

內部類MemoryTransaction是整個事務保證最重要的類。

MemoryTransaction用來接收數據和事務控制。該類繼承BasicTransactionSemantics類。

MemoryTransaction維護了兩個隊列,一個用於Source的put,一個用於Sink的take,容量大小爲事務的容量(transCapacity)。

  • takeList:take事務用到的隊列;阻塞雙端隊列,從channel中取event先放入takeList,輸送到sink,commit成功,從channel queue中刪除;
  • putList:put事務用到的隊列;從source 會先放至putList,而後commit傳送到channel queue隊列;
  • channelCounter:channel屬性;ChannelCounter類定義了監控指標數據的一些屬性方法
  • putByteCounter:put字節數計數器;
  • takeByteCounter:take字節計數器;
private class MemoryTransaction extends BasicTransactionSemantics {
    private LinkedBlockingDeque<Event> takeList;
    private LinkedBlockingDeque<Event> putList;
    private final ChannelCounter channelCounter;
    private int putByteCounter = 0;
    private int takeByteCounter = 0;
}

不管是Sink,仍是Source都會調用getTransaction()方法,獲取當前Channel的事務實例。

接口與成員變量大體邏輯能夠理解以下,其中 Channel 的 API 表示這裏是 MemorChannel 的對外 API:

+-----------+                    +--------------+                  +---------------+
|           |                    |              |                  |               |
|  Channel  |                    |  Transaction |                  | Configurable  |
|           |                    |              |                  |               |
+---+-------+                    +--------------+                  +---------------+
    ^
    |                                    ^                                  ^
    |                                    |                                  |
    |                                    |                                  |
    |        +--------------------------------------------------------+     |
    |        |                           |                            |     |
    |        |  MemoryChannel            |                            |     |
    |        |                           +                            |     |
    |        |                                                        |     |
    |        |                    MemoryTransaction                   |     |
    |        |                                                        |     |
    |        |                    Semaphore / Queue                   |     |
    |        |                                                        |     |
    +--------+                                                        |     |
     API     |                                                        |     |
             |                                                        |     |
             |                               Config Parameters +------------+
             |                                                        |
             |                                                        |
             +--------------------------------------------------------+

0x03 使用

看了上面講的,估計你們仍是會暈,由於成員變量和概念實在是太多了,因此咱們從使用入手分析。

前面提到,memory channel內部有三個隊列,分別是putList,queue,takeList。其中putList,takeList在MemoryTransaction之中。

3.1 channel如何使用

channel之上有一把鎖,當source主動向channel放數據或者sink主動從channel取數據時,會搶鎖,誰取到鎖,誰就能夠操做channel。

每次使用時會首先調用tx.begin()開始事務,也就是獲取鎖。而後調用tx.commit()提交數據或者調用tx.rollback()取消操做。

這裏須要注意的是:Source, Sink 都是死循環,搶同一個鎖。因此就會有消費者,生產者速度不一致的狀況,因此就須要有 一個內部的 buffer,就是咱們的Queue。

3.2 source往channel放數據

這是一個死循環,source一直試圖獲取channel鎖,而後從kafka獲取數據,放入channel中,那每次放入多少個數據呢?在KafkaSource.java中,代碼是這樣的:

while (eventList.size() < batchUpperLimit &&
		System.currentTimeMillis() < maxBatchEndTime) {
}

含義就是:每次最多放batchUpperLimit或最多等待maxBatchEndTime的時間,就結束向channel放數據。

當獲取了足夠的數據,首先放入putList中,而後就會調用tx.commit()將putList的所有數據放入queue中。

3.3 sink從channel取數據

也是一個死循環,sink一直試圖獲取channel鎖,而後從channel取一批數據,放入sink和takeList(僅僅用於回滾,在調用rollback時takeList的數據會回滾到queue中)。每次取多少個event呢?以HDFSEventSink爲例,代碼以下:

for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
    Event event = channel.take();
    if (event == null) 
    		break;
}

batchSize的大小默認是100,由hdfs.batchSize控制。

具體以下:

+--------------->
                                   ^                 |
                                   |                 |   while(1)
                                   |                 v
   +-----------+                   |            +----+----+
   |  Source   |                   |  take      |  Sink   |
   |           |                   |            |         |
   +-----+-----+                   |            +---------+
         |                         |
         |           +-------------+--+
         |           | Channel        |
         |           |                |
While(1) |           |                |
         |           |       buffer   |
         |           +----------------+
         |
         |                 ^
         |                 |
         |                 |  put
         v ----------------^

0x04 實現事務

此處回答了前面提到的兩個重點:

  • 可靠的,容錯性高的;
  • 實現事務;

其實就是用事務保證整個流程的高可靠,其核心就在從source抽取數據到channel,從channel抽取到sink,當sink被消費後channel數據刪除的這三個環節。而這些環節在flume中被統一的用事務管理起來。能夠說,這是flume高可靠的關鍵一點

具體涉及到的幾個點以下:

  • MemoryTransaction是實現事務的核心。每次使用時會首先調用tx.begin()開始事務,也就是獲取鎖。而後調用tx.commit()提交數據或者調用tx.rollback()取消操做。
  • MemoryChannel時設計時考慮了兩個容量:Channel Queue容量和事務容量,而這兩個容量涉及到了數量容量和字節數容量。
  • MemoryChannel 會根據事務容量 transCapacity 建立兩個阻塞雙端隊列putList和takeList,這兩個隊列(至關於兩個臨時緩衝隊列)主要就是用於事務處理的。即,每一個事務都有一個Take List和Put List分別用於存儲事務相關的取數據和放數據,等事務提交時才徹底同步到Channel Queue,或者失敗把取數據回滾到Channel Queue。
    • 首先由一個Channel Queue用於存儲整個Channel的Event數據;
    • 當從Source往 Channel中放事件event 時,會先將event放入 putList 隊列,而後將putList隊列中的event 放入 MemoryChannel的queue中。
    • 當從 Channel 中將數據傳送給 Sink 時,則會將event先放入 takeList 隊列中,而後從takeList隊列中將event送入Sink,不管是 put 仍是 take 發生異常,都會調用 rollback 方法回滾事務。
    • 回滾時,會先給 Channel 加鎖防止回滾時有其餘線程訪問,若takeList 不爲空, 就將寫入 takeList中的event再次放入 Channel 中,而後移除 putList 中的全部event(即就是丟棄寫入putList臨時隊列的 event)。
  • 由於多個事務要操做Channel Queue,還要考慮Channel Queue的動態擴容問題,所以MemoryChannel使用了鎖來實現;而容量問題則使用了信號量來實現。

咱們下面具體走一下這個流程。

4.1 put事務

此事務發生在在Source到Channel之間,是從指定的Source中得到Event放入指定的Channel中,具體包括:

  • doPut:將批數據先寫入臨時緩衝區 putList;
  • doCommit:檢查 channel 內存隊列是否足夠合併;
  • doRollback:channel 內存隊列空間不足,回滾數據;

以下調用。

try {
    tx.begin();
    //底層就是調用的doPut方法
    // Source寫事件調用put方法
    reqChannel.put(event);
    tx.commit();
  } catch (Throwable t) {
    // 發生異常則回滾事務
    tx.rollback();
    if (t instanceof Error) {
      throw (Error) t;
    } else if (t instanceof ChannelException) {
      throw (ChannelException) t;
    } else {
      throw new ChannelException("Unable to put event on required " +
          "channel: " + reqChannel, t);
    }
  } finally {
    if (tx != null) {
      tx.close();
    }
  }

下面分析doPut方法。

doPut邏輯以下:

  • 計算event大概佔用的slot數;
  • offer方法往putList中添加event,等事務提交時轉移到Channel Queue,若是滿了則直接拋異常回滾事務;
  • 累加這一條event所佔用的slot空間,以便以後作字節容量限制。

具體代碼以下:

protected void doPut(Event event) throws InterruptedException {    
      //增長放入事件計數器
      channelCounter.incrementEventPutAttemptCount();
      //estimateEventSize計算當前Event body大小
      int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
    
      /*
       * offer若當即可行且不違反容量限制,則將指定的元素插入putList阻塞雙端隊列中(隊尾),
       * 並在成功時返回,若是當前沒有空間可用,則拋異常回滾事務 
       * */
      if (!putList.offer(event)) {
        throw new ChannelException(
            "Put queue for MemoryTransaction of capacity " +
            putList.size() + " full, consider committing more frequently, " +
            "increasing capacity or increasing thread count");
      }
 
      //記錄Event的byte值
      putByteCounter += eventByteSize;
}

具體以下圖,咱們暫時忽略commit與rollback:

+----------+
|  Source  |    +---------------------------+
+-----+----+    | [MemoryChannel]           |
      |         |   +---------------------+ |
      |         |   | [MemoryTransaction] | |
      |         |   |                     | |
      |         |   |                     | |
      |         |   |    channelCounter   | |
      |         |   |                     | |
      |         |   |    putByteCounter   | |
      |         |   |                     | |
      |         |   |    +-----------+    | |
      +----------------> |  putList  |    | |
      doPut     |   |    +-----------+    | |
                |   +---------------------+ |
                +---------------------------+

4.2 take事務

此事務發生在Channel到Sink之間,主要是從Channel中取出event放入Sink中,具體包括。

  • doTake:將數據取到臨時緩衝區 takeList,並將數據發送到 HDFS;
  • doCommit:若是數據所有發送成功,則清除臨時緩衝區 takeList;
  • doRollback:數據發送過程當中若是出現異常,rollback 將臨時緩衝區 takeList 中的數據歸還給 channel 內存隊列;

以下調用:

transaction = channel.getTransaction();
transaction.begin();
 
......
  
event = channel.take();
 
......
  
transaction.commit();

邏輯以下:

  • 判斷takeList中是否還有空間,若是沒有空間則拋出異常;
  • 判斷當前MemoryChannel中的queue中是否還有空間,這裏經過信號量來判斷;
  • 從queue頭部彈出一條消息,放入takeList中;
  • 估算這條Event所佔空間(slot數),累加takeList中的字節數;
  • 將取出來的這條Event返回;

doTake具體代碼以下:

protected Event doTake() throws InterruptedException {
        
      channelCounter.incrementEventTakeAttemptCount();//將正在從channel中取出的event計數器原子的加一,即增長取出事件計數器
  
 		 //若是takeList隊列沒有剩餘容量,即當前事務已經消費了最大容量的Event,拋異常
      if (takeList.remainingCapacity() == 0) {//takeList隊列剩餘容量爲0
        throw new ChannelException("Take list for MemoryTransaction, capacity " +
            takeList.size() + " full, consider committing more frequently, " +
            "increasing capacity, or increasing thread count");
      }
      
     //嘗試獲取一個信號量獲取許可,若是能夠獲取到許可的話,證實queue隊列有空間,超時直接返回null
      if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
        return null;
      }
 
      Event event;
      synchronized (queueLock) {
        event = queue.poll();   //獲取並移除MemoryChannel雙端隊列表示的隊列的頭部(也就是隊列的第一個元素),隊列爲空返回null,同一時間只能有一個線程訪問,加鎖同步
      }
      
 			//由於信號量的保證,Channel Queue不該該返回null,出現了就不正常了
      Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
          "signalling existence of entry");

      takeList.put(event);  //將取出的event暫存到事務的takeList隊列
 
  		//計算當前Event body大小並增長取出隊列字節數計數器
   		/* 計算event的byte大小 */
      int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
			//更新takeByteCounter大小 
      takeByteCounter += eventByteSize;

      return event;
}

因而咱們把take事務加入,咱們暫時忽略commit與rollback。具體以下圖,目前兩個事務是沒有聯繫的:

+----------+                                                                    +-------+
|  Source  |    +---------------------------------------------------------+     | Sink  |
+-----+----+    | [MemoryChannel]                                         |     +---+---+
      |         |   +--------------------------------------------------+  |         ^
      |         |   | [MemoryTransaction]                              |  |         |
      |         |   |                                                  |  |         |
      |         |   |                                                  |  |         |
      |         |   |    channelCounter                                |  |         |
      |         |   |                                                  |  |         |
      |         |   |    putByteCounter               takeByteCounter  |  |         |
      |         |   |                                                  |  |         |
      |         |   |    +-----------+                +------------+   |  | doTake  |
      +----------------> |  putList  |                |  takeList  +----------------+
      doPut     |   |    +-----------+                +------+-----+   |  |
                |   |                                        ^         |  |
                |   |                                        |         |  |
                |   +--------------------------------------------------+  |
                |                                            |            |
                |                                            |            |
                |                                            |            |
                |                      +---------+  poll     |            |
                |                      |  queue  | +---------+            |
                |                      +---------+                        |
                +---------------------------------------------------------+

4.3 提交事務

commit階段主要作的事情是提交事務,此代碼繁雜在於其包括了兩個方面的操做:

  • 從putList拿數據到Queue;
  • 處理 takelist後續操做,就是根據此時具體狀況調整各類數值;

commit其邏輯以下:

  • 計算takeList中Event數與putList中的Event差值;int remainingChange = takeList.size() - putList.size();
  • 差值小於0,說明takeList小,也就是向該MemoryChannel放的數據比取的數據要多,因此須要判斷該MemoryChannel是否有空間來放;
    • 首先經過信號量來判斷是否還有剩餘空間;這一步tryAcquire方法會將bytesRemaining的值減去putByteCounter的值,若是bytesRemaining原來的值大於putByteCounter則返回true;
    • 而後判斷,在給定的keepAlive時間內,可否獲取到充足的queue空間;
  • 若是上面的兩個判斷都過了,那麼把putList中的Event放到該MemoryChannel中的queue中;
    • 將putList中的Event循環放入queue中;
    • 面的工做完成後,清空putList和takeList,一次事務完成;
  • 而後將兩個計數器置零;
  • 將queueStored的值加上puts的值,更新信號量;
  • 若是takeList比putList大,說明該MemoryChannel中queue的數量應該是減小了,因此把(takeList-putList)的差值加到信號量queueRemaining;
  • 更新channelCounter中的三個變量;

具體以下:

protected void doCommit() throws InterruptedException {    

  		//計算改變的Event數量,即取出數量-放入數量;若是放入的多,那麼改變的Event數量將是負數
    	//若是takeList更小,說明該MemoryChannel放的數據比取的數據要多,因此須要判斷該MemoryChannel是否有空間來放
      int remainingChange = takeList.size() - putList.size();  //takeList.size()能夠當作source,putList.size()當作sink

 			//若是remainingChange小於0,則須要獲取Channel Queue剩餘容量的信號量
      if (remainingChange < 0) { //sink的消費速度慢於source的產生速度
   			//利用bytesRemaining信號量判斷是否有足夠空間接收putList中的events所佔的空間    
				//putByteCounter是須要推到channel中的數據大小,bytesRemainingchannel是容量剩餘
				//獲取putByteCounter個字節容量信號量,若是失敗說明超過字節容量限制了,回滾事務
        if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {
         	//channel 數據大小容量不足,事物不能提交
          throw new ChannelException("Cannot commit transaction. Byte capacity " +
              "allocated to store event body " + byteCapacity * byteCapacitySlotSize +
              "reached. Please increase heap space/byte capacity allocated to " +
              "the channel as the sinks may not be keeping up with the sources");
        }

    		//獲取Channel Queue的-remainingChange個信號量用於放入-remainingChange個Event,若是獲取不到,則釋放putByteCounter個字節容量信號量,並拋出異常回滾事務
        //由於source速度快於sink速度,需判斷queue是否還有空間接收event
        if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
       	 //remainingChange若是是負數的話,說明source的生產速度,大於sink的消費速度,且這個速度大於channel所能承載的值
          bytesRemaining.release(putByteCounter);
          throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
              " Sinks are likely not keeping up with sources, or the buffer size is too tight");
        }
      }
     
      int puts = putList.size(); //事務期間生產的event
      int takes = takeList.size();  //事務期間等待消費的event
  
   		//若是上述兩個信號量都有空間的話,那麼把putList中的Event放到該MemoryChannel中的queue中。
     	//鎖住隊列開始,進行數據的流轉
      synchronized (queueLock) {//操做Channel Queue時必定要鎖定queueLock     
        if (puts > 0) {
          while (!putList.isEmpty()) { //若是有Event,則循環放入Channel Queue
            if (!queue.offer(putList.removeFirst())) {  
          		//若是放入Channel Queue失敗了,說明信號量控制出問題了,這種狀況不該該發生  
              throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
            }
          }
        }
        //以上步驟執行成功,清空事務的putList和takeList
        putList.clear();  
        takeList.clear();
      }
  
    	//更新queue大小控制的信號量bytesRemaining
		  //釋放takeByteCounter個字節容量信號量
		  bytesRemaining.release(takeByteCounter);
		  //重置字節計數器
		  takeByteCounter = 0;
		  putByteCounter = 0;

 			//釋放puts個queueStored信號量,這樣doTake方法就能夠獲取數據了
      queueStored.release(puts);  //從queueStored釋放puts個信號量

      //釋放remainingChange個queueRemaining信號量    
      if (remainingChange > 0) {
        queueRemaining.release(remainingChange);
      }
      
  		//ChannelCounter一些數據計數
      if (puts > 0) { //更新成功放入Channel中的events監控指標數據
        channelCounter.addToEventPutSuccessCount(puts);
      }
      if (takes > 0) {  //更新成功從Channel中取出的events的數量
        channelCounter.addToEventTakeSuccessCount(takes);
      }

      channelCounter.setChannelSize(queue.size());
}

此處涉及到兩個信號量:

queueStored表示Channel Queue已存儲事件容量(已存儲的事件數量),隊列取出事件時-1,放入事件成功時+N,取出失敗時-N,即Channel Queue存儲了多少事件。

  • queueStored信號量默認爲0。
  • 當doTake取出Event時減小一個queueStored信號量。
  • 當doCommit提交事務時須要增長putList 隊列大小的queueStored信號量。
  • 當doRollback回滾事務時須要減小takeList隊列大小的queueStored信號量。

queueRemaining表示Channel Queue可存儲事件容量(可存儲的事件數量),取出事件成功時+N,放入事件成功時-N。

  • queueRemaining信號量默認爲Channel Queue容量。其在提交事務時首先經過remainingChange = takeList.size() - putList.size()計算得到須要增長多少變動事件;
  • 若是小於0表示放入的事件比取出的多,表示有 remainingChange個事件放入,此時應該減小queueRemaining信號量;
  • 而若是大於0,則表示取出的事件比放入的多,表示有queueRemaining個事件取出,此時應該增長queueRemaining信號量;即消費事件時減小信號量,生產事件時增長信號量。

bytesRemaining是字節容量信號量,超出容量則回滾事務。

具體以下圖,如今總體業務已經走通:

+----------+                                                                          +-------+
|  Source  |    +---------------------------------------------------------------+     | Sink  |
+-----+----+    | [MemoryChannel]                                               |     +---+---+
      |         |   +--------------------------------------------------------+  |         ^
      |         |   | [MemoryTransaction]                                    |  |         |
      |         |   |                                                        |  |         |
      |         |   |                                                        |  |         |
      |         |   |    channelCounter                                      |  |         |
      |         |   |                                                        |  |         |
      |         |   |    putByteCounter                     takeByteCounter  |  |         |
      |         |   |                                                        |  |         |
      |         |   |    +-----------+                      +------------+   |  | doTake  |
      +----------------> |  putList  |                      |  takeList  +----------------+
      doPut     |   |    +----+------+                      +------+-----+   |  |
                |   |         |                                    ^         |  |
                |   |         |                                    |         |  |
                |   +--------------------------------------------------------+  |
                |             |                                    | poll       |
                |             |                                    |            |
                |             |                                    |            |
                |             |  doCommit    +---------+  doCommit |            |
                |             +------------> |  queue  | +---------+            |
                |                            +---------+                        |
                +---------------------------------------------------------------+

手機以下圖:

img

4.4 回滾事務

當一個事務失敗時,會進行回滾,即調用本方法。在回滾時,須要把takeList中暫存的事件回滾到Channel Queue,並回滾queueStored信號量。具體邏輯以下:

  • 獲得takeList中的Event數量 int takes = takeList.size();
  • 首先把takeList中的Event放回到MemoryChannel中的queue中;
    • 先判斷queue中可否有足夠的空間將takeList的Events放回去;
    • 從takeList的尾部依次取出Event,放入queue的頭部;
    • 而後清空putList;
  • 由於清空了putList,因此須要把putList所佔用的空間大小添加到bytesRemaining中;

具體代碼以下:

protected void doRollback() {
    	//獲取takeList的大小,而後bytesRemaining中釋放
      int takes = takeList.size();
    	//將takeList中的Event從新放回到queue隊列中。
      synchronized (queueLock)  { //操做Channel Queue時必定鎖住queueLock
      	//前置條件判斷,檢查是否有足夠容量回滾事務
        Preconditions.checkState(queue.remainingCapacity() >= takeList.size(),
            "Not enough space in memory channel " +
            "queue to rollback takes. This should never happen, please report");
    		//回滾事務的takeList隊列到Channel Queue
        while (!takeList.isEmpty()) {  //takeList不爲空,將其events所有放回queue
          //removeLast()獲取並移除此雙端隊列的最後一個元素
          queue.addFirst(takeList.removeLast());
        }
      	//最後清空putList
    	   putList.clear();
      }
      
		//清空了putList,因此須要把putList佔用的空間添加到bytesRemaining中
    //即,釋放putByteCounter個bytesRemaining信號量
    bytesRemaining.release(putByteCounter);
    //計數器重置
    putByteCounter = 0;
    takeByteCounter = 0;
    //釋放takeList隊列大小個已存儲事件容量
    queueStored.release(takes);
    channelCounter.setChannelSize(queue.size());
}

具體以下圖:

+----------+                                                                          +-------+
|  Source  |    +----------------------------------------------------------------+    | Sink  |
+-----+----+    | [MemoryChannel]                                                |    +---+---+
      |         |   +--------------------------------------------------------+   |        ^
      |         |   | [MemoryTransaction]                                    |   |        |
      |         |   |                                                        |   |        |
      |         |   |                                                        |   |        |
      |         |   |    channelCounter                                      |   |        |
      |         |   |                                                        |   |        |
      |         |   |    putByteCounter                     takeByteCounter  |   |        |
      |         |   |                                                        |   |        |
      |         |   |    +-----------+                      +------------+   |   |doTake  |
      +----------------> |  putList  |                      |  takeList  +----------------+
      doPut     |   |    +----+--+---+                      +----+---+---+   |   |
                |   |         |  ^                               |   ^       |   |
                |   |         |  |                               |   |       |   |
                |   +--------------------------------------------------------+   |
                |             |  |                               |   | poll      |
                |             |  |                               |   |           |
                |             |  |  rollback         rollback    |   |           |
                |             |  +--------------+  +-------------+   |           |
                |             |                 |  |                 |           |
                |             |                 |  v                 |           |
                |             |  doCommit    +--+--+---+  doCommit   |           |
                |             +------------> |  queue  | +-----------+           |
                |                            +---------+                         |
                +----------------------------------------------------------------+

手機上如圖:

img

0x05 動態擴容

此小節回答了以下問題:

  • 可升級的,易管理,可定製的;

MemoryChannel 中使用鎖配合信號實現動態增減容量

MemoryChannel會經過configure方法獲取配置文件系統,初始化MemoryChannel,其中對於配置信息的讀取有兩種方法,只在啓動時讀取一次或者動態的加載配置文件,動態讀取配置文件時若修改了Channel 的容量大小,則會調用 resizeQueue 方法進行調整,以下:

if (queue != null) { //queue不爲null,則爲動態修改配置文件時,從新指定了capacity
      try {
        resizeQueue(capacity);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    } else { //初始化queue,根據指定的capacity申請雙向阻塞隊列,並初始化信號量
      synchronized (queueLock) {
        queue = new LinkedBlockingDeque<Event>(capacity);
        queueRemaining = new Semaphore(capacity);
        queueStored = new Semaphore(0);
      }
    }

動態調整 Channel 容量主要分爲三種狀況:

  • 新老容量相同,則直接返回;

  • 老容量大於新容量,縮容,需先給未被佔用的空間加鎖,防止在縮容時有線程再往其寫數據,而後建立新容量的隊列,將本來隊列加入中全部的 event 添加至新隊列中;

  • 老容量小於新容量,擴容,而後建立新容量的隊列,將本來隊列加入中全部的 event 添加至新隊列中。

具體代碼以下:

private void resizeQueue(int capacity) throws InterruptedException {
    int oldCapacity;
//首先計算擴容前的Channel Queue的容量
    //計算本來的Channel Queue的容量
    synchronized (queueLock) {
      //老的容量=隊列現有餘額+在事務被處理了可是是未被提交的容量
      oldCapacity = queue.size() + queue.remainingCapacity();
    }

    //新容量和老容量相等,不須要調整返回
    if (oldCapacity == capacity) {//若是老容量大於新容量,縮容
      return;
    } else if (oldCapacity > capacity) {
      //縮容
    //首先要預佔老容量-新容量的大小,以便縮容容量
     //首先要預佔用未被佔用的容量,防止其餘線程進行操做
     //嘗試佔用即將縮減的空間,以防被他人佔用
      if (!queueRemaining.tryAcquire(oldCapacity - capacity, keepAlive, TimeUnit.SECONDS)) {

   //若是獲取失敗,默認是記錄日誌而後忽略
        LOGGER.warn("Couldn't acquire permits to downsize the queue, resizing has been aborted");
      } else {
        //直接縮容量
        //鎖定queueLock進行縮容,先建立新capacity的雙端阻塞隊列,而後複製老Queue數據。線程安全
  //不然,直接縮容,而後複製老Queue的數據,縮容時須要鎖定queueLock,由於這一系列操做要線程安全

        synchronized (queueLock) {
          LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity);
          newQueue.addAll(queue);
          queue = newQueue;
        }
      }
    } else { //擴容,加鎖,建立新newQueue,複製老queue數據
     //擴容
      synchronized (queueLock) {
        LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity);
        newQueue.addAll(queue);
        queue = newQueue;
      }
//增長/減小Channel Queue的新的容量

      //釋放capacity - oldCapacity個許可,即就是增長這麼多可用許可
      queueRemaining.release(capacity - oldCapacity);
    }
  }

0x06 丟失數據的可能

回到本文最初的錯誤信息:Space for commit to queue couldn't be acquired

這說明Flume是會出現數據相關問題的。咱們首先分析此問題。

6.1 錯誤

6.1.1 異常緣由

由於「source往putList放數據,而後提交到queue中」與「sink從channel中取數據到sink和takeList,而後再從putList取數據到queue中」這兩部分是分開來,任他們自由搶鎖,因此,當前者屢次搶到鎖,後者沒有搶到鎖,同時queue的大小又過小,撐不住屢次往裏放數據,就會致使觸發這個異常。

6.1.2 失敗處理

正常狀況下,若是遇到此問題,flume會暫停source向channel放數據,等待幾秒鐘,這期間sink應該會消費channel中的數據,當source再次開始想channel放數據時channel就有足夠的空間了。

可是若是一直出現異常,就須要啓用解決方案。

6.1.3 解決方案

解決這個問題最直接的辦法就是增大queue的大小,增大capacity和transacCapacity之間的差距,queue能撐住屢次往裏面放數據便可。

6.2 丟失數據的可能

下面咱們看看Flume使用中,丟失數據的可能。

6.2.1 事務保證

根據Flume的架構原理,採用FileChannel的Flume是不可能丟失數據的,由於其內部有完善的事務機制(ACID)。

  • Source到Channel是事務性的,
  • Channel到Sink也是事務性的,

這兩個環節都不可能丟失數據。

6.2.2 管道容量

一旦管道中全部Flume Agent的容量之和被使用完,Flume 將再也不接受來自客戶端的數據。此時,客戶端須要緩衝數據,不然數據可能會丟失。所以,配置管道可以處理最大預期的停機時間是很是重要的。

6.2.3 MemoryChannel

Channel採用MemoryChannel時候,會出現丟失。

  • MemoryChannel受內存空間的影響,若是數據產生的過快,同時獲取信號量超時容易形成數據的丟失。此時Source再也不寫入數據,形成未寫入的數據丟失;就是本文的狀況;
  • Flume進程掛掉,數據也會丟失,由於以前數據在內存中;

因此若是想要不丟失數據,須要採用File channel。

Memory Channel 是一個內存緩衝區,所以若是Java23 虛擬機(JVM)或機器從新啓動,任何緩衝區中的數據將丟失。另外一方面,File Channel是在磁盤上的。即便JVM 或機器從新啓動,File Channel 也不丟失數據,只要磁盤上存儲的數據仍然是起做用的和可訪問的。機器和Agent 一旦開始運行,任何存儲在FileChannel 中的數據將最終被訪問。

6.2.4 數據重複

在Channel發送到Sink這階段,容易出現數據重複問題。

好比:若是flush到HDFS的時候,數據flush了一半以後出問題了,這意味着已經有一半的數據已經發送到HDFS上面了,如今出了問題,一樣須要調用doRollback方法來進行回滾。

回滾並無「一半」之說,它只會把整個takeList中的數據返回給channel,而後繼續進行數據的讀寫。這樣開啓下一個事務的時候就容易形成數據重複的問題。

因此,在某種程度上,flume對數據進行採集傳輸的時候,它有可能會形成數據的重複,可是其數據不丟失

Flume 保證事件至少一次被送到它們的目的地,只有一次傾力寫數據,且不存在任何類型的故障事件只被寫一次。可是像網絡超時或部分寫入存儲系統的錯誤,可能致使事件不止被寫一次,由於Flume 將重試寫操做直到它們徹底成功。網絡超時可能表示寫操做的失敗,或者只是機器運行緩慢。若是是機器運行緩慢,當Flume 重試這將致使重複。所以,確保每一個事件都有某種形式的惟一標識符一般是一個好主意,若是須要,最終能夠用來刪除事件數據。

0xFF 參考

基於Flume的美團日誌收集系統(一)架構和設計

基於Flume的美團日誌收集系統(二)改進和優化

事件序列化器 Flume 的無數據丟失保證,Channel 和事務

flume MemoryChannel分析

Flume 1.7 源碼分析(一)源碼編譯
Flume 1.7 源碼分析(二)總體架構
Flume 1.7 源碼分析(三)程序入口
Flume 1.7 源碼分析(四)從Source寫數據到Channel
Flume 1.7 源碼分析(五)從Channel獲取數據寫入Sink

Flume - MemoryChannel源碼解析

flume到底會丟數據嗎?其可靠性如何?——輕鬆搞懂Flume事務機制

Flume會不會丟失數據?

flume MemoryChannel分析

Flume架構與源碼分析-MemoryChannel事務實現

flume「Space for commit to queue couldn't be acquired」異常產生分析

源碼趣事-flume-隊列動態擴容及容量使用

併發性標註 @GuardedBy @NotThreadSafe @ThreadSafe

秒懂,Java 註解 (Annotation)你能夠這樣學

Flume之MemoryChannel源碼解讀

Flume MemoryChannel源碼分析

搞懂分佈式技術17,18:分佈式事務總結

相關文章
相關標籤/搜索