Apache Flume 1.7.0 各個模塊簡介

Flume簡介

Apache Flume是一個分佈式、可靠、高可用的日誌收集系統,支持各類各樣的數據來源,如http,log文件,jms,監聽端口數據等等,能將這些數據源的海量日誌數據進行高效收集、聚合、移動,最後存儲到指定存儲系統中,如kafka、分佈式文件系統、Solr搜索服務器等;html

Apache Flume主要有如下幾大模塊組成:算法

  1. 數據源採集(Source)
  2. 數據攔截(Interceptor)
  3. 通道選擇器(Channel Selector)
  4. 數據通道(Channel)
  5. Sink處理器(Sink Processor)
  6. Sink(Sink)
  7. 事件序列化(Serialization)

模塊組成圖以下所示:數據庫

下面將對各個模塊作個簡單的介紹,在這以前,有必要先了解一下什麼是事件?apache

在Flume中,所謂的事件指的是Flume數據流中的數據單位,包含header和body,用於存儲日誌數據,其中header是一個map結構,咱們能夠往header存放一些信息,如時間戳,appid等,以便後續對事件進行處理,body存放的是收集的日誌內容字節流,結構以下圖所示:json

數據源採集(Source)

 先看下source模塊在流程圖中所處的位置,這裏以最簡單的架構圖來做爲示例,以下圖所示:緩存

Flume source主要功能是消費傳遞給它的事件;服務器

Flume內置了各類類型的Source,用於處理各類類型的事件,以下所示,理論上Flume支持全部類型的事件,由於Flume支持自定義Source:架構

  1. Avro Source:支持Avro協議(其實是Avro RPC)
  2. Thrift Source:支持Thrift協議
  3. Exec Source:基於Unix的command在標準輸出上生產數據
  4. JMS Source:從JMS系統中讀取數據
  5. Spooling Directory Source:監控指定目錄內數據變動
  6. Twitter 1% firehose Source:經過API持續下載Twitter數據,試驗性質
  7. Netcat Source:監控某個端口,將流經端口的每個文本行數據做爲Event輸入
  8. Sequence Generator Source:序列生成器數據源,生產序列數據
  9. Syslog Sources:讀取syslog數據,產生Event,支持UDP和TCP兩種協議
  10. HTTP Source:基於HTTP POST或GET方式的數據源,支持JSON、BLOB表示形式(實際上支持任何形式,由於handle能夠自定義)
  11. Legacy Sources:兼容老的Flume OG中Source(0.9.x版本)

這裏列舉幾個比較經常使用的source,app

如Exec Source,經過它咱們能夠監聽一個日誌文件的變化,以下配置,負載均衡

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1

Avro Source,經過它,咱們能夠將兩個Flume Agent關聯起來(由於agent的source和sink都支持Avro),正是這個特性,大大提升了flume的靈活性,可用性...

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

HTTP Source,經過它,能夠接收http請求上報的數據,以下是配置示例,監聽5140端口的http請求,這裏的handle是能夠自定義的,也就是說咱們能夠接收任何類型的上報數據,如json格式、xml等等。

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
a1.sources.r1.handler = org.example.rest.RestHandler
a1.sources.r1.handler.nickname = random props

數據攔截(Interceptor)

先看下interceptor模塊在流程圖中所處的位置,以下圖所示:

攔截器主要的功能是對事件進行過濾,修改;

Flume內置支持的攔截器以下(主要兩類:過濾和修改):

  1. Timestamp Interceptor:在事件頭中插入以毫秒爲單位的時間戳,若是在以前已經有這個時間戳,則保留原有的時間戳。
  2. Host Interceptor:
  3. Static Interceptor
  4. UUID Interceptor
  5. Morphline Interceptor
  6. Search and Replace Interceptor
  7. Regex Filtering Interceptor
  8. Regex Extractor Interceptor

固然,flume是支持自定義攔截器的,以下是一個簡單的配置示例:

#攔截器
a1.sources.r1.interceptors = i1
#a1.sources.r1.interceptors.i1.type = org.apache.flume.sw.interceptor.SignCheckInterceptor$Builder
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.RegexFilteringInterceptor$Builder
a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = one
a1.sources.r1.interceptors.i1.serializers.s2.name = two
a1.sources.r1.interceptors.i1.serializers.s3.name = three

 通道選擇器(Channel Selector)

