Flume 在有讚的大數據業務中一直扮演着一個穩定可靠的日誌數據「搬運工」 的角色。本文主要講一下有贊大數據部門在 Flume 的應用實踐,同時也穿插着咱們對 Flume 的一些理解。mysql
認識 Flume 對事件投遞的可靠性保證是很是重要的,它每每是咱們是否使用 Flume 來解決問題的決定因素之一。web
消息投遞的可靠保證有三種:sql
基本上全部工具的使用用戶都但願工具框架能保證消息 Exactly-once ,這樣就沒必要在設計實現上考慮消息的丟失或者重複的處理場景。可是事實上不多有工具和框架能作到這一點,真正能作到這一點所付出的成本每每很大,或者帶來的額外影響反而讓你以爲不值得。假設 Flume 真的作到了 Exactly-once ,那勢必下降了穩定性和吞吐量,因此 Flume 選擇的策略是 At-least-once 。json
固然這裏的 At-least-once 須要加上引號,並非說用上 Flume 的隨便哪一個組件組成一個實例,運行過程當中就能保存消息不會丟失。事實上 At-least-once 原則只是說的是 Source
、 Channel
和 Sink
三者之間上下投遞消息的保證。而當你選擇 MemoryChannel
時,實例若是異常掛了再重啓,在 channel 中的未被 sink 所消費的殘留數據也就丟失了,從而沒辦法保證整條鏈路的 At-least-once。網絡
Flume 的 At-least-once 保證的實現基礎是創建了自身的 Transaction
機制。Flume 的 Transaction
有4個生命週期函數,分別是 start
、 commit
、rollback
和 close
。 當 Source
往 Channel
批量投遞事件時首先調用 start
開啓事務,批量 put 完事件後經過 commit 來提交事務,若是 commit
異常則 rollback
,而後 close
事務,最後 Source
將剛纔提交的一批消息事件向源服務 ack(好比 kafka 提交新的 offset )。Sink
消費 Channel
也是相同的模式,惟一的區別就是 Sink 須要在向目標源完成寫入以後纔對事務進行 commit
。兩個組件的相同作法都是隻有向下遊成功投遞了消息纔會向上遊 ack
,從而保證了數據能 At-least-once 向下投遞。架構
基於 mysql binlog
的數倉增量同步(datay
業務)是大數據這邊使用 Flume 中一個比較經典的應用場景,datay 具體業務不詳細說明,須要強調的是它對Flume的一個要求是必須保證在 nsq
(消息隊列)的 binlog
消息能可靠的落地到 hdfs
,不容許一條消息的丟失,須要絕對的 At-least-once。框架
Flume 模型自己是基於 At-least-once 原則來傳遞事件,因此須要須要考慮是在各類異常狀況(好比進程異常掛了)下的 At-least-once 保證。顯然 MemoryChannel
沒法知足,因此咱們用 FlieChannel
作代替。因爲公司目前是使用 nsq 做爲 binlog 的消息中轉服務,故咱們沒有辦法使用現有的 KafkaSource
,因此基於公司的 nsq sdk 擴展了 NsqSource
。這裏須要說明的是爲了保證 At-least-once,Source
源必須支持消息接收的 ack 機制,好比 kafka 客戶端只有認爲消費了消息後,纔對 offset 進行提交,否則就須要接受重複的消息。分佈式
因而咱們第一個版本上線了,看上去頗有保障了,即便進程異常掛了重啓也不會丟數據。ide
可能有同窗想到一個關鍵性的問題:若是某一天磁盤壞了而進程異常退出,而 FileChannel
恰好又有未被消費的事件數據,這個時候不就丟數據了嗎?雖然磁盤壞了是一個極低的機率,但這確實是一個須要考慮的問題。函數
在 Flume 現有組件中比 FlieChannel
更可靠的,可能想到的是 KafkaChannel
,kafka 能夠對消息保留多個副本,從而加強了數據的可靠性。可是咱們第二版本的方案沒有選擇它,而是直接擴展出 NsqChannel 。因而第二個版本就有了。
Source
+
Channel
+
Sink
三個組件的固有模式,事實上咱們不必定要三個組件都使用上。另外直接
NsqChannel
到
HDFSEventSink
的有幾個好處:
Flume 在各個組件的擴展性支持具備很是好的設計考慮。
當沒法知足咱們的自定義需求,咱們能夠選擇合適的組件上進行擴展。下面就講講咱們擴展的一些內容。
NsqSource
。在 Flume 定製化一個 Source
比較簡單,繼承一個已有通用實現的抽象類,實現相對幾個生命週期方法便可。這裏說明注意的是 Flume 組件的生命週期在可能被會調用屢次,好比 Flume 具備自動發現實例配置發生變化並 restart
各個組件,這種狀況須要考慮資源的正確釋放。
HdfsEventSink
擴展配置。它自己就具備 role file 功能,好比當 Sink 是按小時生成文件,有這一個小時的第一個事件建立新的文件,而後通過固定的 role 配置時間(好比一小時)關閉文件。這裏存在的問題就是若是源平時的數據量不大,好比8點這個小時的第一個事件是在8點25分來臨,那就是說須要9點25才能關閉這個文件。因爲沒有關閉的tmp文件會被離線數據任務的計算引擎所忽略,在小時級的數據離線任務就沒辦法獲得實時的數據。而咱們作的改造就是 roll file 基於整點時間,而不是第一次事件的時間,好比固定的05分關閉上一次小時的文件,而離線任務調度時間設置在每小時的05分以後就能解決這個問題。最終的效果給下圖:
MetricsReportServer
。當咱們須要收集 Flume 實例運行時的各個組件 counter metric ,就須要開啓 MonitorService
服務。自定義了一個按期發生 http 請求彙報 metric 到一個集中的 web 服務。原生的 HTTPMetricsServer
也是基於 http 服務,區別在於它將 Flume 做爲 http 服務端,而咱們將不少實例部署在一臺機器上,端口分配成了比較頭疼的問題。
當咱們收集到如下的 counter metric 時,就能夠利用它來實現一些監控報警。
{
"identity":"olap_offline_daily_olap_druid_test_timezone_4@49",
"startTime":1544287799839,
"reportCount":4933,
"metrics":{
"SINK.olap_offline_daily_olap_druid_test_timezone_4_snk":{
"ConnectionCreatedCount":"9",
"ConnectionClosedCount":"8",
"Type":"SINK",
"BatchCompleteCount":"6335",
"BatchEmptyCount":"2",
"EventDrainAttemptCount":"686278",
"StartTime":"1544287799837",
"EventDrainSuccessCount":"686267",
"BatchUnderflowCount":"5269",
"StopTime":"0",
"ConnectionFailedCount":"48460"
},
"SOURCE.olap_offline_daily_olap_druid_test_timezone_4_src":{
"KafkaEventGetTimer":"26344146",
"AppendBatchAcceptedCount":"0",
"EventAcceptedCount":"686278",
"AppendReceivedCount":"0",
"StartTime":"1544287800219",
"AppendBatchReceivedCount":"0",
"KafkaCommitTimer":"14295",
"EventReceivedCount":"15882278",
"Type":"SOURCE",
"OpenConnectionCount":"0",
"AppendAcceptedCount":"0",
"KafkaEmptyCount":"0",
"StopTime":"0"
},
"CHANNEL.olap_offline_daily_olap_druid_test_timezone_4_cha":{
"ChannelCapacity":"10000",
"ChannelFillPercentage":"0.11",
"Type":"CHANNEL",
"ChannelSize":"11",
"EventTakeSuccessCount":"686267",
"StartTime":"1544287799332",
"EventTakeAttemptCount":"715780",
"EventPutAttemptCount":"15882278",
"EventPutSuccessCount":"686278",
"StopTime":"0"
}
}
}
複製代碼
HdfsEventSink
的時候不能使用系統時間來計算文件目錄,而是應該基於消息內容中的某個時間戳字段。這個能夠經過擴展 Interceptor
來解決。 Interceptor
用於在 Source
投遞事件給 Channel
前的一個攔截處理,通常都是用來對事件豐富 header
信息。強烈不建議在 Source
中直接處理,實現一個 Interceptor
能夠知足其餘 Source
相似需求的複用性。Flume 實例進行性能調優最多見的配置是事務 batch
和 Channel Capacity
。
Source
對 Channel
進行 put 或者 Sink
對 Channel
進行 take 都是經過開啓事務來操做,因此調大兩個組件的 batch 配置能夠下降 cpu 消耗,減小網絡 IO 等待等。Channel
的 capacity 大小直接影響着 source 和 sink 兩端的事件生產和消費。capacity 越大,吞吐量越好,可是其餘因素制約着不能設置的很大。好比 MemoryChannel
,直接表現着對內存的消耗,以及進程異常退出所丟失的事件數量。不一樣的 Channel
須要不一樣的考慮,最終 trade-off 是不免的。Flume 是一個很是穩定的服務,這一點在咱們生產環境中獲得充分驗證。 同時它的模型設計也很是清晰易懂,每一種組件類型都有不少現成的實現,同時特考慮到各個擴展點,因此咱們很容易找到或者定製化咱們所須要的數據管道的解決方案。
隨着用戶愈來愈多,須要有一個統一的平臺來集中管理全部的 Flume 實例。 有如下幾點好處:
固然這一步咱們也纔剛啓動,但願它將來的價值變得愈來愈大。