原文發表在:http://blog.javachen.com/2014/07/22/flume-ng.htmlhtml
Flume NG是Cloudera提供的一個分佈式、可靠、可用的系統,它可以將不一樣數據源的海量日誌數據進行高效收集、聚合、移動,最後存儲到一箇中心化數據存儲系統中。由原來的Flume OG到如今的Flume NG,進行了架構重構,而且如今NG版本徹底不兼容原來的OG版本。通過架構重構後,Flume NG更像是一個輕量的小工具,很是簡單,容易適應各類方式日誌收集,並支持failover和負載均衡。前端
Flume 使用 java 編寫,其須要運行在 Java1.6 或更高版本之上。java
Flume的架構主要有一下幾個核心概念:node
Flume 的核心是把數據從數據源收集過來,再送到目的地。爲了保證輸送必定成功,在送到目的地以前,會先緩存數據,待數據真正到達目的地後,刪除本身緩存的數據。git
Flume 傳輸的數據的基本單位是 Event,若是是文本文件,一般是一行記錄,這也是事務的基本單位。Event 從 Source,流向 Channel,再到 Sink,自己爲一個 byte 數組,並可攜帶 headers 信息。Event 表明着一個數據流的最小完整單元,從外部數據源來,向外部的目的地去。github
Flume 運行的核心是 Agent。它是一個完整的數據收集工具,含有三個核心組件,分別是 source、channel、sink。經過這些組件,Event 能夠從一個地方流向另外一個地方,以下圖所示。數據庫
Client端操做消費數據的來源,Flume 支持 Avro,log4j,syslog 和 http post(body爲json格式)。可讓應用程序同已有的Source直接打交道,如AvroSource,SyslogTcpSource。也能夠 寫一個 Source,以 IPC 或 RPC 的方式接入本身的應用,Avro和 Thrift 均可以(分別有 NettyAvroRpcClient 和 ThriftRpcClient 實現了 RpcClient接口),其中 Avro 是默認的 RPC 協議。具體代碼級別的 Client 端數據接入,能夠參考官方手冊。apache
對現有程序改動最小的使用方式是使用是直接讀取程序原來記錄的日誌文件,基本能夠實現無縫接入,不須要對現有程序進行任何改動。
對於直接讀取文件 Source,有兩種方式:json
tail -F 文件名
指令,在這種方式下,取的文件名必須是指定的。 ExecSource 能夠實現對日誌的實時收集,可是存在Flume不運行或者指令執行出錯時,將沒法收集到日誌數據,沒法保證日誌數據的完整性。SpoolSource 雖然沒法實現實時的收集數據,可是可使用以分鐘的方式分割文件,趨近於實時。數組
若是應用沒法實現以分鐘切割日誌文件的話, 能夠兩種收集方式結合使用。 在實際使用的過程當中,能夠結合 log4j 使用,使用 log4j的時候,將 log4j 的文件分割機制設爲1分鐘一次,將文件拷貝到spool的監控目錄。
log4j 有一個 TimeRolling 的插件,能夠把 log4j 分割文件到 spool 目錄。基本實現了實時的監控。Flume 在傳完文件以後,將會修改文件的後綴,變爲 .COMPLETED(後綴也能夠在配置文件中靈活指定)。
Flume Source 支持的類型:
Source類型 | 說明 |
---|---|
Avro Source | 支持Avro協議(其實是Avro RPC),內置支持 |
Thrift Source | 支持Thrift協議,內置支持 |
Exec Source | 基於Unix的command在標準輸出上生產數據 | |
JMS Source | 從JMS系統(消息、主題)中讀取數據,ActiveMQ已經測試過 |
Spooling Directory Source | 監控指定目錄內數據變動 |
Twitter 1% firehose Source | 經過API持續下載Twitter數據,試驗性質 |
Netcat Source | 監控某個端口,將流經端口的每個文本行數據做爲Event輸入 |
Sequence Generator Source | 序列生成器數據源,生產序列數據 |
Syslog Sources | 讀取syslog數據,產生Event,支持UDP和TCP兩種協議 |
HTTP Source | 基於HTTP POST或GET方式的數據源,支持JSON、BLOB表示形式 |
Legacy Sources | 兼容老的Flume OG中Source(0.9.x版本) |
當前有幾個 channel 可供選擇,分別是 Memory Channel, JDBC Channel , File Channel,Psuedo Transaction Channel。比較常見的是前三種 channel。
File Channel 是一個持久化的隧道(channel),它持久化全部的事件,並將其存儲到磁盤中。所以,即便 Java 虛擬機當掉,或者操做系統崩潰或重啓,再或者事件沒有在管道中成功地傳遞到下一個代理(agent),這一切都不會形成數據丟失。Memory Channel 是一個不穩定的隧道,其緣由是因爲它在內存中存儲全部事件。若是 java 進程死掉,任何存儲在內存的事件將會丟失。另外,內存的空間收到 RAM大小的限制,而 File Channel 這方面是它的優點,只要磁盤空間足夠,它就能夠將全部事件數據存儲到磁盤上。
Flume Channel 支持的類型:
Channel類型 | 說明 |
---|---|
Memory Channel | Event數據存儲在內存中 |
JDBC Channel | Event數據存儲在持久化存儲中,當前Flume Channel內置支持Derby |
File Channel | Event數據存儲在磁盤文件中 |
Spillable Memory Channel | Event數據存儲在內存中和磁盤上,當內存隊列滿了,會持久化到磁盤文件(當前試驗性的,不建議生產環境使用) |
Pseudo Transaction Channel | 測試用途 |
Custom Channel | 自定義Channel實現 |
Sink在設置存儲數據時,能夠向文件系統、數據庫、hadoop存數據,在日誌數據較少時,能夠將數據存儲在文件系中,而且設定必定的時間間隔保存數據。在日誌數據較多時,能夠將相應的日誌數據存儲到Hadoop中,便於往後進行相應的數據分析。
Flume Sink支持的類型
Sink類型 | 說明 |
---|---|
HDFS Sink | 數據寫入HDFS |
Logger Sink | 數據寫入日誌文件 |
Avro Sink | 數據被轉換成Avro Event,而後發送到配置的RPC端口上 |
Thrift Sink | 數據被轉換成Thrift Event,而後發送到配置的RPC端口上 |
IRC Sink | 數據在IRC上進行回放 |
File Roll Sink | 存儲數據到本地文件系統 |
Null Sink | 丟棄到全部數據 |
HBase Sink | 數據寫入HBase數據庫 |
Morphline Solr Sink | 數據發送到Solr搜索服務器(集羣) |
ElasticSearch Sink | 數據發送到Elastic Search搜索服務器(集羣) |
Kite Dataset Sink | 寫數據到Kite Dataset,試驗性質的 |
Custom Sink | 自定義Sink實現 |
更多sink的內容能夠參考官方手冊。
Flume 的核心是把數據從數據源收集過來,再送到目的地。爲了保證輸送必定成功,在送到目的地以前,會先緩存數據,待數據真正到達目的地後,刪除本身緩存的數據。
Flume 使用事務性的方式保證傳送Event整個過程的可靠性。Sink 必須在 Event 被存入 Channel 後,或者,已經被傳達到下一站agent裏,又或者,已經被存入外部數據目的地以後,才能把 Event 從 Channel 中 remove 掉。這樣數據流裏的 event 不管是在一個 agent 裏仍是多個 agent 之間流轉,都能保證可靠,由於以上的事務保證了 event 會被成功存儲起來。而 Channel 的多種實如今可恢復性上有不一樣的保證。也保證了 event 不一樣程度的可靠性。好比 Flume 支持在本地保存一份文件 channel 做爲備份,而memory channel 將 event 存在內存 queue 裏,速度快,但丟失的話沒法恢復。
下面,根據官網文檔,咱們展現幾種Flow Pipeline,各自適應於什麼樣的應用場景:
能夠將多個Agent順序鏈接起來,將最初的數據源通過收集,存儲到最終的存儲系統中。這是最簡單的狀況,通常狀況下,應該控制這種順序鏈接的Agent的數量,由於數據流經的路徑變長了,若是不考慮failover的話,出現故障將影響整個Flow上的Agent收集服務。
這種狀況應用的場景比較多,好比要收集Web網站的用戶行爲日誌,Web網站爲了可用性使用的負載均衡的集羣模式,每一個節點都產生用戶行爲日誌,能夠爲每一個節點都配置一個Agent來單獨收集日誌數據,而後多個Agent將數據最終匯聚到一個用來存儲數據存儲系統,如HDFS上。
這種模式,有兩種方式,一種是用來複制(Replication),另外一種是用來分流(Multiplexing)。Replication方式,能夠將最前端的數據源複製多份,分別傳遞到多個channel中,每一個channel接收到的數據都是相同的。
配置格式示例以下:
properties# List the sources, sinks and channels for the agent <Agent>.sources = <Source1> <Agent>.sinks = <Sink1> <Sink2> <Agent>.channels = <Channel1> <Channel2> # set list of channels for source (separated by space) <Agent>.sources.<Source1>.channels = <Channel1> <Channel2> # set channel for sinks <Agent>.sinks.<Sink1>.channel = <Channel1> <Agent>.sinks.<Sink2>.channel = <Channel2> <Agent>.sources.<Source1>.selector.type = replicating
上面指定了selector的type的值爲replication,其餘的配置沒有指定,使用的Replication方式,Source1會將數據分別存儲到Channel1和Channel2,這兩個channel裏面存儲的數據是相同的,而後數據被傳遞到Sink1和Sink2。
Multiplexing方式,selector能夠根據header的值來肯定數據傳遞到哪個channel,配置格式,以下所示:
properties# Mapping for multiplexing selector <Agent>.sources.<Source1>.selector.type = multiplexing <Agent>.sources.<Source1>.selector.header = <someHeader> <Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1> <Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2> <Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2> #... <Agent>.sources.<Source1>.selector.default = <Channel2>
上面selector的type的值爲multiplexing,同時配置selector的header信息,還配置了多個selector的mapping的值,即header的值:若是header的值爲Value一、Value2,數據從Source1路由到Channel1;若是header的值爲Value二、Value3,數據從Source1路由到Channel2。
Load balancing Sink Processor可以實現load balance功能,上圖Agent1是一個路由節點,負責將Channel暫存的Event均衡到對應的多個Sink組件上,而每一個Sink組件分別鏈接到一個獨立的Agent上,示例配置,以下所示:
propertiesa1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 k3 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = round_robin a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
Failover Sink Processor可以實現failover功能,具體流程相似load balance,可是內部處理機制與load balance徹底不一樣:Failover Sink Processor維護一個優先級Sink組件列表,只要有一個Sink組件可用,Event就被傳遞到下一個組件。若是一個Sink可以成功處理Event,則會加入到一個Pool中,不然會被移出Pool並計算失敗次數,設置一個懲罰因子,示例配置以下所示:
propertiesa1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 k3 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 7 a1.sinkgroups.g1.processor.priority.k3 = 6 a1.sinkgroups.g1.processor.maxpenalty = 20000
Flume 的 rpm 安裝方式很簡單,這裏不作說明。
安裝成功以後,在 /etc/flume/conf 目錄建立f1.conf 文件,內容以下:
propertiesagent-1.channels.ch-1.type = memory agent-1.sources.avro-source1.channels = ch-1 agent-1.sources.avro-source1.type = avro agent-1.sources.avro-source1.bind = 0.0.0.0 agent-1.sources.avro-source1.port = 41414 agent-1.sources.avro-source1.threads = 5 agent-1.sinks.log-sink1.channel = ch-1 agent-1.sinks.log-sink1.type = logger agent-1.channels = ch-1 agent-1.sources = avro-source1 agent-1.sinks = log-sink1
關於 avro-source 配置說明,請參考 avro-source
接下來啓動 agent:
bash$ flume-ng agent -c /etc/flume-ng/conf -f /etc/flume-ng/conf/f1.conf -Dflume.root.logger=DEBUG,console -n agent-1
參數說明:
-n
指定agent名稱-c
指定配置文件目錄-f
指定配置文件-Dflume.root.logger=DEBUG,console
設置日誌等級下面能夠啓動一個 avro-client 客戶端生產數據:
bash$ flume-ng avro-client -c /etc/flume-ng/conf -H localhost -p 41414 -F /etc/passwd -Dflume.root.logger=DEBUG,console
在 /etc/flume/conf 目錄建立 f2.conf 文件,內容以下:
propertiesagent-1.channels = ch-1 agent-1.sources = src-1 agent-1.channels.ch-1.type = memory agent-1.sources.src-1.type = spooldir agent-1.sources.src-1.channels = ch-1 agent-1.sources.src-1.spoolDir = /root/log agent-1.sources.src-1.fileHeader = true agent-1.sinks.log-sink1.channel = ch-1 agent-1.sinks.log-sink1.type = logger agent-1.sinks = log-sink1
關於 Spooling Directory Source 配置說明,請參考 Spooling Directory Source
接下來啓動 agent:
bash$ flume-ng agent -c /etc/flume-ng/conf -f /etc/flume-ng/conf/f2.conf -Dflume.root.logger=DEBUG,console -n agent-1
而後,手動拷貝一個文件到 /root/log 目錄,觀察日誌輸出以及/root/log 目錄下的變化。
在 /etc/flume/conf 目錄建立 f3.conf 文件,內容以下:
propertiesagent-1.channels.ch-1.type = file agent-1.channels.ch-1.checkpointDir= /root/checkpoint agent-1.channels.ch-1.dataDirs= /root/data agent-1.sources.src-1.type = spooldir agent-1.sources.src-1.channels = ch-1 agent-1.sources.src-1.spoolDir = /root/log agent-1.sources.src-1.deletePolicy= never agent-1.sources.src-1.fileHeader = true agent-1.sources.src-1.interceptors =i1 agent-1.sources.src-1.interceptors.i1.type = timestamp agent-1.sinks.sink_hdfs.channel = ch-1 agent-1.sinks.sink_hdfs.type = hdfs agent-1.sinks.sink_hdfs.hdfs.path = hdfs://cdh1:8020/user/root/events/%Y-%m-%d agent-1.sinks.sink_hdfs.hdfs.filePrefix = logs agent-1.sinks.sink_hdfs.hdfs.inUsePrefix = . agent-1.sinks.sink_hdfs.hdfs.rollInterval = 30 agent-1.sinks.sink_hdfs.hdfs.rollSize = 0 agent-1.sinks.sink_hdfs.hdfs.rollCount = 0 agent-1.sinks.sink_hdfs.hdfs.batchSize = 1000 agent-1.sinks.sink_hdfs.hdfs.writeFormat = text agent-1.sinks.sink_hdfs.hdfs.fileType = DataStream #agent-1.sinks.sink_hdfs.hdfs.fileType = CompressedStream #agent-1.sinks.sink_hdfs.hdfs.codeC = lzop agent-1.channels = ch-1 agent-1.sources = src-1 agent-1.sinks = sink_hdfs
關於 HDFS Sink配置說明,請參考 HDFS Sink
說明:
hdfs.inUsePrefix
,例如設置爲 .
時,hdfs 會把該文件當作隱藏文件,以免在 mr 過程當中讀到這些臨時文件,引發一些錯誤關於 HBase Sink 配置說明,請參考 HBase Sink
從 github 下載源代碼並編譯:
bash$ git clone git@github.com:cloudera/flume-ng.git -b cdh4-1.4.0_4.7.0 $ cd flume-ng $ mvn install -DskipTests -Phadoop-2
若是提示找不到 hadoop-test 的 jar 包,則修改 pom.xml 中的版本,如改成 2.0.0-mr1-cdh4.7.0
,具體版本視你使用的分支版本而定,我這裏是 cdh4.7.0。
若是提示找不到 uanodeset-parser 的 jarb,則在 pom.xml 中添加下面倉庫:
xml<repository> <id>tempo-db</id> <url>http://maven.tempo-db.com/artifactory/list/twitter/ </url> <snapshots> <enabled>false</enabled> </snapshots> </repository>
參考基於Flume的美團日誌收集系統(一)架構和設計,列出一些最佳實踐:
美團對 flume 的改進代碼見 github:https://github.com/javachen/mt-flume。