首先Kafka是一個分佈式消息隊列中間件,Apache頂級項目,https://kafka.apache.org/ 高性能、持久化、多副本備份、橫向擴展。算法
生產者Producer往隊列裏發送消息,消費者Consumer從隊列裏消費消息,而後進行業務邏輯。應用場景主要有:解耦、削峯(緩衝)、異步處理、排隊、分佈式事務控制等等。apache
Kafka Data Flow 消息流轉圖緩存
上圖中,消息生產者Producers往Brokers裏面的指定Topic中寫消息,消息消費者Consumers從Brokers裏面消費指定Topic的消息,而後進行業務處理。架構
在實際的部署架構中,Broker、Topic、Partition這些元數據保存在ZooKeeper中,Kafka的監控、消息路由(分區)由ZooKeeper控制。0.8版本的OffSet也由ZooKeeper控制。異步
1、消息生產/發送過程分佈式
Kafka建立Message、發送時要指定對應的Topic和Value(消息體),Key(分區鍵)和Partition(分區)是可選參數。 性能
調用Producer的Send()方法後,消息先進行序列化(消息序列化器可自定義實現:例如:Protobuf),而後按照Topic和Partition,臨時放到內存中指定的發送隊列中。達到閾值後,而後批量發送。fetch
發送時,當Partition沒設置時,若是設置了Key-分區鍵(例如:單據類型),按照Key進行Hash取模,保證相同的Key發送到指定的分區Partition。若是未設置分區鍵Key,使用Round-Robin輪詢隨機選分區Partition。優化
2、分區Partition的高可用和選舉機制ui
分區有副本的概念,保證消息不丟失。當存在多副本的狀況下,會盡可能把多個副本,分配到不一樣的broker上。
Kafka會爲Partition選出一個Leader Broker(經過ZooKeeper),以後全部該Partition的請求,實際操做的都是Leader,而後再同步到其餘的Follower。
當一個Kafka Broker宕機後,全部Leader在該Broker上的Partition都會從新選舉,在剩餘的Follower中選出一個Leader,繼續提供服務。
正如上面所講:Kafka使用ZooKeeper在多個Broker中選出一個Controller,用於Partition分配和Leader選舉。如下是Partition的分配機制:
Controller會在ZooKeeper的/brokers/ids節點上註冊Watch,一旦有broker宕機,它就能知道。
當Broker宕機後,Controller就會給受到影響的Partition選出新Leader。
Controller從ZooKeeper的/brokers/topics/[topic]/partitions/[partition]/state中,讀取對應Partition的ISR(in-sync replica已同步的副本)列表,選一個出來作Leader。
選出Leader後,更新ZooKeeper的存儲,而後發送LeaderAndISRRequest給受影響的Broker進行通知。
若是ISR列表是空,那麼會根據配置,隨便選一個replica作Leader,或者乾脆這個partition就是宕機了。
若是ISR列表的有機器,可是也宕機了,那麼還能夠等ISR的機器活過來。
多副本同步:
服務端這邊的處理是Follower從Leader批量拉取數據來同步。可是具體的可靠性,是由生產者來決定的。
生產者生產消息的時候,經過request.required.acks參數來設置數據的可靠性。
在acks=-1的時候,若是ISR少於min.insync.replicas指定的數目,那麼就會返回不可用。
這裏ISR列表中的機器是會變化的,根據配置replica.lag.time.max.ms,多久沒同步,就會從ISR列表中剔除。之前還有根據落後多少條消息就踢出ISR,在1.0版本後就去掉了,由於這個值很難取,在高峯的時候很容易出現節點不斷的進出ISR列表。
從ISA中選出leader後,follower會從把本身日誌中上一個高水位後面的記錄去掉,而後去和leader拿新的數據。由於新的leader選出來後,follower上面的數據,可能比新leader多,因此要截取。這裏高水位的意思,對於partition和leader,就是全部ISR中都有的最新一條記錄。消費者最多隻能讀到高水位;
從leader的角度來講高水位的更新會延遲一輪,例如寫入了一條新消息,ISR中的broker都fetch到了,可是ISR中的broker只有在下一輪的fetch中才能告訴leader。
也正是因爲這個高水位延遲一輪,在一些狀況下,kafka會出現丟數據和主備數據不一致的狀況,0.11開始,使用leader epoch來代替高水位。
交互流程
4、消息投遞語義
kafka支持3種消息投遞語義,
At most once:最多一次,消息可能會丟失,但不會重複
At least once:最少一次,消息不會丟失,可能會重複
Exactly once:只且一次,消息不丟失不重複,只且消費一次(0.11中實現,僅限於下游也是kafka)
At least once:(業務中使用比較多)
先獲取數據,再進行業務處理,業務處理成功後commit offset。
At most once:
先獲取數據,再commit offset,最後進行業務處理。
Exactly once:
首先要保證消息不丟,再去保證不重複。因此盯着At least once的緣由來搞。
業務處理的冪等性很是重要。Kafka控制不了,須要業務來實現。好比所判斷消息是否已經處理。
解決重複消費有兩個方法:
生產的冪等性:
爲每一個producer分配一個pid,做爲該producer的惟一標識。producer會爲每個<topic,partition>維護一個單調遞增的seq。相似的,broker也會爲每一個<pid,topic,partition>記錄下最新的seq。當req_seq == broker_seq+1時,broker纔會接受該消息。由於:
消息的seq不比broker的seq小,那麼說明該消息已被保存。
場景是這樣的:
其中第二、3點做爲一個事務,要麼全成功,要麼全失敗。這裏得益與offset其實是用特殊的topic去保存,這兩點都歸一爲寫多個topic的事務性處理。
引入tid(transaction id),和pid不一樣,這個id是應用程序提供的,用於標識事務,和producer是誰並不要緊。就是任何producer均可以使用這個tid去作事務,這樣進行到一半就死掉的事務,能夠由另外一個producer去恢復。
同時爲了記錄事務的狀態,相似對offset的處理,引入transaction coordinator用於記錄transaction log。在集羣中會有多個transaction coordinator,每一個tid對應惟一一個transaction coordinator。
注:transaction log刪除策略是compact,已完成的事務會標記成null,compact後不保留。
啓動事務時,先標記開啓事務,寫入數據,所有成功就在transaction log中記錄爲prepare commit狀態,不然寫入prepare abort的狀態。以後再去給每一個相關的partition寫入一條marker(commit或者abort)消息,標記這個事務的message能夠被讀取或已經廢棄。成功後 在transaction log記錄下commit/abort狀態,至此事務結束。
總體的數據流是這樣的:
當partition中寫入commit的marker後,相關的消息就可被讀取。因此kafka事務在prepare commit到commit這個時間段內,消息是逐漸可見的,而不是同一時刻可見。
消息消費事務
在目錄/${topicName}-{$partitionid}/下,存儲着實際的log文件(即segment),還有對應的索引文件。
每一個segment文件大小相等,文件名以這個segment中最小的offset命名,文件擴展名是.log;segment對應的索引的文件名字同樣,擴展名是.index。有兩個index文件,一個是offset index用於按offset去查message,一個是time index用於按照時間去查,其實這裏能夠優化合到一塊兒,下面只說offset index。整體的組織是這樣的:
爲了減小索引文件的大小,下降空間使用,方便直接加載進內存中,這裏的索引使用稀疏矩陣,不會每個message都記錄下具體位置,而是每隔必定的字節數,再創建一條索引。 索引包含兩部分,分別是baseOffset,還有position。
baseOffset:意思是這條索引對應segment文件中的第幾條message。這樣作方便使用數值壓縮算法來節省空間。例如kafka使用的是varint。
position:在segment中的絕對位置。
查找offset對應的記錄時,會先用二分法,找出對應的offset在哪一個segment中,而後使用索引,在定位出offset在segment中的大概位置,再遍歷查找message。