Flume 在有贊大數據的實踐

1、前言

Flume 是一個分佈式的高可靠,可擴展的數據採集服務。

Flume 在有讚的大數據業務中一直扮演着一個穩定可靠的日誌數據「搬運工」 的角色。本文主要講一下有贊大數據部門在 Flume 的應用實踐,同時也穿插着咱們對 Flume 的一些理解。mysql

2、Delivery 保證

認識 Flume 對事件投遞的可靠性保證是很是重要的,它每每是咱們是否使用 Flume 來解決問題的決定因素之一。web

消息投遞的可靠保證有三種:sql

  • At-least-once
  • At-most-once
  • Exactly-once

基本上全部工具的使用用戶都但願工具框架能保證消息 Exactly-once ,這樣就沒必要在設計實現上考慮消息的丟失或者重複的處理場景。可是事實上不多有工具和框架能作到這一點,真正能作到這一點所付出的成本每每很大,或者帶來的額外影響反而讓你以爲不值得。假設 Flume 真的作到了 Exactly-once ,那勢必下降了穩定性和吞吐量,因此 Flume 選擇的策略是 At-least-oncejson

固然這裏的 At-least-once 須要加上引號,並非說用上 Flume 的隨便哪一個組件組成一個實例,運行過程當中就能保存消息不會丟失。事實上 At-least-once 原則只是說的是 SourceChannelSink 三者之間上下投遞消息的保證。而當你選擇 MemoryChannel 時,實例若是異常掛了再重啓,在 channel 中的未被 sink 所消費的殘留數據也就丟失了,從而沒辦法保證整條鏈路的 At-least-once。網絡

Flume 的 At-least-once 保證的實現基礎是創建了自身的 Transaction 機制。Flume 的 Transaction 有4個生命週期函數,分別是 startcommitrollbackclose。 當 SourceChannel 批量投遞事件時首先調用 start 開啓事務,批量 put 完事件後經過 commit 來提交事務,若是 commit 異常則 rollback ,而後 close 事務,最後 Source 將剛纔提交的一批消息事件向源服務 ack(好比 kafka 提交新的 offset )。Sink 消費 Channel 也是相同的模式,惟一的區別就是 Sink 須要在向目標源完成寫入以後纔對事務進行 commit。兩個組件的相同作法都是隻有向下遊成功投遞了消息纔會向上遊 ack,從而保證了數據能 At-least-once 向下投遞。架構

3、datay應用場景

基於 mysql binlog 的數倉增量同步(datay 業務)是大數據這邊使用 Flume 中一個比較經典的應用場景,datay 具體業務不詳細說明,須要強調的是它對Flume的一個要求是必須保證在 nsq(消息隊列)的 binlog 消息能可靠的落地到 hdfs ,不容許一條消息的丟失,須要絕對的 At-least-once框架

Flume 模型自己是基於 At-least-once 原則來傳遞事件,因此須要須要考慮是在各類異常狀況(好比進程異常掛了)下的 At-least-once 保證。顯然 MemoryChannel 沒法知足,因此咱們用 FlieChannel 作代替。因爲公司目前是使用 nsq 做爲 binlog 的消息中轉服務,故咱們沒有辦法使用現有的 KafkaSource,因此基於公司的 nsq sdk 擴展了 NsqSource。這裏須要說明的是爲了保證 At-least-once,Source 源必須支持消息接收的 ack 機制,好比 kafka 客戶端只有認爲消費了消息後,纔對 offset 進行提交,否則就須要接受重複的消息。分佈式

因而咱們第一個版本上線了,看上去頗有保障了,即便進程異常掛了重啓也不會丟數據。ide

可能有同窗想到一個關鍵性的問題:若是某一天磁盤壞了而進程異常退出,而 FileChannel 恰好又有未被消費的事件數據,這個時候不就丟數據了嗎?雖然磁盤壞了是一個極低的機率,但這確實是一個須要考慮的問題。函數

在 Flume 現有組件中比 FlieChannel 更可靠的,可能想到的是 KafkaChannel ,kafka 能夠對消息保留多個副本,從而加強了數據的可靠性。可是咱們第二版本的方案沒有選擇它,而是直接擴展出 NsqChannel 。因而第二個版本就有了。

初次使用 Flume 的用戶每每陷入到必須搭配 Source + Channel + Sink 三個組件的固有模式,事實上咱們不必定要三個組件都使用上。另外直接 NsqChannelHDFSEventSink 的有幾個好處:

  • 每一個消息的傳遞只須要一次事務,而非兩次,性能上更佳。
  • 避免引入了新的 kafka 服務,減小了資源成本的同時保持架構上更簡單從而更穩定。

4、定製化擴展

Flume 在各個組件的擴展性支持具備很是好的設計考慮。

當沒法知足咱們的自定義需求,咱們能夠選擇合適的組件上進行擴展。下面就講講咱們擴展的一些內容。

  • NsqSource

在 Flume 定製化一個 Source 比較簡單,繼承一個已有通用實現的抽象類,實現相對幾個生命週期方法便可。這裏說明注意的是 Flume 組件的生命週期在可能被會調用屢次,好比 Flume 具備自動發現實例配置發生變化並 restart 各個組件,這種狀況須要考慮資源的正確釋放。

  • HdfsEventSink 擴展配置

