深刻理解Kafka必知必會(3)

Kafka中的事務是怎麼實現的?

Kafka中的事務可使應用程序將消費消息、生產消息、提交消費位移看成原子操做來處理,同時成功或失敗,即便該生產或消費會跨多個分區。linux

生產者必須提供惟一的transactionalId,啓動後請求事務協調器獲取一個PID,transactionalId與PID一一對應。算法

每次發送數據給<Topic, Partition>前,須要先向事務協調器發送AddPartitionsToTxnRequest,事務協調器會將該<Transaction, Topic, Partition>存於__transaction_state內,並將其狀態置爲BEGIN。緩存

在處理完 AddOffsetsToTxnRequest 以後,生產者還會發送 TxnOffsetCommitRequest 請求給 GroupCoordinator,從而將本次事務中包含的消費位移信息 offsets 存儲到主題 __consumer_offsets 中服務器

一旦上述數據寫入操做完成,應用程序必須調用KafkaProducer的commitTransaction方法或者abortTransaction方法以結束當前事務。不管調用 commitTransaction() 方法仍是 abortTransaction() 方法,生產者都會向 TransactionCoordinator 發送 EndTxnRequest 請求。
TransactionCoordinator 在收到 EndTxnRequest 請求後會執行以下操做:網絡

  1. 將 PREPARE_COMMIT 或 PREPARE_ABORT 消息寫入主題 __transaction_state
  2. 經過 WriteTxnMarkersRequest 請求將 COMMIT 或 ABORT 信息寫入用戶所使用的普通主題和 __consumer_offsets
  3. 將 COMPLETE_COMMIT 或 COMPLETE_ABORT 信息寫入內部主題 __transaction_state標明該事務結束

在消費端有一個參數isolation.level,設置爲「read_committed」,表示消費端應用不能夠看到還沒有提交的事務內的消息。若是生產者開啓事務並向某個分區值發送3條消息 msg一、msg2 和 msg3,在執行 commitTransaction() 或 abortTransaction() 方法前,設置爲「read_committed」的消費端應用是消費不到這些消息的,不過在 KafkaConsumer 內部會緩存這些消息,直到生產者執行 commitTransaction() 方法以後它才能將這些消息推送給消費端應用。反之,若是生產者執行了 abortTransaction() 方法,那麼 KafkaConsumer 會將這些緩存的消息丟棄而不推送給消費端應用。負載均衡

失效副本是指什麼?有那些應對措施?

正常狀況下,分區的全部副本都處於 ISR 集合中,可是不免會有異常狀況發生,從而某些副本被剝離出 ISR 集合中。在 ISR 集合以外,也就是處於同步失效或功能失效(好比副本處於非存活狀態)的副本統稱爲失效副本,失效副本對應的分區也就稱爲同步失效分區,即 under-replicated 分區。socket

Kafka 從 0.9.x 版本開始就經過惟一的 broker 端參數 replica.lag.time.max.ms 來抉擇,當 ISR 集合中的一個 follower 副本滯後 leader 副本的時間超過此參數指定的值時則斷定爲同步失敗,須要將此 follower 副本剔除出 ISR 集合。replica.lag.time.max.ms 參數的默認值爲10000。分佈式

在 0.9.x 版本以前,Kafka 中還有另外一個參數 replica.lag.max.messages(默認值爲4000),它也是用來斷定失效副本的,當一個 follower 副本滯後 leader 副本的消息數超過 replica.lag.max.messages 的大小時,則斷定它處於同步失效的狀態。它與 replica.lag.time.max.ms 參數斷定出的失效副本取並集組成一個失效副本的集合,從而進一步剝離出分區的 ISR 集合。工具

Kafka 源碼註釋中說明了通常有這幾種狀況會致使副本失效:性能

  • follower 副本進程卡住,在一段時間內根本沒有向 leader 副本發起同步請求,好比頻繁的 Full GC。
  • follower 副本進程同步過慢,在一段時間內都沒法追遇上 leader 副本,好比 I/O 開銷過大。
  • 若是經過工具增長了副本因子,那麼新增長的副本在遇上 leader 副本以前也都是處於失效狀態的。
  • 若是一個 follower 副本因爲某些緣由(好比宕機)而下線,以後又上線,在追遇上 leader 副本以前也處於失效狀態。

