本文由雲+社區發表做者:皮皮熊git
Apache Flume是一個用於高效地從大量異構數據源收集、聚合、傳輸到一個集中式數據存儲的分佈式、高可靠、高可用的系統。後端
Apache Flume是Apache基金會的頂級項目。如今有兩個代碼版本線能夠獲取:0.9.x和1.x。本文檔對應的是1.x版本。app
Event是流經flume agent的最小數據單元。一個Event(由Event接口實現)從source流向channel,再到sink。Event包含了一個payload(byte array)和可選的header(string attributes)。一個flume agent就是一個jvm下的進程:控制着Events從一個外部的源頭到一個外部的目的地。框架
Source消費着具備特殊格式的Events(這些Event傳遞到Source經過像Web server這樣外在的數據源)。例如AvroSource能夠被用於接收Avro的Events,從本客戶端或者其餘運行中的flume客戶端。當一個Source接收到一個Event,它會把它插入到一個或者多個Channel裏。Channel會被動地存儲這些Event直到它們被一個Sink消費到。Flume中一種Channel是FileChannel,其使用文件系統來做爲後端存儲。Sink須要負責任地將一個Event從Channel中移除,並將其放入像hdfs同樣的外部存儲系統(例如HDFSEventSink),或者轉發到傳輸中下一個節點的source中。Source和Sink在agent中異步地交互Channel中的Event。異步
Event是存儲在Flume agent的Channel裏。Sink的責任就是傳輸Event到下一個agent或者最終的存儲系統(像hdfs)。Sink只有當Event寫入下一個agent的Channel 或者 存儲到最終的系統時纔會從channel裏面刪掉Event。這就是Flume如何在單跳消息傳輸中提供端到端的可靠性。Flume提供了一個事務性的方法來修復可靠傳輸中的Event。Source和Sink包含了Event的存儲和重試(經過由channel提供的事務)。jvm
經過gitmaven
Flume使用maven來build。你能夠經過標準的maven命令行來編譯Flume。分佈式
注意:Flume build須要在path中有Google Protocol Buffers編譯器。ide
File channel依賴Protocol Buffer。當你想更新Protocol Buffer版本時,你須要以下更新使用到Protocol Buffer的data access類:單元測試
Client在Event產生時運轉,並將他們傳遞到Flume的agent。Client一般運行在應用消費數據的進程空間中。Flume目前支持Avro, log4j, syslog, 以及 Http POST (with a JSON body)方式從外部數據源傳輸數據。同時ExecSource支持將本地進程的輸出做爲Flume的輸入。
可能已有的方案是不夠的。本案例中你可使用自定義的方法來向flume發送數據。這裏有兩種方法來實現。第一:寫一個自定義的客戶端來和flume已有的source交互,像AvroSource 或者 SyslogTcpSource。此時Client須要將數據轉換成這些Source能理解的message。另一個方案:寫一個自定義的Flume Source,經過IPC或者RPC,直接地和已有的client應用通訊(須要將client的數據轉換成Flume的Event)。注意這些存儲在flume agent channel中的事件,必須以Flume Event形式存在。
儘管Flume包含了一系列內置的,用於接收數據的方法(即Source),人們經常想直接地經過flume和自定義的程序進行通訊。Flume SDK 就是這樣一個lib,它能夠經過RPC直接地鏈接到Flume,而且發送到Flume的數據流。
一個RPC客戶端接口的實現,包含了支持Flume的RPC方法。用戶的程序能夠簡單地調用Flume SDK客戶端的append(Event)或者appendBatch(List<Event>)接口來發送數據,而不用考慮消息交互的細節。用戶能夠經過使用諸如SimpleEvent類,或者使用EventBuilder的 靜態helper方法withBody(),便捷地實現直接提供事件接口所需的事件ARG。
Transaction接口是Flume可靠性的基礎。全部主要組件(即source,sink和channel)必須使用Flume Transaction。
Transaction在channel的實現中實現。每一個source和sink鏈接到channel時必需要獲得一個channnel的對象。Source使用channnelprocessor來管理transaction。sink明確地經過他們配置的channel來管理transaction。存儲一個事件(把他們放入channnel中)或者抽取一個事件(從channnel中取出)在一個激活的transaction中完成。例如:
Channel ch = new MemoryChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { // This try clause includes whatever Channel operations you want to do Event eventToStage = EventBuilder.withBody("Hello Flume!", Charset.forName("UTF-8")); ch.put(eventToStage); // Event takenEvent = ch.take(); // ... txn.commit(); } catch (Throwable t) { txn.rollback(); // Log exception, handle individual exceptions as needed // re-throw all Errors if (t instanceof Error) { throw (Error)t; } } finally { txn.close(); }
在這裏,咱們從channel獲取transaction。在begin()返回後,Transaction如今處於活動/打開狀態,而後將Event放入Channel中。若是put成功,則提交併關閉Transaction。
Sink的目的就是從Channel中提取事件並將其轉發到傳輸中的下一個Flume Agent或將它們存儲在外部存儲庫中。根據Flume屬性文件中的配置,接收器只與一個通道關聯。每一個已配置的Sink都有一個SinkRunner實例,當Flume框架調用SinkRunner.start()時,會建立一個新線程來驅動Sink(使用SinkRunner.PollingRunner做爲線程的Runnable),該線程管理Sink的生命週期。Sink須要實現start()和stop()方法做爲LifecycleAware接口的一部分。
Sink實現還須要實現Configurable接口來處理本身的配置設置。例如:
public class MySink extends AbstractSink implements Configurable { private String myProp; @Override public void configure(Context context) { String myProp = context.getString("myProp", "defaultValue"); // Process the myProp value (e.g. validation) // Store myProp for later retrieval by process() method this.myProp = myProp; } @Override public void start() { // Initialize the connection to the external repository (e.g. HDFS) that // this Sink will forward Events to .. } @Override public void stop () { // Disconnect from the external respository and do any // additional cleanup (e.g. releasing resources or nulling-out // field values) .. } @Override public Status process() throws EventDeliveryException { Status status = null; // Start transaction Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { // This try clause includes whatever Channel operations you want to do Event event = ch.take(); // Send the Event to the external repository. // storeSomeData(e); txn.commit(); status = Status.READY; } catch (Throwable t) { txn.rollback(); // Log exception, handle individual exceptions as needed status = Status.BACKOFF; // re-throw all Errors if (t instanceof Error) { throw (Error)t; } } return status; } }
Source的目的是從外部客戶端接收數據並將其存儲到已配置的Channels中。Source能夠獲取其本身的ChannelProcessor的實例來處理在Channel本地事務中提交的串行事件。在exception的狀況下,須要Channels傳播異常,則全部Channels將回滾其事務,但先前在其餘Channel上處理的事件將保持提交。
與SinkRunner.PollingRunner Runnable相似,有一個PollingRunner Runnable,它在Flume框架調用PollableSourceRunner.start()時建立的線程上執行。每一個配置的PollableSource都與本身運行PollingRunner的線程相關聯。該線程管理PollableSource的生命週期,例如啓動和中止。
注意,實際上有兩種類型的Source:已經提到過PollableSource,另外一個是EventDrivenSource。與PollableSource不一樣,EventDrivenSource必須有本身的回調機制,捕獲新數據並將其存儲到Channel中。EventDrivenSources並不像PollableSources那樣由它們本身的線程驅動。下面是一個自定義PollableSource的示例:
public class MySource extends AbstractSource implements Configurable, PollableSource { private String myProp; @Override public void configure(Context context) { String myProp = context.getString("myProp", "defaultValue"); // Process the myProp value (e.g. validation, convert to another type, ...) // Store myProp for later retrieval by process() method this.myProp = myProp; } @Override public void start() { // Initialize the connection to the external client } @Override public void stop () { // Disconnect from external client and do any additional cleanup // (e.g. releasing resources or nulling-out field values) .. } @Override public Status process() throws EventDeliveryException { Status status = null; try { // This try clause includes whatever Channel/Event operations you want to do // Receive new data Event e = getSomeData(); // Store the Event into this Source's associated Channel(s) getChannelProcessor().processEvent(e); status = Status.READY; } catch (Throwable t) { // Log exception, handle individual exceptions as needed status = Status.BACKOFF; // re-throw all Errors if (t instanceof Error) { throw (Error)t; } } finally { txn.close(); } return status; } }
參考自(Flume 1.8.0 Developer Guide)
flume 1.8.0 文檔完整翻譯可見 https://blog.csdn.net/u013128262
此文已由騰訊雲+社區在各渠道發佈
獲取更多新鮮技術乾貨,能夠關注咱們騰訊雲技術社區-雲加社區官方號及知乎機構號