Kafka的集羣算法作的很先進,大大強於ActiveMQ。ActiveMQ只有主從互備的HA,負載均衡作的很差,沒有消息分片。而Kafka在HA,負載均衡和消息分片上作的很完美。算法
Kafka的消息存儲在Broker上,每種類型的消息,存在一個Topic裏。每一個Topic分紅多個分區。這些分區能夠分佈在同一個物理機,也能夠分佈在不一樣的物理機。shell
高可用json
Kafka提供一個複製因子的概念,能夠把分區複製n份,放在不一樣的節點上。這就實現了數據的高可用。而服務的高可用,和ActiveMQ相似。是在客戶端註冊多個Broker地址,其中一個失效,客戶端會自動進行失效轉移。負載均衡
消息分片spa
消息經過輪詢,或者自定義的算法寫入不一樣的分區,則實現了消息分片。消息生產者不須要關注消息最終存入Topic的哪一個分區,也不須要關注這個分區在哪臺物理機上。分片算法會自動把消息路由到正確的分區。這個算法,很是像一致性哈希。其中每一個物理機有n個分區,這n個分區是這個物理機的虛擬節點,將這些虛擬節點放入Hash環。code
負載均衡server
每一個Topic分區有n分拷貝,可是隻有一個節點負責處理消息的發送和接收,稱爲主節點。其餘節點稱爲從節點,只是被動的同步消息。一旦主節點不能提供服務,從節點會變爲主節點。而主從節點的單位是分區。也就是一臺物理機,能夠是Topic A的主節點,Topic B的從節點。經過此方式實現負載均衡和最大限度的利用系統資源,不會有專門負責備份的從節點物理機。進程
Kafka集羣的結構圖以下:資源
配置路由
Kafka集羣的配置很簡單。配置文件是Kafka路徑的config的server.properties。而Kafka的啓動腳本須要制定配置文件。
bin/kafka-server-start.sh server.properties &
爲了能夠在單機模擬集羣的狀況,能夠將server.properties文件複製多份,如:
cp config/server.properties config/server-1.properties cp config/server.properties config/server-2.properties
而後使用一樣的命令啓動Kafka就能夠了。可是須要修改一下配置,不能讓Broker ID,端口和存儲路徑衝突。打開server.properties修改如下幾行,改爲不一樣的值。
broker.id=1 port=9093 log.dir=/tmp/kafka-logs-1
另外,zookeeper.connect這個配置也頗有用。只有指向同一個Zookeeper的Kafka纔是在同一個集羣中。
zookeeper.connect=localhost:2181
最後啓動Kafka。
bin/kafka-server-start.sh server.properties & bin/kafka-server-start.sh server-1.properties & bin/kafka-server-start.sh server-2.properties &
平衡主節點
若是某個節點崩潰或死機,那麼Kafka會將死機而影響的分區中選舉新的主節點。當這個節點從新啓動以後,它的全部的分區都是做爲從節點,形成資源的浪費。這樣能夠在啓動Kafka以後執行
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
這樣來觸發主節點從新選舉,讓Kafka更加負載均衡。若是每次手動執行命令以爲很麻煩,能夠在配置文件中配置,讓Kafka自動平衡主節點。
auto.leader.rebalance.enable=true
在集羣中增長新物理機
只要配置zookeeper.connect爲要加入的集羣,再啓動Kafka進程,就可讓新的機器加入到Kafka集羣。可是新的機器只針對新的Topic纔會起做用,在以前就已經存在的Topic的分區,不會自動的分配到新增長的物理機中。爲了使新增長的機器能夠分擔系統壓力,必須進行消息數據遷移。Kafka提供了kafka-reassign-partitions.sh進行數據遷移。這個腳本提供3個命令:
--generate 根據給予的Topic列表和Broker列表生成遷移計劃。generate並不會真正進行消息遷移,而是將消息遷移計劃計算出來,供execute命令使用。
--execute 根據給予的消息遷移計劃進行遷移。
--verify 檢查消息是否已經遷移完成。
例子:
1. 建立須要進行遷移的Topic列表,遷移Topic foo1和foo2。
cat topics-to-move.json {"topics": [{"topic": "foo1"}, {"topic": "foo2"}], "version":1 }
2. 調用--generate生成遷移計劃,將foo1和foo2遷移到Broker ID是5和6的機器上。
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
生成結果以下:
Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]}, {"topic":"foo1","partition":0,"replicas":[3,4]}, {"topic":"foo2","partition":2,"replicas":[1,2]}, {"topic":"foo2","partition":0,"replicas":[3,4]}, {"topic":"foo1","partition":1,"replicas":[2,3]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] } Proposed partition reassignment configuration {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]}, {"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":2,"replicas":[5,6]}, {"topic":"foo2","partition":0,"replicas":[5,6]}, {"topic":"foo1","partition":1,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[5,6]}] }
Current partition replica assignment表示當前的消息存儲情況。Proposed partition reassignment configuration表示遷移後的消息存儲情況。將遷移後的json存入一個文件(如expand-cluster-reassignment.json),供--execute命令使用。
3. 執行--execute遷移。
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
4. 能夠執行--verify查看遷移進度。
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
結果以下:
Status of partition reassignment: Reassignment of partition [foo1,0] completed successfully Reassignment of partition [foo1,1] is in progress Reassignment of partition [foo1,2] is in progress Reassignment of partition [foo2,0] completed successfully Reassignment of partition [foo2,1] completed successfully Reassignment of partition [foo2,2] completed successfully
在集羣中刪除物理機
在Kafka 0.8.2中會提供此功能。目前可使用增長機器的方法,先將數據都遷移到其餘的機器,再刪除該節點。