應對措施

咱們用UnderReplicatedPartitions表明leader副本在當前Broker上且具備失效副本的分區的個數。

若是集羣中有多個Broker的UnderReplicatedPartitions保持一個大於0的穩定值時,通常暗示着集羣中有Broker已經處於下線狀態。這種狀況下,這個Broker中的分區個數與集羣中的全部UnderReplicatedPartitions(處於下線的Broker是不會上報任何指標值的)之和是相等的。一般這類問題是因爲機器硬件緣由引發的,但也有多是因爲操做系統或者JVM引發的 。

若是集羣中存在Broker的UnderReplicatedPartitions頻繁變更,或者處於一個穩定的大於0的值(這裏特指沒有Broker下線的狀況)時,通常暗示着集羣出現了性能問題,一般這類問題很難診斷,不過咱們能夠一步一步的將問題的範圍縮小,好比先嚐試肯定這個性能問題是否只存在於集羣的某個Broker中,仍是整個集羣之上。若是肯定集羣中全部的under-replicated分區都是在單個Broker上,那麼能夠看出這個Broker出現了問題,進而能夠針對這單一的Broker作專項調查,好比:操做系統、GC、網絡狀態或者磁盤狀態(好比:iowait、ioutil等指標)。

多副本下,各個副本中的HW和LEO的演變過程

某個分區有3個副本分別位於 broker0、broker1 和 broker2 節點中,假設 broker0 上的副本1爲當前分區的 leader 副本,那麼副本2和副本3就是 follower 副本,整個消息追加的過程能夠歸納以下:

  1. 生產者客戶端發送消息至 leader 副本(副本1)中。
  2. 消息被追加到 leader 副本的本地日誌,而且會更新日誌的偏移量。
  3. follower 副本(副本2和副本3)向 leader 副本請求同步數據。
  4. leader 副本所在的服務器讀取本地日誌,並更新對應拉取的 follower 副本的信息。
  5. leader 副本所在的服務器將拉取結果返回給 follower 副本。
  6. follower 副本收到 leader 副本返回的拉取結果,將消息追加到本地日誌中,並更新日誌的偏移量信息。

某一時刻,leader 副本的 LEO 增長至5,而且全部副本的 HW 還都爲0。

以後 follower 副本(不帶陰影的方框)向 leader 副本拉取消息,在拉取的請求中會帶有自身的 LEO 信息,這個 LEO 信息對應的是 FetchRequest 請求中的 fetch_offset。leader 副本返回給 follower 副本相應的消息,而且還帶有自身的 HW 信息,如上圖(右)所示,這個 HW 信息對應的是 FetchResponse 中的 high_watermark。

此時兩個 follower 副本各自拉取到了消息,並更新各自的 LEO 爲3和4。與此同時,follower 副本還會更新本身的 HW,更新 HW 的算法是比較當前 LEO 和 leader 副本中傳送過來的HW的值,取較小值做爲本身的 HW 值。當前兩個 follower 副本的 HW 都等於0(min(0,0) = 0)。

接下來 follower 副本再次請求拉取 leader 副本中的消息,以下圖(左)所示。

此時 leader 副本收到來自 follower 副本的 FetchRequest 請求,其中帶有 LEO 的相關信息,選取其中的最小值做爲新的 HW,即 min(15,3,4)=3。而後連同消息和 HW 一塊兒返回 FetchResponse 給 follower 副本,如上圖(右)所示。注意 leader 副本的 HW 是一個很重要的東西,由於它直接影響了分區數據對消費者的可見性。

兩個 follower 副本在收到新的消息以後更新 LEO 而且更新本身的 HW 爲3(min(LEO,3)=3)。

Kafka在可靠性方面作了哪些改進?(HW, LeaderEpoch)

HW

