一般狀況下,咱們都是使用一套kafka集羣處理業務。但有些狀況須要使用另外一套kafka集羣來進行數據同步和備份。在kafka早先版本的時候,kafka針對這種場景就有推出一個叫mirror maker的工具(mirror maker1,如下mm1即表明mirror maker1),用來同步兩個kafka集羣的數據。apache
最開始版本的mirror maker本質上就是一個消費者 + 生產者的程序。但它有諸多諸多不足,包括bootstrap
DefaultPartitioner
打散到不一樣分區,即對一個topic ,目標集羣的partition與源集羣的partition不一致。由於存在這些問題,mirror maker難以在生產環境中使用。因此kafka2.4版本,推出一個新的mirror maker2(如下mm2即表明mirror maker2)。mirror maker2基於kafka connect工具,解決了上面說的大部分問題。api
今天主要介紹mirror maker2的設計,主要功能和部署。架構
mirror maker2是基於kafka connect框架進行開發的,能夠簡單地將mirror maker2視做幾個source connector和sink connector的組合。包括:框架
不過雖然mirror maker2歲基於kafka connect框架,但它卻作了必定的改造,能夠單獨部署一個mirror maker2集羣,固然也能夠部署在kafka connect單機或kafka connect集羣環境上。這部分後面介紹部署的時候再介紹。maven
和mm1同樣,在最簡單的主從備份場景中,mm2建議部署在目標(target)集羣,即從遠端消費而後本地寫入。若是部署在源集羣端,那麼出錯的時候可能會出現丟數據的狀況。工具
其總體架構如圖:測試
mm2會在kafka生成多個內部topic ,來存儲源集羣topic相關的狀態和配置信息,以及維持心跳。主要有三個內部topic:設計
這幾個內部topic都比較好理解,一看名字基本就知道是幹嗎用的,值得一提的是這其中checkpoints和hearbeat功能均可以經過配置關閉。下面咱們詳細介紹下這幾個topic的功能和數據格式。3d
heartbeat topic
在默認的配置中,源集羣和目標集羣都會有一個用於發送心跳的topic,consumer 客戶端經過這個 topic,一方面能夠確認當前的 connector 是否存活,另外一方面確認源集羣是否處於可用狀態。
heartbeat topic的schema以下:
checkpoints topic
對應的connector(即MirrorCheckpointConnector)會按期向目標集羣發送checkpoint信息,主要是consumer group提交的offset ,以及相關輔助信息。
checkpoints topic 的schema以下:
mm2提供的另外一個功能,consumer切換集羣消費就是經過這個topic實現的。由於這個topic中存放了源集羣consumer group的消費offset,在某些場景(好比源集羣故障)下要切換consumer到目標集羣,就能夠經過這個topic獲取消費offset而後繼續消費。
offset sync
這個topic ,主要是在兩個集羣間同步topic partition的offset。這裏的offset並非consumer的offset,而是日誌的offset。
它的 schema 以下:
mm2會將源集羣的數據同步到目標集羣,那麼目標集羣對應的topic的讀寫權限上怎樣的呢?mm2約定了,目標集羣對應的topic(源集羣備份的那個)只有source和sink connector可以寫入。爲了實施此策略,MM2使用如下規則將 ACL 策略傳播到下游主題:
同時會同步topic相關配置信息
源集羣的consumer group offset ,是存儲在目標集羣的checkpoint topic中,這點咱們上面已經有說到過。要獲取這些offset信息,可使用MirrorClient#remoteConsumerOffsets
這個 api,而後就能用 consumer#seek
api 根據給出的offset消費。
這裏順便提供下大體代碼,首先maven添加依賴:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>connect-mirror</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>connect-mirror-client</artifactId> <version>2.4.0</version> </dependency>
而後獲取offset信息:
MirrorMakerConfig mmConfig = new MirrorMakerConfig(mm2.getProp()); MirrorClientConfig mmClientConfig = mmConfig.clientConfig("target-cluster"); MirrorClient mmClient = new MirrorClient(mmClientConfig); Map<TopicPartition, OffsetAndMetadata> offsetMap = mmClient.remoteConsumerOffsets("my-consumer-group", "source-cluster", Duration.ofMinutes(1));
consumer#seek
的用法就不演示了。
最後順便介紹下其餘比較基礎的功能。
源集羣和目標集羣partition保持同步
說白了就是源集羣和目標集羣的partition和消息會盡可能保持一致,固然可能會有重複消息的狀況,由於目前還不指定exactly-once,聽說後續版本會有(2.4版本之後)。
同步topic增長前綴
mm1有一個缺陷,由於mm1備份數據的時候,源集羣和目標集羣的topic名稱都是同樣的,因此可能出現兩個集羣的消息無限遞歸的狀況(就是兩個名稱相同的topic,一條消息a傳b,b再傳a,循環往復)。mm2解決這個缺陷,採用了給topic加一個前綴的方式,若是是兩個集羣相互備份,那麼有前綴的topic的消息,是不會備份的。
同步配置和acl
mm1的時候,配置信息和topic acl相關的信息是不會同步的,這會給集羣管理帶來必定的困難,mm2解決了這個問題,即源集羣的配置和acl都會自動同步到目標集羣中。
說完功能,最後再介紹下部署方式。
目前主要支持三種部署方式
mm2 相關的配置參照KIP-382,主要配置包括 source 和 target 的 broker 配置,hearbeat ,checkpoint 功能是否啓用,同步時間間隔等。
要部署mm2集羣相對比較簡單,只須要先在config/mm2.properties寫個配置文件:
# 指定兩個集羣,以及對應的host clusters = us-west, us-east us-west.bootstrap.servers = host1:9092 us-east.bootstrap.servers = host2:9092 # 指定同步備份的topic & consumer group,支持正則 topics = .* groups = .* emit.checkpoints.interval.seconds = 10 # 指定複製鏈條,能夠是雙向的 us-west->us-east.enabled = true # us-east->us-west.enabled = true # 雙向,符合條件的兩個集羣的topic會相互備份 # 能夠自定義一些配置 us-west.offset.storage.topic = mm2-offsets # 也能夠指定是否啓用哪一個鏈條的hearbeat,默認是雙向hearbeat的 us-west->us-east.emit.heartbeats.enabled = false
而後使用一條命令就能夠啓動了,./bin/connect-mirror-maker.sh mm2.properties
。啓動後用jps觀察進程,再list下topic,能夠發現多了許多個topic,這種時候應該就啓動成功了。
順便說下,若是是使用kafka connect集羣,那須要手動啓動每一個connector,相似這樣:
PUT /connectors/us-west-source/config HTTP/1.1 { "name": "us-west-source", "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "source.cluster.alias": "us-west", "target.cluster.alias": "us-east", "source.cluster.bootstrap.servers": "us-west-host1:9091", "topics": ".*" }
以上~