先看下interceptor模塊在流程圖中所處的位置,以下圖所示:

通道選擇器的主要功能是對事件流進行復制和分流;

Flume內置了兩種類型的通道選擇器:

  1. 複製(Replicating Channel Selector),使用該選擇器,咱們能夠同時讓同一事件傳遞到多個channel中,最後流入多個sink;
  2. 分流(Multiplexing Channel Selector),使用該選擇器,咱們可讓特定的事件流入到特定的channel中,如不一樣項目產生的日誌事件,交由不一樣的sink處理;

以下是一個分流的配置示例:

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

固然,通道選擇器是支持自定義的,咱們能夠本身實現通道選擇器,並作以下配置:

a1.sources = r1
a1.channels = c1
a1.sources.r1.selector.type = org.example.MyChannelSelector

數據通道(Channel)

先看下channel模塊在流程圖中所處的位置,以下圖所示:

通道Channel的主要功能是緩存日誌事件;

Flume內置的Channel以下:

  1. Memory Channel:內存通道
  2. JDBC Channel:存儲在持久化存儲中,當前Flume Channel內置支持Derby
  3. File Channel:存儲在磁盤文件中
  4. Spillable Memory Channel:存儲在內存中和磁盤上,當內存隊列滿了,會持久化到磁盤文件(當前試驗性的,不建議生產環境使用)
  5. Pseudo Transaction Channel:測試用途

 一樣,Flume支持自定義通道;

以下是一個內存通道的配置示例:

a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

Sink處理器

 先看下Sink處理器在流程圖中所處的位置,以下圖所示:

Sink處理器的主要功能是讓一組sink groups支持負載均衡和災難轉移功能,我以爲跟通道選擇器有點相似經過自定義的方式,我以爲是能夠實現通道選擇器的功能的;

Flume內置的sink處理器以下:

  1. load_balance:負載均衡
  2. failover:主備(災難轉移)

一樣的,也支持自定義sink處理器;

以下是一個負載均衡的例子,使用隨機選擇算法:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

Sink(Sink)

 先看下Sink模塊在流程圖中所處的位置,以下圖所示:

Sink的主要功能是將事件輸出到下一個agent的source或其它存儲系統如,分佈式文件系統、kafka、本地文件系統、日誌等;

Flume內置的sink以下:

  1. HDFS Sink:數據寫入HDFS
  2. Logger Sink:數據寫入日誌文件
  3. Avro Sink:數據被轉換成Avro Event,而後發送到配置的RPC端口上
  4. Thrift Sink:數據被轉換成Thrift Event,而後發送到配置的RPC端口上
  5. IRC Sink:數據在IRC上進行回放
  6. File Roll Sink:存儲數據到本地文件系統
  7. Null Sink:丟棄到全部數據
  8. HBase Sink:數據寫入HBase數據庫
  9. Morphline Solr Sink:數據發送到Solr搜索服務器(集羣)
  10. ElasticSearch Sink:數據發送到Elastic Search搜索服務器(集羣)
  11. Kite Dataset Sink:寫數據到Kite Dataset,試驗性質的

固然,flume也是支持自定義的;

咱們舉個本地文件系統的例子,配置以下便可:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume

事件序列化(Serialization)

序列化在流程圖中所處的位置與Sink同樣,這裏就不畫了,簡單地說,Sink負責將事件輸出到外部,那麼以何種形式輸出(直接文本形式仍是其它形式),須要包含哪些東西(body仍是header仍是其它內容...),就是由事件序列化來完成的;

Flume內置的事件序列化以下:

  1. Body Text Serializer:看名字就知道,直接將事件的body做爲文本形式輸出,事件header將被忽略
  2. Avro Event Serializer:Avro序列化,包含事件所有信息

Flume一樣支持自定義事件序列化,須要實現EventSerializer接口;

下面舉個Body Text Serializer的配置示例:

a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline = false

 結語

上面對flume各個模塊,或者說組件,作了一個簡短的介紹,基本知道了Flume是個怎麼回事,接下來將對各個組件作個介紹,並開發各個組件的自定義實現。

參考資料

http://flume.apache.org/FlumeUserGuide.html

http://shiyanjun.cn/archives/915.html

相關文章
相關標籤/搜索