HW 是 High Watermark 的縮寫,俗稱高水位,它標識了一個特定的消息偏移量(offset),消費者只能拉取到這個 offset 以前的消息。

分區 ISR 集合中的每一個副本都會維護自身的 LEO,而 ISR 集合中最小的 LEO 即爲分區的 HW,對消費者而言只能消費 HW 以前的消息。

leader epoch

leader epoch 表明 leader 的紀元信息(epoch),初始值爲0。每當 leader 變動一次,leader epoch 的值就會加1,至關於爲 leader 增設了一個版本號。
每一個副本中還會增設一個矢量 <LeaderEpoch => StartOffset>,其中 StartOffset 表示當前 LeaderEpoch 下寫入的第一條消息的偏移量。

假設有兩個節點A和B,B是leader節點,裏面的數據如圖:

A發生重啓,以後A不是先忙着截斷日誌而是先發送OffsetsForLeaderEpochRequest請求給B,B做爲目前的leader在收到請求以後會返回當前的LEO(LogEndOffset,注意圖中LE0和LEO的不一樣),與請求對應的響應爲OffsetsForLeaderEpochResponse。若是 A 中的 LeaderEpoch(假設爲 LE_A)和 B 中的不相同,那麼 B 此時會查找 LeaderEpoch 爲 LE_A+1 對應的 StartOffset 並返回給 A

如上圖所示,A 在收到2以後發現和目前的 LEO 相同,也就不須要截斷日誌了,以此來保護數據的完整性。

再如,以後 B 發生了宕機,A 成爲新的 leader,那麼對應的 LE=0 也變成了 LE=1,對應的消息 m2 此時就獲得了保留。後續的消息均可以以 LE1 爲 LeaderEpoch 陸續追加到 A 中。這個時候A就會有兩個LE,第二LE所記錄的Offset從2開始。若是B恢復了,那麼就會從A中獲取到LE+1的Offset爲2的值返回給B。

再來看看LE如何解決數據不一致的問題:
當前 A 爲 leader,B 爲 follower,A 中有2條消息 m1 和 m2,而 B 中有1條消息 m1。假設 A 和 B 同時「掛掉」,而後 B 第一個恢復過來併成爲新的 leader。

以後 B 寫入消息 m3,並將 LEO 和 HW 更新至2,以下圖所示。注意此時的 LeaderEpoch 已經從 LE0 增至 LE1 了。

緊接着 A 也恢復過來成爲 follower 並向 B 發送 OffsetsForLeaderEpochRequest 請求,此時 A 的 LeaderEpoch 爲 LE0。B 根據 LE0 查詢到對應的 offset 爲1並返回給 A,A 就截斷日誌並刪除了消息 m2,以下圖所示。以後 A 發送 FetchRequest 至 B 請求來同步數據,最終A和B中都有兩條消息 m1 和 m3,HW 和 LEO都爲2,而且 LeaderEpoch 都爲 LE1,如此便解決了數據不一致的問題。

爲何Kafka不支持讀寫分離?

由於這樣有兩個明顯的缺點:

  1. 數據一致性問題。數據從主節點轉到從節點必然會有一個延時的時間窗口,這個時間窗口會致使主從節點之間的數據不一致。
  2. 延時問題。數據從寫入主節點到同步至從節點中的過程須要經歷網絡→主節點內存→主節點磁盤→網絡→從節點內存→從節點磁盤這幾個階段。對延時敏感的應用而言,主寫從讀的功能並不太適用。

對於Kafka來講,必要性不是很高,由於在Kafka集羣中,若是存在多個副本,通過合理的配置,可讓leader副本均勻的分佈在各個broker上面,使每一個 broker 上的讀寫負載都是同樣的。

Kafka中的延遲隊列怎麼實現

在發送延時消息的時候並非先投遞到要發送的真實主題(real_topic)中,而是先投遞到一些 Kafka 內部的主題(delay_topic)中,這些內部主題對用戶不可見,而後經過一個自定義的服務拉取這些內部主題中的消息,並將知足條件的消息再投遞到要發送的真實的主題中,消費者所訂閱的仍是真實的主題。

