flume介紹及應用

 

版權聲明:本文爲yunshuxueyuan原創文章。
如需轉載請標明出處: http://www.cnblogs.com/sxt-zkys/
QQ技術交流羣:299142667html

flume的概念

1.     flume 做爲 cloudera 開發的實時日誌收集系統,受到了業界的承認與普遍應用。Flume 初始的發行版本目前被統稱爲 Flume OG(original generation),屬於 cloudera。但隨着 FLume 功能的擴展,Flume OG 代碼工程臃腫、核心組件設計不合理、核心配置不標準等缺點暴露出來,尤爲是在 Flume OG 的最後一個發行版本 0.94.0 中,日誌傳輸不穩定的現象尤其嚴重,爲了解決這些問題,2011 年 10 月 22 號,cloudera 完成了 Flume-728,對 Flume 進行了里程碑式的改動:重構核心組件、核心配置以及代碼架構,重構後的版本統稱爲 Flume NG(next generation);改動的另外一緣由是將 Flume 歸入 apache 旗下,cloudera Flume 更名爲 Apache Flume。java

2. flume的特色:node

flume是一個分佈式、可靠、和高可用的海量日誌採集、聚合和傳輸的系統。支持在日誌系統中定製各種數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各類數據接受方(好比文本、HDFS、Hbase等)的能力 。數據庫

  flume的數據流由事件(Event)貫穿始終。事件是Flume的基本數據單位,它攜帶日誌數據(字節數組形式)而且攜帶有頭信息,這些Event由Agent外部的Source生成,當Source捕獲事件後會進行特定的格式化,而後Source會把事件推入(單個或多個)Channel中。你能夠把Channel看做是一個緩衝區,它將保存事件直到Sink處理完該事件。Sink負責持久化日誌或者把事件推向另外一個Source。apache

3. flume的可靠性數組

    當節點出現故障時,日誌可以被傳送到其餘節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別爲:end-to-end(收到數據agent首先將event寫到磁盤上,當數據傳送成功後,再刪除;若是數據發送失敗,能夠從新發送。),Store on failure(這也是scribe採用的策略,當數據接收方crash時,將數據寫到本地,待恢復後,繼續發送),Besteffort(數據發送到接收方後,不會進行確認)。緩存

4. flume的可恢復性安全

仍是靠Channel。推薦使用FileChannel,事件持久化在本地文件系統裏(性能較差)。服務器

5. flume的一些核心概念架構

Agent:使用JVM 運行Flume。每臺機器運行一個agent,可是能夠在一個 agent中

       包含多個sources和sinks。

Client:生產數據,運行在一個獨立的線程。

Source:從Client收集數據,傳遞給Channel。

Sink:從Channel收集數據,運行在一個獨立線程。

Channel:鏈接 sources 和 sinks ,這個有點像一個隊列。

Events:能夠是日誌記錄、 avro 對象等。

event的概念

    介紹一下flume中event的相關概念:flume的核心是把數據從數據源(source)收集過來,在將收集到的數據送到指定的目的地(sink)。爲了保證輸送的過程必定成功,在送到目的地(sink)以前,會先緩存數據(channel),待數據真正到達目的地(sink)後,flume在刪除本身緩存的數據。

    在整個數據的傳輸的過程當中,流動的是event,即事務保證是在event級別進行的。那麼什麼是event呢?—–event將傳輸的數據進行封裝,是flume傳輸數據的基本單位,若是是文本文件,一般是一行記錄,event也是事務的基本單位。event從source,流向channel,再到sink,自己爲一個字節數組,並可攜帶headers(頭信息)信息。event表明着一個數據的最小完整單元,從外部數據源來,向外部的目的地去。

爲了方便你們理解,給出一張event的數據流向圖:

flume架構

    flume之因此這麼神奇,是源於它自身的一個設計,這個設計就是agent,agent自己是一個Java進程,運行在日誌收集節點—所謂日誌收集節點就是服務器節點。

