kafka的設計

1.動機

設計 kafka 初衷,做爲統一平臺處理大公司的實時數據。因此 必須具備以下特性:java

  • 支持海量數據
  • 高吞吐量
  • 低延遲(實時性)
  • 支持分區,分佈式
  • 容錯

 2.持久化

kafka 高度依賴 文件系統 存儲和緩存消息。經過對磁盤的順序讀寫,並藉助 OS 層面的 頁緩存(page cache),保證優於緩存在內存中或其餘結構中。linux

爲什麼使用磁盤效率仍然很高:緩存

  • 利用磁盤的順序讀寫,操做一個文件,將數據追加到文件的末尾。相比於隨機讀寫,效率很高。
  • 利用 OS 層面的頁緩存(page cache),順序讀文件能夠預讀數據到 page cache。經過自動訪問全部可用內存 以及 存儲緊湊型字節結構而非單個對象提升內存使用率。OS緩存相對於進程內的緩存,重啓後仍然可用,不須要重建。
  • 全部的操做時間複雜度都是 常量時間O(1),與數據大小無關,讀 和 寫 不會互相阻塞。

3.效率

使用磁盤效率低下主要有兩個緣由:服務器

  • 過多的小 I/O 操做:發生在客戶端和服務端之間,以及 服務端本身的持久化操做中
  • 過多的字節複製

針對 小 I/O 操做,kafka 根據 "message set" 抽象構建了一個協議,該 抽象 天然地將消息分組在一塊兒。該協議容許網絡請求將消息分組在一塊兒,並分攤網絡往返的開銷,而不是一次發送一條消息。服務器依次將消息塊一次附加到其日誌中,而消費者一次獲取大型線性塊。網絡

針對過多的字節複製,使用了由生產者、代理 和 消費者共享的標準化二進制消息格式(這樣,數據塊就能夠在它們之間不進行修改的狀況下進行傳輸)。服務器所持有的消息日誌 自己是一個文件目錄,每一個文件都由一系列 "message set" 填充。這些消息集以生產者和消費者使用的相同格式寫入磁盤。維護這種通用格式能夠優化  持久化日誌塊的 網絡傳輸。app

存儲在文件中的信息經過網絡發送給客戶,經歷的幾個路徑:負載均衡

  • 操做系統在內核空間將數據從磁盤讀取到 page cache 中。
  • 應用程序從內核空間讀取到 用戶空間緩衝區
  • 應用程序將數據寫回到內核空間的套接字緩衝區
  • 操做系統將數據從套接字緩衝區複製到 NIC 緩衝區(NIC:網絡接口控制器)。

以上產生了四個副本拷貝,2個系統調用開銷,效率低下。異步

                 

          

       

 基於 零拷貝技術:消息數據直接從 page cache 發送到網絡。linux 中使用 sendfile 完成零拷貝技術。java 中 java.nio.channels.FileChannel 的 transferTo() 方法也使用了零拷貝技術。socket

                  

kafka 經過 page cache 和 sendfile 的組合,將看不到磁盤上的任何讀取活動,由於它們將徹底從緩存中提供數據。分佈式

端到端的批量壓縮

Kafka經過遞歸消息集來支持同時壓縮多個消息而減小相同消息的冗餘。 一批消息能夠一塊兒壓縮並以此形式發送到服務器。 這批消息將以壓縮形式寫入,並將在日誌中保持壓縮,而且只能由消費者解壓縮。Kafka支持GZIP和Snappy壓縮協議。

4.生產者

4.1負載均衡

生產者將數據直接發送給分區對應的 leader。爲了實現這一點,全部的 kafka 節點要可以在 任什麼時候候應答 哪一個服務器還活着以及 topic分區的leader在哪裏的 元數據請求。

客戶端本身控制 消息發送到哪一個分區,這能夠隨機完成,實現一種隨機的負載平衡,也能夠經過一些語義分區函數完成。

4.2異步發送

啓用 kafka 生產者 的批處理,kafka 將在內存中累積數據而後一次性批量發送。能夠配置 累計不超過固定數量的消息(bach.size),等待不超過固定延遲時間(linger.ms)。