若是採用這種方案,那麼通常是按照不一樣的延時等級來劃分的,好比設定5s、10s、30s、1min、2min、5min、10min、20min、30min、45min、1hour、2hour這些按延時時間遞增的延時等級,延時的消息按照延時時間投遞到不一樣等級的主題中,投遞到同一主題中的消息的延時時間會被強轉爲與此主題延時等級一致的延時時間,這樣延時偏差控制在兩個延時等級的時間差範圍以內(好比延時時間爲17s的消息投遞到30s的延時主題中,以後按照延時時間爲30s進行計算,延時偏差爲13s)。雖然有必定的延時偏差,可是偏差可控,而且這樣只需增長少量的主題就能實現延時隊列的功能。

發送到內部主題(delay_topic_*)中的消息會被一個獨立的 DelayService 進程消費,這個 DelayService 進程和 Kafka broker 進程以一對一的配比進行同機部署(參考下圖),以保證服務的可用性。

針對不一樣延時級別的主題,在 DelayService 的內部都會有單獨的線程來進行消息的拉取,以及單獨的 DelayQueue(這裏用的是 JUC 中 DelayQueue)進行消息的暫存。與此同時,在 DelayService 內部還會有專門的消息發送線程來獲取 DelayQueue 的消息並轉發到真實的主題中。從消費、暫存再到轉發,線程之間都是一一對應的關係。以下圖所示,DelayService 的設計應當儘可能保持簡單,避免鎖機制產生的隱患。

爲了保障內部 DelayQueue 不會由於未處理的消息過多而致使內存的佔用過大,DelayService 會對主題中的每一個分區進行計數,當達到必定的閾值以後,就會暫停拉取該分區中的消息。

由於一個主題中通常不止一個分區,分區之間的消息並不會按照投遞時間進行排序,DelayQueue的做用是將消息按照再次投遞時間進行有序排序,這樣下游的消息發送線程就可以按照前後順序獲取最早知足投遞條件的消息。

Kafka中怎麼實現死信隊列和重試隊列?

死信能夠看做消費者不能處理收到的消息,也能夠看做消費者不想處理收到的消息,還能夠看做不符合處理要求的消息。好比消息內包含的消息內容沒法被消費者解析,爲了確保消息的可靠性而不被隨意丟棄,故將其投遞到死信隊列中,這裏的死信就能夠看做消費者不能處理的消息。再好比超過既定的重試次數以後將消息投入死信隊列,這裏就能夠將死信看做不符合處理要求的消息。

重試隊列其實能夠看做一種回退隊列,具體指消費端消費消息失敗時,爲了防止消息無端丟失而從新將消息回滾到 broker 中。與回退隊列不一樣的是,重試隊列通常分紅多個重試等級,每一個重試等級通常也會設置從新投遞延時,重試次數越多投遞延時就越大。

理解了他們的概念以後咱們就能夠爲每一個主題設置重試隊列,消息第一次消費失敗入重試隊列 Q1,Q1 的從新投遞延時爲5s,5s事後從新投遞該消息;若是消息再次消費失敗則入重試隊列 Q2,Q2 的從新投遞延時爲10s,10s事後再次投遞該消息。

而後再設置一個主題做爲死信隊列,重試越屢次從新投遞的時間就越久,而且須要設置一個上限,超過投遞次數就進入死信隊列。重試隊列與延時隊列有相同的地方,都須要設置延時級別。

Kafka中怎麼作消息審計?

消息審計是指在消息生產、存儲和消費的整個過程之間對消息個數及延遲的審計,以此來檢測是否有數據丟失、是否有數據重複、端到端的延遲又是多少等內容。

目前與消息審計有關的產品也有多個,好比 Chaperone(Uber)、Confluent Control Center、Kafka Monitor(LinkedIn),它們主要經過在消息體(value 字段)或在消息頭(headers 字段)中內嵌消息對應的時間戳 timestamp 或全局的惟一標識 ID(或者是二者兼備)來實現消息的審計功能。

