Kafka 是一個分佈式消息隊列,具備高性能、持久化、多副本備份、橫向擴展能力。生產者往隊列裏寫消息,消費者從隊列裏取消息進行業務邏輯。通常在架構設計中起到解耦、削峯、異步處理的做用。正則表達式
Kafka 對外使用 Topic 的概念,生產者往 Topic 裏寫消息,消費者從中讀消息。算法
爲了作到水平擴展,一個 Topic 實際是由多個 Partition 組成的,遇到瓶頸時,能夠經過增長 Partition 的數量來進行橫向擴容。單個 Parition 內是保證消息有序。緩存
每新寫一條消息,Kafka 就是在對應的文件 append 寫,因此性能很是高。網絡
Kafka 的整體數據流是這樣的:架構
大概用法就是,Producers 往 Brokers 裏面的指定 Topic 中寫消息,Consumers 從 Brokers 裏面拉取指定 Topic 的消息,而後進行業務處理。app
圖中有兩個 Topic,Topic0 有兩個 Partition,Topic1 有一個 Partition,三副本備份。異步
能夠看到 Consumer Gourp1 中的 Consumer2 沒有分到 Partition 處理,這是有可能出現的,下面會講到。分佈式
關於 Broker、Topics、Partitions 的一些元信息用 ZK 來存,監控和路由啥的也都會用到 ZK。ide
生產性能
基本流程是這樣的:
建立一條記錄,記錄中一個要指定對應的 Topic 和 Value,Key 和 Partition 可選。
先序列化,而後按照 Topic 和 Partition,放進對應的發送隊列中。Kafka Produce 都是批量請求,會積攢一批,而後一塊兒發送,不是調 send() 就馬上進行網絡發包。
若是 Partition 沒填,那麼狀況會是這樣的:
Key 有填。按照 Key 進行哈希,相同 Key 去一個 Partition。(若是擴展了 Partition 的數量那麼就不能保證了)
Key 沒填。Round-Robin 來選 Partition。
這些要發往同一個 Partition 的請求按照配置,攢一波,而後由一個單獨的線程一次性發過去。
API
有 High Level API,替咱們把不少事情都幹了,Offset,路由啥都替咱們幹了,用起來很簡單。
還有 Simple API,Offset 啥的都是要咱們本身記錄。(注:消息消費的時候,首先要知道去哪消費,這就是路由,消費完以後,要記錄消費單哪,就是 Offset)
Partition
當存在多副本的狀況下,會盡可能把多個副本,分配到不一樣的 Broker 上。
Kafka 會爲 Partition 選出一個 Leader,以後全部該 Partition 的請求,實際操做的都是 Leader,而後再同步到其餘的 Follower。
當一個 Broker 歇菜後,全部 Leader 在該 Broker 上的 Partition 都會從新選舉,選出一個 Leader。(這裏不像分佈式文件存儲系統那樣會自動進行復制保持副本數)
而後這裏就涉及兩個細節:
怎麼分配 Partition
怎麼選 Leader
關於 Partition 的分配,還有 Leader 的選舉,總得有個執行者。在 Kafka 中,這個執行者就叫 Controller。
Kafka 使用 ZK 在 Broker 中選出一個 Controller,用於 Partition 分配和 Leader 選舉。
Partition 的分配:
將全部 Broker(假設共 n 個 Broker)和待分配的 Partition 排序。
將第 i 個 Partition 分配到第(i mod n)個 Broker 上 (這個就是 Leader)。
將第 i 個 Partition 的第 j 個 Replica 分配到第((i + j) mode n)個 Broker 上。
Leader 容災
Controller 會在 ZK 的 /brokers/ids 節點上註冊 Watch,一旦有 Broker 宕機,它就能知道。
當 Broker 宕機後,Controller 就會給受到影響的 Partition 選出新 Leader。
Controller 從 ZK 的 /brokers/topics/[topic]/partitions/[partition]/state 中,讀取對應 Partition 的 ISR(in-sync replica 已同步的副本)列表,選一個出來作 Leader。
選出 Leader 後,更新 ZK,而後發送 LeaderAndISRRequest 給受影響的 Broker,讓它們知道改變這事。
爲何這裏不是使用 ZK 通知,而是直接給 Broker 發送 RPC 請求,個人理解多是這樣作 ZK 有性能問題吧。
若是 ISR 列表是空,那麼會根據配置,隨便選一個 Replica 作 Leader,或者乾脆這個 Partition 就是歇菜。
若是 ISR 列表的有機器,可是也歇菜了,那麼還能夠等 ISR 的機器活過來。
多副本同步
這裏的策略,服務端這邊的處理是 Follower 從 Leader 批量拉取數據來同步。可是具體的可靠性,是由生產者來決定的。
生產者生產消息的時候,經過 request.required.acks 參數來設置數據的可靠性。
在 Acks=-1 的時候,若是 ISR 少於 min.insync.replicas 指定的數目,那麼就會返回不可用。
這裏 ISR 列表中的機器是會變化的,根據配置 replica.lag.time.max.ms,多久沒同步,就會從 ISR 列表中剔除。
之前還有根據落後多少條消息就踢出 ISR,在 1.0 版本後就去掉了,由於這個值很難取,在高峯的時候很容易出現節點不斷的進出 ISR 列表。
從 ISA 中選出 Leader 後,Follower 會把本身日誌中上一個高水位後面的記錄去掉,而後去和 Leader 拿新的數據。
由於新的 Leader 選出來後,Follower 上面的數據,可能比新 Leader 多,因此要截取。
這裏高水位的意思,對於 Partition 和 Leader,就是全部 ISR 中都有的最新一條記錄。消費者最多隻能讀到高水位。
從 Leader 的角度來講高水位的更新會延遲一輪,例如寫入了一條新消息,ISR 中的 Broker 都 Fetch 到了,可是 ISR 中的 Broker 只有在下一輪的 Fetch 中才能告訴 Leader。
也正是因爲這個高水位延遲一輪,在一些狀況下,Kafka 會出現丟數據和主備數據不一致的狀況,0.11 開始,使用 Leader Epoch 來代替高水位。
思考:當 Acks=-1 時
是 Follwers 都來 Fetch 就返回成功,仍是等 Follwers 第二輪 Fetch?
Leader 已經寫入本地,可是 ISR 中有些機器失敗,那麼怎麼處理呢?
消費
訂閱 Topic 是以一個消費組來訂閱的,一個消費組裏面能夠有多個消費者。同一個消費組中的兩個消費者,不會同時消費一個 Partition。
換句話來講,就是一個 Partition,只能被消費組裏的一個消費者消費,可是能夠同時被多個消費組消費。
所以,若是消費組內的消費者若是比 Partition 多的話,那麼就會有個別消費者一直空閒。
API
訂閱 Topic 時,能夠用正則表達式,若是有新 Topic 匹配上,那能自動訂閱上。
Offset 的保存
一個消費組消費 Partition,須要保存 Offset 記錄消費到哪,之前保存在 ZK 中,因爲 ZK 的寫性能很差,之前的解決方法都是 Consumer 每隔一分鐘上報一次。
這裏 ZK 的性能嚴重影響了消費的速度,並且很容易出現重複消費。在 0.10 版本後,Kafka 把這個 Offset 的保存,從 ZK 總剝離,保存在一個名叫 consumeroffsets topic 的 Topic 中。
寫進消息的 Key 由 Groupid、Topic、Partition 組成,Value 是偏移量 Offset。Topic 配置的清理策略是 Compact。老是保留最新的 Key,其他刪掉。
通常狀況下,每一個 Key 的 Offset 都是緩存在內存中,查詢的時候不用遍歷 Partition,若是沒有緩存,第一次就會遍歷 Partition 創建緩存,而後查詢返回。
肯定 Consumer Group 位移信息寫入 consumers_offsets 的哪一個 Partition,具體計算公式:
__consumers_offsets partition =
Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
//groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默認是50個分區。
思考:若是正在跑的服務,修改了 offsets.topic.num.partitions,那麼 Offset 的保存是否是就亂套了?
分配 Partition—Reblance
生產過程當中 Broker 要分配 Partition,消費過程這裏,也要分配 Partition 給消費者。
相似 Broker 中選了一個 Controller 出來,消費也要從 Broker 中選一個 Coordinator,用於分配 Partition。
下面從頂向下,分別闡述一下:
怎麼選 Coordinator
交互流程
Reblance 的流程
①選 Coordinator:看 Offset 保存在那個 Partition;該 Partition Leader 所在的 Broker 就是被選定的 Coordinator。
這裏咱們能夠看到,Consumer Group 的 Coordinator,和保存 Consumer Group Offset 的 Partition Leader 是同一臺機器。
②交互流程:把 Coordinator 選出來以後,就是要分配了。整個流程是這樣的:
Consumer 啓動、或者 Coordinator 宕機了,Consumer 會任意請求一個 Broker,發送 ConsumerMetadataRequest 請求。
Broker 會按照上面說的方法,選出這個 Consumer 對應 Coordinator 的地址。
Consumer 發送 Heartbeat 請求給 Coordinator,返回 IllegalGeneration 的話,就說明 Consumer 的信息是舊的了,須要從新加入進來,進行 Reblance。
返回成功,那麼 Consumer 就從上次分配的 Partition 中繼續執行。
③Reblance 流程:
Consumer 給 Coordinator 發送 JoinGroupRequest 請求。
這時其餘 Consumer 發 Heartbeat 請求過來時,Coordinator 會告訴他們,要 Reblance 了。
其餘 Consumer 發送 JoinGroupRequest 請求。
全部記錄在冊的 Consumer 都發了 JoinGroupRequest 請求以後,Coordinator 就會在這裏 Consumer 中隨便選一個 Leader。
而後回 JoinGroupRespone,這會告訴 Consumer 你是 Follower 仍是 Leader,對於 Leader,還會把 Follower 的信息帶給它,讓它根據這些信息去分配 Partition。
Consumer 向 Coordinator 發送 SyncGroupRequest,其中 Leader 的 SyncGroupRequest 會包含分配的狀況。
Coordinator 回包,把分配的狀況告訴 Consumer,包括 Leader。
當 Partition 或者消費者的數量發生變化時,都得進行 Reblance。
列舉一下會 Reblance 的狀況:
增長 Partition
增長消費者
消費者主動關閉
消費者宕機了
Coordinator 本身也宕機了
消息投遞語義
Kafka 支持 3 種消息投遞語義:
At most once:最多一次,消息可能會丟失,但不會重複。
At least once:最少一次,消息不會丟失,可能會重複。
Exactly once:只且一次,消息不丟失不重複,只且消費一次(0.11 中實現,僅限於下游也是 Kafka)
在業務中,經常都是使用 At least once 的模型,若是須要可重入的話,每每是業務本身實現。
At least once
先獲取數據,再進行業務處理,業務處理成功後 Commit Offset:
生產者生產消息異常,消息是否成功寫入不肯定,重作,可能寫入重複的消息。
消費者處理消息,業務處理成功後,更新 Offset 失敗,消費者重啓的話,會重複消費。
At most once
先獲取數據,再 Commit Offset,最後進行業務處理:
生產者生產消息異常,無論,生產下一個消息,消息就丟了。
消費者處理消息,先更新 Offset,再作業務處理,作業務處理失敗,消費者重啓,消息就丟了。
Exactly once
思路是這樣的,首先要保證消息不丟,再去保證不重複。因此盯着 At least once 的緣由來搞。
首先想出來的:
生產者重作致使重複寫入消息:生產保證冪等性。
消費者重複消費:消滅重複消費,或者業務接口保證冪等性重複消費也沒問題。
因爲業務接口是否冪等,不是 Kafka 能保證的,因此 Kafka 這裏提供的 Exactly once 是有限制的,消費者的下游也必須是 Kafka。
因此如下討論的,沒特殊說明,消費者的下游系統都是 Kafka(注:使用 Kafka Conector,它對部分系統作了適配,實現了 Exactly once)。生產者冪等性好作,沒啥問題。
解決重複消費有兩個方法:
下游系統保證冪等性,重複消費也不會致使多條記錄。
把 Commit Offset 和業務處理綁定成一個事務。
原本 Exactly once 實現第 1 點就 OK 了。可是在一些使用場景下,咱們的數據源多是多個 Topic,處理後輸出到多個 Topic,這時咱們會但願輸出時要麼所有成功,要麼所有失敗。這就須要實現事務性。
既然要作事務,那麼幹脆把重複消費的問題從根源上解決,把 Commit Offset 和輸出到其餘 Topic 綁定成一個事務。
生產冪等性
思路是這樣的,爲每一個 Producer 分配一個 Pid,做爲該 Producer 的惟一標識。
Producer 會爲每個維護一個單調遞增的 Seq。相似的,Broker 也會爲每一個記錄下最新的 Seq。
當 req_seq == broker_seq+1 時,Broker 纔會接受該消息,由於:
消息的 Seq 比 Broker 的 Seq 大超過期,說明中間有數據還沒寫入,即亂序了。
消息的 Seq 不比 Broker 的 Seq 小,那麼說明該消息已被保存。
事務性/原子性廣播
場景是這樣的:
先從多個源 Topic 中獲取數據。
作業務處理,寫到下游的多個目的 Topic。
更新多個源 Topic 的 Offset。
其中第 二、3 點做爲一個事務,要麼全成功,要麼全失敗。這裏得益於 Offset 其實是用特殊的 Topic 去保存,這兩點都歸一爲寫多個 Topic 的事務性處理。
基本思路是這樣的:
引入 Tid(transaction id),和 Pid 不一樣,這個 ID 是應用程序提供的,用於標識事務,和 Producer 是誰並不要緊。
就是任何 Producer 均可以使用這個 Tid 去作事務,這樣進行到一半就死掉的事務,能夠由另外一個 Producer 去恢復。
同時爲了記錄事務的狀態,相似對 Offset 的處理,引入 Transaction Coordinator 用於記錄 Transaction Log。
在集羣中會有多個 Transaction Coordinator,每一個 Tid 對應惟一一個 Transaction Coordinator。
注:Transaction Log 刪除策略是 Compact,已完成的事務會標記成 Null,Compact 後不保留。
作事務時,先標記開啓事務,寫入數據,所有成功就在 Transaction Log 中記錄爲 Prepare Commit 狀態,不然寫入 Prepare Abort 的狀態。
以後再去給每一個相關的 Partition 寫入一條 Marker(Commit 或者 Abort)消息,標記這個事務的 Message 能夠被讀取或已經廢棄。成功後在 Transaction Log記錄下 Commit/Abort 狀態,至此事務結束。
數據流:
首先使用 Tid 請求任意一個 Broker(代碼中寫的是負載最小的 Broker),找到對應的 Transaction Coordinator。
請求 Transaction Coordinator 獲取到對應的 Pid,和 Pid 對應的 Epoch,這個 Epoch 用於防止僵死進程復活致使消息錯亂。
當消息的 Epoch 比當前維護的 Epoch 小時,拒絕掉。Tid 和 Pid 有一一對應的關係,這樣對於同一個 Tid 會返回相同的 Pid。
Client 先請求 Transaction Coordinator 記錄的事務狀態,初始狀態是 Begin,若是是該事務中第一個到達的,同時會對事務進行計時。
Client 輸出數據到相關的 Partition 中;Client 再請求 Transaction Coordinator 記錄 Offset 的事務狀態;Client 發送 Offset Commit 到對應 Offset Partition。
Client 發送 Commit 請求,Transaction Coordinator 記錄 Prepare Commit/Abort,而後發送 Marker 給相關的 Partition。
所有成功後,記錄 Commit/Abort 的狀態,最後這個記錄不須要等待其餘 Replica 的 ACK,由於 Prepare 不丟就能保證最終的正確性了。
這裏 Prepare 的狀態主要是用於事務恢復,例如給相關的 Partition 發送控制消息,沒發完就宕機了,備機起來後,Producer 發送請求獲取 Pid 時,會把未完成的事務接着完成。
當 Partition 中寫入 Commit 的 Marker 後,相關的消息就可被讀取。因此 Kafka 事務在 Prepare Commit 到 Commit 這個時間段內,消息是逐漸可見的,而不是同一時刻可見。
消費事務
前面都是從生產的角度看待事務。還須要從消費的角度去考慮一些問題。
消費時,Partition 中會存在一些消息處於未 Commit 狀態,即業務方應該看不到的消息,須要過濾這些消息不讓業務看到,Kafka 選擇在消費者進程中進行過來,而不是在 Broker 中過濾,主要考慮的仍是性能。
Kafka 高性能的一個關鍵點是 Zero Copy,若是須要在 Broker 中過濾,那麼勢必須要讀取消息內容到內存,就會失去 Zero Copy 的特性。
文件組織
Kafka 的數據,其實是以文件的形式存儲在文件系統的。Topic 下有 Partition,Partition 下有 Segment,Segment 是實際的一個個文件,Topic 和 Partition 都是抽象概念。
在目錄 /partitionid}/ 下,存儲着實際的 Log 文件(即 Segment),還有對應的索引文件。
每一個 Segment 文件大小相等,文件名以這個 Segment 中最小的 Offset 命名,文件擴展名是 .log。Segment 對應的索引的文件名字同樣,擴展名是 .index。
有兩個 Index 文件:
一個是 Offset Index 用於按 Offset 去查 Message。
一個是 Time Index 用於按照時間去查,其實這裏能夠優化合到一塊兒,下面只說 Offset Index。
整體的組織是這樣的:
爲了減小索引文件的大小,下降空間使用,方便直接加載進內存中,這裏的索引使用稀疏矩陣,不會每個 Message 都記錄下具體位置,而是每隔必定的字節數,再創建一條索引。
索引包含兩部分:
BaseOffset:意思是這條索引對應 Segment 文件中的第幾條 Message。這樣作方便使用數值壓縮算法來節省空間。例如 Kafka 使用的是 Varint。
Position:在 Segment 中的絕對位置。
查找 Offset 對應的記錄時,會先用二分法,找出對應的 Offset 在哪一個 Segment 中,而後使用索引,在定位出 Offset 在 Segment 中的大概位置,再遍歷查找 Message。
經常使用配置項
Broker 配置
Topic 配置
關於日誌清理,默認當前正在寫的日誌,是怎麼也不會清理掉的。
還有 0.10 以前的版本,時間看的是日誌文件的 Mtime,但這個值是不許確的,有可能文件被 Touch 一下,Mtime 就變了。所以從 0.10 版本開始,改成使用該文件最新一條消息的時間來判斷。
按大小清理這裏也要注意,Kafka 在定時任務中嘗試比較當前日誌量總大小是否超過閾值至少一個日誌段的大小。若是超過可是沒超過一個日誌段,那麼就不會刪除。