Apache Flume是一個分佈式的、可靠的、易用的系統,能夠有效地未來自不少不一樣源系統的大量日誌數據收集、彙總或者轉移到一個數據中心存儲。html
Apache Flume的做用不只限於日誌彙總,由於數據源是能夠自定義的,Flume也能夠被用於傳輸大量的事件數據,包括但不限於網絡流量數據、社交媒體產生的數據、電子郵件和幾乎全部可能的數據源。node
tar -zxvf flume-ng-1.5.0-cdh5.3.6.tar.gz -C /opt/cdh-5.3.6 vi flume-env.sh export JAVA_HOME=/opt/modules/jdk1.7.0_67
flume的數據流由事件(Event)貫穿始終。事件是Flume的基本數據單位,它攜帶日誌數據(字節數組形式)而且攜帶有頭信息,這些Event由Agent外部的Source生成,當Source捕獲事件後會進行特定的格式化,而後Source會把事件推入(單個或多個)Channel中。能夠把Channel看做是一個緩衝區,它將保存事件直到Sink處理完該事件。Sink負責持久化日誌或者把事件推向另外一個Source。web
當節點出現故障時,日誌可以被傳送到其餘節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別爲:end-to-end(收到數據agent首先將event寫到磁盤上,當數據傳送成功後,再刪除;若是數據發送失敗,能夠從新發送。),Store on failure(這也是scribe採用的策略,當數據接收方crash時,將數據寫到本地,待恢復後,繼續發送),Besteffort(數據發送到接收方後,不會進行確認)。shell
Agent使用JVM 運行Flume。每臺機器運行一個agent,可是能夠在一個agent中包含多個sources和sinks。數據庫
Client生產數據,運行在一個獨立的線程。apache
Source從Client收集數據,傳遞給Channel。bootstrap
Sink從Channel收集數據,運行在一個獨立線程。數組
Channel鏈接 sources 和 sinks ,這個有點像一個隊列。安全
Events能夠是日誌記錄、 avro 對象等。bash
Flume提供了大量內置的Source、Channel和Sink類型。不一樣類型的Source,Channel和Sink能夠自由組合。組合方式基於用戶設置的配置文件,很是靈活。好比:Channel能夠把事件暫存在內存裏,也能夠持久化到本地硬盤上。Sink能夠把日誌寫入HDFS, HBase,甚至是另一個Source等等
#example.conf:單節點Flume配置 #將該代理商的組件命名爲 a1.sources = r1 a1.sinks = k1 a1.channels = c1 #描述/配置源 a1.sources.r1.type = netcat a1.sources.r1.bind = hadoop.jianxin.com a1.sources.r1.port = 44444 #描述sink a1.sinks.k1.type = logger #使用緩衝內存中事件的通道 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #將信源和信宿綁定到信道 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
啓動方式
bin / flume-ng agent --conf conf --conf -file example.conf --name a1 -Dflume.root.logger = INFO,console
#將該代理商的組件命名爲 a1.sources = r1 a1.sinks = k1 a1.channels = c1 #描述/配置源 a1.sources.r1.type = avro a1.sources.r1.bind = hadoop.jianxin.com a1.sources.r1.port = 3306 #描述sink a1.sinks.k1.type = logger #使用緩衝內存中事件的通道 a1.channels.c1.type = memory a1.channels.c1.capacity = 100 a1.channels.c1.transactionCapacity = 10 #將信源和信宿綁定到信道 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
exec source:能夠經過指定的操做對日誌進行讀取,使用exec時須要指定shell命令,對日誌進行讀取
Spooling-directory source:能夠讀取文件夾裏的日誌,使用時指定一個文件夾,能夠讀取該文件夾中的全部文件,當出現新文件時會讀取該文件並獲取數據.須要注意的是該文件夾中的文件在讀取過程當中不能修改,同時文件名也不能修改。
spoolDirectory是監控目錄,不能爲空,沒有默認值。這個source不具備監控子目錄的功能,也就是不能遞歸監控。若是須要,這須要本身去實現,http://blog.csdn.net/yangbutao/article/details/8835563 這裏有遞歸檢測的實現;
completedSuffix是文件讀取完畢後給完成文件添加的標記後綴,默認是".COMPLETED";
deletePolicy這是是否刪除讀取完畢的文件,默認是"never",就是不刪除,目前只支持"never"和「IMMEDIATE」;
fileHeader是否在event的Header中添加文件名,boolean類型, 默認false
fileHeaderKey這是event的Header中的key,value是文件名
batchSize這個是一次處理的記錄數,默認是100;
inputCharset編碼方式,默認是"UTF-8";
ignorePattern忽略符合條件的文件名
trackerDirPath被處理文件元數據的存儲目錄,默認".flumespool"
deserializerType將文件中的數據序列化成event的方式,默認是「LINE」---org.apache.flume.serialization.LineDeserializer
deserializerContext這個主要用在Deserializer中設置編碼方式outputCharset和文件每行最大長度maxLineLength。
a1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092 a1.channels.channel1.kafka.topic = channel1 a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.sinks=k1 a1.sinks.k2.type = avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname = hadoop03 (指定的主機名或ip) a1.sinks.k2.port = 16666 (指定的端口號)
path:寫入hdfs的路徑,須要包含文件系統標識,好比:hdfs://namenode/flume/webdata/ 可使用flume提供的日期及%{host}表達式。 filePrefix: 默認值:FlumeData 寫入hdfs的文件名前綴,可使用flume提供的日期及%{host}表達式。 fileSuffix:寫入hdfs的文件名後綴,好比:.lzo .log等。 inUsePrefix:臨時文件的文件名前綴,hdfs sink會先往目標目錄中寫臨時文件,再根據相關規則重命名成最終目標文件; inUseSuffi: 默認值:.tmp 臨時文件的文件名後綴。 rollInterval: 默認值:30 hdfs sink間隔多長將臨時文件滾動成最終目標文件,單位:秒; 若是設置成0,則表示不根據時間來滾動文件; 注:滾動(roll)指的是,hdfs sink將臨時文件重命名成最終目標文件,並新打開一個臨時文件來寫入數據; rollSize 默認值:1024 當臨時文件達到該大小(單位:bytes)時,滾動成目標文件; 若是設置成0,則表示不根據臨時文件大小來滾動文件; rollCount 默認值:10 當events數據達到該數量時候,將臨時文件滾動成目標文件; 若是設置成0,則表示不根據events數據來滾動文件; idleTimeout 默認值:0 當目前被打開的臨時文件在該參數指定的時間(秒)內,沒有任何數據寫入,則將該臨時文件關閉並重命名成目標文件; batchSize 默認值:100 每一個批次刷新到HDFS上的events數量; codeC 文件壓縮格式,包括:gzip, bzip2, lzo, lzop, snappy fileType 默認值:SequenceFile 文件格式,包括:SequenceFile, DataStream,CompressedStream 當使用DataStream時候,文件不會被壓縮,不須要設置hdfs.codeC;當使用CompressedStream時候,必須設置一個正確的hdfs.codeC值; maxOpenFiles 默認值:5000 最大容許打開的HDFS文件數,當打開的文件數達到該值,最先打開的文件將會被關閉; minBlockReplicas 默認值:HDFS副本數 寫入HDFS文件塊的最小副本數。該參數會影響文件的滾動配置,通常將該參數配置成1,才能夠按照配置正確滾動文件。 writeFormat 寫sequence文件的格式。包含:Text, Writable(默認) callTimeout 默認值:10000 執行HDFS操做的超時時間(單位:毫秒); threadsPoolSize 默認值:10 hdfs sink啓動的操做HDFS的線程數。 rollTimerPoolSize 默認值:1 hdfs sink啓動的根據時間滾動文件的線程數。 kerberosPrincipal: HDFS安全認證kerberos配置; kerberosKeytab HDFS安全認證kerberos配置; proxyUser 代理用戶 round 默認值:false 是否啓用時間上的」捨棄」,這裏的」捨棄」,相似於」四捨五入」,後面再介紹。若是啓用,則會影響除了%t的其餘全部時間表達式; roundValue 默認值:1 時間上進行「捨棄」的值; roundUnit 默認值:seconds 時間上進行」捨棄」的單位,包含:second,minute,hour a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute 當時間爲2015-10-16 17:38:59時候,hdfs.path依然會被解析爲: /flume/events/20151016/17:30/00 由於設置的是捨棄10分鐘內的時間,所以,該目錄每10分鐘新生成一個。 timeZone 默認值:Local Time 時區。 useLocalTimeStamp 默認值:flase 是否使用當地時間。 closeTries:默認值:0 hdfs sink關閉文件的嘗試次數; 若是設置爲1,當一次關閉文件失敗後,hdfs sink將不會再次嘗試關閉文件,這個未關閉的文件將會一直留在那,而且是打開狀態。 設置爲0,當一次關閉失敗後,hdfs sink會繼續嘗試下一次關閉,直到成功。 retryInterval 默認值:180(秒) hdfs sink嘗試關閉文件的時間間隔,若是設置爲0,表示不嘗試,至關於於將hdfs.closeTries設置成1. Serializer 默認值:TEXT 序列化類型。其餘還有:avro_event或者是實現了EventSerializer.Builder的類名。
tableName:要寫入的HBase數據表名,不能爲空; columnFamily:數據表對應的列簇名,這個sink目前只支持一個列簇,不能爲空; batchSize:每次事務能夠處理的最大Event數量,默認是100; eventSerializerType:用來將event寫入HBase,即將event轉化爲put。默認是org.apache.flume.sink.hbase.SimpleHbaseEventSerializer,還有一個是RegexHbaseEventSerializer,即適合HBaseSink的Serializer只有這倆,不然本身定製; serializerContext:是eventSerializerType的配置信息,就是配置文件中包含「serializer.」的項; 例子: agent1.sinks = k1 agent1.sinks.k1.type = hbase agent1.sinks.k1.table = flume agent1.sinks.k1.columnFamily=fl_conf agent1.sinks.k1.serializer=org.apache.flume.sink.hbase.RegexHbaseEventSerializer
#將該代理商的組件命名爲 a1.sources = r1 a1.sinks = k1 a1.channels = c1 #描述/配置源 a1.sources.r1.type = avro a1.sources.r1.bind = hadoop.jianxin.com a1.sources.r1.port = 3306 #描述sink a1.sinks.k1.type = logger #使用緩衝內存中事件的通道 a1.channels.c1.type = memory a1.channels.c1.capacity = 100 a1.channels.c1.transactionCapacity = 10 #將信源和信宿綁定到信道 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a2.sources=r2 a2.sinks=k2 a2.channels=c2 #描述/配置源 a2.sources.r2.type=exec a2.sources.r2.command=tail -f /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/logs/hive.log a2.sources.r2.shell=/bin/bash -c #使用緩衝內存中事件的通道 a2.channels.c2.type=memory a1.channels.c2.capacity = 100 a1.channels.c2.transactionCapacity = 10 #描述sink a2.sinks.k2.type=hdfs a2.sinks.k2.hdfs.path=hdfs://hadoop.jianxin.com:9000/user/jianxin/flume/hive/date/%Y%m%d a2.sinks.k2.hdfs.fileType=DataStream a2.sinks.k2.hdfs.writeFormat=Text #Format for sequence file records. One of Text or Writable. #Set to Text before creating data files with Flume, #otherwise those files cannot be read by either Apache Impala (incubating) or Apache Hive. a2.sinks.k2.hdfs.batchSize=10 #number of events written to file before it is flushed to HDFS a2.sinks.k2.hdfs.useLocalTimeStamp=true #將信源和信宿綁定到信道 a2.sources.r2.channels=c2 a2.sinks.k2.channel=c2
運行小案例
#啓動的會是a2source bin/flume-ng agent --conf conf --conf-file conf/conf.file --name a2 -Dflume.root.logger=INFO,console nohup bin/flume-ng agent --conf conf --conf-file conf/conf.file --name a2 -Dflume.root.logger=INFO,console > /data/flume-1.5.0-cdh5.3.6/flume.log 2>&1 &
參考 https://blog.csdn.net/wei_hhh/article/details/77838999 flume 官網 http://flume.apache.org/FlumeUserGuide.html