生產者-消費者
模型用於解耦生產者與消費者,平衡二者之間的能力不平衡,該模型普遍應用於各個系統中,Hudi也使用了該模型控制對記錄的處理,即記錄會被生產者生產至隊列中,而後由消費者從隊列中消費,更具體一點,對於更新操做,生產者會將文件中老的記錄放入隊列中等待消費者消費,消費後交由HoodieMergeHandle
處理;對於插入操做,生產者會將新記錄放入隊列中等待消費者消費,消費後交由HandleCreateHandle
處理。java
前面的文章中提到過不管是HoodieCopyOnWriteTable#handleUpdate
處理更新時直接生成了一個SparkBoundedInMemoryExecutor
對象,仍是HoodieCopyOnWriteTable#handleInsert
處理插入時生成了一個CopyOnWriteLazyInsertIterable
對象,再迭代時調用該對象的CopyOnWriteLazyInsertIterable#computeNext
方法生成SparkBoundedInMemoryExecutor
對象。最後二者均會調用SparkBoundedInMemoryExecutor#execute
開始記錄的處理,該方法核心代碼以下緩存
public E execute() { try { ExecutorCompletionService<Boolean> producerService = startProducers(); Future<E> future = startConsumer(); // Wait for consumer to be done return future.get(); } catch (Exception e) { throw new HoodieException(e); } }
該方法會啓動全部生產者和單個消費者進行處理。併發
Hudi定義了BoundedInMemoryQueueProducer
接口表示生產者,其子類實現以下app
Function
來生產記錄,在合併日誌log文件和數據parquet文件時使用,以便提供RealTimeView
。定義了BoundedInMemoryQueueConsumer
類表示消費者,其主要子類實現以下ui
CopyOnWrite
表類型時的插入。
MergeOnRead
表類型時的插入,其爲CopyOnWriteInsertHandler
的子類。this
CopyOnWrite
表類型時的更新。整個生產消費相關的類繼承結構很是清晰。線程
對於生產者的啓動,startProducers
方法核心代碼以下日誌
public ExecutorCompletionService<Boolean> startProducers() { // Latch to control when and which producer thread will close the queue final CountDownLatch latch = new CountDownLatch(producers.size()); final ExecutorCompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(executorService); producers.stream().map(producer -> { return completionService.submit(() -> { try { preExecute(); producer.produce(queue); } catch (Exception e) { logger.error("error producing records", e); queue.markAsFailed(e); throw e; } finally { synchronized (latch) { latch.countDown(); if (latch.getCount() == 0) { // Mark production as done so that consumer will be able to exit queue.close(); } } } return true; }); }).collect(Collectors.toList()); return completionService; }
該方法使用CountDownLatch
來協調生產者線程與消費者線程的退出動做,而後調用produce
方法開始生產,對於插入更新時的IteratorBasedQueueProducer
而言,其核心代碼以下code
public void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception { ... while (inputIterator.hasNext()) { queue.insertRecord(inputIterator.next()); } ... }
能夠看到只要迭代器還有記錄(可能爲插入時的新記錄或者更新時的舊記錄),就會往隊列中不斷寫入。orm
對於消費者的啓動,startConsumer
方法的核心代碼以下
private Future<E> startConsumer() { return consumer.map(consumer -> { return executorService.submit(() -> { ... preExecute(); try { E result = consumer.consume(queue); return result; } catch (Exception e) { queue.markAsFailed(e); throw e; } }); }).orElse(CompletableFuture.completedFuture(null)); }
消費時會先進行執行前的準備,而後開始消費,其中consume
方法的核心代碼以下
public O consume(BoundedInMemoryQueue<?, I> queue) throws Exception { Iterator<I> iterator = queue.iterator(); while (iterator.hasNext()) { consumeOneRecord(iterator.next()); } // Notifies done finish(); return getResult(); }
能夠看到只要隊列中還有記錄,就能夠獲取該記錄,而後調用不一樣BoundedInMemoryQueueConsumer
子類的consumeOneRecord
進行更新插入處理。
值得一提的是Hudi對隊列進行了流控,生產者不能無限制地將記錄寫入隊列中,隊列緩存的大小由用戶配置,隊列能放入記錄的條數由採樣的記錄大小和隊列緩存大小控制。
在生產時,會調用BoundedInMemoryQueue#insertRecord
將記錄寫入隊列,其核心代碼以下
public void insertRecord(I t) throws Exception { ... rateLimiter.acquire(); // We are retrieving insert value in the record queueing thread to offload computation // around schema validation // and record creation to it. final O payload = transformFunction.apply(t); adjustBufferSizeIfNeeded(payload); queue.put(Option.of(payload)); }
首先獲取一個許可(Semaphore
),未成功獲取會被阻塞直至成功獲取,而後獲取記錄的負載以便調整隊列,而後放入內部隊列(LinkedBlockingQueue
)中,其中adjustBufferSizeIfNeeded
方法的核心代碼以下
private void adjustBufferSizeIfNeeded(final O payload) throws InterruptedException { if (this.samplingRecordCounter.incrementAndGet() % RECORD_SAMPLING_RATE != 0) { return; } final long recordSizeInBytes = payloadSizeEstimator.sizeEstimate(payload); final long newAvgRecordSizeInBytes = Math.max(1, (avgRecordSizeInBytes * numSamples + recordSizeInBytes) / (numSamples + 1)); final int newRateLimit = (int) Math.min(RECORD_CACHING_LIMIT, Math.max(1, this.memoryLimit / newAvgRecordSizeInBytes)); // If there is any change in number of records to cache then we will either release (if it increased) or acquire // (if it decreased) to adjust rate limiting to newly computed value. if (newRateLimit > currentRateLimit) { rateLimiter.release(newRateLimit - currentRateLimit); } else if (newRateLimit < currentRateLimit) { rateLimiter.acquire(currentRateLimit - newRateLimit); } currentRateLimit = newRateLimit; avgRecordSizeInBytes = newAvgRecordSizeInBytes; numSamples++; }
首先看是否已經達到採樣頻率,而後計算新的記錄平均大小和限流速率,若是新的限流速率大於當前速率,則可釋放一些許可(供阻塞的生產者獲取後繼續生產),不然須要獲取(回收)一些許可(許可變少後生產速率天然就下降了)。該操做可根據採樣的記錄大小動態調節速率,不至於在記錄負載太大和記錄負載過小時,放入同等個數,從而起到動態調節做用。
在消費時,會調用BoundedInMemoryQueue#readNextRecord
讀取記錄,其核心代碼以下
private Option<O> readNextRecord() { ... rateLimiter.release(); Option<O> newRecord = Option.empty(); while (expectMoreRecords()) { try { throwExceptionIfFailed(); newRecord = queue.poll(RECORD_POLL_INTERVAL_SEC, TimeUnit.SECONDS); if (newRecord != null) { break; } } catch (InterruptedException e) { throw new HoodieException(e); } } ... if (newRecord != null && newRecord.isPresent()) { return newRecord; } else { // We are done reading all the records from internal iterator. this.isReadDone.set(true); return Option.empty(); } }
能夠看到首先會釋放一個許可,而後判斷是否還能夠讀取記錄(還在生產或者中止生產但隊列不爲空均可讀取),而後從內部隊列獲取記錄或返回。
上述即是生產者-消費者
在Hudi中應用的分析。
Hudi採用了生產者-消費者
模型來控制記錄的處理,與傳統多生產者-多消費者
模型不一樣的是,Hudi如今只支持多生產者-單消費者
模型,單消費者意味着Hudi暫時不支持文件的併發寫入。而對於生產消費的隊列的實現,Hudi並未僅僅只是基於LinkedBlockingQueue
,而是採用了更精細化的速率控制,保證速率會隨着記錄負載大小的變化和配置的隊列緩存大小而動態變化,這也下降了系統發生OOM的機率。