5.消費者

5.1拉 VS 推送

消費者主動拉取消息缺點:若是 broker 沒有數據,消費者會輪詢,忙等待直到數據到達。kafka 能夠在拉請求中設置一些參數,容許使用者請求在「長輪詢」中阻塞,等待數據到達(也能夠選擇等待,直到給定的字節數可用,以確保傳輸大小很大)

消費者被動推送消息缺點:很難適應消費速率不一樣的消費者,消息發送速率是由 broker 決定的,broker 是儘量快的將消息發送出去,這樣會形成消費者來不及處理消息,典型的表現就是 網絡阻塞 和 拒絕服務。

5.2消費者的定位

topic 被分爲一組有序的分區,每一個分區在任何給定的時間都由每一個訂閱消費者組中的一個消費者消費。這意味着消費者在每一個分區中的位置只是一個整數,這個整數表明了即將要消費的消息的偏移量。這樣作的好處是能夠返回到舊的偏移量進行消費。

5.3離線數據加載

可伸縮持久性容許消費者只按期使用,例如批量數據加載,按期將數據批量加載到離線系統(如Hadoop或關係數據倉庫)中。

6.消息傳遞語義

很明顯,消息傳遞保證可以提供多種可能:

  • 最多一次:消息可能丟失,可是毫不會重發
  • 至少一次:消息毫不會丟失,可是可能會重發
  • 正好一次:每條消息被傳遞一次

kafka 的消息傳遞語義:

一旦發佈的消息已提交到日誌,只要副本分區寫入了此消息的一個broker仍然"活着」,它就不會丟失。

0.11.0.0 版本以前,若是一個生產者沒有收到消息提交的響應,那麼生產者只能從新發送該消息。這就保證了至少一次的傳遞語義。若是上一次的請求其實是成功的,那麼消息就會再次寫到日誌中,形成重複消費。

0.11.0.0 版本以後,kafka 生產者支持冪等傳遞,保證從新發送不會致使日誌中有重複記錄。爲了實現這一點,broker 爲 每個生產者 分配一個 ID,使用生產者隨每條消息一塊兒發送的序列號來消除重複的消息。

同時也是從 0.11.0.0 版本以後,生產者支持使用事務類語義將消息發送到多個 topic 分區的能力:即,要麼全部消息都已成功寫入,要麼都未成功寫入。這方面的主要用例是在Kafka topic 之間進行一次處理。

固然,不是全部的使用場景都須要如此嚴謹的保障,對於延遲敏感的,咱們容許生產者指定它想要的耐用性水平。如生產者能夠指定它獲取需等待10毫秒量級上的響應。生產者也能夠指定異步發送,或只等待leader(不須要副本的響應)有響應。

從消費者的角度描述語義:

  • 讀取到消息,在日誌中保存位置,最後處理消息。這種順序 若是消費者在保存位置以後,處理消息以前崩潰,數據會丟失,屬於 最多一次的語義。
  • 讀取消息,處理消息,在日誌中保存位置。這種順序,若是消費者在處理消息以後,日誌中保存位置以前崩潰,數據會被屢次處理,屬於至少一次的語義。在多數狀況下,消息都有一個主鍵,因此更新是冪等的(一次執行和屢次執行的影響相同)。

kafka 默認是保證「至少一次」傳遞,並容許用戶經過禁止生產者重試和處理一批消息前提交它的偏移量來實現 「最多一次」傳遞。而「正好一次」傳遞須要與目標存儲系統合做,但kafka提供了偏移量,因此實現這個很簡單。

7.副本

kafka 在各個服務器上備份 每一個 topic 的 partition (經過 replication factor 設置副本數)。當集羣中的某個服務器發生故障時,自動轉移到這些副本,以便在故障時,消息仍然可用。

kafka 的默認 副本因子爲 1,即不建立副本。副本因子是指副本的總數,包括 leader 。

