flume介紹

前言

本文經過對flume的架構,數據鏈路和數據的可靠性來分析flume的原理,並在文末提供了demo(官網搬運)。html

 

架構

flume能夠理解爲是一個ETL工具,自己是單點的,也就是隻有agent,沒有server,可是經過強大的source-channel-sink-source…機制,能夠經過在多個節點上起agent構成一個DAG,從而造成分佈式形態。參考圖1,圖2(其餘圖就不貼了,參考官網文檔),flume支持在任意節點啓動任意數量的agent,而且agent與agent之間能夠經過rpc鏈接。須要注意的是,1個source能夠下發數據到多個channel(經過`ChannelSelector`,最多見的就是廣播,或者動態路由),1個sink只能從1個channel獲取數據,可是多個sink又能夠從1個channel拉取數據(經過`SinkProcessor`)。這樣的DAG設計意圖很明顯,source生產數據一般都是比sink處理數據快的,因此channel起到數據緩衝做用,而且經過事務機制保證數據的可靠性。試想一下場景,假如數據量很大,1個source消費速度跟不上,也就是說達到了單進程的性能瓶頸,那麼能夠啓動多個agent;假如數據量通常,1個source就足夠,可是處理很複雜好比IO密集型的操做,那麼能夠經過多sink的方式從channel拉數據,也就是sink端作負載均衡,好比`LoadBalancingSinkProcessor`。flume這樣的設計很好的知足了各類場景需求。node

 

圖1apache

圖2架構

數據鏈路

參考圖3,flume的三個核心組件的數據鏈路是基於推送和拉取模式,其中Channel能夠理解爲列隊,也是實現數據可靠性的關鍵組件。好比常見的,採用FileChannel之類落盤的channel場景中,當source推送數據到channel失敗,那麼就會觸發source重試,當sink拉取數據操做失敗,那麼回通知channel回滾,直到sink操做成功才提交以前的那些數據,從而使得channel移除那些已經被成功處理的數據。負載均衡

圖3分佈式

 

flume的組件也不只僅只有source,channel,sink這三個。參考圖4,完整的數據鏈路還有ChannelProcessor和SinkeProcessor。ChannelProcessor主要功能有兩個,1.對事件進行攔截,提供修改事件的入口; 2.對channel的選擇,咱們知道1個source能夠發送數據到多個channel,可是具體發送到哪些channel呢?這裏就是選擇器決定了source發送到哪些channel。SinkProcessor的功能其實也相似,因爲一個channel的事件能夠被多個sink拉取,那麼SinkProcessor決定了sink拉取的策略,這裏flume衍生出了sinkGroup的概念,通常狀況下,1個sink對應1個線程,而sinkGroup能夠包含多個sink共用1個線程。ide

 

圖4工具

 

數據可靠性

數據可靠性是很是重要的一個特性,因此拉出來單獨作一下說明。性能

flume基於Channel和Transaction接口實現數據不丟失的特性。其中channel負責對數據持久化,維護了全部沒有被事務提交的事件,Transaction負責實現事務語義,相似jdbc的事務語法,以下大數據

Transaction tx = ch.getTransaction();
try {
  tx.begin();
  ...
  // ch.put(event) or ch.take()
  ...
  tx.commit();
 } catch (ChannelException ex) {
  tx.rollback();
   ...
 } finally {
  tx.close();
}

經過這兩個接口的保證,flume能夠實現at leaset once的語義,這是因爲sink能夠出現rpc超時等一些問題,致使誤覺得失敗致使事件被重複拉取。這個問題能夠經過對事件分配惟一id,再經過其餘大數據組件去重。

 

總結

flume提供一個靈活的設計思路,能夠經過agent組合構建出符合本身需求的DAG,有點相似storm,可是程序更加輕量。而且提供了不少開箱即用的插件,能夠說是很良心了。

 

demo

下面經過一個案例來了解flume,思路是構建基於netcat的agent,而後經過telnet進行驗證。

# 建立一個新的flume配置文件

vi example.conf

# example.conf: A single-node Flume configuration

 

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444

 

# Describe the sink

a1.sinks.k1.type = logger

 

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

 

# 啓動flume agent,並開啓http的度量監控,能夠經過http請求獲取相關度量數據

flume-ng agent --name a1 --conf-file example.conf -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=9999

 

# 經過telnet進行調試

telnet localhost 44444

# 發現消息,能夠看到flume agent能夠成功接收

 

參考

// flume官網文檔以及源碼

http://flume.apache.org/FlumeUserGuide.html

 

// 書乃本也~

《Flume  構建高可用、可擴展的海量日誌採集系統》

相關文章
相關標籤/搜索