Flume 做爲 cloudera 開發的實時日誌收集系統,受到了業界的承認與普遍應用。Flume 初始的發行版本目前被統稱爲 Flume OG(original generation),屬於 cloudera。但隨着 FLume 功能的擴展,Flume OG 代碼工程臃腫、核心組件設計不合理、核心配置不標準等缺點暴露出來,尤爲是在 Flume OG 的最後一個發行版本 0.94.0 中,日誌傳輸不穩定的現象尤其嚴重,爲了解決這些問題,2011 年 10 月 22 號,cloudera 完成了 Flume-728,對 Flume 進行了里程碑式的改動:重構核心組件、核心配置以及代碼架構,重構後的版本統稱爲 Flume NG(next generation);改動的另外一緣由是將 Flume 歸入 apache 旗下,cloudera Flume 更名爲 Apache Flume。IBM 的這篇文章:《Flume NG:Flume 發展史上的第一次革命》,從基本組件以及用戶體驗的角度闡述 Flume OG 到 Flume NG 發生的革命性變化。本文就再也不贅述各類細枝末節了,不過這裏仍是簡要提下 Flume NG (1.x.x)的主要變化: html
注:本文所使用的 Flume 版本爲 flume-1.4.0-cdh4.7.0,不須要額外的安裝過程,解壓縮便可用。
java
組件 | 功能 |
---|---|
Agent | 使用JVM 運行Flume。每臺機器運行一個agent,可是能夠在一個agent中包含多個sources和sinks。 |
Client | 生產數據,運行在一個獨立的線程。 |
Source | 從Client收集數據,傳遞給Channel。 |
Sink | 從Channel收集數據,運行在一個獨立線程。 |
Channel | 鏈接 sources 和 sinks ,這個有點像一個隊列。 |
Events | 能夠是日誌記錄、 avro 對象等。 |
Flume以agent爲最小的獨立運行單位。一個agent就是一個JVM。單agent由Source、Sink和Channel三大組件構成,以下圖: node
圖一 web
Flume的數據流由事件(Event)貫穿始終。事件是Flume的基本數據單位,它攜帶日誌數據(字節數組形式)而且攜帶有頭信息,這些Event由Agent外部的Source,好比上圖中的Web Server生成。當Source捕獲事件後會進行特定的格式化,而後Source會把事件推入(單個或多個)Channel中。你能夠把Channel看做是一個緩衝區,它將保存事件直到Sink處理完該事件。Sink負責持久化日誌或者把事件推向另外一個Source。
很直白的設計,其中值得注意的是,Flume提供了大量內置的Source、Channel和Sink類型。不一樣類型的Source,Channel和Sink能夠自由組合。組合方式基於用戶設置的配置文件,很是靈活。好比:Channel能夠把事件暫存在內存裏,也能夠持久化到本地硬盤上。Sink能夠把日誌寫入HDFS, HBase,甚至是另一個Source等等。
若是你覺得Flume就這些能耐那就大錯特錯了。Flume支持用戶創建多級流,也就是說,多個agent能夠協同工做,而且支持Fan-in、Fan-out、Contextual Routing、Backup Routes。以下圖所示: shell
Flume架構總體上看就是 source-->channel-->sink 的三層架構(參見最上面的 圖一),相似生成者和消費者的架構,他們之間經過queue(channel)傳輸,解耦。 數據庫
Source:完成對日誌數據的收集,分紅 transtion 和 event 打入到channel之中。
Channel:主要提供一個隊列的功能,對source提供中的數據進行簡單的緩存。
Sink:取出Channel中的數據,進行相應的存儲文件系統,數據庫,或者提交到遠程服務器。
對現有程序改動最小的使用方式是使用是直接讀取程序原來記錄的日誌文件,基本能夠實現無縫接入,不須要對現有程序進行任何改動。
對於直接讀取文件Source, 主要有兩種方式:
apache
# example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1將上述配置存爲:example.conf
而後咱們就能夠啓動 Flume 了: segmentfault
bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
PS:-Dflume.root.logger=INFO,console 僅爲 debug 使用,請勿生產環境生搬硬套,不然大量的日誌會返回到終端。。。 數組
-c/--conf 後跟配置目錄,-f/--conf-file 後跟具體的配置文件,-n/--name 指定agent的名稱
緩存
$ telnet localhost 44444 Trying 127.0.0.1... Connected to localhost.localdomain (127.0.0.1). Escape character is '^]'. Hello world! <ENTER> OKFlume 終端窗口此時會打印出以下信息,就表示成功了:
12/06/19 15:32:19 INFO source.NetcatSource: Source starting 12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444] 12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D Hello world!. }至此,我們的第一個 Flume Agent 算是部署成功了!
# Define a memory channel called ch1 on agent1 agent1.channels.ch1.type = memory agent1.channels.ch1.capacity = 100000 agent1.channels.ch1.transactionCapacity = 100000 agent1.channels.ch1.keep-alive = 30 # Define an Avro source called avro-source1 on agent1 and tell it # to bind to 0.0.0.0:41414. Connect it to channel ch1. #agent1.sources.avro-source1.channels = ch1 #agent1.sources.avro-source1.type = avro #agent1.sources.avro-source1.bind = 0.0.0.0 #agent1.sources.avro-source1.port = 41414 #agent1.sources.avro-source1.threads = 5 #define source monitor a file agent1.sources.avro-source1.type = exec agent1.sources.avro-source1.shell = /bin/bash -c agent1.sources.avro-source1.command = tail -n +0 -F /home/storm/tmp/id.txt agent1.sources.avro-source1.channels = ch1 agent1.sources.avro-source1.threads = 5 # Define a logger sink that simply logs all events it receives # and connect it to the other end of the same channel. agent1.sinks.log-sink1.channel = ch1 agent1.sinks.log-sink1.type = hdfs agent1.sinks.log-sink1.hdfs.path = hdfs://192.168.1.111:8020/flumeTest agent1.sinks.log-sink1.hdfs.writeFormat = Text agent1.sinks.log-sink1.hdfs.fileType = DataStream agent1.sinks.log-sink1.hdfs.rollInterval = 0 agent1.sinks.log-sink1.hdfs.rollSize = 1000000 agent1.sinks.log-sink1.hdfs.rollCount = 0 agent1.sinks.log-sink1.hdfs.batchSize = 1000 agent1.sinks.log-sink1.hdfs.txnEventMax = 1000 agent1.sinks.log-sink1.hdfs.callTimeout = 60000 agent1.sinks.log-sink1.hdfs.appendTimeout = 60000 # Finally, now that we've defined all of our components, tell # agent1 which ones we want to activate. agent1.channels = ch1 agent1.sources = avro-source1 agent1.sinks = log-sink1啓動以下命令,就能夠在 hdfs 上看到效果了。
../bin/flume-ng agent --conf ../conf/ -f flume_directHDFS.conf -n agent1 -Dflume.root.logger=INFO,console
PS:實際環境中有這樣的需求,經過在多個agent端tail日誌,發送給collector,collector再把數據收集,統一發送給HDFS存儲起來,當HDFS文件大小超過必定的大小或者超過在規定的時間間隔會生成一個文件。
Flume 實現了兩個Trigger,分別爲SizeTriger(在調用HDFS輸出流寫的同時,count該流已經寫入的大小總和,若超過必定大小,則建立新的文件和輸出流,寫入操做指向新的輸出流,同時close之前的輸出流)和TimeTriger(開啓定時器,當到達該點時,自動建立新的文件和輸出流,新的寫入重定向到該流中,同時close之前的輸出流)。
# clientMainAgent clientMainAgent.channels = c1 clientMainAgent.sources = s1 clientMainAgent.sinks = k1 k2 # clientMainAgent sinks group clientMainAgent.sinkgroups = g1 # clientMainAgent Spooling Directory Source clientMainAgent.sources.s1.type = spooldir clientMainAgent.sources.s1.spoolDir =/dsap/rawdata/ clientMainAgent.sources.s1.fileHeader = true clientMainAgent.sources.s1.deletePolicy =immediate clientMainAgent.sources.s1.batchSize =1000 clientMainAgent.sources.s1.channels =c1 clientMainAgent.sources.s1.deserializer.maxLineLength =1048576 # clientMainAgent FileChannel clientMainAgent.channels.c1.type = file clientMainAgent.channels.c1.checkpointDir = /var/flume/fchannel/spool/checkpoint clientMainAgent.channels.c1.dataDirs = /var/flume/fchannel/spool/data clientMainAgent.channels.c1.capacity = 200000000 clientMainAgent.channels.c1.keep-alive = 30 clientMainAgent.channels.c1.write-timeout = 30 clientMainAgent.channels.c1.checkpoint-timeout=600 # clientMainAgent Sinks # k1 sink clientMainAgent.sinks.k1.channel = c1 clientMainAgent.sinks.k1.type = avro # connect to CollectorMainAgent clientMainAgent.sinks.k1.hostname = flume115 clientMainAgent.sinks.k1.port = 41415 # k2 sink clientMainAgent.sinks.k2.channel = c1 clientMainAgent.sinks.k2.type = avro # connect to CollectorBackupAgent clientMainAgent.sinks.k2.hostname = flume116 clientMainAgent.sinks.k2.port = 41415 # clientMainAgent sinks group clientMainAgent.sinkgroups.g1.sinks = k1 k2 # load_balance type clientMainAgent.sinkgroups.g1.processor.type = load_balance clientMainAgent.sinkgroups.g1.processor.backoff = true clientMainAgent.sinkgroups.g1.processor.selector = random
../bin/flume-ng agent --conf ../conf/ -f flume_Consolidation.conf -n clientMainAgent -Dflume.root.logger=DEBUG,console
# collectorMainAgent collectorMainAgent.channels = c2 collectorMainAgent.sources = s2 collectorMainAgent.sinks =k1 k2 # collectorMainAgent AvroSource # collectorMainAgent.sources.s2.type = avro collectorMainAgent.sources.s2.bind = flume115 collectorMainAgent.sources.s2.port = 41415 collectorMainAgent.sources.s2.channels = c2 # collectorMainAgent FileChannel # collectorMainAgent.channels.c2.type = file collectorMainAgent.channels.c2.checkpointDir =/opt/var/flume/fchannel/spool/checkpoint collectorMainAgent.channels.c2.dataDirs = /opt/var/flume/fchannel/spool/data,/work/flume/fchannel/spool/data collectorMainAgent.channels.c2.capacity = 200000000 collectorMainAgent.channels.c2.transactionCapacity=6000 collectorMainAgent.channels.c2.checkpointInterval=60000 # collectorMainAgent hdfsSink collectorMainAgent.sinks.k2.type = hdfs collectorMainAgent.sinks.k2.channel = c2 collectorMainAgent.sinks.k2.hdfs.path = hdfs://db-cdh-cluster/flume%{dir} collectorMainAgent.sinks.k2.hdfs.filePrefix =k2_%{file} collectorMainAgent.sinks.k2.hdfs.inUsePrefix =_ collectorMainAgent.sinks.k2.hdfs.inUseSuffix =.tmp collectorMainAgent.sinks.k2.hdfs.rollSize = 0 collectorMainAgent.sinks.k2.hdfs.rollCount = 0 collectorMainAgent.sinks.k2.hdfs.rollInterval = 240 collectorMainAgent.sinks.k2.hdfs.writeFormat = Text collectorMainAgent.sinks.k2.hdfs.fileType = DataStream collectorMainAgent.sinks.k2.hdfs.batchSize = 6000 collectorMainAgent.sinks.k2.hdfs.callTimeout = 60000 collectorMainAgent.sinks.k1.type = hdfs collectorMainAgent.sinks.k1.channel = c2 collectorMainAgent.sinks.k1.hdfs.path = hdfs://db-cdh-cluster/flume%{dir} collectorMainAgent.sinks.k1.hdfs.filePrefix =k1_%{file} collectorMainAgent.sinks.k1.hdfs.inUsePrefix =_ collectorMainAgent.sinks.k1.hdfs.inUseSuffix =.tmp collectorMainAgent.sinks.k1.hdfs.rollSize = 0 collectorMainAgent.sinks.k1.hdfs.rollCount = 0 collectorMainAgent.sinks.k1.hdfs.rollInterval = 240 collectorMainAgent.sinks.k1.hdfs.writeFormat = Text collectorMainAgent.sinks.k1.hdfs.fileType = DataStream collectorMainAgent.sinks.k1.hdfs.batchSize = 6000 collectorMainAgent.sinks.k1.hdfs.callTimeout = 60000
../bin/flume-ng agent --conf ../conf/ -f flume_Consolidation.conf -n collectorMainAgent -Dflume.root.logger=DEBUG,console
flume 報錯: java.lang.OutOfMemoryError: GC overhead limit exceeded 或者: java.lang.OutOfMemoryError: Java heap space Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.OutOfMemoryError: Java heap space
Flume 啓動時的最大堆內存大小默認是 20M,線上環境很容易 OOM,所以須要你在 flume-env.sh 中添加 JVM 啓動參數:
JAVA_OPTS="-Xms8192m -Xmx8192m -Xss256k -Xmn2g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"
而後在啓動 agent 的時候必定要帶上 -c conf 選項,不然 flume-env.sh 裏配置的環境變量不會被加載生效。
具體參見:
http://marc.info/?l=flume-user&m=138933303305433&w=2
2014-07-07 14:44:17,902 (agent-shutdown-hook) [WARN - org.apache.flume.sink.hdfs.HDFSEventSink.stop(HDFSEventSink.java:504)] Exception while closing hdfs://192.168.1.111:8020/flumeTest/FlumeData. Exception follows. java.lang.UnsupportedOperationException: This is supposed to be overridden by subclasses. at com.google.protobuf.GeneratedMessage.getUnknownFields(GeneratedMessage.java:180) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$GetFileInfoRequestProto.getSerializedSize(ClientNamenodeProtocolProtos.java:30108) at com.google.protobuf.AbstractMessageLite.toByteString(AbstractMessageLite.java:49) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.constructRpcRequest(ProtobufRpcEngine.java:149) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:193)
把你的 jdk7 換成 jdk6 試試。
其實上面 3.2 中已有說明,flume 的 sink 已經實現了幾種最主要的持久化觸發器:
好比按大小、按間隔時間、按消息條數等等,針對你的文件太小遲遲無法寫入 HDFS 持久化的問題,
那是由於你此時尚未知足持久化的條件,好比你的行數尚未達到配置的閾值或者大小還沒達到等等,
能夠針對上面 3.2 小節的配置微調下,例如:
agent1.sinks.log-sink1.hdfs.rollInterval = 20當遲遲沒有新日誌生成的時候,若是你想很快的 flush,那麼讓它每隔 20s flush 持久化一下,agent 會根據多個條件,優先執行知足條件的觸發器。
下面貼一些常見的持久化觸發器:
# Number of seconds to wait before rolling current file (in 600 seconds) agent.sinks.sink.hdfs.rollInterval=600 # File size to trigger roll, in bytes (256Mb) agent.sinks.sink.hdfs.rollSize = 268435456 # never roll based on number of events agent.sinks.sink.hdfs.rollCount = 0 # Timeout after which inactive files get closed (in seconds) agent.sinks.sink.hdfs.idleTimeout = 3600 agent.sinks.HDFS.hdfs.batchSize = 1000更多關於 sink 的觸發機制與參數配置請參見: http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
http://stackoverflow.com/questions/20638498/flume-not-writing-to-hdfs-unless-killed
注意:對於 HDFS 來講應當竭力避免小文件問題,因此請慎重對待你配置的持久化觸發機制。
Flume的HDFSsink在數據寫入/讀出Channel時,都有Transcation的保證。當Transaction失敗時,會回滾,而後重試。但因爲HDFS不可修改文件的內容,假設有1萬行數據要寫入HDFS,而在寫入5000行時,網絡出現問題致使寫入失敗,Transaction回滾,而後重寫這10000條記錄成功,就會致使第一次寫入的5000行重複。這些問題是 HDFS 文件系統設計上的特性缺陷,並不能經過簡單的Bugfix來解決。咱們只能關閉批量寫入,單條事務保證,或者啓用監控策略,兩端對數。
Memory和exec的方式可能會有數據丟失,file 是 end to end 的可靠性保證的,可是性能較前二者要差。
end to end、store on failure 方式 ACK 確認時間設置太短(特別是高峯時間)也有可能引起數據的重複寫入。
能夠在 tail 傳的時候記錄行號,下次再傳的時候,取上次記錄的位置開始傳輸,相似:
agent1.sources.avro-source1.command = /usr/local/bin/tail -n +$(tail -n1 /home/storm/tmp/n) --max-unchanged-stats=600 -F /home/storm/tmp/id.txt | awk 'ARNGIND==1{i=$0;next}{i++; if($0~/文件已截斷/)i=0; print i >> "/home/storm/tmp/n";print $1"---"i}' /home/storm/tmp/n -須要注意以下幾點:
(1)文件被 rotation 的時候,須要同步更新你的斷點記錄「指針」,
(2)須要按文件名來追蹤文件,
(3)flume 掛掉後須要累加斷點續傳「指針」
(4)flume 掛掉後,若是剛好文件被 rotation,那麼會有丟數據的風險,
只能監控儘快拉起或者加邏輯判斷文件大小重置指針。
(5)tail 注意你的版本,請更新 coreutils 包到最新。
這裏你須要利用 Flume 提供的攔截器(Interceptor)機制來知足上述的需求了,具體請參考下面幾個連接:
(1)Flume-NG源碼閱讀之Interceptor(原創)
http://www.cnblogs.com/lxf20061900/p/3664602.html
(2)Flume-NG自定義攔截器
http://sep10.com/posts/2014/04/15/flume-interceptor/
(3)Flume-ng生產環境實踐(四)實現log格式化interceptor
http://blog.csdn.net/rjhym/article/details/8450728
(4)flume-ng如何根據源文件名輸出到HDFS文件名
(1)scribe、chukwa、kafka、flume日誌系統對比
http://www.ttlsa.com/log-system/scribe-chukwa-kafka-flume-log-system-contrast/
(2)關於Flume-ng那些事 http://www.ttlsa.com/?s=flume
關於Flume-ng那些事(三):常見架構測試 http://www.ttlsa.com/log-system/about-flume-ng-3/
(3)Flume 1.4.0 User Guide
http://archive.cloudera.com/cdh4/cdh/4/flume-ng-1.4.0-cdh4.7.0/FlumeUserGuide.html
(4)flume日誌採集 http://blog.csdn.net/sunmeng_007/article/details/9762507
(5)Flume-NG + HDFS + HIVE 日誌收集分析
(6)【Twitter Storm系列】flume-ng+Kafka+Storm+HDFS 實時系統搭建
http://blog.csdn.net/weijonathan/article/details/18301321
(7)Flume-NG + HDFS + PIG 日誌收集分析
http://hi.baidu.com/life_to_you/item/a98e2ec3367486dbef183b5e
flume 示例一收集tomcat日誌 http://my.oschina.net/88sys/blog/71529
flume-ng 多節點集羣示例 http://my.oschina.net/u/1401580/blog/204052
試用flume-ng 1.1 http://heipark.iteye.com/blog/1617995
(8)Flafka: Apache Flume Meets Apache Kafka for Event Processing
http://blog.cloudera.com/blog/2014/11/flafka-apache-flume-meets-apache-kafka-for-event-processing/
(9)Flume-ng的原理和使用
http://segmentfault.com/blog/javachen/1190000002532284
(10)基於Flume的美團日誌收集系統(一)架構和設計
http://tech.meituan.com/mt-log-system-arch.html
(11)基於Flume的美團日誌收集系統(二)改進和優化
http://tech.meituan.com/mt-log-system-optimization.html
(12)How-to: Do Real-Time Log Analytics with Apache Kafka, Cloudera Search, and Hue
(13)Real-time analytics in Apache Flume - Part 1
http://jameskinley.tumblr.com/post/57704266739/real-time-analytics-in-apache-flume-part-1