Flume NG是Cloudera提供的一個分佈式、可靠、可用的系統,它可以將不一樣數據源的海量日誌數據進行高效收集、聚合、移動,最後存儲到一箇中心化數據存儲系統中。由原來的Flume OG到如今的Flume NG,進行了架構重構,而且如今NG版本徹底不兼容原來的OG版本。通過架構重構後,Flume NG更像是一個輕量的小工具,很是簡單,容易適應各類方式日誌收集,並支持failover和負載均衡。html
Flume 使用 java 編寫,其須要運行在 Java1.6 或更高版本之上。前端
官方網站:http://flume.apache.org/java
Flume的架構主要有一下幾個核心概念:github
Event:一個數據單元,帶有一個可選的消息頭web
Flow:Event從源點到達目的點的遷移的抽象數據庫
Client:操做位於源點處的Event,將其發送到Flume Agentapache
Agent:一個獨立的Flume進程,包含組件Source、Channel、Sinkjson
Source:用來消費傳遞到該組件的Event
Channel:中轉Event的一個臨時存儲,保存有Source組件傳遞過來的Event
Sink:從Channel中讀取並移除Event,將Event傳遞到Flow Pipeline中的下一個Agent(若是有的話)
Flume 的核心是把數據從數據源收集過來,再送到目的地。爲了保證輸送必定成功,在送到目的地以前,會先緩存數據,待數據真正到達目的地後,刪除本身緩存的數據。
Flume 傳輸的數據的基本單位是 Event,若是是文本文件,一般是一行記錄,這也是事務的基本單位。Event 從 Source,流向 Channel,再到 Sink,自己爲一個 byte 數組,並可攜帶 headers 信息。Event 表明着一個數據流的最小完整單元,從外部數據源來,向外部的目的地去。
Flume 運行的核心是 Agent。它是一個完整的數據收集工具,含有三個核心組件,分別是 source、channel、sink。經過這些組件,Event 能夠從一個地方流向另外一個地方,以下圖所示。
source 能夠接收外部源發送過來的數據。不一樣的 source,能夠接受不一樣的數據格式。好比有目錄池(spooling directory)數據源,能夠監控指定文件夾中的新文件變化,若是目錄中有文件產生,就會馬上讀取其內容。
channel 是一個存儲地,接收 source 的輸出,直到有 sink 消費掉 channel 中的數據。channel 中的數據直到進入到下一個channel中或者進入終端纔會被刪除。當 sink 寫入失敗後,能夠自動重啓,不會形成數據丟失,所以很可靠。
sink 會消費 channel 中的數據,而後送給外部源或者其餘 source。如數據能夠寫入到 HDFS 或者 HBase 中。
Client端操做消費數據的來源,Flume 支持 Avro,log4j,syslog 和 http post(body爲json格式)。可讓應用程序同已有的Source直接打交道,如AvroSource,SyslogTcpSource。也能夠 寫一個 Source,以 IPC 或 RPC 的方式接入本身的應用,Avro和 Thrift 均可以(分別有 NettyAvroRpcClient 和 ThriftRpcClient 實現了 RpcClient接口),其中 Avro 是默認的 RPC 協議。具體代碼級別的 Client 端數據接入,能夠參考官方手冊。
對現有程序改動最小的使用方式是使用是直接讀取程序原來記錄的日誌文件,基本能夠實現無縫接入,不須要對現有程序進行任何改動。 對於直接讀取文件 Source,有兩種方式:
ExecSource: 以運行 Linux 命令的方式,持續的輸出最新的數據,如 tail -F 文件名
指令,在這種方式下,取的文件名必須是指定的。 ExecSource 能夠實現對日誌的實時收集,可是存在Flume不運行或者指令執行出錯時,將沒法收集到日誌數據,沒法保證日誌數據的完整性。
SpoolSource: 監測配置的目錄下新增的文件,並將文件中的數據讀取出來。須要注意兩點:拷貝到 spool 目錄下的文件不能夠再打開編輯;spool 目錄下不可包含相應的子目錄。
SpoolSource 雖然沒法實現實時的收集數據,可是可使用以分鐘的方式分割文件,趨近於實時。
若是應用沒法實現以分鐘切割日誌文件的話, 能夠兩種收集方式結合使用。 在實際使用的過程當中,能夠結合 log4j 使用,使用 log4j的時候,將 log4j 的文件分割機制設爲1分鐘一次,將文件拷貝到spool的監控目錄。
log4j 有一個 TimeRolling 的插件,能夠把 log4j 分割文件到 spool 目錄。基本實現了實時的監控。Flume 在傳完文件以後,將會修改文件的後綴,變爲 .COMPLETED(後綴也能夠在配置文件中靈活指定)。
Flume Source 支持的類型:
Source類型 | 說明 |
---|---|
Avro Source | 支持Avro協議(其實是Avro RPC),內置支持 |
Thrift Source | 支持Thrift協議,內置支持 |
Exec Source | |
JMS Source | 從JMS系統(消息、主題)中讀取數據,ActiveMQ已經測試過 |
Spooling Directory Source | 監控指定目錄內數據變動 |
Twitter 1% firehose Source | 經過API持續下載Twitter數據,試驗性質 |
Netcat Source | 監控某個端口,將流經端口的每個文本行數據做爲Event輸入 |
Sequence Generator Source | 序列生成器數據源,生產序列數據 |
Syslog Sources | 讀取syslog數據,產生Event,支持UDP和TCP兩種協議 |
HTTP Source | 基於HTTP POST或GET方式的數據源,支持JSON、BLOB表示形式 |
Legacy Sources | 兼容老的Flume OG中Source(0.9.x版本) |
當前有幾個 channel 可供選擇,分別是 Memory Channel, JDBC Channel , File Channel,Psuedo Transaction Channel。比較常見的是前三種 channel。
MemoryChannel 能夠實現高速的吞吐,可是沒法保證數據的完整性。
MemoryRecoverChannel 在官方文檔的建議上已經建義使用FileChannel來替換。
FileChannel保證數據的完整性與一致性。在具體配置FileChannel時,建議FileChannel設置的目錄和程序日誌文件保存的目錄設成不一樣的磁盤,以便提升效率。
File Channel 是一個持久化的隧道(channel),它持久化全部的事件,並將其存儲到磁盤中。所以,即便 Java 虛擬機當掉,或者操做系統崩潰或重啓,再或者事件沒有在管道中成功地傳遞到下一個代理(agent),這一切都不會形成數據丟失。Memory Channel 是一個不穩定的隧道,其緣由是因爲它在內存中存儲全部事件。若是 java 進程死掉,任何存儲在內存的事件將會丟失。另外,內存的空間收到 RAM大小的限制,而 File Channel 這方面是它的優點,只要磁盤空間足夠,它就能夠將全部事件數據存儲到磁盤上。
Flume Channel 支持的類型:
Channel類型 | 說明 |
---|---|
Memory Channel | Event數據存儲在內存中 |
JDBC Channel | Event數據存儲在持久化存儲中,當前Flume Channel內置支持Derby |
File Channel | Event數據存儲在磁盤文件中 |
Spillable Memory Channel | Event數據存儲在內存中和磁盤上,當內存隊列滿了,會持久化到磁盤文件(當前試驗性的,不建議生產環境使用) |
Pseudo Transaction Channel | 測試用途 |
Custom Channel | 自定義Channel實現 |
Sink在設置存儲數據時,能夠向文件系統、數據庫、hadoop存數據,在日誌數據較少時,能夠將數據存儲在文件系中,而且設定必定的時間間隔保存數據。在日誌數據較多時,能夠將相應的日誌數據存儲到Hadoop中,便於往後進行相應的數據分析。
Flume Sink支持的類型
Sink類型 | 說明 |
---|---|
HDFS Sink | 數據寫入HDFS |
Logger Sink | 數據寫入日誌文件 |
Avro Sink | 數據被轉換成Avro Event,而後發送到配置的RPC端口上 |
Thrift Sink | 數據被轉換成Thrift Event,而後發送到配置的RPC端口上 |
IRC Sink | 數據在IRC上進行回放 |
File Roll Sink | 存儲數據到本地文件系統 |
Null Sink | 丟棄到全部數據 |
HBase Sink | 數據寫入HBase數據庫 |
Morphline Solr Sink | 數據發送到Solr搜索服務器(集羣) |
ElasticSearch Sink | 數據發送到Elastic Search搜索服務器(集羣) |
Kite Dataset Sink | 寫數據到Kite Dataset,試驗性質的 |
Custom Sink | 自定義Sink實現 |
更多sink的內容能夠參考官方手冊。
Flume 的核心是把數據從數據源收集過來,再送到目的地。爲了保證輸送必定成功,在送到目的地以前,會先緩存數據,待數據真正到達目的地後,刪除本身緩存的數據。
Flume 使用事務性的方式保證傳送Event整個過程的可靠性。Sink 必須在 Event 被存入 Channel 後,或者,已經被傳達到下一站agent裏,又或者,已經被存入外部數據目的地以後,才能把 Event 從 Channel 中 remove 掉。這樣數據流裏的 event 不管是在一個 agent 裏仍是多個 agent 之間流轉,都能保證可靠,由於以上的事務保證了 event 會被成功存儲起來。而 Channel 的多種實如今可恢復性上有不一樣的保證。也保證了 event 不一樣程度的可靠性。好比 Flume 支持在本地保存一份文件 channel 做爲備份,而memory channel 將 event 存在內存 queue 裏,速度快,但丟失的話沒法恢復。
下面,根據官網文檔,咱們展現幾種Flow Pipeline,各自適應於什麼樣的應用場景:
多個 agent 順序鏈接:
能夠將多個Agent順序鏈接起來,將最初的數據源通過收集,存儲到最終的存儲系統中。這是最簡單的狀況,通常狀況下,應該控制這種順序鏈接的Agent的數量,由於數據流經的路徑變長了,若是不考慮failover的話,出現故障將影響整個Flow上的Agent收集服務。
多個Agent的數據匯聚到同一個Agent:
這種狀況應用的場景比較多,好比要收集Web網站的用戶行爲日誌,Web網站爲了可用性使用的負載均衡的集羣模式,每一個節點都產生用戶行爲日誌,能夠爲每一個節點都配置一個Agent來單獨收集日誌數據,而後多個Agent將數據最終匯聚到一個用來存儲數據存儲系統,如HDFS上。
多路(Multiplexing)Agent
這種模式,有兩種方式,一種是用來複制(Replication),另外一種是用來分流(Multiplexing)。Replication方式,能夠將最前端的數據源複製多份,分別傳遞到多個channel中,每一個channel接收到的數據都是相同的。
配置格式示例以下:
# List the sources, sinks and channels for the agent <Agent>.sources = <Source1> <Agent>.sinks = <Sink1> <Sink2> <Agent>.channels = <Channel1> <Channel2> # set list of channels for source (separated by space) <Agent>.sources.<Source1>.channels = <Channel1> <Channel2> # set channel for sinks <Agent>.sinks.<Sink1>.channel = <Channel1> <Agent>.sinks.<Sink2>.channel = <Channel2> <Agent>.sources.<Source1>.selector.type = replicating
上面指定了selector的type的值爲replication,其餘的配置沒有指定,使用的Replication方式,Source1會將數據分別存儲到Channel1和Channel2,這兩個channel裏面存儲的數據是相同的,而後數據被傳遞到Sink1和Sink2。
Multiplexing方式,selector能夠根據header的值來肯定數據傳遞到哪個channel,配置格式,以下所示:
# Mapping for multiplexing selector <Agent>.sources.<Source1>.selector.type = multiplexing <Agent>.sources.<Source1>.selector.header = <someHeader> <Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1> <Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2> <Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2> #... <Agent>.sources.<Source1>.selector.default = <Channel2>
上面selector的type的值爲multiplexing,同時配置selector的header信息,還配置了多個selector的mapping的值,即header的值:若是header的值爲Value一、Value2,數據從Source1路由到Channel1;若是header的值爲Value二、Value3,數據從Source1路由到Channel2。
實現load balance功能
Load balancing Sink Processor可以實現load balance功能,上圖Agent1是一個路由節點,負責將Channel暫存的Event均衡到對應的多個Sink組件上,而每一個Sink組件分別鏈接到一個獨立的Agent上,示例配置,以下所示:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 k3 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = round_robin a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
實現failover能
Failover Sink Processor可以實現failover功能,具體流程相似load balance,可是內部處理機制與load balance徹底不一樣:Failover Sink Processor維護一個優先級Sink組件列表,只要有一個Sink組件可用,Event就被傳遞到下一個組件。若是一個Sink可以成功處理Event,則會加入到一個Pool中,不然會被移出Pool並計算失敗次數,設置一個懲罰因子,示例配置以下所示:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 k3 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 7 a1.sinkgroups.g1.processor.priority.k3 = 6 a1.sinkgroups.g1.processor.maxpenalty = 20000
Flume 的 rpm 安裝方式很簡單,這裏不作說明。
安裝成功以後,在 /etc/flume/conf 目錄建立f1.conf 文件,內容以下:
agent-1.channels.ch-1.type = memory agent-1.sources.avro-source1.channels = ch-1 agent-1.sources.avro-source1.type = avro agent-1.sources.avro-source1.bind = 0.0.0.0 agent-1.sources.avro-source1.port = 41414 agent-1.sources.avro-source1.threads = 5 agent-1.sinks.log-sink1.channel = ch-1 agent-1.sinks.log-sink1.type = logger agent-1.channels = ch-1 agent-1.sources = avro-source1 agent-1.sinks = log-sink1
關於 avro-source 配置說明,請參考 avro-source
接下來啓動 agent:
$ flume-ng agent -c /etc/flume-ng/conf -f /etc/flume-ng/conf/f1.conf -Dflume.root.logger=DEBUG,console -n agent-1
參數說明:
-n
指定agent名稱
-c
指定配置文件目錄
-f
指定配置文件
-Dflume.root.logger=DEBUG,console
設置日誌等級
下面能夠啓動一個 avro-client 客戶端生產數據:
$ flume-ng avro-client -c /etc/flume-ng/conf -H localhost -p 41414 -F /etc/passwd -Dflume.root.logger=DEBUG,console
在 /etc/flume/conf 目錄建立 f2.conf 文件,內容以下:
agent-1.channels = ch-1 agent-1.sources = src-1 agent-1.channels.ch-1.type = memory agent-1.sources.src-1.type = spooldir agent-1.sources.src-1.channels = ch-1 agent-1.sources.src-1.spoolDir = /root/log agent-1.sources.src-1.fileHeader = true agent-1.sinks.log-sink1.channel = ch-1 agent-1.sinks.log-sink1.type = logger agent-1.sinks = log-sink1
關於 Spooling Directory Source 配置說明,請參考 Spooling Directory Source
接下來啓動 agent:
$ flume-ng agent -c /etc/flume-ng/conf -f /etc/flume-ng/conf/f2.conf -Dflume.root.logger=DEBUG,console -n agent-1
而後,手動拷貝一個文件到 /root/log 目錄,觀察日誌輸出以及/root/log 目錄下的變化。
在 /etc/flume/conf 目錄建立 f3.conf 文件,內容以下:
agent-1.channels.ch-1.type = file agent-1.channels.ch-1.checkpointDir= /root/checkpoint agent-1.channels.ch-1.dataDirs= /root/data agent-1.sources.src-1.type = spooldir agent-1.sources.src-1.channels = ch-1 agent-1.sources.src-1.spoolDir = /root/log agent-1.sources.src-1.deletePolicy= never agent-1.sources.src-1.fileHeader = true agent-1.sources.src-1.interceptors =i1 agent-1.sources.src-1.interceptors.i1.type = timestamp agent-1.sinks.sink_hdfs.channel = ch-1 agent-1.sinks.sink_hdfs.type = hdfs agent-1.sinks.sink_hdfs.hdfs.path = hdfs://cdh1:8020/user/root/events/%Y-%m-%d agent-1.sinks.sink_hdfs.hdfs.filePrefix = logs agent-1.sinks.sink_hdfs.hdfs.inUsePrefix = . agent-1.sinks.sink_hdfs.hdfs.rollInterval = 30 agent-1.sinks.sink_hdfs.hdfs.rollSize = 0 agent-1.sinks.sink_hdfs.hdfs.rollCount = 0 agent-1.sinks.sink_hdfs.hdfs.batchSize = 1000 agent-1.sinks.sink_hdfs.hdfs.writeFormat = text agent-1.sinks.sink_hdfs.hdfs.fileType = DataStream #agent-1.sinks.sink_hdfs.hdfs.fileType = CompressedStream #agent-1.sinks.sink_hdfs.hdfs.codeC = lzop agent-1.channels = ch-1 agent-1.sources = src-1 agent-1.sinks = sink_hdfs
關於 HDFS Sink配置說明,請參考 HDFS Sink
說明:
經過 interceptors 往 header 裏添加 timestamp,這樣作,能夠在 hdfs.path 引用系統內部的時間變量或者主機的 hostname。
經過設置 hdfs.inUsePrefix
,例如設置爲 .
時,hdfs 會把該文件當作隱藏文件,以免在 mr 過程當中讀到這些臨時文件,引發一些錯誤
若是使用 lzo 壓縮,則須要手動建立 lzo 索引,能夠經過修改 HdfsSink 的代碼,經過代碼建立索引
FileChannel 的目錄最好是和 spooldir 的數據目錄處於不一樣磁盤。
關於 HBase Sink 配置說明,請參考 HBase Sink
從 github 下載源代碼並編譯:
$ git clone git@github.com:cloudera/flume-ng.git -b cdh4-1.4.0_4.7.0 $ cd flume-ng $ mvn install -DskipTests -Phadoop-2
若是提示找不到 hadoop-test 的 jar 包,則修改 pom.xml 中的版本,如改成 2.0.0-mr1-cdh4.7.0
,具體版本視你使用的分支版本而定,我這裏是 cdh4.7.0。
若是提示找不到 uanodeset-parser 的 jarb,則在 pom.xml 中添加下面倉庫:
<repository> <id>tempo-db</id> <url>http://maven.tempo-db.com/artifactory/list/twitter/ </url> <snapshots> <enabled>false</enabled> </snapshots> </repository>
參考基於Flume的美團日誌收集系統(一)架構和設計,列出一些最佳實踐:
模塊命名規則:全部的 Source 以 src 開頭,全部的 Channel 以 ch 開頭,全部的 Sink 以 sink 開頭;
模塊之間內部通訊統一使用 Avro 接口;
將日誌採集系統系統分爲三層:Agent 層,Collector 層和 Store 層,其中 Agent 層每一個機器部署一個進程,負責對單機的日誌收集工做;Collector 層部署在中心服務器上,負責接收Agent層發送的日誌,而且將日誌根據路由規則寫到相應的 Store 層中;Store 層負責提供永久或者臨時的日誌存儲服務,或者將日誌流導向其它服務器。
擴展 MemoryChannel 和 FileChannel ,提供 DualChannel 的實現,以提供高吞吐和大緩存
監控 collector HdfsSink寫數據到 hdfs 的速度、FileChannel 中擁堵的 events 數量,以及寫 hdfs 狀態(查看是否有 .tmp 文件生成)
美團對 flume 的改進代碼見 github:https://github.com/javachen/mt-flume。