Flume-NG內置計數器(監控)源碼級分析

  Flume的內置監控怎麼整?這個問題有不少人問。目前瞭解到的信息是可使用Cloudera Manager、Ganglia有圖形的監控工具,以及從瀏覽器獲取json串,或者自定義向其餘監控系統彙報信息。那監控的信息是什麼呢?就是各個組件的統計信息,好比成功接收的Event數量、成功發送的Event數量,處理的Transaction的數量等等。並且不一樣的組件有不一樣的Countor來作統計,目前直到1.5版本仍然只對三大組件:source、sink、channel進行統計分別是SourceCounter、SinkCounter、ChannelCounter,這三個計數器的統計項是固定的,就是你不能本身設置本身的統計項;另外還有ChannelProcessorCounter和SinkProcessorCounter,這兩項目前沒有設置統計項,因此是目前仍是「擺設」。另外有些同窗可能也發現了,有些內置的組件使用CounterGroup這個來統計信息,這個能夠本身隨意設置統計項,可是遺憾的是目前(1.5版本)這個能夠自定義的計數器的信息還沒法用在監控上,由於這只是一個單獨的類,並無繼承MonitoredCounterGroup這個抽象類。有些內置組件使用的是CounterGroup,因此監控時會沒有數據,不一樣的版本使用此CounterGroup的組件可能不一樣。下面咱們重點介紹:SourceCounter、SinkCounter、ChannelCounter。node

  Flume-NG的全部統計信息、監控及相關的類都在org.apache.flume.instrumentation.http、org.apache.flume.instrumentation、org.apache.flume.instrumentation.util三個包下。mysql

  上面提到了MonitoredCounterGroup,這個類是用來跟蹤內部的統計指標的,註冊組件的MBean並跟蹤和更新統計值。須要監控的組件都要繼承這個類,這個類能夠跟蹤flume內部的全部組件,可是目前只實現了3個。其中比較重要的方法有如下幾個:web

  (1)、構造方法MonitoredCounterGroup(Type type, String name, String... attrs),這個方法主要是設置組件的類型、名稱;而後將全部的attrs(這是設定的各個統計項)加入Map<String, AtomicLong> counterMap,值設定爲0;而後初始化計數器的開始時間和結束時間,都設爲0.sql

  (2)、start()方法,會先註冊計數器,而後對全部統計項的統計值設爲0;將開始時間設置爲當前時間數據庫

  (3)、register()方法,若是這個計數器還未註冊,將這個計數器的MBean進行註冊,就能夠進行跟蹤了apache

  (4)、stop()方法,會設置結束時間爲當前時間;輸出各個統計項的信息。咱們 Ctrl+C 結束進程時,最後顯示的統計信息就是來自這裏。json

  其它方法都是獲取counterMap的中信息或者更新值等,比較簡單。數組

  接下來咱們看看,三個組件中各類統計項及其含義吧:瀏覽器

  1、SourceCounter,繼承了MonitoredCounterGroup。主要統計項以下:服務器

  (1)"src.events.received",表示source接受的event個數;

  (2)"src.events.accepted",表示source處理成功的event個數,和上面的區別就是上面雖然接受了可能沒處理成功;

  (3)"src.append.received",表示調用append次數,在avrosource和thriftsource中調用;

  (4)"src.append.accepted",表示append處理成功次數;

  (5)"src.append-batch.received",表示appendBatch被調用的次數,在avrosource和thriftsource中調用;

  (6)"src.append-batch.accepted",表示appendBatch處理成功次數;

  (7)"src.open-connection.count",用在avrosource中表示打開鏈接的數量;

  通常source調用都集中在前倆。

  2、SinkCounter,繼承了MonitoredCounterGroup

  (1)"sink.connection.creation.count",這個調用的地方頗多,都表示「連接」建立的數量,好比與HBase創建連接,與avrosource創建連接以及文件的打開等;

  (2)"sink.connection.closed.count",對應於上面的stop操做、destroyConnection、close文件操做等。

  (3)"sink.connection.failed.count",表示上面所表示「連接」時異常、失敗的次數;

  (4)"sink.batch.empty",表示這個批次處理的event數量爲0的狀況;

  (5)"sink.batch.underflow",表示這個批次處理的event的數量介於0和配置的batchSize之間;

  (6)"sink.batch.complete",表示這個批次處理的event數量等於設定的batchSize;

  (7)"sink.event.drain.attempt",準備處理的event的個數;

  (8)"sink.event.drain.sucess",這個表示處理成功的event數量,與上面不一樣的是上面的是還未處理的。

  3、ChannelCounter,繼承了MonitoredCounterGroup

  (1)"channel.current.size",這個表示這個channel的當前容量;

  (2)"channel.event.put.attempt",通常指的是在channel的事務當中,source的put操做中記錄嘗試發送event的個數;

  (3)"channel.event.take.attempt",通常指的是在channel的事務中,sink的take操做記錄嘗試拿event的個數;

  (4)"channel.event.put.success",通常指的是在channel的事務中,put成功的event的數量;

  (5)"channel.event.take.success",通常指的是channel事務中,take成功的event的數量;

  (6)"channel.capacity",指的是channel的容量,在channel的start方法中設置。

  上面這些統計項都是固定的,咱們能夠根據須要增長相應項的值,能夠在監控中查看組件的變化狀況,從而掌握flume進程的運行狀況。好比能夠查看channel的容量從而瞭解到source和sink的相對處理速度,還有能夠看source或者sink每一個批次處理成功與失敗的次數,瞭解組件的運行情況等等。

  固然有些同窗可能在自定義本身的組件時,想統計一些本身的統計項,這些統計項在上面三大組件中是沒有,怎麼辦?本身定製啊,上面說了必需要繼承MonitoredCounterGroup這個抽象類,設定本身的統計項,而後將統計項設置成數組調用MonitoredCounterGroup的構造函數;而後在自定義的計數器中增長更新數值的方法。最後在自定義的組件中構造自定義的計數器,並啓用它的start方法,剩下的就是在該更新統計項數值的地方更新就能夠了。

 

  還有一個重要的內容就是監控的實現!沒錯,內置的有兩種HTTP方式(就是json串)和Ganglia,後者須要安裝Ganglia,前者很是簡單,只須要在Flume的啓動命令中加上:-Dflume.monitoring.type=http -Dflume.monitoring.port=XXXX  ,最後的XXXX是你須要設置的端口!而後你就能夠在瀏覽器上經過訪問這個Flume所在節點的IP:XXXX/metrics,不斷刷新就能夠看到最新的組件統計信息。關於Ganglia的請讀者自行組建Ganglia集羣並參考用戶指南來操做。

  若是我想本身實現一個server向其餘系統彙報信息,咋整?目前有至少兩個方法:

  1、就是上面的HTTP啊,你能夠不斷去獲取json串,本身解析出來各個統計指標,而後剩下的就是你想怎麼整就怎麼整吧。

  2、就是本身實現一個相似HTTP的server,必須實現org.apache.flume.instrumentation.MonitorService接口,這個接口只有倆方法:start和stop。這個接口繼承自Configurable接口因此擁有能夠讀取配置文件的configure(configure(Context context))方法,來獲取一些配置信息。

  以HTTP爲例(對應的類是org.apache.flume.instrumentation.http.HTTPMetricsServer),它的start方法啓動了一個jetty做爲web server,提供WEB服務。並實現了AbstractHandler的一個處理數據的類HTTPMetricsHandler,這個類的handle(String target, HttpServletRequest request, HttpServletResponse response,int dispatch)方法來設置一些WEB頁面的格式以及經過JMXPollUtil.getAllMBeans()獲取全部組件註冊的MBean構成的Map<String, Map<String, String>> metricsMap,遍歷這個metricsMap將這個metricsMap轉換成json輸出到web頁面。stop方法就是一些清理工做,這裏是關閉jetty server。很簡單吧,因此咱們徹底能夠實現一個server,在start方法中啓動一個線程每隔一秒或者本身定遍歷這個metricsMap,寫入mysql、HBase或者別的地方,你隨便。。。

  你能夠在定義的組件中調用本身的計數器,而後將計數器、監控類、自定義組件(source、sink、channel)打包放到lib下,在啓動命令後加-Dflume.monitoring.type=AAAAA -Dflume.monitoring.node=BBBB,就能夠了。注意,Dflume.monitoring.type這個好似必需要設置的,就是你本身的監控類(這裏是AAAAA),後面的無關緊要都是一些參數,你能夠自定義參數名,好比能夠設置數據庫服務器IP、端口等。

    至此,這裏介紹完了。這些都是從源碼中看出來的,還不曾實現,供大夥借鑑。

相關文章
相關標籤/搜索