Apache Flume是一個分佈式、可靠、高可用的日誌收集系統,支持各類各樣的數據來源,如http,log文件,jms,監聽端口數據等等,能將這些數據源的海量日誌數據進行高效收集、聚合、移動,最後存儲到指定存儲系統中,如kafka、分佈式文件系統、Solr搜索服務器等;html
Apache Flume主要有如下幾大模塊組成:算法
模塊組成圖以下所示:數據庫
下面將對各個模塊作個簡單的介紹,在這以前,有必要先了解一下什麼是事件?apache
在Flume中,所謂的事件指的是Flume數據流中的數據單位,包含header和body,用於存儲日誌數據,其中header是一個map結構,咱們能夠往header存放一些信息,如時間戳,appid等,以便後續對事件進行處理,body存放的是收集的日誌內容字節流,結構以下圖所示:json
先看下source模塊在流程圖中所處的位置,這裏以最簡單的架構圖來做爲示例,以下圖所示:緩存
Flume source主要功能是消費傳遞給它的事件;服務器
Flume內置了各類類型的Source,用於處理各類類型的事件,以下所示,理論上Flume支持全部類型的事件,由於Flume支持自定義Source:架構
這裏列舉幾個比較經常使用的source,app
如Exec Source,經過它咱們能夠監聽一個日誌文件的變化,以下配置,負載均衡
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /var/log/secure a1.sources.r1.channels = c1
Avro Source,經過它,咱們能夠將兩個Flume Agent關聯起來(由於agent的source和sink都支持Avro),正是這個特性,大大提升了flume的靈活性,可用性...
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141
HTTP Source,經過它,能夠接收http請求上報的數據,以下是配置示例,監聽5140端口的http請求,這裏的handle是能夠自定義的,也就是說咱們能夠接收任何類型的上報數據,如json格式、xml等等。
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = http a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 a1.sources.r1.handler = org.example.rest.RestHandler a1.sources.r1.handler.nickname = random props
先看下interceptor模塊在流程圖中所處的位置,以下圖所示:
攔截器主要的功能是對事件進行過濾,修改;
Flume內置支持的攔截器以下(主要兩類:過濾和修改):
固然,flume是支持自定義攔截器的,以下是一個簡單的配置示例:
#攔截器 a1.sources.r1.interceptors = i1 #a1.sources.r1.interceptors.i1.type = org.apache.flume.sw.interceptor.SignCheckInterceptor$Builder a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.RegexFilteringInterceptor$Builder a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d) a1.sources.r1.interceptors.i1.serializers = s1 s2 s3 a1.sources.r1.interceptors.i1.serializers.s1.name = one a1.sources.r1.interceptors.i1.serializers.s2.name = two a1.sources.r1.interceptors.i1.serializers.s3.name = three
先看下interceptor模塊在流程圖中所處的位置,以下圖所示:
通道選擇器的主要功能是對事件流進行復制和分流;
Flume內置了兩種類型的通道選擇器:
以下是一個分流的配置示例:
a1.sources = r1 a1.channels = c1 c2 c3 c4 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = state a1.sources.r1.selector.mapping.CZ = c1 a1.sources.r1.selector.mapping.US = c2 c3 a1.sources.r1.selector.default = c4
固然,通道選擇器是支持自定義的,咱們能夠本身實現通道選擇器,並作以下配置:
a1.sources = r1 a1.channels = c1 a1.sources.r1.selector.type = org.example.MyChannelSelector
先看下channel模塊在流程圖中所處的位置,以下圖所示:
通道Channel的主要功能是緩存日誌事件;
Flume內置的Channel以下:
一樣,Flume支持自定義通道;
以下是一個內存通道的配置示例:
a1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000
先看下Sink處理器在流程圖中所處的位置,以下圖所示:
Sink處理器的主要功能是讓一組sink groups支持負載均衡和災難轉移功能,我以爲跟通道選擇器有點相似經過自定義的方式,我以爲是能夠實現通道選擇器的功能的;
Flume內置的sink處理器以下:
一樣的,也支持自定義sink處理器;
以下是一個負載均衡的例子,使用隨機選擇算法:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = random
先看下Sink模塊在流程圖中所處的位置,以下圖所示:
Sink的主要功能是將事件輸出到下一個agent的source或其它存儲系統如,分佈式文件系統、kafka、本地文件系統、日誌等;
Flume內置的sink以下:
固然,flume也是支持自定義的;
咱們舉個本地文件系統的例子,配置以下便可:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume
序列化在流程圖中所處的位置與Sink同樣,這裏就不畫了,簡單地說,Sink負責將事件輸出到外部,那麼以何種形式輸出(直接文本形式仍是其它形式),須要包含哪些東西(body仍是header仍是其它內容...),就是由事件序列化來完成的;
Flume內置的事件序列化以下:
Flume一樣支持自定義事件序列化,須要實現EventSerializer接口;
下面舉個Body Text Serializer的配置示例:
a1.sinks = k1 a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume a1.sinks.k1.sink.serializer = text a1.sinks.k1.sink.serializer.appendNewline = false
上面對flume各個模塊,或者說組件,作了一個簡短的介紹,基本知道了Flume是個怎麼回事,接下來將對各個組件作個介紹,並開發各個組件的自定義實現。