MQ(消息隊列)是跨進程通訊的方式之一,可理解爲異步rpc,上游系統對調用結果的態度每每是重要不緊急。使用消息隊列有如下好處:業務解耦、流量削峯、靈活擴展。接下來介紹消息中間件Kafka。
Kafka是什麼?
Kafka是一個分佈式的消息引擎。具備如下特徵
node
Kafka架構總覽
算法
Topic
消息的主題、隊列,每個消息都有它的topic,Kafka經過topic對消息進行歸類。Kafka中能夠將Topic從物理上劃分紅一個或多個分區(Partition),每一個分區在物理上對應一個文件夾,以」topicName_partitionIndex」的命名方式命名,該dir包含了這個分區的全部消息(.log)和索引文件(.index),這使得Kafka的吞吐率能夠水平擴展。
Partition
每一個分區都是一個 順序的、不可變的消息隊列, 而且能夠持續的添加;分區中的消息都被分了一個序列號,稱之爲偏移量(offset),在每一個分區中此偏移量都是惟一的。
producer在發佈消息的時候,能夠爲每條消息指定Key,這樣消息被髮送到broker時,會根據分區算法把消息存儲到對應的分區中(一個分區存儲多個消息),若是分區規則設置的合理,那麼全部的消息將會被均勻的分佈到不一樣的分區中,這樣就實現了負載均衡。
緩存
Broker
Kafka server,用來存儲消息,Kafka集羣中的每個服務器都是一個Broker,消費者將從broker拉取訂閱的消息
Producer
向Kafka發送消息,生產者會根據topic分發消息。生產者也負責把消息關聯到Topic上的哪個分區。最簡單的方式從分區列表中輪流選擇。也能夠根據某種算法依照權重選擇分區。算法可由開發者定義。
Cousumer
Consermer實例能夠是獨立的進程,負責訂閱和消費消息。消費者用consumerGroup來標識本身。同一個消費組能夠併發地消費多個分區的消息,同一個partition也能夠由多個consumerGroup併發消費,可是在consumerGroup中一個partition只能由一個consumer消費
CousumerGroup
Consumer Group:同一個Consumer Group中的Consumers,Kafka將相應Topic中的每一個消息只發送給其中一個Consumer
Kafka producer 設計原理
發送消息的流程
bash
1.序列化消息&&.計算partition
根據key和value的配置對消息進行序列化,而後計算partition:
ProducerRecord對象中若是指定了partition,就使用這個partition。不然根據key和topic的partition數目取餘,若是key也沒有的話就隨機生成一個counter,使用這個counter來和partition數目取餘。這個counter每次使用的時候遞增。
2發送到batch&&喚醒Sender 線程
根據topic-partition獲取對應的batchs(Dueue<ProducerBatch>),而後將消息append到batch中.若是有batch滿了則喚醒Sender 線程。隊列的操做是加鎖執行,因此batch內消息時有序的。後續的Sender操做當前方法異步操做。
服務器
3.Sender把消息有序發到 broker(tp replia leader)
3.1 肯定tp relica leader 所在的broker
微信
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
private final Node[] offlineReplicas;
}
複製代碼
3.2 冪等性發送
爲實現Producer的冪等性,Kafka引入了Producer ID(即PID)和Sequence Number。對於每一個PID,該Producer發送消息的每一個<Topic, Partition>都對應一個單調遞增的Sequence Number。一樣,Broker端也會爲每一個<PID, Topic, Partition>維護一個序號,而且每Commit一條消息時將其對應序號遞增。對於接收的每條消息,若是其序號比Broker維護的序號)大一,則Broker會接受它,不然將其丟棄:
網絡
4. Sender處理broker發來的produce response
一旦broker處理完Sender的produce請求,就會發送produce response給Sender,此時producer將執行咱們爲send()設置的回調函數。至此producer的send執行完畢。
吞吐性&&延時:
架構
Sender線程和長鏈接
每初始化一個producer實例,都會初始化一個Sender實例,新增到broker的長鏈接。
代碼角度:每初始化一次KafkaProducer,都賦一個空的client
併發
public KafkaProducer(final Map<String, Object> configs) { this(configs, null, null, null, null, null, Time.SYSTEM); }
複製代碼
終端查看TCP鏈接數:
lsof -p portNum -np | grep TCP
Consumer設計原理
poll消息
app
位移管理
consumer的消息位移表明了當前group對topic-partition的消費進度,consumer宕機重啓後能夠繼續從該offset開始消費。
在kafka0.8以前,位移信息存放在zookeeper上,因爲zookeeper不適合高併發的讀寫,新版本Kafka把位移信息當成消息,發往__consumers_offsets 這個topic所在的broker,__consumers_offsets默認有50個分區。
消息的key 是groupId+topic_partition,value 是offset.
Kafka Group 狀態
重平衡reblance
當一些緣由致使consumer對partition消費再也不均勻時,kafka會自動執行reblance,使得consumer對partition的消費再次平衡。
何時發生rebalance?:
reblance過程
舉例1 consumer被檢測爲崩潰引發的reblance
好比心跳線程在timeout時間內沒和broker發送心跳,此時coordnator認爲該group應該進行reblance。接下來其餘consumer發來fetch請求後,coordnator將回復他們進行reblance通知。當consumer成員收到請求後,只有leader會根據分配策略進行分配,而後把各自的分配結果返回給coordnator。這個時候只有consumer leader返回的是實質數據,其餘返回的都爲空。收到分配方法後,consumer將會把分配策略同步給各consumer
舉例2 consumer加入引發的reblance
(上圖圖片摘自網絡)
引伸:以上reblance機制存在的問題
在大型系統中,一個topic可能對應數百個consumer實例。這些consumer陸續加入到一個空消費組將致使屢次的rebalance;此外consumer 實例啓動的時間不可控,頗有可能超出coordinator肯定的rebalance timeout(即max.poll.interval.ms),將會再次觸發rebalance,而每次rebalance的代價又至關地大,由於不少狀態都須要在rebalance前被持久化,而在rebalance後被從新初始化。
新版本改進
經過延遲進入PreparingRebalance狀態減小reblance次數
新版本新增了group.initial.rebalance.delay.ms參數。空消費組接受到成員加入請求時,不當即轉化到PreparingRebalance狀態來開啓reblance。當時間超過group.initial.rebalance.delay.ms後,再把group狀態改成PreparingRebalance(開啓reblance)。實現機制是在coordinator底層新增一個group狀態:InitialReblance。假設此時有多個consumer陸續啓動,那麼group狀態先轉化爲InitialReblance,待group.initial.rebalance.delay.ms時間後,再轉換爲PreparingRebalance(開啓reblance)
Broker設計原理
Broker 是Kafka 集羣中的節點。負責處理生產者發送過來的消息,消費者消費的請求。以及集羣節點的管理等。因爲涉及內容較多,先簡單介紹,後續專門抽出一篇文章分享
broker zk註冊
broker消息存儲
broker狀態數據
broker設計中,每臺機器都保存了相同的狀態數據。主要包括如下:
broker負載均衡
分區數量負載:各臺broker的partition數量應該均勻
partition Replica分配算法以下:
容量大小負載:每臺broker的硬盤佔用大小應該均勻
在kafka1.1以前,Kafka可以保證各臺broker上partition數量均勻,但因爲每一個partition內的消息數不一樣,可能存在不一樣硬盤之間內存佔用差別大的狀況。在Kafka1.1中增長了副本跨路徑遷移功能kafka-reassign-partitions.sh,咱們能夠結合它和監控系統,實現自動化的負載均衡
Kafka高可用
在介紹kafka高可用以前先介紹幾個概念
Isr
Kafka結合同步複製和異步複製,使用ISR(與Partition Leader保持同步的Replica列表)的方式在確保數據不丟失和吞吐率之間作了平衡。Producer只需把消息發送到Partition Leader,Leader將消息寫入本地Log。Follower則從Leader pull數據。Follower在收到該消息向Leader發送ACK。一旦Leader收到了ISR中全部Replica的ACK,該消息就被認爲已經commit了,Leader將增長HW而且向Producer發送ACK。這樣若是leader掛了,只要Isr中有一個replica存活,就不會丟數據。
Isr動態更新
Leader會跟蹤ISR,若是ISR中一個Follower宕機,或者落後太多,Leader將把它從ISR中移除。這裏所描述的「落後太多」指Follower複製的消息落後於Leader後的條數超過預約值(replica.lag.max.messages)或者Follower超過必定時間(replica.lag.time.max.ms)未向Leader發送fetch請求。
broker Nodes In Zookeeper
/brokers/topics/[topic]/partitions/[partition]/state 保存了topic-partition的leader和Isr等信息
Controller負責broker故障檢查&&故障轉移(fail/recover)
3.3 更新Leader、ISR、leader_epoch、controller_epoch:寫入/brokers/topics/[topic]/partitions/[partition]/state
Controller掛掉
每一個 broker 都會在 zookeeper 的臨時節點 "/controller" 註冊 watcher,當 controller 宕機時 "/controller" 會消失,觸發broker的watch,每一個 broker 都嘗試建立新的 controller path,只有一個競選成功並當選爲 controller。
使用Kafka如何保證冪等性
不丟消息
不重複
Kafka高性能
業務方對 Kafka producer的優化