flume 1.8.0 開發基礎

本文由雲+社區發表

做者:皮皮熊git

概述

Apache Flume是一個用於高效地從大量異構數據源收集、聚合、傳輸到一個集中式數據存儲的分佈式、高可靠、高可用的系統。後端

Apache Flume是Apache基金會的頂級項目。如今有兩個代碼版本線能夠獲取:0.9.x和1.x。本文檔對應的是1.x版本。app

數據流模型

Event是流經flume agent的最小數據單元。一個Event(由Event接口實現)從source流向channel,再到sink。Event包含了一個payload(byte array)和可選的header(string attributes)。一個flume agent就是一個jvm下的進程:控制着Events從一個外部的源頭到一個外部的目的地。框架

Source消費着具備特殊格式的Events(這些Event傳遞到Source經過像Web server這樣外在的數據源)。例如AvroSource能夠被用於接收Avro的Events,從本客戶端或者其餘運行中的flume客戶端。當一個Source接收到一個Event,它會把它插入到一個或者多個Channel裏。Channel會被動地存儲這些Event直到它們被一個Sink消費到。Flume中一種Channel是FileChannel,其使用文件系統來做爲後端存儲。Sink須要負責任地將一個Event從Channel中移除,並將其放入像hdfs同樣的外部存儲系統(例如HDFSEventSink),或者轉發到傳輸中下一個節點的source中。Source和Sink在agent中異步地交互Channel中的Event。異步

可靠性

Event是存儲在Flume agent的Channel裏。Sink的責任就是傳輸Event到下一個agent或者最終的存儲系統(像hdfs)。Sink只有當Event寫入下一個agent的Channel 或者 存儲到最終的系統時纔會從channel裏面刪掉Event。這就是Flume如何在單跳消息傳輸中提供端到端的可靠性。Flume提供了一個事務性的方法來修復可靠傳輸中的Event。Source和Sink包含了Event的存儲和重試(經過由channel提供的事務)。jvm

構建Flume

獲取源碼

經過gitmaven

編譯/測試 Flume

Flume使用maven來build。你能夠經過標準的maven命令行來編譯Flume。分佈式

  1. 僅編譯:mvn clean compile
  2. 編譯且運行單元測試:mvn clean test
  3. 運行獨立的測試:mvn clean test -Dtest=<Test1>,<Test2>,... -DfailIfNoTests=false
  4. 打包:mvn clean install
  5. 打包(忽略單元測試):mvn clean install -DskipTests

注意:Flume build須要在path中有Google Protocol Buffers編譯器。ide

更新Protocol Buffer版本

File channel依賴Protocol Buffer。當你想更新Protocol Buffer版本時,你須要以下更新使用到Protocol Buffer的data access類:單元測試

  1. 本機安裝你想要的PB版本
  2. 更新pom.xml中PB的版本
  3. 生成flume中新的PB data access類:cd flume-ng-channels/flume-file-channel; mvn -P compile-proto clean package -DskipTests
  4. 在全部生成文件中加上Apache license(若是缺了的話)
  5. rebuild及測試Flume:cd ../..; mvn clean install

開發自定義部分

client

Client在Event產生時運轉,並將他們傳遞到Flume的agent。Client一般運行在應用消費數據的進程空間中。Flume目前支持Avro, log4j, syslog, 以及 Http POST (with a JSON body)方式從外部數據源傳輸數據。同時ExecSource支持將本地進程的輸出做爲Flume的輸入。

可能已有的方案是不夠的。本案例中你可使用自定義的方法來向flume發送數據。這裏有兩種方法來實現。第一:寫一個自定義的客戶端來和flume已有的source交互,像AvroSource 或者 SyslogTcpSource。此時Client須要將數據轉換成這些Source能理解的message。另一個方案:寫一個自定義的Flume Source,經過IPC或者RPC,直接地和已有的client應用通訊(須要將client的數據轉換成Flume的Event)。注意這些存儲在flume agent channel中的事件,必須以Flume Event形式存在。

Client SDK

儘管Flume包含了一系列內置的,用於接收數據的方法(即Source),人們經常想直接地經過flume和自定義的程序進行通訊。Flume SDK 就是這樣一個lib,它能夠經過RPC直接地鏈接到Flume,而且發送到Flume的數據流。

RPC客戶端接口

一個RPC客戶端接口的實現,包含了支持Flume的RPC方法。用戶的程序能夠簡單地調用Flume SDK客戶端的append(Event)或者appendBatch(List<Event>)接口來發送數據,而不用考慮消息交互的細節。用戶能夠經過使用諸如SimpleEvent類,或者使用EventBuilder的 靜態helper方法withBody(),便捷地實現直接提供事件接口所需的事件ARG。

Transaction(事務)接口

Transaction接口是Flume可靠性的基礎。全部主要組件(即source,sink和channel)必須使用Flume Transaction。

Transaction在channel的實現中實現。每一個source和sink鏈接到channel時必需要獲得一個channnel的對象。Source使用channnelprocessor來管理transaction。sink明確地經過他們配置的channel來管理transaction。存儲一個事件(把他們放入channnel中)或者抽取一個事件(從channnel中取出)在一個激活的transaction中完成。例如:

