Kafka中的事務可使應用程序將消費消息、生產消息、提交消費位移看成原子操做來處理,同時成功或失敗,即便該生產或消費會跨多個分區。linux
生產者必須提供惟一的transactionalId,啓動後請求事務協調器獲取一個PID,transactionalId與PID一一對應。算法
每次發送數據給<Topic, Partition>前,須要先向事務協調器發送AddPartitionsToTxnRequest,事務協調器會將該<Transaction, Topic, Partition>存於__transaction_state內,並將其狀態置爲BEGIN。緩存
在處理完 AddOffsetsToTxnRequest 以後,生產者還會發送 TxnOffsetCommitRequest 請求給 GroupCoordinator,從而將本次事務中包含的消費位移信息 offsets 存儲到主題 __consumer_offsets 中服務器
一旦上述數據寫入操做完成,應用程序必須調用KafkaProducer的commitTransaction方法或者abortTransaction方法以結束當前事務。不管調用 commitTransaction() 方法仍是 abortTransaction() 方法,生產者都會向 TransactionCoordinator 發送 EndTxnRequest 請求。
TransactionCoordinator 在收到 EndTxnRequest 請求後會執行以下操做:網絡
在消費端有一個參數isolation.level,設置爲「read_committed」,表示消費端應用不能夠看到還沒有提交的事務內的消息。若是生產者開啓事務並向某個分區值發送3條消息 msg一、msg2 和 msg3,在執行 commitTransaction() 或 abortTransaction() 方法前,設置爲「read_committed」的消費端應用是消費不到這些消息的,不過在 KafkaConsumer 內部會緩存這些消息,直到生產者執行 commitTransaction() 方法以後它才能將這些消息推送給消費端應用。反之,若是生產者執行了 abortTransaction() 方法,那麼 KafkaConsumer 會將這些緩存的消息丟棄而不推送給消費端應用。負載均衡
正常狀況下,分區的全部副本都處於 ISR 集合中,可是不免會有異常狀況發生,從而某些副本被剝離出 ISR 集合中。在 ISR 集合以外,也就是處於同步失效或功能失效(好比副本處於非存活狀態)的副本統稱爲失效副本,失效副本對應的分區也就稱爲同步失效分區,即 under-replicated 分區。socket
Kafka 從 0.9.x 版本開始就經過惟一的 broker 端參數 replica.lag.time.max.ms 來抉擇,當 ISR 集合中的一個 follower 副本滯後 leader 副本的時間超過此參數指定的值時則斷定爲同步失敗,須要將此 follower 副本剔除出 ISR 集合。replica.lag.time.max.ms 參數的默認值爲10000。分佈式
在 0.9.x 版本以前,Kafka 中還有另外一個參數 replica.lag.max.messages(默認值爲4000),它也是用來斷定失效副本的,當一個 follower 副本滯後 leader 副本的消息數超過 replica.lag.max.messages 的大小時,則斷定它處於同步失效的狀態。它與 replica.lag.time.max.ms 參數斷定出的失效副本取並集組成一個失效副本的集合,從而進一步剝離出分區的 ISR 集合。工具
Kafka 源碼註釋中說明了通常有這幾種狀況會致使副本失效:性能
咱們用UnderReplicatedPartitions表明leader副本在當前Broker上且具備失效副本的分區的個數。
若是集羣中有多個Broker的UnderReplicatedPartitions保持一個大於0的穩定值時,通常暗示着集羣中有Broker已經處於下線狀態。這種狀況下,這個Broker中的分區個數與集羣中的全部UnderReplicatedPartitions(處於下線的Broker是不會上報任何指標值的)之和是相等的。一般這類問題是因爲機器硬件緣由引發的,但也有多是因爲操做系統或者JVM引發的 。
若是集羣中存在Broker的UnderReplicatedPartitions頻繁變更,或者處於一個穩定的大於0的值(這裏特指沒有Broker下線的狀況)時,通常暗示着集羣出現了性能問題,一般這類問題很難診斷,不過咱們能夠一步一步的將問題的範圍縮小,好比先嚐試肯定這個性能問題是否只存在於集羣的某個Broker中,仍是整個集羣之上。若是肯定集羣中全部的under-replicated分區都是在單個Broker上,那麼能夠看出這個Broker出現了問題,進而能夠針對這單一的Broker作專項調查,好比:操做系統、GC、網絡狀態或者磁盤狀態(好比:iowait、ioutil等指標)。
某個分區有3個副本分別位於 broker0、broker1 和 broker2 節點中,假設 broker0 上的副本1爲當前分區的 leader 副本,那麼副本2和副本3就是 follower 副本,整個消息追加的過程能夠歸納以下:
某一時刻,leader 副本的 LEO 增長至5,而且全部副本的 HW 還都爲0。
以後 follower 副本(不帶陰影的方框)向 leader 副本拉取消息,在拉取的請求中會帶有自身的 LEO 信息,這個 LEO 信息對應的是 FetchRequest 請求中的 fetch_offset。leader 副本返回給 follower 副本相應的消息,而且還帶有自身的 HW 信息,如上圖(右)所示,這個 HW 信息對應的是 FetchResponse 中的 high_watermark。
此時兩個 follower 副本各自拉取到了消息,並更新各自的 LEO 爲3和4。與此同時,follower 副本還會更新本身的 HW,更新 HW 的算法是比較當前 LEO 和 leader 副本中傳送過來的HW的值,取較小值做爲本身的 HW 值。當前兩個 follower 副本的 HW 都等於0(min(0,0) = 0)。
接下來 follower 副本再次請求拉取 leader 副本中的消息,以下圖(左)所示。
此時 leader 副本收到來自 follower 副本的 FetchRequest 請求,其中帶有 LEO 的相關信息,選取其中的最小值做爲新的 HW,即 min(15,3,4)=3。而後連同消息和 HW 一塊兒返回 FetchResponse 給 follower 副本,如上圖(右)所示。注意 leader 副本的 HW 是一個很重要的東西,由於它直接影響了分區數據對消費者的可見性。
兩個 follower 副本在收到新的消息以後更新 LEO 而且更新本身的 HW 爲3(min(LEO,3)=3)。
HW 是 High Watermark 的縮寫,俗稱高水位,它標識了一個特定的消息偏移量(offset),消費者只能拉取到這個 offset 以前的消息。
分區 ISR 集合中的每一個副本都會維護自身的 LEO,而 ISR 集合中最小的 LEO 即爲分區的 HW,對消費者而言只能消費 HW 以前的消息。
leader epoch 表明 leader 的紀元信息(epoch),初始值爲0。每當 leader 變動一次,leader epoch 的值就會加1,至關於爲 leader 增設了一個版本號。
每一個副本中還會增設一個矢量 <LeaderEpoch => StartOffset>,其中 StartOffset 表示當前 LeaderEpoch 下寫入的第一條消息的偏移量。
假設有兩個節點A和B,B是leader節點,裏面的數據如圖:
A發生重啓,以後A不是先忙着截斷日誌而是先發送OffsetsForLeaderEpochRequest請求給B,B做爲目前的leader在收到請求以後會返回當前的LEO(LogEndOffset,注意圖中LE0和LEO的不一樣),與請求對應的響應爲OffsetsForLeaderEpochResponse。若是 A 中的 LeaderEpoch(假設爲 LE_A)和 B 中的不相同,那麼 B 此時會查找 LeaderEpoch 爲 LE_A+1 對應的 StartOffset 並返回給 A
如上圖所示,A 在收到2以後發現和目前的 LEO 相同,也就不須要截斷日誌了,以此來保護數據的完整性。
再如,以後 B 發生了宕機,A 成爲新的 leader,那麼對應的 LE=0 也變成了 LE=1,對應的消息 m2 此時就獲得了保留。後續的消息均可以以 LE1 爲 LeaderEpoch 陸續追加到 A 中。這個時候A就會有兩個LE,第二LE所記錄的Offset從2開始。若是B恢復了,那麼就會從A中獲取到LE+1的Offset爲2的值返回給B。
再來看看LE如何解決數據不一致的問題:
當前 A 爲 leader,B 爲 follower,A 中有2條消息 m1 和 m2,而 B 中有1條消息 m1。假設 A 和 B 同時「掛掉」,而後 B 第一個恢復過來併成爲新的 leader。
以後 B 寫入消息 m3,並將 LEO 和 HW 更新至2,以下圖所示。注意此時的 LeaderEpoch 已經從 LE0 增至 LE1 了。
緊接着 A 也恢復過來成爲 follower 並向 B 發送 OffsetsForLeaderEpochRequest 請求,此時 A 的 LeaderEpoch 爲 LE0。B 根據 LE0 查詢到對應的 offset 爲1並返回給 A,A 就截斷日誌並刪除了消息 m2,以下圖所示。以後 A 發送 FetchRequest 至 B 請求來同步數據,最終A和B中都有兩條消息 m1 和 m3,HW 和 LEO都爲2,而且 LeaderEpoch 都爲 LE1,如此便解決了數據不一致的問題。
由於這樣有兩個明顯的缺點:
對於Kafka來講,必要性不是很高,由於在Kafka集羣中,若是存在多個副本,通過合理的配置,可讓leader副本均勻的分佈在各個broker上面,使每一個 broker 上的讀寫負載都是同樣的。
在發送延時消息的時候並非先投遞到要發送的真實主題(real_topic)中,而是先投遞到一些 Kafka 內部的主題(delay_topic)中,這些內部主題對用戶不可見,而後經過一個自定義的服務拉取這些內部主題中的消息,並將知足條件的消息再投遞到要發送的真實的主題中,消費者所訂閱的仍是真實的主題。
若是採用這種方案,那麼通常是按照不一樣的延時等級來劃分的,好比設定5s、10s、30s、1min、2min、5min、10min、20min、30min、45min、1hour、2hour這些按延時時間遞增的延時等級,延時的消息按照延時時間投遞到不一樣等級的主題中,投遞到同一主題中的消息的延時時間會被強轉爲與此主題延時等級一致的延時時間,這樣延時偏差控制在兩個延時等級的時間差範圍以內(好比延時時間爲17s的消息投遞到30s的延時主題中,以後按照延時時間爲30s進行計算,延時偏差爲13s)。雖然有必定的延時偏差,可是偏差可控,而且這樣只需增長少量的主題就能實現延時隊列的功能。
發送到內部主題(delay_topic_*)中的消息會被一個獨立的 DelayService 進程消費,這個 DelayService 進程和 Kafka broker 進程以一對一的配比進行同機部署(參考下圖),以保證服務的可用性。
針對不一樣延時級別的主題,在 DelayService 的內部都會有單獨的線程來進行消息的拉取,以及單獨的 DelayQueue(這裏用的是 JUC 中 DelayQueue)進行消息的暫存。與此同時,在 DelayService 內部還會有專門的消息發送線程來獲取 DelayQueue 的消息並轉發到真實的主題中。從消費、暫存再到轉發,線程之間都是一一對應的關係。以下圖所示,DelayService 的設計應當儘可能保持簡單,避免鎖機制產生的隱患。
爲了保障內部 DelayQueue 不會由於未處理的消息過多而致使內存的佔用過大,DelayService 會對主題中的每一個分區進行計數,當達到必定的閾值以後,就會暫停拉取該分區中的消息。
由於一個主題中通常不止一個分區,分區之間的消息並不會按照投遞時間進行排序,DelayQueue的做用是將消息按照再次投遞時間進行有序排序,這樣下游的消息發送線程就可以按照前後順序獲取最早知足投遞條件的消息。
死信能夠看做消費者不能處理收到的消息,也能夠看做消費者不想處理收到的消息,還能夠看做不符合處理要求的消息。好比消息內包含的消息內容沒法被消費者解析,爲了確保消息的可靠性而不被隨意丟棄,故將其投遞到死信隊列中,這裏的死信就能夠看做消費者不能處理的消息。再好比超過既定的重試次數以後將消息投入死信隊列,這裏就能夠將死信看做不符合處理要求的消息。
重試隊列其實能夠看做一種回退隊列,具體指消費端消費消息失敗時,爲了防止消息無端丟失而從新將消息回滾到 broker 中。與回退隊列不一樣的是,重試隊列通常分紅多個重試等級,每一個重試等級通常也會設置從新投遞延時,重試次數越多投遞延時就越大。
理解了他們的概念以後咱們就能夠爲每一個主題設置重試隊列,消息第一次消費失敗入重試隊列 Q1,Q1 的從新投遞延時爲5s,5s事後從新投遞該消息;若是消息再次消費失敗則入重試隊列 Q2,Q2 的從新投遞延時爲10s,10s事後再次投遞該消息。
而後再設置一個主題做爲死信隊列,重試越屢次從新投遞的時間就越久,而且須要設置一個上限,超過投遞次數就進入死信隊列。重試隊列與延時隊列有相同的地方,都須要設置延時級別。
消息審計是指在消息生產、存儲和消費的整個過程之間對消息個數及延遲的審計,以此來檢測是否有數據丟失、是否有數據重複、端到端的延遲又是多少等內容。
目前與消息審計有關的產品也有多個,好比 Chaperone(Uber)、Confluent Control Center、Kafka Monitor(LinkedIn),它們主要經過在消息體(value 字段)或在消息頭(headers 字段)中內嵌消息對應的時間戳 timestamp 或全局的惟一標識 ID(或者是二者兼備)來實現消息的審計功能。
內嵌 timestamp 的方式主要是設置一個審計的時間間隔 time_bucket_interval(能夠自定義設置幾秒或幾分鐘),根據這個 time_bucket_interval 和消息所屬的 timestamp 來計算相應的時間桶(time_bucket)。
內嵌 ID 的方式就更加容易理解了,對於每一條消息都會被分配一個全局惟一標識 ID。若是主題和相應的分區固定,則能夠爲每一個分區設置一個全局的 ID。當有消息發送時,首先獲取對應的 ID,而後內嵌到消息中,最後纔將它發送到 broker 中。消費者進行消費審計時,能夠判斷出哪條消息丟失、哪條消息重複。
消息軌跡指的是一條消息從生產者發出,經由 broker 存儲,再到消費者消費的整個過程當中,各個相關節點的狀態、時間、地點等數據匯聚而成的完整鏈路信息。生產者、broker、消費者這3個角色在處理消息的過程當中都會在鏈路中增長相應的信息,將這些信息匯聚、處理以後就能夠查詢任意消息的狀態,進而爲生產環境中的故障排除提供強有力的數據支持。
對消息軌跡而言,最多見的實現方式是封裝客戶端,在保證正常生產消費的同時添加相應的軌跡信息埋點邏輯。不管生產,仍是消費,在執行以後都會有相應的軌跡信息,咱們須要將這些信息保存起來。
咱們一樣能夠將軌跡信息保存到 Kafka 的某個主題中,好比下圖中的主題 trace_topic。
生產者在將消息正常發送到用戶主題 real_topic 以後(或者消費者在拉取到消息消費以後)會將軌跡信息發送到主題 trace_topic 中。
若是消費者客戶端的 isolation.level 參數配置爲「read_uncommitted」(默認),它對應的 Lag 等於HW – ConsumerOffset 的值,其中 ConsumerOffset 表示當前的消費位移。
若是這個參數配置爲「read_committed」,那麼就要引入 LSO 來進行計算了。LSO 是 LastStableOffset 的縮寫,它對應的 Lag 等於 LSO – ConsumerOffset 的值。
比較重要的 Broker 端 JMX 指標:
一個Consumer Group中能夠有多個consumer,多個consumer能夠同時消費不一樣分區的消息,大大的提升了消費者的並行消費能力。可是一個分區中的消息只能被一個Consumer Group中的一個consumer消費。
網絡傳輸上減小開銷
批量發送:
在發送消息的時候,kafka不會直接將少許數據發送出去,不然每次發送少許的數據會增長網絡傳輸頻率,下降網絡傳輸效率。kafka會先將消息緩存在內存中,當超過一個的大小或者超過必定的時間,那麼會將這些消息進行批量發送。
端到端壓縮:
固然網絡傳輸時數據量小也能夠減少網絡負載,kafaka會將這些批量的數據進行壓縮,將一批消息打包後進行壓縮,發送broker服務器後,最終這些數據仍是提供給消費者用,因此數據在服務器上仍是保持壓縮狀態,不會進行解壓,並且頻繁的壓縮和解壓也會下降性能,最終仍是以壓縮的方式傳遞到消費者的手上。
順序讀寫
kafka將消息追加到日誌文件中,利用了磁盤的順序讀寫,來提升讀寫效率。
零拷貝技術
零拷貝將文件內容從磁盤經過DMA引擎複製到內核緩衝區,並且沒有把數據複製到socket緩衝區,只是將數據位置和長度信息的描述符複製到了socket緩存區,而後直接將數據傳輸到網絡接口,最後發送。這樣大大減少了拷貝的次數,提升了效率。kafka正是調用linux系統給出的sendfile系統調用來使用零拷貝。Java中的系統調用給出的是FileChannel.transferTo接口。
Kafka 中的索引文件以稀疏索引(sparse index)的方式構造消息的索引,它並不保證每一個消息在索引文件中都有對應的索引項。每當寫入必定量(由 broker 端參數 log.index.interval.bytes 指定,默認值爲4096,即 4KB)的消息時,偏移量索引文件和時間戳索引文件分別增長一個偏移量索引項和時間戳索引項,增大或減少 log.index.interval.bytes 的值,對應地能夠增長或縮小索引項的密度。