在使用Flume時,有時遇到以下錯誤信息:Space for commit to queue couldn't be acquired。html
究其緣由,是在memory channel的使用中出現了問題。java
本文就以此爲切入點,帶你們一塊兒剖析下 Flume 中 MemoryChannel 的實現數據庫
Flume的用途:高可用的,高可靠的,分佈式的海量日誌採集、聚合和傳輸的系統。segmentfault
這裏咱們介紹與本文相關的特色:緩存
這裏就要介紹channel的概念。channel是一種短暫的存儲容器,它將從source處接收到的event格式的數據緩存起來,直到它們被sinks消費掉,它在source和sink間起着橋樑的做用,channel是一個完整的事務,這一點保證了數據在收發的時候的一致性。而且它能夠和任意數量的source和sink連接。安全
支持的類型主要有: JDBC channel , File System channel , Memory channel等,大體區別以下:網絡
本文主要涉及Memory Channel,因此看看其特性。架構
由此,咱們能夠總結出來 Flume 的一些重點功能:併發
由於MemoryChannel屬於Flume的重要模塊,因此,咱們本文就看看是MemoryChannel是如何確保Flume以上特色的,這也是本文的學習思路。app
如何回滾,使用鎖,信號量 ,動態擴容,如何解決生產者消費者不一致問題。
MemoryChannel仍是比較簡單的,主要是經過MemoryTransaction中的putList、takeList與MemoryChannel中的queue進行數據流轉和事務控制,這裏的queue至關於持久化層,只不過放到了內存中,若是是FileChannel的話,會把這個queue放到本地文件中。
MemoryChannel受內存空間的影響,若是數據產生的過快,同時獲取信號量超時容易形成數據的丟失。並且Flume進程掛掉,數據也會丟失。
具體是:
下面表示了Event在一個使用了MemoryChannel的agent中數據流向:
source ---> putList ---> queue ---> takeList ---> sink
爲了你們更好的理解,咱們提早把最終圖例發到這裏。
具體以下圖:
+----------+ +-------+ | Source | +----------------------------------------------------------------+ | Sink | +-----+----+ | [MemoryChannel] | +---+---+ | | +--------------------------------------------------------+ | ^ | | | [MemoryTransaction] | | | | | | | | | | | | | | | | | | channelCounter | | | | | | | | | | | | putByteCounter takeByteCounter | | | | | | | | | | | | +-----------+ +------------+ | |doTake | +----------------> | putList | | takeList +----------------+ doPut | | +----+--+---+ +----+---+---+ | | | | | ^ | ^ | | | | | | | | | | | +--------------------------------------------------------+ | | | | | | poll | | | | | | | | | | rollback rollback | | | | | +--------------+ +-------------+ | | | | | | | | | | | v | | | | doCommit +--+--+---+ doCommit | | | +------------> | queue | +-----------+ | | +---------+ | +----------------------------------------------------------------+
手機上如圖:
咱們要看看MemoryChannel重要變量的定義,這裏咱們沒有按照代碼順序來,而是從新整理。
MemorChannel中最重要的部分主要是Channel、Transaction 和Configurable三個接口。
Channel接口 主要聲明瞭Channel中的三個方法,就是隊列基本功能:
public void put(Event event) throws ChannelException; //從指定的Source中得到Event放入指定的Channel中 public Event take() throws ChannelException; //從Channel中取出event放入Sink中 public Transaction getTransaction(); //得到當前Channel的事務實例
Transaction接口 主要聲明瞭flume中事務機制的四個方法,就是事務功能:
enum TransactionState { Started, Committed, RolledBack, Closed } //枚舉類型,指定了事務的四種狀態,事務開始、提交、失敗回滾、關閉 void begin(); void commit(); void rollback(); void close();
Configurable接口 主要是和flume配置組件相關的,須要從flume配置系統獲取配置信息的任何組件,都必須實現該接口。該接口中只聲明瞭一個context方法,用於獲取配置信息。
大致邏輯以下:
+-----------+ +--------------+ +---------------+ | | | | | | | Channel | | Transaction | | Configurable | | | | | | | +-----------+ +--------------+ +---------------+ ^ ^ ^ | | | | | | | | | | +-------------+--------------+ | | | | | | | MemorChannel +---------+ +-------+ | | | | | | | | | | | | | | +----------------------------+
下面咱們具體講講成員變量。
首先是一系列業務配置參數。
//定義隊列中一次容許的事件總數 private static final Integer defaultCapacity = 100; //定義一個事務中容許的事件總數 private static final Integer defaultTransCapacity = 100; //將物理內存轉換成槽(slot)數,默認是100 private static final double byteCapacitySlotSize = 100; //定義隊列中事件所使用空間的最大字節數(默認是JVM最大可用內存的0.8) private static final Long defaultByteCapacity = (long)(Runtime.getRuntime().maxMemory() * .80); //定義byteCapacity和預估Event大小之間的緩衝區百分比: private static final Integer defaultByteCapacityBufferPercentage = 20; //添加或者刪除一個event的超時時間,單位秒: private static final Integer defaultKeepAlive = 3; // maximum items in a transaction queue private volatile Integer transCapacity; private volatile int keepAlive; private volatile int byteCapacity; private volatile int lastByteCapacity; private volatile int byteCapacityBufferPercentage; private ChannelCounter channelCounter;
這些參數基本都在configure(Context context)中設置,基本邏輯以下:
設置 capacity:MemroyChannel的容量,默認是100。
設置 transCapacity:每一個事務最大的容量,也就是每一個事務可以獲取的最大Event數量。默認也是100。事務容量必須小於等於Channel Queue容量。
設置 byteCapacityBufferPercentage:用來肯定byteCapacity的一個百分比參數,即咱們定義的字節容量和實際事件容量的百分比,由於咱們定義的字節容量主要考慮Event body,而忽略Event header,所以須要減去Event header部分的內存佔用,能夠認爲該參數定義了Event header佔了實際字節容量的百分比,默認20%;
設置 byteCapacity:byteCapacity等於設置的byteCapacity值或堆的80%乘以1減去byteCapacityBufferPercentage的百分比,而後除以100。具體是首先讀取配置文件定義的byteCapacity,若是沒有定義,則使用默認defaultByteCapacity,而defaultByteCapacity默認是JVM物理內存的80%(Runtime.getRuntime().maxMemory() * .80);那麼實際byteCapacity=定義的byteCapacity * (1- Event header百分比)/ byteCapacitySlotSize;byteCapacitySlotSize默認100,即計算百分比的一個係數。
設置 keep-alive:增長和刪除一個Event的超時時間(單位:秒)。
設置初始化 LinkedBlockingDeque對象,大小爲capacity。以及各類信號量對象。
最後初始化計數器。
配置代碼摘要以下:
public void configure(Context context) { capacity = context.getInteger("capacity", defaultCapacity); transCapacity = context.getInteger("transactionCapacity", defaultTransCapacity); byteCapacityBufferPercentage = context.getInteger("byteCapacityBufferPercentage", defaultByteCapacityBufferPercentage); byteCapacity = (int) ((context.getLong("byteCapacity", defaultByteCapacity).longValue() *(1 - byteCapacityBufferPercentage * .01)) / byteCapacitySlotSize); if (byteCapacity < 1) { byteCapacity = Integer.MAX_VALUE; } keepAlive = context.getInteger("keep-alive", defaultKeepAlive); resizeQueue(capacity); if (channelCounter == null) { channelCounter = new ChannelCounter(getName()); } }
ChannelCounter 須要單獨說一下。其就是把channel的一些屬性封裝了一下,初始化了一個ChannelCounter,是一個計數器,記錄如當前隊列放入Event數、取出Event數、成功數等。
private ChannelCounter channelCounter;
定義以下:
public class ChannelCounter extends MonitoredCounterGroup implements ChannelCounterMBean { private static final String COUNTER_CHANNEL_SIZE = "channel.current.size"; private static final String COUNTER_EVENT_PUT_ATTEMPT ="channel.event.put.attempt"; private static final String COUNTER_EVENT_TAKE_ATTEMPT = "channel.event.take.attempt"; private static final String COUNTER_EVENT_PUT_SUCCESS = "channel.event.put.success"; private static final String COUNTER_EVENT_TAKE_SUCCESS = "channel.event.take.success"; private static final String COUNTER_CHANNEL_CAPACITY = "channel.capacity"; }
其次是Semaphore和Queue。主要就是用來協助制事務。
MemoryChannel有三個信號量用來控制事務,防止容量越界:queueStored,queueRemaining,bytesRemaining。
private Object queueLock = new Object(); @GuardedBy(value = "queueLock") private LinkedBlockingDeque<Event> queue; private Semaphore queueRemaining; private Semaphore queueStored; private Semaphore bytesRemaining;// 表示可使用的內存大小。該大小就是計算後的byteCapacity值。
內部類MemoryTransaction
是整個事務保證最重要的類。
MemoryTransaction用來接收數據和事務控制。該類繼承BasicTransactionSemantics類。
MemoryTransaction維護了兩個隊列,一個用於Source的put,一個用於Sink的take,容量大小爲事務的容量(transCapacity)。
private class MemoryTransaction extends BasicTransactionSemantics { private LinkedBlockingDeque<Event> takeList; private LinkedBlockingDeque<Event> putList; private final ChannelCounter channelCounter; private int putByteCounter = 0; private int takeByteCounter = 0; }
不管是Sink,仍是Source都會調用getTransaction()方法,獲取當前Channel的事務實例。
接口與成員變量大體邏輯能夠理解以下,其中 Channel 的 API 表示這裏是 MemorChannel 的對外 API:
+-----------+ +--------------+ +---------------+ | | | | | | | Channel | | Transaction | | Configurable | | | | | | | +---+-------+ +--------------+ +---------------+ ^ | ^ ^ | | | | | | | +--------------------------------------------------------+ | | | | | | | | MemoryChannel | | | | | + | | | | | | | | MemoryTransaction | | | | | | | | Semaphore / Queue | | | | | | +--------+ | | API | | | | | | | Config Parameters +------------+ | | | | +--------------------------------------------------------+
看了上面講的,估計你們仍是會暈,由於成員變量和概念實在是太多了,因此咱們從使用入手分析。
前面提到,memory channel內部有三個隊列,分別是putList,queue,takeList。其中putList,takeList在MemoryTransaction之中。
channel之上有一把鎖,當source主動向channel放數據或者sink主動從channel取數據時,會搶鎖,誰取到鎖,誰就能夠操做channel。
每次使用時會首先調用tx.begin()開始事務,也就是獲取鎖。而後調用tx.commit()提交數據或者調用tx.rollback()取消操做。
這裏須要注意的是:Source, Sink 都是死循環,搶同一個鎖。因此就會有消費者,生產者速度不一致的狀況,因此就須要有 一個內部的 buffer,就是咱們的Queue。
這是一個死循環,source一直試圖獲取channel鎖,而後從kafka獲取數據,放入channel中,那每次放入多少個數據呢?在KafkaSource.java中,代碼是這樣的:
while (eventList.size() < batchUpperLimit && System.currentTimeMillis() < maxBatchEndTime) { }
含義就是:每次最多放batchUpperLimit或最多等待maxBatchEndTime的時間,就結束向channel放數據。
當獲取了足夠的數據,首先放入putList中,而後就會調用tx.commit()將putList的所有數據放入queue中。
也是一個死循環,sink一直試圖獲取channel鎖,而後從channel取一批數據,放入sink和takeList(僅僅用於回滾,在調用rollback時takeList的數據會回滾到queue中)。每次取多少個event呢?以HDFSEventSink爲例,代碼以下:
for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) { Event event = channel.take(); if (event == null) break; }
batchSize的大小默認是100,由hdfs.batchSize控制。
具體以下:
+---------------> ^ | | | while(1) | v +-----------+ | +----+----+ | Source | | take | Sink | | | | | | +-----+-----+ | +---------+ | | | +-------------+--+ | | Channel | | | | While(1) | | | | | buffer | | +----------------+ | | ^ | | | | put v ----------------^
此處回答了前面提到的兩個重點:
其實就是用事務保證整個流程的高可靠,其核心就在從source抽取數據到channel,從channel抽取到sink,當sink被消費後channel數據刪除的這三個環節。而這些環節在flume中被統一的用事務管理起來。能夠說,這是flume高可靠的關鍵一點。
具體涉及到的幾個點以下:
咱們下面具體走一下這個流程。
此事務發生在在Source到Channel之間,是從指定的Source中得到Event放入指定的Channel中,具體包括:
以下調用。
try { tx.begin(); //底層就是調用的doPut方法 // Source寫事件調用put方法 reqChannel.put(event); tx.commit(); } catch (Throwable t) { // 發生異常則回滾事務 tx.rollback(); if (t instanceof Error) { throw (Error) t; } else if (t instanceof ChannelException) { throw (ChannelException) t; } else { throw new ChannelException("Unable to put event on required " + "channel: " + reqChannel, t); } } finally { if (tx != null) { tx.close(); } }
下面分析doPut方法。
doPut邏輯以下:
具體代碼以下:
protected void doPut(Event event) throws InterruptedException { //增長放入事件計數器 channelCounter.incrementEventPutAttemptCount(); //estimateEventSize計算當前Event body大小 int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize); /* * offer若當即可行且不違反容量限制,則將指定的元素插入putList阻塞雙端隊列中(隊尾), * 並在成功時返回,若是當前沒有空間可用,則拋異常回滾事務 * */ if (!putList.offer(event)) { throw new ChannelException( "Put queue for MemoryTransaction of capacity " + putList.size() + " full, consider committing more frequently, " + "increasing capacity or increasing thread count"); } //記錄Event的byte值 putByteCounter += eventByteSize; }
具體以下圖,咱們暫時忽略commit與rollback:
+----------+ | Source | +---------------------------+ +-----+----+ | [MemoryChannel] | | | +---------------------+ | | | | [MemoryTransaction] | | | | | | | | | | | | | | | channelCounter | | | | | | | | | | putByteCounter | | | | | | | | | | +-----------+ | | +----------------> | putList | | | doPut | | +-----------+ | | | +---------------------+ | +---------------------------+
此事務發生在Channel到Sink之間,主要是從Channel中取出event放入Sink中,具體包括。
以下調用:
transaction = channel.getTransaction(); transaction.begin(); ...... event = channel.take(); ...... transaction.commit();
邏輯以下:
doTake具體代碼以下:
protected Event doTake() throws InterruptedException { channelCounter.incrementEventTakeAttemptCount();//將正在從channel中取出的event計數器原子的加一,即增長取出事件計數器 //若是takeList隊列沒有剩餘容量,即當前事務已經消費了最大容量的Event,拋異常 if (takeList.remainingCapacity() == 0) {//takeList隊列剩餘容量爲0 throw new ChannelException("Take list for MemoryTransaction, capacity " + takeList.size() + " full, consider committing more frequently, " + "increasing capacity, or increasing thread count"); } //嘗試獲取一個信號量獲取許可,若是能夠獲取到許可的話,證實queue隊列有空間,超時直接返回null if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) { return null; } Event event; synchronized (queueLock) { event = queue.poll(); //獲取並移除MemoryChannel雙端隊列表示的隊列的頭部(也就是隊列的第一個元素),隊列爲空返回null,同一時間只能有一個線程訪問,加鎖同步 } //由於信號量的保證,Channel Queue不該該返回null,出現了就不正常了 Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " + "signalling existence of entry"); takeList.put(event); //將取出的event暫存到事務的takeList隊列 //計算當前Event body大小並增長取出隊列字節數計數器 /* 計算event的byte大小 */ int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize); //更新takeByteCounter大小 takeByteCounter += eventByteSize; return event; }
因而咱們把take事務加入,咱們暫時忽略commit與rollback。具體以下圖,目前兩個事務是沒有聯繫的:
+----------+ +-------+ | Source | +---------------------------------------------------------+ | Sink | +-----+----+ | [MemoryChannel] | +---+---+ | | +--------------------------------------------------+ | ^ | | | [MemoryTransaction] | | | | | | | | | | | | | | | | | | channelCounter | | | | | | | | | | | | putByteCounter takeByteCounter | | | | | | | | | | | | +-----------+ +------------+ | | doTake | +----------------> | putList | | takeList +----------------+ doPut | | +-----------+ +------+-----+ | | | | ^ | | | | | | | | +--------------------------------------------------+ | | | | | | | | | | | +---------+ poll | | | | queue | +---------+ | | +---------+ | +---------------------------------------------------------+
commit階段主要作的事情是提交事務,此代碼繁雜在於其包括了兩個方面的操做:
commit其邏輯以下:
int remainingChange = takeList.size() - putList.size();
具體以下:
protected void doCommit() throws InterruptedException { //計算改變的Event數量,即取出數量-放入數量;若是放入的多,那麼改變的Event數量將是負數 //若是takeList更小,說明該MemoryChannel放的數據比取的數據要多,因此須要判斷該MemoryChannel是否有空間來放 int remainingChange = takeList.size() - putList.size(); //takeList.size()能夠當作source,putList.size()當作sink //若是remainingChange小於0,則須要獲取Channel Queue剩餘容量的信號量 if (remainingChange < 0) { //sink的消費速度慢於source的產生速度 //利用bytesRemaining信號量判斷是否有足夠空間接收putList中的events所佔的空間 //putByteCounter是須要推到channel中的數據大小,bytesRemainingchannel是容量剩餘 //獲取putByteCounter個字節容量信號量,若是失敗說明超過字節容量限制了,回滾事務 if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) { //channel 數據大小容量不足,事物不能提交 throw new ChannelException("Cannot commit transaction. Byte capacity " + "allocated to store event body " + byteCapacity * byteCapacitySlotSize + "reached. Please increase heap space/byte capacity allocated to " + "the channel as the sinks may not be keeping up with the sources"); } //獲取Channel Queue的-remainingChange個信號量用於放入-remainingChange個Event,若是獲取不到,則釋放putByteCounter個字節容量信號量,並拋出異常回滾事務 //由於source速度快於sink速度,需判斷queue是否還有空間接收event if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) { //remainingChange若是是負數的話,說明source的生產速度,大於sink的消費速度,且這個速度大於channel所能承載的值 bytesRemaining.release(putByteCounter); throw new ChannelFullException("Space for commit to queue couldn't be acquired." + " Sinks are likely not keeping up with sources, or the buffer size is too tight"); } } int puts = putList.size(); //事務期間生產的event int takes = takeList.size(); //事務期間等待消費的event //若是上述兩個信號量都有空間的話,那麼把putList中的Event放到該MemoryChannel中的queue中。 //鎖住隊列開始,進行數據的流轉 synchronized (queueLock) {//操做Channel Queue時必定要鎖定queueLock if (puts > 0) { while (!putList.isEmpty()) { //若是有Event,則循環放入Channel Queue if (!queue.offer(putList.removeFirst())) { //若是放入Channel Queue失敗了,說明信號量控制出問題了,這種狀況不該該發生 throw new RuntimeException("Queue add failed, this shouldn't be able to happen"); } } } //以上步驟執行成功,清空事務的putList和takeList putList.clear(); takeList.clear(); } //更新queue大小控制的信號量bytesRemaining //釋放takeByteCounter個字節容量信號量 bytesRemaining.release(takeByteCounter); //重置字節計數器 takeByteCounter = 0; putByteCounter = 0; //釋放puts個queueStored信號量,這樣doTake方法就能夠獲取數據了 queueStored.release(puts); //從queueStored釋放puts個信號量 //釋放remainingChange個queueRemaining信號量 if (remainingChange > 0) { queueRemaining.release(remainingChange); } //ChannelCounter一些數據計數 if (puts > 0) { //更新成功放入Channel中的events監控指標數據 channelCounter.addToEventPutSuccessCount(puts); } if (takes > 0) { //更新成功從Channel中取出的events的數量 channelCounter.addToEventTakeSuccessCount(takes); } channelCounter.setChannelSize(queue.size()); }
此處涉及到兩個信號量:
queueStored表示Channel Queue已存儲事件容量(已存儲的事件數量),隊列取出事件時-1,放入事件成功時+N,取出失敗時-N,即Channel Queue存儲了多少事件。
queueRemaining表示Channel Queue可存儲事件容量(可存儲的事件數量),取出事件成功時+N,放入事件成功時-N。
而bytesRemaining是字節容量信號量,超出容量則回滾事務。
具體以下圖,如今總體業務已經走通:
+----------+ +-------+ | Source | +---------------------------------------------------------------+ | Sink | +-----+----+ | [MemoryChannel] | +---+---+ | | +--------------------------------------------------------+ | ^ | | | [MemoryTransaction] | | | | | | | | | | | | | | | | | | channelCounter | | | | | | | | | | | | putByteCounter takeByteCounter | | | | | | | | | | | | +-----------+ +------------+ | | doTake | +----------------> | putList | | takeList +----------------+ doPut | | +----+------+ +------+-----+ | | | | | ^ | | | | | | | | | +--------------------------------------------------------+ | | | | poll | | | | | | | | | | | doCommit +---------+ doCommit | | | +------------> | queue | +---------+ | | +---------+ | +---------------------------------------------------------------+
手機以下圖:
當一個事務失敗時,會進行回滾,即調用本方法。在回滾時,須要把takeList中暫存的事件回滾到Channel Queue,並回滾queueStored信號量。具體邏輯以下:
具體代碼以下:
protected void doRollback() { //獲取takeList的大小,而後bytesRemaining中釋放 int takes = takeList.size(); //將takeList中的Event從新放回到queue隊列中。 synchronized (queueLock) { //操做Channel Queue時必定鎖住queueLock //前置條件判斷,檢查是否有足夠容量回滾事務 Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " + "queue to rollback takes. This should never happen, please report"); //回滾事務的takeList隊列到Channel Queue while (!takeList.isEmpty()) { //takeList不爲空,將其events所有放回queue //removeLast()獲取並移除此雙端隊列的最後一個元素 queue.addFirst(takeList.removeLast()); } //最後清空putList putList.clear(); } //清空了putList,因此須要把putList佔用的空間添加到bytesRemaining中 //即,釋放putByteCounter個bytesRemaining信號量 bytesRemaining.release(putByteCounter); //計數器重置 putByteCounter = 0; takeByteCounter = 0; //釋放takeList隊列大小個已存儲事件容量 queueStored.release(takes); channelCounter.setChannelSize(queue.size()); }
具體以下圖:
+----------+ +-------+ | Source | +----------------------------------------------------------------+ | Sink | +-----+----+ | [MemoryChannel] | +---+---+ | | +--------------------------------------------------------+ | ^ | | | [MemoryTransaction] | | | | | | | | | | | | | | | | | | channelCounter | | | | | | | | | | | | putByteCounter takeByteCounter | | | | | | | | | | | | +-----------+ +------------+ | |doTake | +----------------> | putList | | takeList +----------------+ doPut | | +----+--+---+ +----+---+---+ | | | | | ^ | ^ | | | | | | | | | | | +--------------------------------------------------------+ | | | | | | poll | | | | | | | | | | rollback rollback | | | | | +--------------+ +-------------+ | | | | | | | | | | | v | | | | doCommit +--+--+---+ doCommit | | | +------------> | queue | +-----------+ | | +---------+ | +----------------------------------------------------------------+
手機上如圖:
此小節回答了以下問題:
MemoryChannel 中使用鎖配合信號實現動態增減容量。
MemoryChannel會經過configure方法獲取配置文件系統,初始化MemoryChannel,其中對於配置信息的讀取有兩種方法,只在啓動時讀取一次或者動態的加載配置文件,動態讀取配置文件時若修改了Channel 的容量大小,則會調用 resizeQueue 方法進行調整,以下:
if (queue != null) { //queue不爲null,則爲動態修改配置文件時,從新指定了capacity try { resizeQueue(capacity); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } else { //初始化queue,根據指定的capacity申請雙向阻塞隊列,並初始化信號量 synchronized (queueLock) { queue = new LinkedBlockingDeque<Event>(capacity); queueRemaining = new Semaphore(capacity); queueStored = new Semaphore(0); } }
動態調整 Channel 容量主要分爲三種狀況:
新老容量相同,則直接返回;
老容量大於新容量,縮容,需先給未被佔用的空間加鎖,防止在縮容時有線程再往其寫數據,而後建立新容量的隊列,將本來隊列加入中全部的 event 添加至新隊列中;
老容量小於新容量,擴容,而後建立新容量的隊列,將本來隊列加入中全部的 event 添加至新隊列中。
具體代碼以下:
private void resizeQueue(int capacity) throws InterruptedException { int oldCapacity; //首先計算擴容前的Channel Queue的容量 //計算本來的Channel Queue的容量 synchronized (queueLock) { //老的容量=隊列現有餘額+在事務被處理了可是是未被提交的容量 oldCapacity = queue.size() + queue.remainingCapacity(); } //新容量和老容量相等,不須要調整返回 if (oldCapacity == capacity) {//若是老容量大於新容量,縮容 return; } else if (oldCapacity > capacity) { //縮容 //首先要預佔老容量-新容量的大小,以便縮容容量 //首先要預佔用未被佔用的容量,防止其餘線程進行操做 //嘗試佔用即將縮減的空間,以防被他人佔用 if (!queueRemaining.tryAcquire(oldCapacity - capacity, keepAlive, TimeUnit.SECONDS)) { //若是獲取失敗,默認是記錄日誌而後忽略 LOGGER.warn("Couldn't acquire permits to downsize the queue, resizing has been aborted"); } else { //直接縮容量 //鎖定queueLock進行縮容,先建立新capacity的雙端阻塞隊列,而後複製老Queue數據。線程安全 //不然,直接縮容,而後複製老Queue的數據,縮容時須要鎖定queueLock,由於這一系列操做要線程安全 synchronized (queueLock) { LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity); newQueue.addAll(queue); queue = newQueue; } } } else { //擴容,加鎖,建立新newQueue,複製老queue數據 //擴容 synchronized (queueLock) { LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity); newQueue.addAll(queue); queue = newQueue; } //增長/減小Channel Queue的新的容量 //釋放capacity - oldCapacity個許可,即就是增長這麼多可用許可 queueRemaining.release(capacity - oldCapacity); } }
回到本文最初的錯誤信息:Space for commit to queue couldn't be acquired。
這說明Flume是會出現數據相關問題的。咱們首先分析此問題。
由於「source往putList放數據,而後提交到queue中」與「sink從channel中取數據到sink和takeList,而後再從putList取數據到queue中」這兩部分是分開來,任他們自由搶鎖,因此,當前者屢次搶到鎖,後者沒有搶到鎖,同時queue的大小又過小,撐不住屢次往裏放數據,就會致使觸發這個異常。
正常狀況下,若是遇到此問題,flume會暫停source向channel放數據,等待幾秒鐘,這期間sink應該會消費channel中的數據,當source再次開始想channel放數據時channel就有足夠的空間了。
可是若是一直出現異常,就須要啓用解決方案。
解決這個問題最直接的辦法就是增大queue的大小,增大capacity和transacCapacity之間的差距,queue能撐住屢次往裏面放數據便可。
下面咱們看看Flume使用中,丟失數據的可能。
根據Flume的架構原理,採用FileChannel的Flume是不可能丟失數據的,由於其內部有完善的事務機制(ACID)。
這兩個環節都不可能丟失數據。
一旦管道中全部Flume Agent的容量之和被使用完,Flume 將再也不接受來自客戶端的數據。此時,客戶端須要緩衝數據,不然數據可能會丟失。所以,配置管道可以處理最大預期的停機時間是很是重要的。
Channel採用MemoryChannel時候,會出現丟失。
因此若是想要不丟失數據,須要採用File channel。
Memory Channel 是一個內存緩衝區,所以若是Java23 虛擬機(JVM)或機器從新啓動,任何緩衝區中的數據將丟失。另外一方面,File Channel是在磁盤上的。即便JVM 或機器從新啓動,File Channel 也不丟失數據,只要磁盤上存儲的數據仍然是起做用的和可訪問的。機器和Agent 一旦開始運行,任何存儲在FileChannel 中的數據將最終被訪問。
在Channel發送到Sink這階段,容易出現數據重複問題。
好比:若是flush到HDFS的時候,數據flush了一半以後出問題了,這意味着已經有一半的數據已經發送到HDFS上面了,如今出了問題,一樣須要調用doRollback方法來進行回滾。
回滾並無「一半」之說,它只會把整個takeList中的數據返回給channel,而後繼續進行數據的讀寫。這樣開啓下一個事務的時候就容易形成數據重複的問題。
因此,在某種程度上,flume對數據進行採集傳輸的時候,它有可能會形成數據的重複,可是其數據不丟失。
Flume 保證事件至少一次被送到它們的目的地,只有一次傾力寫數據,且不存在任何類型的故障事件只被寫一次。可是像網絡超時或部分寫入存儲系統的錯誤,可能致使事件不止被寫一次,由於Flume 將重試寫操做直到它們徹底成功。網絡超時可能表示寫操做的失敗,或者只是機器運行緩慢。若是是機器運行緩慢,當Flume 重試這將致使重複。所以,確保每一個事件都有某種形式的惟一標識符一般是一個好主意,若是須要,最終能夠用來刪除事件數據。
事件序列化器 Flume 的無數據丟失保證,Channel 和事務
Flume 1.7 源碼分析(一)源碼編譯
Flume 1.7 源碼分析(二)總體架構
Flume 1.7 源碼分析(三)程序入口
Flume 1.7 源碼分析(四)從Source寫數據到Channel
Flume 1.7 源碼分析(五)從Channel獲取數據寫入Sink
flume到底會丟數據嗎?其可靠性如何?——輕鬆搞懂Flume事務機制
Flume架構與源碼分析-MemoryChannel事務實現
flume「Space for commit to queue couldn't be acquired」異常產生分析