在Kafka在0.8之前的版本中,是沒有Replication的,一旦某一個Broker宕機,則其上全部的Partition數據都不可被消費,這與Kafka數據持久性及Delivery Guarantee的設計目標相悖。同時Producer都不能再將數據存於這些Partition中。node
若是Producer使用同步模式則Producer會在嘗試從新發送message.send.max.retries(默認值爲3)次後拋出Exception,用戶能夠選擇中止發送後續數據也可選擇繼續選擇發送。而前者會形成數據的阻塞,後者會形成本應發往該Broker的數據的丟失。算法
若是Producer使用異步模式,則Producer會嘗試從新發送message.send.max.retries(默認值爲3)次後記錄該異常並繼續發送後續數據,這會形成數據丟失而且用戶只能經過日誌發現該問題。同時,Kafka的Producer並未對異步模式提供callback接口。服務器
因而可知,在沒有Replication的狀況下,一旦某機器宕機或者某個Broker中止工做則會形成整個系統的可用性下降。隨着集羣規模的增長,整個集羣中出現該類異常的概率大大增長,所以對於生產系統而言Replication機制的引入很是重要。網絡
引入Replication以後,同一個Partition可能會有多個Replica,而這時須要在這些Replication之間選出一個Leader,Producer和Consumer只與這個Leader交互,其它Replica做爲Follower從Leader中複製數據。session
由於須要保證同一個Partition的多個Replica之間的數據一致性(其中一個宕機後其它Replica必需要能繼續服務而且即不能形成數據重複也不能形成數據丟失)。若是沒有一個Leader,全部Replica均可同時讀/寫數據,那就須要保證多個Replica之間互相(N×N條通路)同步數據,數據的一致性和有序性很是難保證,大大增長了Replication實現的複雜性,同時也增長了出現異常的概率。而引入Leader後,只有Leader負責數據讀寫,Follower只向Leader順序Fetch數據(N條通路),系統更加簡單且高效。併發
爲了更好的作負載均衡,Kafka儘可能將全部的Partition均勻分配到整個集羣上。一個典型的部署方式是一個Topic的Partition數量大於Broker的數量。同時爲了提升Kafka的容錯能力,也須要將同一個Partition的Replica儘可能分散到不一樣的機器。實際上,若是全部的Replica都在同一個Broker上,那一旦該Broker宕機,該Partition的全部Replica都沒法工做,也就達不到HA的效果。同時,若是某個Broker宕機了,須要保證它上面的負載能夠被均勻的分配到其它倖存的全部Broker上。app
Kafka分配Replica的算法以下:負載均衡
1.將全部Broker(假設共n個Broker)和待分配的Partition排序異步
2.將第i個Partition分配到第(i mod n)個Broker上分佈式
3.將第i個Partition的第j個Replica分配到第((i + j) mode n)個Broker上
Kafka的高可靠性的保障來源於其健壯的副本(replication)策略。
Producer在發佈消息到某個Partition時,先經過ZooKeeper找到該Partition的Leader,而後不管該Topic的Replication Factor爲多少,Producer只將該消息發送到該Partition的Leader。Leader會將該消息寫入其本地Log。每一個Follower都從Leader pull數據。這種方式上,Follower存儲的數據順序與Leader保持一致。Follower在收到該消息並寫入其Log後,向Leader發送ACK。一旦Leader收到了ISR中的全部Replica的ACK,該消息就被認爲已經commit了,Leader將增長HW而且向Producer發送ACK。
爲了提升性能,每一個Follower在接收到數據後就立馬向Leader發送ACK,而非等到數據寫入Log中。所以,對於已經commit的消息,Kafka只能保證它被存於多個Replica的內存中,而不能保證它們被持久化到磁盤中,也就不能徹底保證異常發生後該條消息必定能被Consumer消費。
Consumer讀消息也是從Leader讀取,只有被commit過的消息纔會暴露給Consumer。
Kafka Replication的數據流以下圖所示:
對於Kafka而言,定義一個Broker是否「活着」包含兩個條件:
Leader會跟蹤與其保持同步的Replica列表,該列表稱爲ISR(即in-sync Replica)。若是一個Follower宕機,或者落後太多,Leader將把它從ISR中移除。這裏所描述的「落後太多」指Follower複製的消息落後於Leader後的條數超過預約值(該值可在$KAFKA_HOME/config/server.properties中經過replica.lag.max.messages配置,其默認值是4000)或者Follower超過必定時間(該值可在$KAFKA_HOME/config/server.properties中經過replica.lag.time.max.ms來配置,其默認值是10000)未向Leader發送fetch請求。
Kafka的複製機制既不是徹底的同步複製,也不是單純的異步複製。事實上,徹底同步複製要求全部能工做的Follower都複製完,這條消息纔會被認爲commit,這種複製方式極大的影響了吞吐率(高吞吐率是Kafka很是重要的一個特性)。而異步複製方式下,Follower異步的從Leader複製數據,數據只要被Leader寫入log就被認爲已經commit,這種狀況下若是Follower都複製完都落後於Leader,而若是Leader忽然宕機,則會丟失數據。而Kafka的這種使用ISR的方式則很好的均衡了確保數據不丟失以及吞吐率。Follower能夠批量的從Leader複製數據,這樣極大的提升複製性能(批量寫磁盤),極大減小了Follower與Leader的差距。
須要說明的是,Kafka只解決fail/recover,不處理「Byzantine」(「拜占庭」)問題。一條消息只有被ISR裏的全部Follower都從Leader複製過去纔會被認爲已提交。這樣就避免了部分數據被寫進了Leader,還沒來得及被任何Follower複製就宕機了,而形成數據丟失(Consumer沒法消費這些數據)。而對於Producer而言,它能夠選擇是否等待消息commit,這能夠經過request.required.acks來設置。這種機制確保了只要ISR有一個或以上的Follower,一條被commit的消息就不會丟失。
Leader選舉本質上是一個分佈式鎖,有兩種方式實現基於ZooKeeper的分佈式鎖:
一種很是經常使用的選舉leader的方式是「Majority Vote」(「少數服從多數」),但Kafka並未採用這種方式。這種模式下,若是咱們有2f+1個Replica(包含Leader和Follower),那在commit以前必須保證有f+1個Replica複製完消息,爲了保證正確選出新的Leader,fail的Replica不能超過f個。由於在剩下的任意f+1個Replica裏,至少有一個Replica包含有最新的全部消息。這種方式有個很大的優點,系統的latency只取決於最快的幾個Broker,而非最慢那個。Majority Vote也有一些劣勢,爲了保證Leader Election的正常進行,它所能容忍的fail的follower個數比較少。若是要容忍1個follower掛掉,必需要有3個以上的Replica,若是要容忍2個Follower掛掉,必需要有5個以上的Replica。也就是說,在生產環境下爲了保證較高的容錯程度,必需要有大量的Replica,而大量的Replica又會在大數據量下致使性能的急劇降低。這就是這種算法更多用在ZooKeeper這種共享集羣配置的系統中而不多在須要存儲大量數據的系統中使用的緣由。例如HDFS的HA Feature是基於majority-vote-based journal,可是它的數據存儲並無使用這種方式。
Kafka在ZooKeeper中動態維護了一個ISR(in-sync replicas),這個ISR裏的全部Replica都跟上了leader,只有ISR裏的成員纔有被選爲Leader的可能。在這種模式下,對於f+1個Replica,一個Partition能在保證不丟失已經commit的消息的前提下容忍f個Replica的失敗。在大多數使用場景中,這種模式是很是有利的。事實上,爲了容忍f個Replica的失敗,Majority Vote和ISR在commit前須要等待的Replica數量是同樣的,可是ISR須要的總的Replica的個數幾乎是Majority Vote的一半。
雖然Majority Vote與ISR相比有不需等待最慢的Broker這一優點,可是Kafka做者認爲Kafka能夠經過Producer選擇是否被commit阻塞來改善這一問題,而且節省下來的Replica和磁盤使得ISR模式仍然值得。
在ISR中至少有一個follower時,Kafka能夠確保已經commit的數據不丟失,但若是某個Partition的全部Replica都宕機了,就沒法保證數據不丟失了。這種狀況下有兩種可行的方案:
1.等待ISR中的任一個Replica「活」過來,而且選它做爲Leader
2.選擇第一個「活」過來的Replica(不必定是ISR中的)做爲Leader
這就須要在可用性和一致性當中做出一個簡單的折衷。若是必定要等待ISR中的Replica「活」過來,那不可用的時間就可能會相對較長。並且若是ISR中的全部Replica都沒法「活」過來了,或者數據都丟失了,這個Partition將永遠不可用。選擇第一個「活」過來的Replica做爲Leader,而這個Replica不是ISR中的Replica,那即便它並不保證已經包含了全部已commit的消息,它也會成爲Leader而做爲consumer的數據源(前文有說明,全部讀寫都由Leader完成)。Kafka0.8.*使用了第二種方式。根據Kafka的文檔,在之後的版本中,Kafka支持用戶經過配置選擇這兩種方式中的一種,從而根據不一樣的使用場景選擇高可用性仍是強一致性。
最簡單最直觀的方案是,全部Follower都在ZooKeeper上設置一個Watch,一旦Leader宕機,其對應的ephemeral znode會自動刪除,此時全部Follower都嘗試建立該節點,而建立成功者(ZooKeeper保證只有一個能建立成功)便是新的Leader,其它Replica即爲Follower。
可是該方法會有3個問題:
1.split-brain 這是由ZooKeeper的特性引發的,雖然ZooKeeper能保證全部Watch按順序觸發,但並不能保證同一時刻全部Replica「看」到的狀態是同樣的,這就可能形成不一樣Replica的響應不一致
2.herd effect 若是宕機的那個Broker上的Partition比較多,會形成多個Watch被觸發,形成集羣內大量的調整
3.ZooKeeper負載太重 每一個Replica都要爲此在ZooKeeper上註冊一個Watch,當集羣規模增長到幾千個Partition時ZooKeeper負載會太重。
Kafka 0.8.*的Leader Election方案解決了上述問題,它在全部broker中選出一個controller,全部Partition的Leader選舉都由controller決定。controller會將Leader的改變直接經過RPC的方式(比ZooKeeper Queue的方式更高效)通知需爲爲此做爲響應的Broker。同時controller也負責增刪Topic以及Replica的從新分配。
producer採用推(push)模式將消息發佈到broker,每條消息都被追加(append)到分區(patition)中,屬於順序寫磁盤(順序寫磁盤效率比隨機寫內存要高,保障kafka吞吐率)。
Kafka集羣有多個消息代理服務器(broker-server)組成,發佈到Kafka集羣的每條消息都有一個類別,用主題(topic)來表示。一般,不一樣應用產生不一樣類型的數據,能夠設置不一樣的主題。一個主題通常會有多個消息的訂閱者,當生產者發佈消息到某個主題時,訂閱了這個主題的消費者均可以接收到生成者寫入的新消息。
Kafka集羣爲每一個主題維護了分佈式的分區(partition)日誌文件,物理意義上能夠把主題(topic)看做進行了分區的日誌文件(partition log)。主題的每一個分區都是一個有序的、不可變的記錄序列,新的消息會不斷追加到日誌中。分區中的每條消息都會按照時間順序分配到一個單調遞增的順序編號,叫作偏移量(offset),這個偏移量可以惟一地定位當前分區中的每一條消息。
消息發送時都被髮送到一個topic,其本質就是一個目錄,而topic是由一些Partition Logs(分區日誌)組成,其組織結構以下圖所示:
下圖中的topic有3個分區,每一個分區的偏移量都從0開始,不一樣分區之間的偏移量都是獨立的,不會相互影響。
咱們能夠看到,每一個Partition中的消息都是有序的,生產的消息被不斷追加到Partition log上,其中的每個消息都被賦予了一個惟一的offset值。
發佈到Kafka主題的每條消息包括鍵值和時間戳。消息到達服務器端的指定分區後,都會分配到一個自增的偏移量。原始的消息內容和分配的偏移量以及其餘一些元數據信息最後都會存儲到分區日誌文件中。消息的鍵也能夠不用設置,這種狀況下消息會均衡地分佈到不一樣的分區。
1) 分區的緣由
(1)方便在集羣中擴展,每一個Partition能夠經過調整以適應它所在的機器,而一個topic又能夠有多個Partition組成,所以整個集羣就能夠適應任意大小的數據了;
(2)能夠提升併發,由於能夠以Partition爲單位讀寫了。
傳統消息系統在服務端保持消息的順序,若是有多個消費者消費同一個消息隊列,服務端會以消費存儲的順序依次發送給消費者。但因爲消息是異步發送給消費者的,消息到達消費者的順序多是無序的,這就意味着在並行消費時,傳統消息系統沒法很好地保證消息被順序處理。雖然咱們能夠設置一個專用的消費者只消費一個隊列,以此來解決消息順序的問題,可是這就使得消費處理沒法真正執行。
Kafka比傳統消息系統有更強的順序性保證,它使用主題的分區做爲消息處理的並行單元。Kafka以分區做爲最小的粒度,將每一個分區分配給消費者組中不一樣的並且是惟一的消費者,並確保一個分區只屬於一個消費者,即這個消費者就是這個分區的惟一讀取線程。那麼,只要分區的消息是有序的,消費者處理的消息順序就有保證。每一個主題有多個分區,不一樣的消費者處理不一樣的分區,因此Kafka不只保證了消息的有序性,也作到了消費者的負載均衡。
2)分區的原則
(1)指定了patition,則直接使用;
(2)未指定patition但指定key,經過對key的value進行hash出一個patition
(3)patition和key都未指定,使用輪詢選出一個patition。
DefaultPartitioner類 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
同一個partition可能會有多個replication(對應 server.properties 配置中的 default.replication.factor=N)。沒有replication的狀況下,一旦broker 宕機,其上全部 patition 的數據都不可被消費,同時producer也不能再將數據存於其上的patition。引入replication以後,同一個partition可能會有多個replication,而這時須要在這些replication之間選出一個leader,producer和consumer只與這個leader交互,其它replication做爲follower從leader 中複製數據。
producer寫入消息流程以下:
1)producer先從zookeeper的 "/brokers/.../state"節點找到該partition的leader
2)producer將消息發送給該leader
3)leader將消息寫入本地log
4)followers從leader pull消息,寫入本地log後向leader發送ACK
5)leader收到全部ISR中的replication的ACK後,增長HW(high watermark,最後commit 的offset)並向producer發送ACK
物理上把 topic 分紅一個或多個 patition(對應 server.properties 中的 num.partitions=3 配置),每一個 patition 物理上對應一個文件夾(該文件夾存儲該 patition 的全部消息和索引文件),以下:
不管消息是否被消費,kafka 都會保留全部消息。有兩種策略能夠刪除舊數據:
一、 基於時間:log.retention.hours=168 二、 基於大小:log.retention.bytes=1073741824
須要注意的是,由於Kafka讀取特定消息的時間複雜度爲O(1),即與文件大小無關,因此這裏刪除過時文件與提升 Kafka 性能無關。
該目錄下znode只有在有相關操做時纔會存在,操做結束時會將其刪除
/admin/reassign_partitions用於將一些Partition分配到不一樣的broker集合上。對於每一個待從新分配的Partition,Kafka會在該znode上存儲其全部的Replica和相應的Broker id。該znode由管理進程建立而且一旦從新分配成功它將會被自動移除。
即/brokers/ids/[brokerId])存儲「活着」的broker信息。
topic註冊信息(/brokers/topics/[topic]),存儲該topic的全部partition的全部replica所在的broker id,第一個replica即爲preferred replica,對一個給定的partition,它在同一個broker上最多隻有一個replica,所以broker id可做爲replica id。
/controller -> int (broker id of the controller)存儲當前controller的信息
/controller_epoch -> int (epoch)直接以整數形式存儲controller epoch,而非像其它znode同樣以JSON字符串形式存儲。
kafka提供了兩套consumer API:高級Consumer API和低級API。
消息由生產者發佈到Kafka集羣后,會被消費者消費。消息的消費模型有兩種:推送模型(push)和拉取模型(pull)。
基於推送模型(push)的消息系統,由消息代理記錄消費者的消費狀態。消息代理在將消息推送到消費者後,標記這條消息爲已消費,但這種方式沒法很好地保證消息被處理。好比,消息代理把消息發送出去後,當消費進程掛掉或者因爲網絡緣由沒有收到這條消息時,就有可能形成消息丟失(由於消息代理已經把這條消息標記爲已消費了,但實際上這條消息並無被實際處理)。若是要保證消息被處理,消息代理髮送完消息後,要設置狀態爲「已發送」,只有收到消費者的確認請求後才更新爲「已消費」,這就須要消息代理中記錄全部的消費狀態,這種作法顯然是不可取的。
Kafka採用拉取模型,由消費者本身記錄消費狀態,每一個消費者互相獨立地順序讀取每一個分區的消息。以下圖所示,有兩個消費者(不一樣消費者組)拉取同一個主題的消息,消費者A的消費進度是3,消費者B的消費進度是6。消費者拉取的最大上限經過最高水位(watermark)控制,生產者最新寫入的消息若是尚未達到備份數量,對消費者是不可見的。這種由消費者控制偏移量的優勢是:消費者能夠按照任意的順序消費消息。好比,消費者能夠重置到舊的偏移量,從新處理以前已經消費過的消息;或者直接跳到最近的位置,從當前的時刻開始消費。
在一些消息系統中,消息代理會在消息被消費以後當即刪除消息。若是有不一樣類型的消費者訂閱同一個主題,消息代理可能須要冗餘地存儲同一消息;或者等全部消費者都消費完才刪除,這就須要消息代理跟蹤每一個消費者的消費狀態,這種設計很大程度上限制了消息系統的總體吞吐量和處理延遲。Kafka的作法是生產者發佈的全部消息會一致保存在Kafka集羣中,無論消息有沒有被消費。用戶能夠經過設置保留時間來清理過時的數據,好比,設置保留策略爲兩天。那麼,在消息發佈以後,它能夠被不一樣的消費者消費,在兩天以後,過時的消息就會自動清理掉。
1)高級API優勢
高級API 寫起來簡單
不須要自行去管理offset,系統經過zookeeper自行管理。
不須要管理分區,副本等狀況,.系統自動管理。
消費者斷線會自動根據上一次記錄在zookeeper中的offset去接着獲取數據(默認設置1分鐘更新一下zookeeper中存的offset)
可使用group來區分對同一個topic 的不一樣程序訪問分離開來(不一樣的group記錄不一樣的offset,這樣不一樣程序讀取同一個topic纔不會由於offset互相影響)
2)高級API缺點
不能自行控制offset(對於某些特殊需求來講)
不能細化控制如分區、副本、zk等
1)低級 API 優勢
可以讓開發者本身控制offset,想從哪裏讀取就從哪裏讀取。
自行控制鏈接分區,對分區自定義進行負載均衡
對zookeeper的依賴性下降(如:offset不必定非要靠zk存儲,自行存儲offset便可,好比存在文件或者內存中)
2)低級API缺點
太過複雜,須要自行控制offset,鏈接哪一個分區,找到分區leader 等。
消費者是以consumer group消費者組的方式工做,由一個或者多個消費者組成一個組,共同消費一個topic。每一個分區在同一時間只能由group中的一個消費者讀取,可是多個group能夠同時消費這個partition。在圖中,有一個由三個消費者組成的group,有一個消費者讀取主題中的兩個分區,另外兩個分別讀取一個分區。某個消費者讀取某個分區,也能夠叫作某個消費者是某個分區的擁有者。
在這種狀況下,消費者能夠經過水平擴展的方式同時讀取大量的消息。另外,若是一個消費者失敗了,那麼其餘的group成員會自動負載均衡讀取以前失敗的消費者讀取的分區。
consumer採用pull(拉)模式從broker中讀取數據。
push(推)模式很難適應消費速率不一樣的消費者,由於消息發送速率是由broker決定的。它的目標是儘量以最快速度傳遞消息,可是這樣很容易形成consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而pull模式則能夠根據consumer的消費能力以適當的速率消費消息。
對於Kafka而言,pull模式更合適,它可簡化broker的設計,consumer可自主控制消費消息的速率,同時consumer能夠本身控制消費方式——便可批量消費也可逐條消費,同時還能選擇不一樣的提交方式從而實現不一樣的傳輸語義。
pull模式不足之處是,若是kafka沒有數據,消費者可能會陷入循環中,一直等待數據到達。爲了不這種狀況,咱們在咱們的拉請求中有參數,容許消費者請求在等待數據到達的「長輪詢」中進行阻塞(而且可選地等待到給定的字節數,以確保大的傳輸大小)。
1)需求:測試同一個消費者組中的消費者,同一時刻只能有一個消費者消費。
2)案例實操
(1)在node2一、node22上修改/opt/module/kafka/config/consumer.properties配置文件中的group.id屬性爲任意組名。
[root@node22 config]$ vi consumer.properties
group.id=admin
(2)在node2一、node22上分別啓動消費者
[root@node21 kafka]$ bin/kafka-console-consumer.sh --zookeeper node21:2181,node22:2181,node23:2181 --topic first --consumer.config config/consumer.properties
[root@node22 kafka]$ bin/kafka-console-consumer.sh --zookeeper node21:2181,node22:2181,node23:2181 --topic first --consumer.config config/consumer.properties
(3)在node23上啓動生產者
[root@node23 kafka]$ bin/kafka-console-producer.sh --broker-list node21:9092 --topic first
>hello world
(4)查看node21和node22的接收者。
同一時刻只有一個消費者接收到消息。
建立 topic 的序列圖以下所示:
流程說明:
一、 controller 在 ZooKeeper 的 /brokers/topics 節點上註冊 watcher,當 topic 被建立,則 controller 會經過 watch 獲得該 topic 的 partition/replica 分配。 二、 controller從 /brokers/ids 讀取當前全部可用的 broker 列表,對於 set_p 中的每個 partition: 2.一、 從分配給該 partition 的全部 replica(稱爲AR)中任選一個可用的 broker 做爲新的 leader,並將AR設置爲新的 ISR 2.二、 將新的 leader 和 ISR 寫入 /brokers/topics/[topic]/partitions/[partition]/state 三、 controller 經過 RPC 向相關的 broker 發送 LeaderAndISRRequest。
刪除 topic 的序列圖以下所示:
流程說明:
一、 controller 在 zooKeeper 的 /brokers/topics 節點上註冊 watcher,當 topic 被刪除,則 controller 會經過 watch 獲得該 topic 的 partition/replica 分配。 二、 若 delete.topic.enable=false,結束;不然 controller 註冊在 /admin/delete_topics 上的 watch 被 fire,controller 經過回調向對應的 broker 發送 StopReplicaRequest。
kafka broker failover 序列圖以下所示:
流程說明:
一、 controller 在 zookeeper 的 /brokers/ids/[brokerId] 節點註冊 Watcher,當 broker 宕機時 zookeeper 會 fire watch 二、 controller 從 /brokers/ids 節點讀取可用broker 三、 controller決定set_p,該集合包含宕機 broker 上的全部 partition 四、 對 set_p 中的每個 partition 4.一、 從/brokers/topics/[topic]/partitions/[partition]/state 節點讀取 ISR 4.二、 決定新 leader 4.三、 將新 leader、ISR、controller_epoch 和 leader_epoch 等信息寫入 state 節點 五、 經過 RPC 向相關 broker 發送 leaderAndISRRequest 命令
當 controller 宕機時會觸發 controller failover。每一個 broker 都會在 zookeeper 的 "/controller" 節點註冊 watcher,當 controller 宕機時 zookeeper 中的臨時節點消失,全部存活的 broker 收到 fire 的通知,每一個 broker 都嘗試建立新的 controller path,只有一個競選成功並當選爲 controller。
當新的 controller 當選時,會觸發 KafkaController.onControllerFailover 方法,在該方法中完成以下操做:
一、 讀取並增長 Controller Epoch。 二、 在 reassignedPartitions Patch(/admin/reassign_partitions) 上註冊 watcher。 三、 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上註冊 watcher。 四、 經過 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上註冊 watcher。 五、 若 delete.topic.enable=true(默認值是 false),則 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上註冊 watcher。 六、 經過 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上註冊Watch。 七、 初始化 ControllerContext 對象,設置當前全部 topic,「活」着的 broker 列表,全部 partition 的 leader 及 ISR等。 八、 啓動 replicaStateMachine 和 partitionStateMachine。 九、 將 brokerState 狀態設置爲 RunningAsController。 十、 將每一個 partition 的 Leadership 信息發送給全部「活」着的 broker。 十一、 若 auto.leader.rebalance.enable=true(默認值是true),則啓動 partition-rebalance 線程。 十二、 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,則刪除相應的Topic。