agent裏面包含3個核心的組件:source—->channel—–>sink,相似生產者、倉庫、消費者的架構。

    source:source組件是專門用來收集數據的,能夠處理各類類型、各類格式的日誌數據,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定義。

    channel:source組件把數據收集來之後,臨時存放在channel中,即channel組件在agent中是專門用來存放臨時數據的——對採集到的數據進行簡單的緩存,能夠存放在memory、jdbc、file等等。

    sink:sink組件是用於把數據發送到目的地的組件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、kafaka、自定義。

flume source

Source類型:

Avro Source: 支持Avro協議(其實是Avro RPC),內置支持

Thrift Source: 支持Thrift協議,內置支持

Exec Source: 基於Unix的command在標準輸出上生產數據

JMS Source: 從JMS系統(消息、主題)中讀取數據

Spooling Directory Source: 監控指定目錄內數據變動

Twitter 1% firehose Source: 經過API持續下載Twitter數據,試驗性質

Netcat Source: 監控某個端口,將流經端口的每個文本行數據做爲Event輸入

Sequence Generator Source: 序列生成器數據源,生產序列數據

Syslog Sources: 讀取syslog數據,產生Event,支持UDP和TCP兩種協議

HTTP Source: 基於HTTP POST或GET方式的數據源,支持JSON、BLOB表示形式

Legacy Sources: 兼容老的Flume OG中Source(0.9.x版本)

flume channel

Channel類型:

Memory Channel:Event數據存儲在內存中

JDBC Channel:Event數據存儲在持久化存儲中,當前Flume Channel內置支持Derby

File Channel:Event數據存儲在磁盤文件中

Spillable Memory Channel:Event數據存儲在內存中和磁盤上,當內存隊列滿了,會持

                           久化到磁盤文件

Pseudo Transaction Channel:測試用途

Custom Channel:自定義Channel實現

flume sink

Sink類型 說明

HDFS Sink:數據寫入HDFS

Logger Sink:數據寫入日誌文件

Avro Sink:數據被轉換成Avro Event,而後發送到配置的RPC端口上

Thrift Sink:數據被轉換成Thrift Event,而後發送到配置的RPC端口上

IRC Sink:數據在IRC上進行回放

File Roll Sink:存儲數據到本地文件系統

Null Sink:丟棄到全部數據

HBase Sink:數據寫入HBase數據庫

Morphline Solr Sink:數據發送到Solr搜索服務器(集羣)

ElasticSearch Sink:數據發送到Elastic Search搜索服務器(集羣)

Kite Dataset Sink:寫數據到Kite Dataset,試驗性質的

Custom Sink:自定義Sink實現

flume運行機制

    flume的核心就是一個agent,這個agent對外有兩個進行交互的地方,一個是接受數據的輸入——source,一個是數據的輸出sink,sink負責將數據發送到外部指定的目的地。source接收到數據以後,將數據發送給channel,chanel做爲一個數據緩衝區會臨時存放這些數據,隨後sink會將channel中的數據發送到指定的地方—-例如HDFS等,注意:只有在sink將channel中的數據成功發送出去以後,channel纔會將臨時數據進行刪除,這種機制保證了數據傳輸的可靠性與安全性。

flume的廣義用法

flume之因此這麼神奇—-其緣由也在於flume能夠支持多級flume的agent,即flume能夠先後相繼,例如sink能夠將數據寫到下一個agent的source中,這樣的話就能夠連成串了,能夠總體處理了。flume還支持扇入(fan-in)、扇出(fan-out)。所謂扇入就是source能夠接受多個輸入,所謂扇出就是sink能夠將數據輸出多個目的地destination中。

 

flume安裝

1. 下載源碼包,上傳到集羣的節點:

2. 解壓到指定目錄

3. 修改conf/flume.env.sh:

注意:JAVA_OPTS 配置  若是咱們傳輸文件過大 報內存溢出時 須要修改這個配置項

4. 配置環境變量

刷新profile文件:source /etc/profile

5. 驗證安裝是否成功

flume應用

案例1

http://flume.apache.org/FlumeUserGuide.html#a-simple-example

配置文件simple.conf

# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

 

啓動flume

flume-ng agent -n a1 -c conf -f simple.conf -Dflume.root.logger=INFO,console

安裝telnet

yum install telnet

 

