生產者-消費者模型在Hudi中的應用

介紹

生產者-消費者模型用於解耦生產者與消費者,平衡二者之間的能力不平衡,該模型普遍應用於各個系統中,Hudi也使用了該模型控制對記錄的處理,即記錄會被生產者生產至隊列中,而後由消費者從隊列中消費,更具體一點,對於更新操做,生產者會將文件中老的記錄放入隊列中等待消費者消費,消費後交由HoodieMergeHandle處理;對於插入操做,生產者會將新記錄放入隊列中等待消費者消費,消費後交由HandleCreateHandle處理。java

入口

前面的文章中提到過不管是HoodieCopyOnWriteTable#handleUpdate處理更新時直接生成了一個SparkBoundedInMemoryExecutor對象,仍是HoodieCopyOnWriteTable#handleInsert處理插入時生成了一個CopyOnWriteLazyInsertIterable對象,再迭代時調用該對象的CopyOnWriteLazyInsertIterable#computeNext方法生成SparkBoundedInMemoryExecutor對象。最後二者均會調用SparkBoundedInMemoryExecutor#execute開始記錄的處理,該方法核心代碼以下緩存

public E execute() {
    try {
      ExecutorCompletionService<Boolean> producerService = startProducers();
      Future<E> future = startConsumer();
      // Wait for consumer to be done
      return future.get();
    } catch (Exception e) {
      throw new HoodieException(e);
    }
  }

該方法會啓動全部生產者和單個消費者進行處理。併發

Hudi定義了BoundedInMemoryQueueProducer接口表示生產者,其子類實現以下app

  • FunctionBasedQueueProducer,基於Function來生產記錄,在合併日誌log文件和數據parquet文件時使用,以便提供RealTimeView
  • IteratorBasedQueueProducer,基於迭代器來生產記錄,在插入更新時使用。

定義了BoundedInMemoryQueueConsumer類表示消費者,其主要子類實現以下ui

  • CopyOnWriteLazyInsertIterable$CopyOnWriteInsertHandler,主要處理CopyOnWrite表類型時的插入。
    • MergeOnReadLazyInsertIterable$MergeOnReadInsertHandler,主要處理MergeOnRead

表類型時的插入,其爲CopyOnWriteInsertHandler的子類。this

  • CopyOnWriteLazyInsertIterable$UpdateHandler,主要處理CopyOnWrite表類型時的更新。

整個生產消費相關的類繼承結構很是清晰。線程

對於生產者的啓動,startProducers方法核心代碼以下日誌

public ExecutorCompletionService<Boolean> startProducers() {
    // Latch to control when and which producer thread will close the queue
    final CountDownLatch latch = new CountDownLatch(producers.size());
    final ExecutorCompletionService<Boolean> completionService =
        new ExecutorCompletionService<Boolean>(executorService);
    producers.stream().map(producer -> {
      return completionService.submit(() -> {
        try {
          preExecute();
          producer.produce(queue);
        } catch (Exception e) {
          logger.error("error producing records", e);
          queue.markAsFailed(e);
          throw e;
        } finally {
          synchronized (latch) {
            latch.countDown();
            if (latch.getCount() == 0) {
              // Mark production as done so that consumer will be able to exit
              queue.close();
            }
          }
        }
        return true;
      });
    }).collect(Collectors.toList());
    return completionService;
  }

該方法使用CountDownLatch來協調生產者線程與消費者線程的退出動做,而後調用produce方法開始生產,對於插入更新時的IteratorBasedQueueProducer而言,其核心代碼以下code

public void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception {
    ...
    while (inputIterator.hasNext()) {
      queue.insertRecord(inputIterator.next());
    }
    ...
  }

能夠看到只要迭代器還有記錄(可能爲插入時的新記錄或者更新時的舊記錄),就會往隊列中不斷寫入。orm

對於消費者的啓動,startConsumer方法的核心代碼以下

private Future<E> startConsumer() {
    return consumer.map(consumer -> {
      return executorService.submit(() -> {
        ...
        preExecute();
        try {
          E result = consumer.consume(queue);
          return result;
        } catch (Exception e) {
          queue.markAsFailed(e);
          throw e;
        }
      });
    }).orElse(CompletableFuture.completedFuture(null));
  }

消費時會先進行執行前的準備,而後開始消費,其中consume方法的核心代碼以下

public O consume(BoundedInMemoryQueue<?, I> queue) throws Exception {
    Iterator<I> iterator = queue.iterator();

    while (iterator.hasNext()) {
      consumeOneRecord(iterator.next());
    }

    // Notifies done
    finish();

    return getResult();
  }

