Flume MemoryChannel源碼分析

Flume做爲Hadoop生態系統中的一員,能夠說是功能最大的數據收集系統,Flume的模型也比較簡單,經過agent不斷級連,來打通數據源與最終目的地(通常爲HDFS)。下圖結構說明了Flume中的數據流。
Flume架構html

我今天要說的是Channel部分,具體來講是MemoryChannel的分析,其餘概念像source、sink你們能夠去官方文檔查看。java

注意:git

本文章中的Flume源碼爲1.6.0版本。

Event

Event是Flume中對數據的抽象,分爲兩部分:header與body,和http中的header與body很相似。github

Flume中按Event爲單位操做數據,不一樣的source、sink在必要時會自動在原始數據與Event之間作轉化。數據庫

Channel

Channel充當了Source與Sink之間的緩衝區。Channel的引入,使得source與sink之間的藕合度下降,source只管像Channel發數據,sink只需從Channel取數據。
此外,有了Channel,不可貴出下面結論:apache

  • source與sink能夠爲N對N的關係api

  • source發數據的速度能夠大於sink取數據的速度(在Channel不滿的狀況下)架構

Transaction

Channel採用了Transaction(事務)機制來保證數據的完整性,這裏的事務和數據庫中的事務概念相似,但並非徹底一致,其語義能夠參考下面這個圖:
channel_transactionoracle

source端經過commit操做像Channel放置數據,sink端經過commit操做從Channel取數據。

那麼事務是如何保證數據的完整性的呢?看下面有兩個agent的狀況:
圖片描述app

數據流程:

  1. source 1產生Event,經過「put」、「commit」操做將Event放到Channel 1

  2. sink 1經過「take」操做從Channel 1中取出Event,並把它發送到Source 2

  3. source 2經過「put」、「commit」操做將Event放到Channel 2

  4. source 2sink 1發送成功信號,sink 1「commit」步驟2中的「take」操做(其實就是刪除Channel 1中的Event)

說明:

在任什麼時候刻,Event至少在一個Channel中是完整有效的

Memory Channe

Flume中提供的Channel實現主要有三個:

  • Memory Channel,event保存在Java Heap中。若是容許數據小量丟失,推薦使用

  • File Channel,event保存在本地文件中,可靠性高,但吞吐量低於Memory Channel

  • JDBC Channel,event保存在關係數據中,通常不推薦使用

不一樣的Channel主要在於Event存放的位置不一樣,今天我着重講一下比較簡單的Memory Channel的源碼。

首先看一下MemoryChannel中比較重要的成員變量:

// lock to guard queue, mainly needed to keep it locked down during resizes
// it should never be held through a blocking operation
private Object queueLock = new Object();

//queue爲Memory Channel中存放Event的地方,這裏用了LinkedBlockingDeque來實現
@GuardedBy(value = "queueLock")
private LinkedBlockingDeque<Event> queue;

//下面的兩個信號量用來作同步操做,queueRemaining表示queue中的剩餘空間,queueStored表示queue中的使用空間

// invariant that tracks the amount of space remaining in the queue(with all uncommitted takeLists deducted)
// we maintain the remaining permits = queue.remaining - takeList.size()
// this allows local threads waiting for space in the queue to commit without denying access to the
// shared lock to threads that would make more space on the queue
private Semaphore queueRemaining;
// used to make "reservations" to grab data from the queue.
// by using this we can block for a while to get data without locking all other threads out
// like we would if we tried to use a blocking call on queue
private Semaphore queueStored;

//下面幾個變量爲配置文件中Memory Channel的配置項
// 一個事務中Event的最大數目
private volatile Integer transCapacity;
// 向queue中添加、移除Event的等待時間
private volatile int keepAlive;
// queue中,全部Event所能佔用的最大空間
private volatile int byteCapacity;
private volatile int lastByteCapacity;
// queue中,全部Event的header所能佔用的最大空間佔byteCapacity的比例
private volatile int byteCapacityBufferPercentage;
// 用於標示byteCapacity中剩餘空間的信號量
private Semaphore bytesRemaining;
// 用於記錄Memory Channel的一些指標,後面能夠經過配置監控來觀察Flume的運行狀況
private ChannelCounter channelCounter;

而後重點說下MemoryChannel裏面的MemoryTransaction,它是Transaction類的子類,從其文檔來看,一個Transaction的使用模式都是相似的:

Channel ch = ...
 Transaction tx = ch.getTransaction();
 try {
   tx.begin();
   ...
   // ch.put(event) or ch.take()
   ...
   tx.commit();
 } catch (ChannelException ex) {
   tx.rollback();
   ...
 } finally {
   tx.close();
 }

能夠看到一個Transaction主要有、puttakecommitrollback這四個方法,咱們在實現其子類時,主要也是實現着四個方法。

Flume官方爲了方便開發者實現本身的Transaction,定義了BasicTransactionSemantics,這時開發者只須要繼承這個輔助類,而且實現其相應的、doPutdoTakedoCommitdoRollback方法便可,MemoryChannel就是繼承了這個輔助類。

private class MemoryTransaction extends BasicTransactionSemantics {
    //和MemoryChannel同樣,內部使用LinkedBlockingDeque來保存沒有commit的Event
    private LinkedBlockingDeque<Event> takeList;
    private LinkedBlockingDeque<Event> putList;
    private final ChannelCounter channelCounter;
    //下面兩個變量用來表示put的Event的大小、take的Event的大小
    private int putByteCounter = 0;
    private int takeByteCounter = 0;