Memory Chanel 配置

  capacity:默認該通道中最大的能夠存儲的event數量是100,

  trasactionCapacity:每次最大能夠source中拿到或者送到sink中的event數量也是100

  keep-alive:event添加到通道中或者移出的容許時間

  byte**:即event的字節量的限制,只包括eventbody

案例二、兩個flume作集羣

 

node01服務器中,配置文件

# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = node1 a1.sources.r1.port = 44444 # Describe the sink # a1.sinks.k1.type = logger a1.sinks.k1.type = avro a1.sinks.k1.hostname = node2 a1.sinks.k1.port = 60000 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

node02服務器中,安裝Flume(步驟略)

配置文件

# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.bind = node2 a1.sources.r1.port = 60000 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

先啓動node02的Flume

flume-ng agent  -n a1 -c conf -f avro.conf -Dflume.root.logger=INFO,console

 

再啓動node01的Flume

flume-ng agent  -n a1 -c conf -f simple.conf2 -Dflume.root.logger=INFO,console

 

打開telnet 測試  node02控制檯輸出結果

案例三、Exec Source

http://flume.apache.org/FlumeUserGuide.html#exec-source

配置文件

a1.sources = r1

a1.sinks = k1

a1.channels = c1



# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /home/flume.exec.log



# Describe the sink

a1.sinks.k1.type = logger



# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100



# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

啓動Flume

flume-ng agent -n a1 -c conf -f exec.conf -Dflume.root.logger=INFO,console

 

建立空文件演示 touch flume.exec.log

循環添加數據

for i in {1..50}; do echo "$i hi flume" >> flume.exec.log ; sleep 0.1; done

 

案例四、Spooling Directory Source

http://flume.apache.org/FlumeUserGuide.html#spooling-directory-source

配置文件

a1.sources = r1

a1.sinks = k1

a1.channels = c1



# Describe/configure the source

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /home/logs

a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

啓動Flume

flume-ng agent -n a1 -c conf -f spool.conf -Dflume.root.logger=INFO,console

拷貝文件演示

mkdir logs

cp flume.exec.log logs/

 

 

案例五、hdfs sink

http://flume.apache.org/FlumeUserGuide.html#hdfs-sink

 

配置文件

############################################################

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# Describe/configure the source

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /home/logs

a1.sources.r1.fileHeader = true

 

# Describe the sink

***只修改上一個spool sink的配置代碼塊 a1.sinks.k1.type = logger

a1.sinks.k1.type=hdfs

a1.sinks.k1.hdfs.path=hdfs://sxt/flume/%Y-%m-%d/%H%M

 

##每隔60s或者文件大小超過10M的時候產生新文件

# hdfs有多少條消息時新建文件,0不基於消息個數

a1.sinks.k1.hdfs.rollCount=0

# hdfs建立多長時間新建文件,0不基於時間

a1.sinks.k1.hdfs.rollInterval=60

# hdfs多大時新建文件,0不基於文件大小

a1.sinks.k1.hdfs.rollSize=10240

# 當目前被打開的臨時文件在該參數指定的時間(秒)內,沒有任何數據寫入,則將該臨時文件關閉並重命名成目標文件

a1.sinks.k1.hdfs.idleTimeout=3

 

a1.sinks.k1.hdfs.fileType=DataStream

a1.sinks.k1.hdfs.useLocalTimeStamp=true

 

## 每五分鐘生成一個目錄:

# 是否啓用時間上的」捨棄」,這裏的」捨棄」,相似於」四捨五入」,後面再介紹。若是啓用,則會影響除了%t的其餘全部時間表達式

a1.sinks.k1.hdfs.round=true

# 時間上進行「捨棄」的值;

a1.sinks.k1.hdfs.roundValue=5

# 時間上進行」捨棄」的單位,包含:second,minute,hour

a1.sinks.k1.hdfs.roundUnit=minute

 

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

############################################################

建立HDFS目錄

hadoop fs -mkdir /flume

 

啓動Flume

flume-ng agent -n a1 -c conf -f hdfs.conf -Dflume.root.logger=INFO,console

 

查看hdfs文件

hadoop fs -ls /flume/...

hadoop fs -get /flume/...

相關文章
相關標籤/搜索