Kafka是一種分佈式的消息系統。本文基於0.9.0版本,新版kafka加入了流處理組件kafka stream,最新的官方文檔又自稱分佈式流處理平臺。前端
一個典型的kafka集羣中包含若干producer(能夠是web前端產生的page view,或者是服務器日誌,系統CPU、memory等),若干broker(Kafka支持水平擴展,通常broker數量越多,集羣吞吐率越高),若干consumer group,以及一個Zookeeper集羣。Kafka經過Zookeeper管理集羣配置,選舉leader,以及在consumer group發生變化時進行rebalance。producer使用push模式將消息發佈到broker,consumer使用pull模式從broker訂閱並消費消息。 node
對於每一個Topic,Kafka會爲其維護一個以下圖所示的分區的日誌文件
每一個partition(分區)是一個有序的、不可修改的消息組成的隊列;這些消息是被不斷的appended到這個commit log(提交日誌文件)上的。在這些patitions之中的每一個消息都會被賦予一個叫作offset的順序id編號,用來在partition之中惟一性的標示這個消息。git
Kafka集羣會保存一個時間段內全部被髮布出來的信息,不管這個消息是否已經被消費過。github
Kafka僅僅提供提供partition以內的消息的全局有序,在不一樣的partition之間不能擔保。partition的消息有序性加上能夠按照指定的key劃分消息的partition,這基本上知足了大部分應用的需求。若是你必需要實現一個全局有序的消息隊列,那麼能夠採用Topic只劃分1個partition來實現。可是這就意味着你的每一個消費組只能有惟一的一個消費者進程。web
每個consumer實例都屬於一個consumer group,每一條消息都會被全部訂閱了該topic的consumer group消費。經過group id指定consumer group。至關於同一個consumer group的消費者會瓜分全部的分區,每一個consumer會消費一個或多個分區。apache
使用high-level consumer時,同一個consumer group裏只有一個consumer能消費到該消息。服務器
由於high level不用client關心offset, 會自動的讀zookeeper中該Consumer group的last offset,至關於全部consumer都公用這個offset。當其中一個consumer消費一條消息時,offset就移動到下一條。網絡
訂閱模式:每一個Consumer都採用不一樣的group,每一條消息都會發送給全部消費者
消息隊列模式:全部的Consumer在同一個Group裏,消費者之間負載均衡session
新建topic時,經過–partitions 能夠設置分區數。能夠指定partitions數爲broker的整數倍,這樣,每一個broker會對應相同個數的partitions。架構
生產者在生產數據的時候,能夠爲每條消息指定Key,這樣消息被髮送到broker時,會根據分區規則選擇被存儲到哪個partitions中。若是分區規則設置的合理,那麼全部的消息將會被均勻的分佈到不一樣的partitions中,這樣就實現了負載均衡和水平擴展。
Kafka保證同一consumer group中只有一個consumer會消費某條消息,實際上,Kafka保證的是穩定狀態下每個consumer實例只會消費某一個或多個特定partition的數據,而某個partition的數據只會被某一個特定的consumer實例所消費。其中consumer和partition數量關係以下表所示:
consumer和partition數量關係 | 消費狀況 |
---|---|
小於 | 至少有一個consumer會消費多個partition的數據 |
相等 | 一個consumer消費一個partition的數據 |
大於 | 部分consumer沒法消費該topic下任何一條消息,浪費 |
增減consumer,broker,partition會致使rebalance,rebalance後consumer對應的partition會發生變化,在後面的實例中也能夠看到。
利用kafka中自帶的生產者和消費者例子來作個簡單的測試。具體步驟在另外一篇kakfa部署中。
kafka集羣有3個broker節點。新建一個partitions數量爲3的topic。啓動一個A終端爲生產者,啓動B、C、D、E終端爲消費者。C、D、E終端爲一個consumer group,B爲單獨的一個consumer group。
在producer終端輸入消息從1-20。能夠看到B終端會輸出1-20所有消息,圖中B所示。而C、D、E終端因爲屬於同一個Consumer Group,partitions數量等於consumer,每一個consumer消費了一個partition裏的消息。圖中爲C、D、E。
將C終端斷開,剩下B、D、E去消費消息。B終端仍是會輸出1-20所有消息,圖中爲B1所示。而D、E屬於同一個Consumer Group,且consumer數量少於partition數,能夠看到D消費了兩個partition中的數據,見圖中D1所示。
replication策略是基於partition。kafka經過建立topic時能夠經過–replication-factor配置partition副本數。配置副本以後,每一個partition都有一個惟一的leader,有0個或多個follower。
全部的讀寫操做都在leader上完成,leader批量從leader上pull數據。followers從leader消費消息來複制message,就跟普通的consumer消費消息同樣。
通常狀況下partition的數量大於等於broker的數量,而且全部partition的leader均勻分佈在broker上。
broker是否alive包含兩個條件:
leader會track「in sync」的node list。若是一個follower宕機,或者落後太多,leader將把它從」in sync」 list中移除。
一條消息只有被「in sync」 list裏的全部follower都從leader複製過去纔會被認爲已提交。這樣就避免了部分數據被寫進了leader,還沒來得及被任何follower複製就宕機了,而形成數據丟失(consumer沒法消費這些數據)
而對於producer而言,它能夠選擇是否等待消息commit,這能夠經過producer的「acks」來設置。默認爲acks=all ,這意味着leader將等待全部follower複製完消息。
leader掛掉後,怎樣在follower中選舉出新的leader?
Kafka在Zookeeper中動態維護了一個ISR(in-sync replicas) set,這個set裏的全部replica都跟上了leader,只有ISR裏的成員纔有被選爲leader的可能。
若是某一個partition的全部replica都掛了,就沒法保證數據不丟失了。這種狀況下有兩種可行的方案:
kafka採用第二種方案,能夠經過配置unclean.leader.election.enable來關閉這種方案。
kafka集羣有3個broker節點。具體部署在另外一篇kafka部署中。
作個簡單的測試,建立一個3分區的topic,不指定副本數,能夠看到默認一個副本,Partition均勻分佈在各broker。
1
2
3
4
5
6
|
[fangyeqing@xxxx kafka_2.11-0.9.0.0]$bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 3 --topic test3partitions
[fangyeqing@xxxx kafka_2.11-0.9.0.0]$bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test3partitions
Topic:test3partitions PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test3partitions Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: test3partitions Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: test3partitions Partition: 2 Leader: 0 Replicas: 0 Isr: 0
|
建立一個3分區2副本的topic,能夠看到Replicas和Isr中有1個follower。例如Partitions:0的Leader爲broker:1,follower爲broker:2,而且2在Isr中,理論上當Leader掛掉以後,2會頂上。
1
2
3
4
5
6
|
[fangyeqing@xxxx kafka_2.11-0.9.0.0]$bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 3 --topic test3partition2replication --replication-factor 2
[fangyeqing@xxxx kafka_2.11-0.9.0.0]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test3partition2replication
Topic:test3partition2replication PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test3partition2replication Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test3partition2replication Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: test3partition2replication Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
|
建立一個3分區2副本的topic,能夠看到Replicas和Isr中有2個follower
1
2
3
4
5
6
|
[fangyeqing@xxxx kafka_2.11-0.9.0.0]$bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 3 --topic test3partition3replication --replication-factor 3
[fangyeqing@xxxx kafka_2.11-0.9.0.0]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test3partition3replication
Topic:test3partition3replication PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test3partition3replication Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: test3partition3replication Partition: 1 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: test3partition3replication Partition: 2 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
|
目前有下面幾種消息確保機制:
Kafka默認保證At least once,而且容許經過設置producer異步提交來實現At most once。下面分階段分析:
當Producer向broker發送消息,由上述Replication的分析可知,一旦這條消息已經被commit,若是這個topic有多個replication(副本),某個broker掛掉也不會丟失消息。
Producer發送數據給broker的過程當中,若是遇到網絡問題而形成通訊中斷:
當Consumer從broker消費消息時,consumer若是在消費消息時crash: