Flume是Cloudera提供的一個高可用的、高可靠的開源分佈式海量日誌收集系統,日誌數據能夠通過Flume流向須要存儲終端目的地。這裏的日誌是一個統稱,泛指文件、操做記錄等許多數據。html
1、Flume基礎知識java
一、數據流模型apache
Flume的核心是把數據從數據源收集過來,再送到目的地。爲了保證輸送必定成功,在送到目的地以前,會先緩存數據,待數據真正到達目的地後,刪除本身緩存的數據。數組
Flume傳輸的數據的基本單位是Event,若是是文本文件,一般是一行記錄,這也是事務的基本單位。Event從Source,流向Channel,再到Sink,自己爲一個byte數組,並可攜帶headers信息。Event表明着一個數據流的最小完整單元,從外部數據源來,向外部的目的地去。緩存
二、核心組件分佈式
Flume運行的核心是Agent。它是一個完整的數據收集工具,含有三個核心組件,分別是source、channel、sink。經過這些組件,Event能夠從一個地方流向另外一個地方,如圖1-1所示,也能夠多級agent任一連接組合,如圖1-2所示。工具
圖1-1 flume數據流模型oop
圖1-2 多級agent鏈接模型ui
1) Source:專用於收集日誌,能夠處理各類類型各類格式的日誌數據,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定義等。spa
▶ Exec Source:以運行 Linux 命令的方式,持續的輸出最新的數據,如 tail -F 文件名 指令,在這種方式下,取的文件名必須是指定的。 ExecSource 能夠實現對日誌的實時收集,可是存在Flume不運行或者指令執行出錯時,將沒法收集到日誌數據,沒法保證日誌數據的完整性;
▶ Spool Source:監測配置的目錄下新增的文件,並將文件中的數據讀取出來。須要注意兩點:拷貝到 spool 目錄下的文件不能夠再打開編輯;spool 目錄下不可包含相應的子目錄;
2) Channel:專用於臨時存儲數據,能夠存放在memory、jdbc、file、自定義等。其存儲的數據只有在sink發送成功以後纔會被刪除。
▶ Memory Channel:能夠實現高速的吞吐,可是沒法保證數據的完整性。Memory Channel 是一個不穩定的隧道,其緣由是因爲它在內存中存儲全部事件。若是 java 進程死掉,任何存儲在內存的事件將會丟失。另外,內存的空間也受到RAM大小的限制,與File Channel有差異;
▶ File Channel:保證數據的完整性與一致性。在具體配置FileChannel時,建議FileChannel設置的目錄和程序日誌文件保存的目錄設成不一樣的磁盤,以便提升效率。File Channel是一個持久化的隧道(channel),它持久化全部的事件,並將其存儲到磁盤中。所以,即便 Java 虛擬機當掉,或者操做系統崩潰或重啓,再或者事件沒有在管道中成功地傳遞到下一個代理(agent),這一切都不會形成數據丟失。
3) Sink:專用於把數據發送到目的地件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定義等。
三、可靠性
Flume的核心是把數據從數據源收集過來,再送到目的地。爲了保證輸送必定成功,在送到目的地以前,會先緩存數據,待數據真正到達目的地後,刪除本身緩存的數據。
Flume使用事務性的方式保證傳送Event整個過程的可靠性。Sink必須在Event被存入Channel 後,或者已經被傳達到下一站agent裏,又或者,已經被存入外部數據目的地以後,才能把Event從Channel中remove掉。這樣數據流裏的event不管是在一個agent裏仍是多個agent之間流轉,都能保證可靠,由於以上的事務保證了event會被成功存儲起來。而Channel的多種實如今可恢復性上有不一樣的保證。也保證了event不一樣程度的可靠性。好比Flume支持在本地保存一份文件channel做爲備份,而memory channel將event存在內存queue裏,速度快,但丟失的話沒法恢復。
2、Flume安裝與使用
一、安裝
官網(http://flume.apache.org/download.html)下載flume版本(本實驗:apache-flume-1.5.2-bin.tar.gz),解壓到/usr/local目錄下,進入flume-xx/conf目錄中,執行命令:mv flume-env.sh.properties flume-env.sh,而後配置flume-env.sh中的JAVA_HOME路徑。
二、一個示例
本示例Source來自Spooling Directory,Sink流向HDFS。監控/root/logs文件目錄下的文件,一旦有新文件,就馬上將文件內容經過agent流向HDFS的hdfs://cluster1/flume/%Y%m%d文件中(此處若是找不到cluster1,須要將hadoop的配置文件core-site.xml和hdfs-site.xml拷貝至flume的conf目錄中)。
flume目錄下新建test目錄,新建文件example,內容以下:
#定義agent名, source、channel、sink的名稱 agent1.sources = source1 agent1.channels = channel1 agent1.sinks = sink1 #具體定義source agent1.sources.source1.type = spooldir agent1.sources.source1.spoolDir = /home/logs agent1.sources.source1.fileHeader = false #定義攔截器,爲消息添加時間戳 agent1.sources.source1.interceptors = i1 agent1.sources.source1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder #具體定義channel #也能夠將channel數據放在內存(可是內存容易丟失)中,如 #agent1.channels.c1.type = memory #agent1.channels.c1.capacity = 10000 #agent1.channels.c1.transactionCapacity = 100 #此處配置爲文件中 agent1.channels.channel1.type=file #備份路徑 agent1.channels.channel1.checkpointDir=/root/flume_bak #數據保存路徑 agent1.channels.channel1.dataDirs=/root/flume_tmp #具體定義sink agent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path = hdfs://cluster1/flume/%Y%m%d agent1.sinks.sink1.hdfs.fileType = DataStream #存儲到HDFS文件名的前綴,格式爲20140116-文件名.. agent1.sinks.sink1.hdfs.filePrefix=%Y-%m-%d #不按照條數生成文件 agent1.sinks.sink1.hdfs.rollCount = 0 #HDFS上的文件達到128M時生成一個文件 agent1.sinks.sink1.hdfs.rollSize = 134217728 #HDFS上的文件每60秒去檢測 agent1.sinks.sink1.hdfs.rollInterval = 60 #組裝source、channel、sink agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1
運行該示例,進入/usr/local/flume目錄,執行命令:bin/flume-ng agent -n agent1 -c conf -f test/example -Dflume.root.logger=DEBUG,console
其中-n指定agent名稱,-c指定配置文件目錄,-f指定配置文件,-Dflume.root.logger=DEBUG,console設置日誌等級爲輸出到控制檯。