它自己就具備 role file 功能,好比當 Sink 是按小時生成文件,有這一個小時的第一個事件建立新的文件,而後通過固定的 role 配置時間(好比一小時)關閉文件。這裏存在的問題就是若是源平時的數據量不大,好比8點這個小時的第一個事件是在8點25分來臨,那就是說須要9點25才能關閉這個文件。因爲沒有關閉的tmp文件會被離線數據任務的計算引擎所忽略,在小時級的數據離線任務就沒辦法獲得實時的數據。而咱們作的改造就是 roll file 基於整點時間,而不是第一次事件的時間,好比固定的05分關閉上一次小時的文件,而離線任務調度時間設置在每小時的05分以後就能解決這個問題。最終的效果給下圖:

  • MetricsReportServer

當咱們須要收集 Flume 實例運行時的各個組件 counter metric ,就須要開啓 MonitorService 服務。自定義了一個按期發生 http 請求彙報 metric 到一個集中的 web 服務。原生的 HTTPMetricsServer 也是基於 http 服務,區別在於它將 Flume 做爲 http 服務端,而咱們將不少實例部署在一臺機器上,端口分配成了比較頭疼的問題。

當咱們收集到如下的 counter metric 時,就能夠利用它來實現一些監控報警。

{
    "identity":"olap_offline_daily_olap_druid_test_timezone_4@49",
    "startTime":1544287799839,
    "reportCount":4933,
    "metrics":{
        "SINK.olap_offline_daily_olap_druid_test_timezone_4_snk":{
            "ConnectionCreatedCount":"9",
            "ConnectionClosedCount":"8",
            "Type":"SINK",
            "BatchCompleteCount":"6335",
            "BatchEmptyCount":"2",
            "EventDrainAttemptCount":"686278",
            "StartTime":"1544287799837",
            "EventDrainSuccessCount":"686267",
            "BatchUnderflowCount":"5269",
            "StopTime":"0",
            "ConnectionFailedCount":"48460"
        },
        "SOURCE.olap_offline_daily_olap_druid_test_timezone_4_src":{
            "KafkaEventGetTimer":"26344146",
            "AppendBatchAcceptedCount":"0",
            "EventAcceptedCount":"686278",
            "AppendReceivedCount":"0",
            "StartTime":"1544287800219",
            "AppendBatchReceivedCount":"0",
            "KafkaCommitTimer":"14295",
            "EventReceivedCount":"15882278",
            "Type":"SOURCE",
            "OpenConnectionCount":"0",
            "AppendAcceptedCount":"0",
            "KafkaEmptyCount":"0",
            "StopTime":"0"
        },
        "CHANNEL.olap_offline_daily_olap_druid_test_timezone_4_cha":{
            "ChannelCapacity":"10000",
            "ChannelFillPercentage":"0.11",
            "Type":"CHANNEL",
            "ChannelSize":"11",
            "EventTakeSuccessCount":"686267",
            "StartTime":"1544287799332",
            "EventTakeAttemptCount":"715780",
            "EventPutAttemptCount":"15882278",
            "EventPutSuccessCount":"686278",
            "StopTime":"0"
        }
    }
}



複製代碼
  • 事件時間戳攔截。有一些 hdfs sink 業務對消息事件的時間比較敏感,同一小時的數據必須放在同一個目錄裏,這就要求使用 HdfsEventSink 的時候不能使用系統時間來計算文件目錄,而是應該基於消息內容中的某個時間戳字段。這個能夠經過擴展 Interceptor 來解決。 Interceptor 用於在 Source 投遞事件給 Channel 前的一個攔截處理,通常都是用來對事件豐富 header 信息。強烈不建議在 Source 中直接處理,實現一個 Interceptor 能夠知足其餘 Source 相似需求的複用性。

5、性能調優

Flume 實例進行性能調優最多見的配置是事務 batchChannel Capacity

  • 事務 batch 指的是合理設置 batch 配置,能夠明顯的改善實例的吞吐量。上面已經講到 SourceChannel 進行 put 或者 SinkChannel 進行 take 都是經過開啓事務來操做,因此調大兩個組件的 batch 配置能夠下降 cpu 消耗,減小網絡 IO 等待等。
  • Channel 的 capacity 大小直接影響着 source 和 sink 兩端的事件生產和消費。capacity 越大,吞吐量越好,可是其餘因素制約着不能設置的很大。好比 MemoryChannel ,直接表現着對內存的消耗,以及進程異常退出所丟失的事件數量。不一樣的 Channel 須要不一樣的考慮,最終 trade-off 是不免的。

6、總結和展望

Flume 是一個很是穩定的服務,這一點在咱們生產環境中獲得充分驗證。 同時它的模型設計也很是清晰易懂,每一種組件類型都有不少現成的實現,同時特考慮到各個擴展點,因此咱們很容易找到或者定製化咱們所須要的數據管道的解決方案。

隨着用戶愈來愈多,須要有一個統一的平臺來集中管理全部的 Flume 實例。 有如下幾點好處:

  • 下降用戶對 Flume 的成本。儘量在不太瞭解 Flume 的狀況下就能夠享受到它帶來的價值。
  • 有效對機器的資源進行合理協調使用。
  • 完善的監控讓 FLume 運行的更加穩定可靠。

固然這一步咱們也纔剛啓動,但願它將來的價值變得愈來愈大。

相關文章
相關標籤/搜索