內嵌 timestamp 的方式主要是設置一個審計的時間間隔 time_bucket_interval(能夠自定義設置幾秒或幾分鐘),根據這個 time_bucket_interval 和消息所屬的 timestamp 來計算相應的時間桶(time_bucket)。

內嵌 ID 的方式就更加容易理解了,對於每一條消息都會被分配一個全局惟一標識 ID。若是主題和相應的分區固定,則能夠爲每一個分區設置一個全局的 ID。當有消息發送時,首先獲取對應的 ID,而後內嵌到消息中,最後纔將它發送到 broker 中。消費者進行消費審計時,能夠判斷出哪條消息丟失、哪條消息重複。

Kafka中怎麼作消息軌跡?

消息軌跡指的是一條消息從生產者發出,經由 broker 存儲,再到消費者消費的整個過程當中,各個相關節點的狀態、時間、地點等數據匯聚而成的完整鏈路信息。生產者、broker、消費者這3個角色在處理消息的過程當中都會在鏈路中增長相應的信息,將這些信息匯聚、處理以後就能夠查詢任意消息的狀態,進而爲生產環境中的故障排除提供強有力的數據支持。

對消息軌跡而言,最多見的實現方式是封裝客戶端,在保證正常生產消費的同時添加相應的軌跡信息埋點邏輯。不管生產,仍是消費,在執行以後都會有相應的軌跡信息,咱們須要將這些信息保存起來。

咱們一樣能夠將軌跡信息保存到 Kafka 的某個主題中,好比下圖中的主題 trace_topic。


生產者在將消息正常發送到用戶主題 real_topic 以後(或者消費者在拉取到消息消費以後)會將軌跡信息發送到主題 trace_topic 中。

怎麼計算Lag?(注意read_uncommitted和read_committed狀態下的不一樣)

若是消費者客戶端的 isolation.level 參數配置爲「read_uncommitted」(默認),它對應的 Lag 等於HW – ConsumerOffset 的值,其中 ConsumerOffset 表示當前的消費位移。

若是這個參數配置爲「read_committed」,那麼就要引入 LSO 來進行計算了。LSO 是 LastStableOffset 的縮寫,它對應的 Lag 等於 LSO – ConsumerOffset 的值。

  • 首先經過 DescribeGroupsRequest 請求獲取當前消費組的元數據信息,固然在這以前還會經過 FindCoordinatorRequest 請求查找消費組對應的 GroupCoordinator。
  • 接着經過 OffsetFetchRequest 請求獲取消費位移 ConsumerOffset。
  • 而後經過 KafkaConsumer 的 endOffsets(Collection partitions)方法(對應於 ListOffsetRequest 請求)獲取 HW(LSO)的值。
  • 最後經過 HW 與 ConsumerOffset 相減獲得分區的 Lag,要得到主題的整體 Lag 只需對旗下的各個分區累加便可。

Kafka有哪些指標須要着重關注?

比較重要的 Broker 端 JMX 指標:

  • BytesIn/BytesOut:即 Broker 端每秒入站和出站字節數。你要確保這組值不要接近你的網絡帶寬,不然這一般都表示網卡已被「打滿」,很容易出現網絡丟包的情形。
  • NetworkProcessorAvgIdlePercent:即網絡線程池線程平均的空閒比例。一般來講,你應該確保這個 JMX 值長期大於 30%。若是小於這個值,就代表你的網絡線程池很是繁忙,你須要經過增長網絡線程數或將負載轉移給其餘服務器的方式,來給該 Broker 減負。
  • RequestHandlerAvgIdlePercent:即 I/O 線程池線程平均的空閒比例。一樣地,若是該值長期小於 30%,你須要調整 I/O 線程池的數量,或者減小 Broker 端的負載。
  • UnderReplicatedPartitions:即未充分備份的分區數。所謂未充分備份,是指並不是全部的 Follower 副本都和 Leader 副本保持同步。一旦出現了這種狀況,一般都代表該分區有可能會出現數據丟失。所以,這是一個很是重要的 JMX 指標。
  • ISRShrink/ISRExpand:即 ISR 收縮和擴容的頻次指標。若是你的環境中出現 ISR 中副本頻繁進出的情形,那麼這組值必定是很高的。這時,你要診斷下副本頻繁進出 ISR 的緣由,並採起適當的措施。
  • ActiveControllerCount:即當前處於激活狀態的控制器的數量。正常狀況下,Controller 所在 Broker 上的這個 JMX 指標值應該是 1,其餘 Broker 上的這個值是 0。若是你發現存在多臺 Broker 上該值都是 1 的狀況,必定要趕快處理,處理方式主要是查看網絡連通性。這種狀況一般代表集羣出現了腦裂。腦裂問題是很是嚴重的分佈式故障,Kafka 目前依託 ZooKeeper 來防止腦裂。但一旦出現腦裂,Kafka 是沒法保證正常工做的。