    public MemoryTransaction(int transCapacity, ChannelCounter counter) {
      //用transCapacity來初始化put、take的隊列
      putList = new LinkedBlockingDeque<Event>(transCapacity);
      takeList = new LinkedBlockingDeque<Event>(transCapacity);

      channelCounter = counter;
    }

    @Override
    protected void doPut(Event event) throws InterruptedException {
      //doPut操做,先判斷putList中是否還有剩餘空間,有則把Event插入到該隊列中,同時更新putByteCounter
      //沒有剩餘空間的話,直接報ChannelException
      channelCounter.incrementEventPutAttemptCount();
      int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);

      if (!putList.offer(event)) {
        throw new ChannelException(
          "Put queue for MemoryTransaction of capacity " +
            putList.size() + " full, consider committing more frequently, " +
            "increasing capacity or increasing thread count");
      }
      putByteCounter += eventByteSize;
    }

    @Override
    protected Event doTake() throws InterruptedException {
      //doTake操做,首先判斷takeList中是否還有剩餘空間
      channelCounter.incrementEventTakeAttemptCount();
      if(takeList.remainingCapacity() == 0) {
        throw new ChannelException("Take list for MemoryTransaction, capacity " +
            takeList.size() + " full, consider committing more frequently, " +
            "increasing capacity, or increasing thread count");
      }
      //而後判斷,該MemoryChannel中的queue中是否還有空間,這裏經過信號量來判斷
      if(!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
        return null;
      }
      Event event;
      //從MemoryChannel中的queue中取出一個event
      synchronized(queueLock) {
        event = queue.poll();
      }
      Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
          "signalling existence of entry");
      //放到takeList中,而後更新takeByteCounter變量
      takeList.put(event);

      int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
      takeByteCounter += eventByteSize;

      return event;
    }

    @Override
    protected void doCommit() throws InterruptedException {
      //該對應一個事務的提交
      //首先判斷putList與takeList的相對大小
      int remainingChange = takeList.size() - putList.size();
      //若是takeList小,說明向該MemoryChannel放的數據比取的數據要多,因此須要判斷該MemoryChannel是否有空間來放
      if(remainingChange < 0) {
        // 1. 首先經過信號量來判斷是否還有剩餘空間
        if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive,
          TimeUnit.SECONDS)) {
          throw new ChannelException("Cannot commit transaction. Byte capacity " +
            "allocated to store event body " + byteCapacity * byteCapacitySlotSize +
            "reached. Please increase heap space/byte capacity allocated to " +
            "the channel as the sinks may not be keeping up with the sources");
        }
        // 2. 而後判斷,在給定的keepAlive時間內,可否獲取到充足的queue空間
        if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
          bytesRemaining.release(putByteCounter);
          throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
              " Sinks are likely not keeping up with sources, or the buffer size is too tight");
        }
      }
      int puts = putList.size();
      int takes = takeList.size();
      //若是上面的兩個判斷都過了,那麼把putList中的Event放到該MemoryChannel中的queue中。
      synchronized(queueLock) {
        if(puts > 0 ) {
          while(!putList.isEmpty()) {
            if(!queue.offer(putList.removeFirst())) {
              throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
            }
          }
        }
        //清空本次事務中用到的putList與takeList,釋放資源
        putList.clear();
        takeList.clear();
      }
      //更新控制queue大小的信號量bytesRemaining,由於把takeList清空了,因此直接把takeByteCounter加到bytesRemaining中。
      bytesRemaining.release(takeByteCounter);
      takeByteCounter = 0;
      putByteCounter = 0;
      //由於把putList中的Event放到了MemoryChannel中的queue,因此把puts加到queueStored中去。
      queueStored.release(puts);
      //若是takeList比putList大,說明該MemoryChannel中queue的數量應該是減小了,因此把(takeList-putList)的差值加到信號量queueRemaining
      if(remainingChange > 0) {
        queueRemaining.release(remainingChange);
      }
      if (puts > 0) {
        channelCounter.addToEventPutSuccessCount(puts);
      }
      if (takes > 0) {
        channelCounter.addToEventTakeSuccessCount(takes);
      }

      channelCounter.setChannelSize(queue.size());
    }

    @Override
    protected void doRollback() {
      //當一個事務失敗時,會進行回滾,即調用本方法
      //首先把takeList中的Event放回到MemoryChannel中的queue中。
      int takes = takeList.size();
      synchronized(queueLock) {
        Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " +
            "queue to rollback takes. This should never happen, please report");
        while(!takeList.isEmpty()) {
          queue.addFirst(takeList.removeLast());
        }
        //而後清空putList
        putList.clear();
      }
      //由於清空了putList,因此須要把putList所佔用的空間大小添加到bytesRemaining中
      bytesRemaining.release(putByteCounter);
      putByteCounter = 0;
      takeByteCounter = 0;
      //由於把takeList中的Event回退到queue中去了,因此須要把takeList的大小添加到queueStored中
      queueStored.release(takes);
      channelCounter.setChannelSize(queue.size());
    }

  }

MemoryChannel的邏輯相對簡單,主要是經過MemoryTransaction中的putListtakeList與MemoryChannel中的queue打交道,這裏的queue至關於持久化層,只不過放到了內存中,若是是FileChannel的話,會把這個queue放到本地文件中。下面表示了Event在一個使用了MemoryChannel的agent中數據流向:

source ---> putList ---> queue ---> takeList ---> sink

還須要注意的一點是,這裏的事務能夠嵌套使用,以下圖:
transaction

當有兩個agent級連時,sink的事務中包含了一個source的事務,這也應證了前面所說的:

在任什麼時候刻,Event至少在一個Channel中是完整有效的

參考

相關文章
相關標籤/搜索