Channel ch = new MemoryChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
  // This try clause includes whatever Channel operations you want to do

  Event eventToStage = EventBuilder.withBody("Hello Flume!",
                       Charset.forName("UTF-8"));
  ch.put(eventToStage);
  // Event takenEvent = ch.take();
  // ...
  txn.commit();
} catch (Throwable t) {
  txn.rollback();

  // Log exception, handle individual exceptions as needed

  // re-throw all Errors
  if (t instanceof Error) {
    throw (Error)t;
  }
} finally {
  txn.close();
}

在這裏,咱們從channel獲取transaction。在begin()返回後,Transaction如今處於活動/打開狀態,而後將Event放入Channel中。若是put成功,則提交併關閉Transaction。

Sink

Sink的目的就是從Channel中提取事件並將其轉發到傳輸中的下一個Flume Agent或將它們存儲在外部存儲庫中。根據Flume屬性文件中的配置,接收器只與一個通道關聯。每一個已配置的Sink都有一個SinkRunner實例,當Flume框架調用SinkRunner.start()時,會建立一個新線程來驅動Sink(使用SinkRunner.PollingRunner做爲線程的Runnable),該線程管理Sink的生命週期。Sink須要實現start()和stop()方法做爲LifecycleAware接口的一部分。

  • Sink.start()方法應初始化Sink並將其置於可將事件轉發到其下一個目標的狀態。
  • Sink.process()應該執行從Channel提取Event並轉發它的核心處理過程。
  • Sink.stop()方法應該進行必要的清理(例如釋放資源)。

Sink實現還須要實現Configurable接口來處理本身的配置設置。例如:

public class MySink extends AbstractSink implements Configurable {
  private String myProp;

  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");

    // Process the myProp value (e.g. validation)

    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }

  @Override
  public void start() {
    // Initialize the connection to the external repository (e.g. HDFS) that
    // this Sink will forward Events to ..
  }

  @Override
  public void stop () {
    // Disconnect from the external respository and do any
    // additional cleanup (e.g. releasing resources or nulling-out
    // field values) ..
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;

    // Start transaction
    Channel ch = getChannel();
    Transaction txn = ch.getTransaction();
    txn.begin();
    try {
      // This try clause includes whatever Channel operations you want to do

      Event event = ch.take();

      // Send the Event to the external repository.
      // storeSomeData(e);

      txn.commit();
      status = Status.READY;
    } catch (Throwable t) {
      txn.rollback();

      // Log exception, handle individual exceptions as needed

      status = Status.BACKOFF;

      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    }
    return status;
  }
}

Source

Source的目的是從外部客戶端接收數據並將其存儲到已配置的Channels中。Source能夠獲取其本身的ChannelProcessor的實例來處理在Channel本地事務中提交的串行事件。在exception的狀況下,須要Channels傳播異常,則全部Channels將回滾其事務,但先前在其餘Channel上處理的事件將保持提交。

與SinkRunner.PollingRunner Runnable相似,有一個PollingRunner Runnable,它在Flume框架調用PollableSourceRunner.start()時建立的線程上執行。每一個配置的PollableSource都與本身運行PollingRunner的線程相關聯。該線程管理PollableSource的生命週期,例如啓動和中止。

  • PollableSource必須實現LifecycleAware接口中聲明的start()和stop()方法。
  • PollableSource的運行器調用Source的process()方法。 process()方法應檢查新數據並將其做爲Flume事件存儲到Channel中。

注意,實際上有兩種類型的Source:已經提到過PollableSource,另外一個是EventDrivenSource。與PollableSource不一樣,EventDrivenSource必須有本身的回調機制,捕獲新數據並將其存儲到Channel中。EventDrivenSources並不像PollableSources那樣由它們本身的線程驅動。下面是一個自定義PollableSource的示例:

public class MySource extends AbstractSource implements Configurable, PollableSource {
  private String myProp;

  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");

    // Process the myProp value (e.g. validation, convert to another type, ...)

    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }

  @Override
  public void start() {
    // Initialize the connection to the external client
  }

  @Override
  public void stop () {
    // Disconnect from external client and do any additional cleanup
    // (e.g. releasing resources or nulling-out field values) ..
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;

    try {
      // This try clause includes whatever Channel/Event operations you want to do

      // Receive new data
      Event e = getSomeData();

      // Store the Event into this Source's associated Channel(s)
      getChannelProcessor().processEvent(e);

      status = Status.READY;
    } catch (Throwable t) {
      // Log exception, handle individual exceptions as needed

      status = Status.BACKOFF;

      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    } finally {
      txn.close();
    }
    return status;
  }
}

參考自(Flume 1.8.0 Developer Guide)

flume 1.8.0 文檔完整翻譯可見 https://blog.csdn.net/u013128262

此文已由騰訊雲+社區在各渠道發佈

獲取更多新鮮技術乾貨,能夠關注咱們騰訊雲技術社區-雲加社區官方號及知乎機構號

相關文章
相關標籤/搜索