Kafka
體系架構
- Producers
- Brokers
- Consumers
- Zookeeper Cluster
- manage kafka cluster config
- select leader
- rebalance consumer group
存儲機制
Topic
Partition
- 一個 Topic 有多個 partition
- 每一個 partition 爲每一個 consumer group 維護了一個 邏輯offset
- 一個 partition 在存儲中對應一個目錄,分紅多個 segments
- 能夠對 partition 作備份
- partition 內的消息是有序的
Segment
- 每一個 segments 對應一個 .log 文件和一個 .index 文件,在磁盤中順序存儲消息(比隨機寫內存效率要高)
- .index 和 .log 文件的命名方式是以邏輯 offset 命名的,第一個 00000000.log(20位) 00000000.index,後面的多是 00123123.log 00123123.index
- 分割是由配置決定的,分割的時間 或者 分割的大小
- .index 至關於一個索引文件,文件每行存放了一個局部 offset 和它在 .log 文件的偏移量,offset 是稀疏的,並不是連貫的,用以減小 .index 存儲
- .log 文件有本身的格式,會記錄一些元素,以及各個元素的偏移量,因此只要找到偏移量,就能遍歷下面的條數,找到對應的數據,二分查找(這個不太肯定)
- 總結起來,根據 offset 查找對應的 .index,而後對 .index 進行二分查找,肯定偏移量,而後從 .log 文件中根據偏移量查找每行數據,直到找到 offset 對應的那一條數據
高可靠 Tips:這種順序存儲機制保證了快速讀寫(順序存儲,索引),負載均衡(partition),快速過時刪除(segment),以及容災備份(replica)算法
Kafka 沒有一個緩存機制,每次都要訪問文件嗎?
緩存
複製和同步
- HW HighWatermark
- 每一個 partition(包括 partition 副本) 都會有一個 HW
- 這個 HW 決定了能讀取的最大偏移量
- LEO LogEndOffset
- 每一個 partition 也都會有一個 LEO
- 這個是真正的消息記錄的截止位置
- ISR In-Sync-Replicas
- 每一個 partition leader 維護了一個 ISR 列表,即副本同步隊列,保存了 partition follower
- 若是 follower 過慢,則可能會被從列表刪除
- replica.lag.time.max.ms
- replica.lag.max.messages
- OSR Out-Sync-Replicas
- AR Assigned-Replicas
kafka 的複製機制不是徹底同步的,也不是單純的異步複製安全
- 同步複製下降了吞吐率
- 異步複製可能會丟失數據
- ISR 能夠很好的均衡上述兩點
ISR 的信息都會反饋到 zookeeper 上,有兩個地方會維護這個信息網絡
- broker controller
- 負責管理 partition 和 replica 狀態
- 從新分配 partition
- LeaderSelector 選舉新的 partition leader,ISR,leader_epoch,controller_epoch
- 把相關消息推送給全部 replica
- partition leader
ISR 包括了 partition leader 自身架構
replication=3 表示 算主有3個負載均衡
數據可靠性和持久性
ack
- 1 leader 確認則可直接發送下一條數據
- 0 不用等 leader 確認
- -1 ISR 全部 follower 確認
- 當配置了 min.insync.replicas 這個參數,會發揮其功效,就是 至少這個數的 ISR 中的 follower 肯定後纔算提交成功,不然返回異常
擔憂切換 leader 時數據丟失,由於 leader 是否會選擇最新的,而不是隨機選的異步
在 -1 的狀況下:async
- kafka 同步,replication.factor >= 2 && min.insync.replicas >= 2,這種狀況不會丟失數據
- 若是 kafka broker 宕機, ISR 中的 follower 沒有所有同步,而返回了異常,這時候若是選擇了已經同步的 follower,會形成數據重複
恢復後同步
- 要確保一致性
- 不會從 LEO 開始
- 會從 HW 開始
- 由於可能 LEO 可能沒有同步完就 down 掉了,因此從 LEO 開始會多數據,形成不一致
leader 選舉
- 不是少數服從多數,raft 這種是,zk 是,這種方式須要大量的副本
- 大量的副本會在大數據量下致使性能的急劇降低
- 不多在須要大量數據的系統中使用
- 常見的選舉算法
- Zab
- Raft
- Paxos
- PacificA
- Viewstamped Replication
容錯處理
若是某一個partition的全部replica都掛了,就沒法保證數據不丟失了。這種狀況下有兩種可行的方案:性能
- 等待ISR中任意一個replica「活」過來,而且選它做爲leader
- 選擇第一個「活」過來的replica(並不必定是在ISR中)做爲leader
默認會採用第二種測試
對於 某個 broker down 掉,可能致使服務不可用(可讀不可寫,ack=-1,replicas>1),這時候須要調整 min.insync.replicas = 1
Procduer 發送方式
-
producer.type=sync
-
producer.type=async
若是 producer 網絡出現問題,沒有收到 ack,也會重試,因此會出現 at least once;若是 consumer 設置了自動提交,那麼在 producer 沒出問題的前提下,是 exactly once。若是手動提交,在消費結束後提交,就是 at least once,若是在以前,就是 at most once,由於可能消費失敗。因此爲了保證 exactly once,須要消費後手動提交,並加入去重機制。
總結
要保證數據寫入到Kafka是安全的,高可靠的,須要以下的配置:
- topic的配置:replication.factor>=3,即副本數至少是3個;2<=min.insync.replicas<=replication.factor
- broker的配置:leader的選舉條件unclean.leader.election.enable=false(ISR中選取Leader)
- producer的配置:request.required.acks=-1(all),producer.type=sync
測試表現:
- 當acks=-1時,Kafka發送端的TPS受限於topic的副本數量(ISR中),副本越多TPS越低;
- acks=0時,TPS最高,其次爲1,最差爲-1,即TPS:acks_0 > acks_1 > acks_-1;
- min.insync.replicas參數不影響TPS;
- partition的不一樣會影響TPS,隨着partition的個數的增加TPS會有所增加,但並非一直成正比關係,到達必定臨界值時,partition數量的增長反而會使TPS略微下降;