能夠看到只要隊列中還有記錄,就能夠獲取該記錄,而後調用不一樣BoundedInMemoryQueueConsumer子類的consumeOneRecord進行更新插入處理。

值得一提的是Hudi對隊列進行了流控,生產者不能無限制地將記錄寫入隊列中,隊列緩存的大小由用戶配置,隊列能放入記錄的條數由採樣的記錄大小和隊列緩存大小控制。

在生產時,會調用BoundedInMemoryQueue#insertRecord將記錄寫入隊列,其核心代碼以下

public void insertRecord(I t) throws Exception {
    ...
    rateLimiter.acquire();
    // We are retrieving insert value in the record queueing thread to offload computation
    // around schema validation
    // and record creation to it.
    final O payload = transformFunction.apply(t);
    adjustBufferSizeIfNeeded(payload);
    queue.put(Option.of(payload));
  }

首先獲取一個許可(Semaphore),未成功獲取會被阻塞直至成功獲取,而後獲取記錄的負載以便調整隊列,而後放入內部隊列(LinkedBlockingQueue)中,其中adjustBufferSizeIfNeeded方法的核心代碼以下

private void adjustBufferSizeIfNeeded(final O payload) throws InterruptedException {
    if (this.samplingRecordCounter.incrementAndGet() % RECORD_SAMPLING_RATE != 0) {
      return;
    }

    final long recordSizeInBytes = payloadSizeEstimator.sizeEstimate(payload);
    final long newAvgRecordSizeInBytes =
        Math.max(1, (avgRecordSizeInBytes * numSamples + recordSizeInBytes) / (numSamples + 1));
    final int newRateLimit =
        (int) Math.min(RECORD_CACHING_LIMIT, Math.max(1, this.memoryLimit / newAvgRecordSizeInBytes));

    // If there is any change in number of records to cache then we will either release (if it increased) or acquire
    // (if it decreased) to adjust rate limiting to newly computed value.
    if (newRateLimit > currentRateLimit) {
      rateLimiter.release(newRateLimit - currentRateLimit);
    } else if (newRateLimit < currentRateLimit) {
      rateLimiter.acquire(currentRateLimit - newRateLimit);
    }
    currentRateLimit = newRateLimit;
    avgRecordSizeInBytes = newAvgRecordSizeInBytes;
    numSamples++;
  }

首先看是否已經達到採樣頻率,而後計算新的記錄平均大小和限流速率,若是新的限流速率大於當前速率,則可釋放一些許可(供阻塞的生產者獲取後繼續生產),不然須要獲取(回收)一些許可(許可變少後生產速率天然就下降了)。該操做可根據採樣的記錄大小動態調節速率,不至於在記錄負載太大和記錄負載過小時,放入同等個數,從而起到動態調節做用。

在消費時,會調用BoundedInMemoryQueue#readNextRecord讀取記錄,其核心代碼以下

private Option<O> readNextRecord() {
    ...
    rateLimiter.release();
    Option<O> newRecord = Option.empty();
    while (expectMoreRecords()) {
      try {
        throwExceptionIfFailed();
        newRecord = queue.poll(RECORD_POLL_INTERVAL_SEC, TimeUnit.SECONDS);
        if (newRecord != null) {
          break;
        }
      } catch (InterruptedException e) {
        throw new HoodieException(e);
      }
    }
    ...

    if (newRecord != null && newRecord.isPresent()) {
      return newRecord;
    } else {
      // We are done reading all the records from internal iterator.
      this.isReadDone.set(true);
      return Option.empty();
    }
  }

能夠看到首先會釋放一個許可,而後判斷是否還能夠讀取記錄(還在生產或者中止生產但隊列不爲空均可讀取),而後從內部隊列獲取記錄或返回。

上述即是生產者-消費者在Hudi中應用的分析。

總結

Hudi採用了生產者-消費者模型來控制記錄的處理,與傳統多生產者-多消費者模型不一樣的是,Hudi如今只支持多生產者-單消費者模型,單消費者意味着Hudi暫時不支持文件的併發寫入。而對於生產消費的隊列的實現,Hudi並未僅僅只是基於LinkedBlockingQueue,而是採用了更精細化的速率控制,保證速率會隨着記錄負載大小的變化和配置的隊列緩存大小而動態變化,這也下降了系統發生OOM的機率。

相關文章
相關標籤/搜索