副本以 topic 的 partition 爲單位。在非故障的狀況下,kafka 中的每一個 partition 都有一個 leader,0 個或者多個 follower。全部的讀 和寫都指向 分區的 leader。一般,分區數 多於 broker 的數量,leader 均勻的分佈在 broker 上。follower 的日誌與 leader 的日誌相同,即相同的 偏移量 offset 和 消息順序 。(固然,有可能在某個時間點,leader 上比 follower 多幾條還未同步的消息)。

kafka 節點存活的2個條件:

  • 一個節點必須能維持與 zookeeper 的會話(經過 zookeeper 的心跳機制)。
  • 若是該節點是  slave,它必須複製 leader 的寫數據,而且不能落後太多。

若是節點 死掉,卡主,或者落後太多,leader 將 從 同步副本 ISR (In Sync Replicas)中移除該節點。落後多少是由  replica.lag.max.messages 控制,卡主多久算卡主是由  replica.lag.time.max.ms 控制。

kafka 動態維護一組同步 leader 數據的副本(ISR),只有這個組中的成員纔有資格當選 leader。在全部同步副本都收到寫操做以前,不會認爲已提交對Kafka分區的寫操做。這組 ISR 保存在 zookeeper 中,正由於如此,在ISR中的任何副本都有資格當選leader。對於 f+1 個 副本的 kafka, topic 能夠容忍f失敗而不會丟失已提交的消息。

若是全部的節點都死掉,有兩種能夠實現的方式:

  • 等待 ISR 列表中的節點活過來,而且選擇該節點做爲 leader.
  • 選擇第一個活過來的節點(無論它在 ISR 列表中)做爲 leader.

從 0.11.0.0 開始 kafka 默認選擇第一種策略,等待一致性的副本;能夠經過配置 unclean.leader.election.enable 爲 true 來選用第二種策略。這兩種策略是  可用性 和一致性的權衡,須要根據實際業務來決定。

可用性 和 耐久性保證

當寫消息到 kafka 時,生產者能夠 配置 須要 leader 收到的確認數 來肯定是否完成請求,經過 配置 acks 知足多種狀況:

  • acks = 0 :生產者不會等待服務器的任何確認,消息記錄將被馬上添加到  socket 緩衝區並視爲已發送。這種狀況沒法確保服務器已經接收到消息記錄,重試的配置也不會生效。每一個記錄返回的偏移量始終被設置爲 1.
  • acks = 1 :服務器端的 leader 寫入消息到本地日誌就當即響應生產者,而不等待 follower 應答。這種狀況,若是在服務器響應生產者以後,複製到 follower 以前掛掉 就會丟失數據。
  • acks = all(-1):服務器端的 leader 會等待 ISR 中全部副本同步響應來確認消息記錄。這保證了只要 ISR 中還有一個副本存活就不會丟失記錄,也能夠設置爲 -1;

提供兩種 topic 級別的配置 來確保 持久性 而非 可用性。

  • unclean.leader.election.enable 設爲 false,(默認即爲 false)即 全部的副本都不可用時,分區纔不可用。只有當 ISR 中的節點 活過來 分區才能可用。
  • 指定 一個最小的 ISR 數量值,經過 min.insync.replicas 來配置,只有當 ISR 中的數量 超過最小值,分區纔會接受寫入操做,以此來防止僅寫入單個副本然後副本不可用而致使的消息的丟失。該設置僅在 acks = all 並保證至少有這麼多同步副本確認消息時生效。

副本管理

上面關於複製日誌的討論實際上只涉及了一個日誌,例如 一個 topic 的partition,然而,kafka 集羣管理着成百上千個這樣的分區。經過 round-robin 的方式平衡 集羣中的分區,避免 大部分的分區分佈在少許的及誒單上,一樣,平衡 leader,使在分區份額上的每一個節點都是 leader。

kafka 選擇 其中一個 broker 做爲 controller(到 zookeeper 上註冊,先到先得)。該 controller 檢測 broker 級別的故障,並負責更改 故障 broker 上受影響的 分區的 leader。這樣就能夠批量處理 leader 的變動。若是 controller 故障,其餘存活的 broker 將會成爲新的 controller(一樣須要到 zookeeper 上註冊)。

相關文章
相關標籤/搜索