Kafka的那些設計讓它有如此高的性能?

  1. 分區
    kafka是個分佈式集羣的系統,整個系統能夠包含多個broker,也就是多個服務器實例。每一個主題topic會有多個分區,kafka將分區均勻地分配到整個集羣中,當生產者向對應主題傳遞消息,消息經過負載均衡機制傳遞到不一樣的分區以減輕單個服務器實例的壓力。

一個Consumer Group中能夠有多個consumer,多個consumer能夠同時消費不一樣分區的消息,大大的提升了消費者的並行消費能力。可是一個分區中的消息只能被一個Consumer Group中的一個consumer消費。

  1. 網絡傳輸上減小開銷
    批量發送:
    在發送消息的時候,kafka不會直接將少許數據發送出去,不然每次發送少許的數據會增長網絡傳輸頻率,下降網絡傳輸效率。kafka會先將消息緩存在內存中,當超過一個的大小或者超過必定的時間,那麼會將這些消息進行批量發送。
    端到端壓縮:
    固然網絡傳輸時數據量小也能夠減少網絡負載,kafaka會將這些批量的數據進行壓縮,將一批消息打包後進行壓縮,發送broker服務器後,最終這些數據仍是提供給消費者用,因此數據在服務器上仍是保持壓縮狀態,不會進行解壓,並且頻繁的壓縮和解壓也會下降性能,最終仍是以壓縮的方式傳遞到消費者的手上。

  2. 順序讀寫
    kafka將消息追加到日誌文件中,利用了磁盤的順序讀寫,來提升讀寫效率。

  3. 零拷貝技術

零拷貝將文件內容從磁盤經過DMA引擎複製到內核緩衝區,並且沒有把數據複製到socket緩衝區,只是將數據位置和長度信息的描述符複製到了socket緩存區,而後直接將數據傳輸到網絡接口,最後發送。這樣大大減少了拷貝的次數,提升了效率。kafka正是調用linux系統給出的sendfile系統調用來使用零拷貝。Java中的系統調用給出的是FileChannel.transferTo接口。

  1. 優秀的文件存儲機制
    若是分區規則設置得合理,那麼全部的消息能夠均勻地分佈到不一樣的分區中,這樣就能夠實現水平擴展。不考慮多副本的狀況,一個分區對應一個日誌(Log)。爲了防止 Log 過大,Kafka 又引入了日誌分段(LogSegment)的概念,將 Log 切分爲多個 LogSegment,至關於一個巨型文件被平均分配爲多個相對較小的文件,這樣也便於消息的維護和清理。

Kafka 中的索引文件以稀疏索引(sparse index)的方式構造消息的索引,它並不保證每一個消息在索引文件中都有對應的索引項。每當寫入必定量(由 broker 端參數 log.index.interval.bytes 指定,默認值爲4096,即 4KB)的消息時,偏移量索引文件和時間戳索引文件分別增長一個偏移量索引項和時間戳索引項,增大或減少 log.index.interval.bytes 的值,對應地能夠增長或縮小索引項的密度。

相關文章
相關標籤/搜索