Kafka Agent 設計 - 可靠事件記錄不是一件簡單的事情

圖片描述

用 golang 編寫,解決事件快速同時可靠入Kafka的問題。綠色表示goroutine(多是不一樣的線程),藍色表示共享的資源。全部對藍色資源的訪問須要加鎖。磁盤隊列使用內存映射文件實現ring buffer。kafka agent啓動的時候有一個bootstrap的broker列表,同時發送的時候會與每一個相關的broker維護一個到多個鏈接。可是kafka agent不讀取也不監聽zookeeper。golang

收益

  • 業務方不須要等待kafka響應就能夠繼續幹別的去(低延遲)數據庫

  • 基於磁盤的隊列(高可靠)apache

  • 消息按partition排隊後組大包並壓縮發送(高吞吐)bootstrap

  • 由於發送到kafka以前業務側已經拿到響應並返回,kafka的request/response能夠簡單地一去一回,不用基於corelation id回調通知業務側(簡單設計)緩存

缺點是業務側沒法知道寫入的kafka offset。網絡

生產過程

業務方和 Kafka Agent 能夠在同一個進程內,也能夠經過本地unix domain socket溝通。若是是在同進程內,事件直接在業務方的 goroutine 內產生併入磁盤隊列。若是是在不一樣進程內,事件經過本地網絡請求轉發,由kafka agent代爲入隊列。入隊列成功以後有返回,業務側接到返回則認爲事件已經可靠保存,Eventually會進入到Kafka中。架構

  1. 產生事件,生成hash值。查詢 queue registry 得到topic的partition數量。根據hash值計算落到parition號。根據topic和partition找到對應的磁盤隊列。dom

  2. 若是沒有對應的磁盤隊列,則新建,並保存回 queue registry。若是沒有對應 topic 的 metadata 則須要查詢並保存回 queue registry。socket

  3. 事件寫入磁盤隊列,移動寫入offset的指向位置。若是寫入超過讀取速度,則覆蓋(丟棄掉舊的event)。ide

  4. 返回業務側,告知已經可靠寫入

異常流程

  • 沒有topic的metadata

  • 沒有對應的queue

  • topic的partition數量可能增長,須要定時刷新

  • 寫入會超過讀取的速度

  • 寫入超過讀取速度的時候,讀取可能正在進行中

發送過程

每一個隊列有一個對應的goroutine負責發送到kafka。

  1. 定時喚醒發送goroutine,從本身負責的queue裏讀取一批事件(只會對應一個topic的一個partition,由於queue是對應topic和partition建立的)。

  2. 當前goroutine裏若是沒有緩存對應的broker(parition的leader)的connection pool,則從全局鏈接池裏拿一個pool。從pool裏借一個conn。

  3. 若是pool不存在則新建pool。

  4. 發送消息到kafka broker,並同步等待其返回。而後歸還connection到pool裏。

  5. 此時消息已經可靠寫入kafka,移動讀取的offset。

異常流程

  • partition對應的broker還未知

  • broker對應的pool還未建立

  • 從broker裏取得的connection已經損壞

  • 寫入時broker告知leader已經改變

  • 移動offset的時候寫入已經溢出

Reliable Event Logging

不少時候咱們都但願可靠地記錄事件,這些事件處理大體分爲兩類

日誌監控

  • 記錄監控指標,用於告警

  • 記錄原始日誌,用於定位問題

  • 記錄原始事件,用於離線統計分析

業務事件

  • 離線計算髮放獎勵

  • 觸發一系列周邊系統的動做

日誌監控類的要求是迅速量大,能夠丟個別的日誌。業務事件類的要求是一條也不能丟,不然業務方要引入另一個隊列作離線對帳(這隊列不仍是logging麼,通常是業務本身的主數據庫來承擔)。業務事件的模式不能依靠上游來重試,由於主業務流程已經完成了,事件的寫入和接下來的處理不該該來決定業務操做自己的成敗。簡單來講就是有一些業務但願本身業務自己成功了,事件必須可靠記錄,同時基於事件的後續處理必須發生。

如今對kafka的使用方式,主要有兩種

  • kafka producer => kafka,實現方式是內存作buffer,而後批量寫入

  • log file => 日誌採集 => kafka 用日誌文件作緩衝

兩種方式都不夠完美,都沒法知足使用方的要求。日誌方式的問題是使用麻煩,並且性能開銷大。直接用producer的方式幾乎無持久化作緩衝,大部分時候就是靠內存buffer活着(除非可以忍受直接寫遠程kafka的延遲),根本沒有持久化保證可靠性。

因此事實上目前基於kafka的應用架構裏都沒法把kafka作爲一個可靠存儲來用,不是由於kafka自己不可靠,而是作到不拖慢業務的同時可靠入庫並不容易。或者講究着把一個不可靠的data pipeline看成可靠的來用。

More on this: http://www.slideshare.net/JiangjieQin/no-data-loss-pipeline-with-apache-kafka-49753844

相關文章
相關標籤/搜索