版權聲明:本文爲yunshuxueyuan原創文章。
如需轉載請標明出處: http://www.cnblogs.com/sxt-zkys/
QQ技術交流羣:299142667html
1. flume 做爲 cloudera 開發的實時日誌收集系統,受到了業界的承認與普遍應用。Flume 初始的發行版本目前被統稱爲 Flume OG(original generation),屬於 cloudera。但隨着 FLume 功能的擴展,Flume OG 代碼工程臃腫、核心組件設計不合理、核心配置不標準等缺點暴露出來,尤爲是在 Flume OG 的最後一個發行版本 0.94.0 中,日誌傳輸不穩定的現象尤其嚴重,爲了解決這些問題,2011 年 10 月 22 號,cloudera 完成了 Flume-728,對 Flume 進行了里程碑式的改動:重構核心組件、核心配置以及代碼架構,重構後的版本統稱爲 Flume NG(next generation);改動的另外一緣由是將 Flume 歸入 apache 旗下,cloudera Flume 更名爲 Apache Flume。java
2. flume的特色:node
flume是一個分佈式、可靠、和高可用的海量日誌採集、聚合和傳輸的系統。支持在日誌系統中定製各種數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各類數據接受方(好比文本、HDFS、Hbase等)的能力 。數據庫
flume的數據流由事件(Event)貫穿始終。事件是Flume的基本數據單位,它攜帶日誌數據(字節數組形式)而且攜帶有頭信息,這些Event由Agent外部的Source生成,當Source捕獲事件後會進行特定的格式化,而後Source會把事件推入(單個或多個)Channel中。你能夠把Channel看做是一個緩衝區,它將保存事件直到Sink處理完該事件。Sink負責持久化日誌或者把事件推向另外一個Source。apache
3. flume的可靠性數組
當節點出現故障時,日誌可以被傳送到其餘節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別爲:end-to-end(收到數據agent首先將event寫到磁盤上,當數據傳送成功後,再刪除;若是數據發送失敗,能夠從新發送。),Store on failure(這也是scribe採用的策略,當數據接收方crash時,將數據寫到本地,待恢復後,繼續發送),Besteffort(數據發送到接收方後,不會進行確認)。緩存
4. flume的可恢復性安全
仍是靠Channel。推薦使用FileChannel,事件持久化在本地文件系統裏(性能較差)。服務器
5. flume的一些核心概念架構
Agent:使用JVM 運行Flume。每臺機器運行一個agent,可是能夠在一個 agent中
包含多個sources和sinks。
Client:生產數據,運行在一個獨立的線程。
Source:從Client收集數據,傳遞給Channel。
Sink:從Channel收集數據,運行在一個獨立線程。
Channel:鏈接 sources 和 sinks ,這個有點像一個隊列。
Events:能夠是日誌記錄、 avro 對象等。
介紹一下flume中event的相關概念:flume的核心是把數據從數據源(source)收集過來,在將收集到的數據送到指定的目的地(sink)。爲了保證輸送的過程必定成功,在送到目的地(sink)以前,會先緩存數據(channel),待數據真正到達目的地(sink)後,flume在刪除本身緩存的數據。
在整個數據的傳輸的過程當中,流動的是event,即事務保證是在event級別進行的。那麼什麼是event呢?—–event將傳輸的數據進行封裝,是flume傳輸數據的基本單位,若是是文本文件,一般是一行記錄,event也是事務的基本單位。event從source,流向channel,再到sink,自己爲一個字節數組,並可攜帶headers(頭信息)信息。event表明着一個數據的最小完整單元,從外部數據源來,向外部的目的地去。
爲了方便你們理解,給出一張event的數據流向圖:
flume之因此這麼神奇,是源於它自身的一個設計,這個設計就是agent,agent自己是一個Java進程,運行在日誌收集節點—所謂日誌收集節點就是服務器節點。
agent裏面包含3個核心的組件:source—->channel—–>sink,相似生產者、倉庫、消費者的架構。
source:source組件是專門用來收集數據的,能夠處理各類類型、各類格式的日誌數據,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定義。
channel:source組件把數據收集來之後,臨時存放在channel中,即channel組件在agent中是專門用來存放臨時數據的——對採集到的數據進行簡單的緩存,能夠存放在memory、jdbc、file等等。
sink:sink組件是用於把數據發送到目的地的組件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、kafaka、自定義。
Source類型:
Avro Source: 支持Avro協議(其實是Avro RPC),內置支持
Thrift Source: 支持Thrift協議,內置支持
Exec Source: 基於Unix的command在標準輸出上生產數據
JMS Source: 從JMS系統(消息、主題)中讀取數據
Spooling Directory Source: 監控指定目錄內數據變動
Twitter 1% firehose Source: 經過API持續下載Twitter數據,試驗性質
Netcat Source: 監控某個端口,將流經端口的每個文本行數據做爲Event輸入
Sequence Generator Source: 序列生成器數據源,生產序列數據
Syslog Sources: 讀取syslog數據,產生Event,支持UDP和TCP兩種協議
HTTP Source: 基於HTTP POST或GET方式的數據源,支持JSON、BLOB表示形式
Legacy Sources: 兼容老的Flume OG中Source(0.9.x版本)
Channel類型:
Memory Channel:Event數據存儲在內存中
JDBC Channel:Event數據存儲在持久化存儲中,當前Flume Channel內置支持Derby
File Channel:Event數據存儲在磁盤文件中
Spillable Memory Channel:Event數據存儲在內存中和磁盤上,當內存隊列滿了,會持
久化到磁盤文件
Pseudo Transaction Channel:測試用途
Custom Channel:自定義Channel實現
Sink類型 說明
HDFS Sink:數據寫入HDFS
Logger Sink:數據寫入日誌文件
Avro Sink:數據被轉換成Avro Event,而後發送到配置的RPC端口上
Thrift Sink:數據被轉換成Thrift Event,而後發送到配置的RPC端口上
IRC Sink:數據在IRC上進行回放
File Roll Sink:存儲數據到本地文件系統
Null Sink:丟棄到全部數據
HBase Sink:數據寫入HBase數據庫
Morphline Solr Sink:數據發送到Solr搜索服務器(集羣)
ElasticSearch Sink:數據發送到Elastic Search搜索服務器(集羣)
Kite Dataset Sink:寫數據到Kite Dataset,試驗性質的
Custom Sink:自定義Sink實現
flume的核心就是一個agent,這個agent對外有兩個進行交互的地方,一個是接受數據的輸入——source,一個是數據的輸出sink,sink負責將數據發送到外部指定的目的地。source接收到數據以後,將數據發送給channel,chanel做爲一個數據緩衝區會臨時存放這些數據,隨後sink會將channel中的數據發送到指定的地方—-例如HDFS等,注意:只有在sink將channel中的數據成功發送出去以後,channel纔會將臨時數據進行刪除,這種機制保證了數據傳輸的可靠性與安全性。
flume之因此這麼神奇—-其緣由也在於flume能夠支持多級flume的agent,即flume能夠先後相繼,例如sink能夠將數據寫到下一個agent的source中,這樣的話就能夠連成串了,能夠總體處理了。flume還支持扇入(fan-in)、扇出(fan-out)。所謂扇入就是source能夠接受多個輸入,所謂扇出就是sink能夠將數據輸出多個目的地destination中。
1. 下載源碼包,上傳到集羣的節點:
2. 解壓到指定目錄
3. 修改conf/flume.env.sh:
注意:JAVA_OPTS 配置 若是咱們傳輸文件過大 報內存溢出時 須要修改這個配置項
4. 配置環境變量
刷新profile文件:source /etc/profile
5. 驗證安裝是否成功
http://flume.apache.org/FlumeUserGuide.html#a-simple-example
配置文件simple.conf
啓動flume
flume-ng agent -n a1 -c conf -f simple.conf -Dflume.root.logger=INFO,console
安裝telnet
yum install telnet
Memory Chanel 配置
capacity:默認該通道中最大的能夠存儲的event數量是100,
trasactionCapacity:每次最大能夠source中拿到或者送到sink中的event數量也是100
keep-alive:event添加到通道中或者移出的容許時間
byte**:即event的字節量的限制,只包括eventbody
node01服務器中,配置文件
node02服務器中,安裝Flume(步驟略)
配置文件
先啓動node02的Flume
flume-ng agent -n a1 -c conf -f avro.conf -Dflume.root.logger=INFO,console
再啓動node01的Flume
flume-ng agent -n a1 -c conf -f simple.conf2 -Dflume.root.logger=INFO,console
打開telnet 測試 node02控制檯輸出結果
http://flume.apache.org/FlumeUserGuide.html#exec-source
配置文件
啓動Flume
flume-ng agent -n a1 -c conf -f exec.conf -Dflume.root.logger=INFO,console
建立空文件演示 touch flume.exec.log
循環添加數據
for i in {1..50}; do echo "$i hi flume" >> flume.exec.log ; sleep 0.1; done
http://flume.apache.org/FlumeUserGuide.html#spooling-directory-source
配置文件
啓動Flume
flume-ng agent -n a1 -c conf -f spool.conf -Dflume.root.logger=INFO,console
拷貝文件演示
mkdir logs
cp flume.exec.log logs/
http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
配置文件
############################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/logs
a1.sources.r1.fileHeader = true
# Describe the sink
***只修改上一個spool sink的配置代碼塊 a1.sinks.k1.type = logger
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://sxt/flume/%Y-%m-%d/%H%M
##每隔60s或者文件大小超過10M的時候產生新文件
# hdfs有多少條消息時新建文件,0不基於消息個數
a1.sinks.k1.hdfs.rollCount=0
# hdfs建立多長時間新建文件,0不基於時間
a1.sinks.k1.hdfs.rollInterval=60
# hdfs多大時新建文件,0不基於文件大小
a1.sinks.k1.hdfs.rollSize=10240
# 當目前被打開的臨時文件在該參數指定的時間(秒)內,沒有任何數據寫入,則將該臨時文件關閉並重命名成目標文件
a1.sinks.k1.hdfs.idleTimeout=3
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp=true
## 每五分鐘生成一個目錄:
# 是否啓用時間上的」捨棄」,這裏的」捨棄」,相似於」四捨五入」,後面再介紹。若是啓用,則會影響除了%t的其餘全部時間表達式
a1.sinks.k1.hdfs.round=true
# 時間上進行「捨棄」的值;
a1.sinks.k1.hdfs.roundValue=5
# 時間上進行」捨棄」的單位,包含:second,minute,hour
a1.sinks.k1.hdfs.roundUnit=minute
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################
建立HDFS目錄
hadoop fs -mkdir /flume
啓動Flume
flume-ng agent -n a1 -c conf -f hdfs.conf -Dflume.root.logger=INFO,console
查看hdfs文件
hadoop fs -ls /flume/...
hadoop fs -get /flume/...