kafka 異步雙活方案 mirror maker2 深度解析

mirror maker2背景

一般狀況下,咱們都是使用一套kafka集羣處理業務。但有些狀況須要使用另外一套kafka集羣來進行數據同步和備份。在kafka早先版本的時候,kafka針對這種場景就有推出一個叫mirror maker的工具(mirror maker1,如下mm1即表明mirror maker1),用來同步兩個kafka集羣的數據。apache

最開始版本的mirror maker本質上就是一個消費者 + 生產者的程序。但它有諸多諸多不足,包括bootstrap

  1. 目標集羣的Topic使用默認配置建立,但一般須要手動repartition。
  2. acl和配置修改的時候不會自動同步,給多集羣管理帶來一些困難
  3. 消息會被DefaultPartitioner打散到不一樣分區,即對一個topic ,目標集羣的partition與源集羣的partition不一致。
  4. 任何配置修改,都會使得集羣變得不穩定。好比比較常見的增長topic到whitelist。
  5. 沒法讓源集羣的producer或consumer直接使用目標集羣的topic。
  6. 不保證exactly-once,可能出現重複數據到狀況
  7. mm1支持的數據備份模式較簡單,好比沒法支持active <-> active互備
  8. rebalance會致使延遲

由於存在這些問題,mirror maker難以在生產環境中使用。因此kafka2.4版本,推出一個新的mirror maker2(如下mm2即表明mirror maker2)。mirror maker2基於kafka connect工具,解決了上面說的大部分問題。api

今天主要介紹mirror maker2的設計,主要功能和部署。架構

設計和功能

總體設計

mirror maker2是基於kafka connect框架進行開發的,能夠簡單地將mirror maker2視做幾個source connector和sink connector的組合。包括:框架

  • MirrorSourceConnector, MirrorSourceTask:用來進行同步數據的connector
  • MirrorCheckpointConnector, MirrorCheckpointTask:用來同步輔助信息的connector,這裏的輔助信息主要是consumer的offset
  • MirrorHeartbeatConnector, MirrorHeartbeatTask:維持心跳的connector

不過雖然mirror maker2歲基於kafka connect框架,但它卻作了必定的改造,能夠單獨部署一個mirror maker2集羣,固然也能夠部署在kafka connect單機或kafka connect集羣環境上。這部分後面介紹部署的時候再介紹。maven

和mm1同樣,在最簡單的主從備份場景中,mm2建議部署在目標(target)集羣,即從遠端消費而後本地寫入。若是部署在源集羣端,那麼出錯的時候可能會出現丟數據的狀況。工具

其總體架構如圖:測試

image-20201123221826698

內部topic設計

mm2會在kafka生成多個內部topic ,來存儲源集羣topic相關的狀態和配置信息,以及維持心跳。主要有三個內部topic:設計

  • hearbeat topic
  • checkpoints topic
  • offset sync topic

這幾個內部topic都比較好理解,一看名字基本就知道是幹嗎用的,值得一提的是這其中checkpoints和hearbeat功能均可以經過配置關閉。下面咱們詳細介紹下這幾個topic的功能和數據格式。3d

heartbeat topic

在默認的配置中,源集羣和目標集羣都會有一個用於發送心跳的topic,consumer 客戶端經過這個 topic,一方面能夠確認當前的 connector 是否存活,另外一方面確認源集羣是否處於可用狀態。

heartbeat topic的schema以下:

  • target cluster:接收心跳集羣
  • source cluster:發送心跳的集羣
  • timestamp:時間戳

checkpoints topic

對應的connector(即MirrorCheckpointConnector)會按期向目標集羣發送checkpoint信息,主要是consumer group提交的offset ,以及相關輔助信息。

checkpoints topic 的schema以下:

  • consumer group id (String)
  • topic (String) :包含源集羣和目標集羣的 topic
  • partition (int)
  • upstream offset (int): 源集羣指定consumer group已提交的offset(latest committed offset in source cluster)
  • downstream offset (int): 目標集羣已同步的offset(latest committed offset translated to target cluster)
  • metadata (String):partition元數據
  • timestamp

mm2提供的另外一個功能,consumer切換集羣消費就是經過這個topic實現的。由於這個topic中存放了源集羣consumer group的消費offset,在某些場景(好比源集羣故障)下要切換consumer到目標集羣,就能夠經過這個topic獲取消費offset而後繼續消費。

offset sync

這個topic ,主要是在兩個集羣間同步topic partition的offset。這裏的offset並非consumer的offset,而是日誌的offset。

它的 schema 以下:

  • topic (String):topic 名
  • partition (int)
  • upstream offset (int):源集羣的 offset
  • downstream offset (int):目標集羣的 offset,和源集羣的應該保持一致

config sync

mm2會將源集羣的數據同步到目標集羣,那麼目標集羣對應的topic的讀寫權限上怎樣的呢?mm2約定了,目標集羣對應的topic(源集羣備份的那個)只有source和sink connector可以寫入。爲了實施此策略,MM2使用如下規則將 ACL 策略傳播到下游主題:

  • 若用戶對源集羣的topic有read的權限,那麼對目標集羣對應的topic也有read的權限
  • 除了mm2,別的用戶都不能寫入目標集羣對應的topic

同時會同步topic相關配置信息

acl

consumer切換集羣

源集羣的consumer group offset ,是存儲在目標集羣的checkpoint topic中,這點咱們上面已經有說到過。要獲取這些offset信息,可使用MirrorClient#remoteConsumerOffsets這個 api,而後就能用 consumer#seek api 根據給出的offset消費。

這裏順便提供下大體代碼,首先maven添加依賴:

<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>connect-mirror</artifactId>
      <version>2.4.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>connect-mirror-client</artifactId>
      <version>2.4.0</version>
    </dependency>

而後獲取offset信息:

MirrorMakerConfig mmConfig = new MirrorMakerConfig(mm2.getProp());
        MirrorClientConfig mmClientConfig = mmConfig.clientConfig("target-cluster");
        MirrorClient mmClient = new MirrorClient(mmClientConfig);

        Map<TopicPartition, OffsetAndMetadata> offsetMap =
                mmClient.remoteConsumerOffsets("my-consumer-group", "source-cluster", Duration.ofMinutes(1));

consumer#seek的用法就不演示了。

其餘功能

最後順便介紹下其餘比較基礎的功能。

源集羣和目標集羣partition保持同步

  • 消息的分區和排序,源集羣和目標集羣都會保持同樣
  • 目標集羣的分區數與源集羣分區保持同樣
  • 目標集羣只會有一個topic與源集羣topic對應
  • 目標集羣只會有一個分區與源集羣的分區對應
  • 目標集羣的partition i對應源集羣partition i

說白了就是源集羣和目標集羣的partition和消息會盡可能保持一致,固然可能會有重複消息的狀況,由於目前還不指定exactly-once,聽說後續版本會有(2.4版本之後)。

同步topic增長前綴
mm1有一個缺陷,由於mm1備份數據的時候,源集羣和目標集羣的topic名稱都是同樣的,因此可能出現兩個集羣的消息無限遞歸的狀況(就是兩個名稱相同的topic,一條消息a傳b,b再傳a,循環往復)。mm2解決這個缺陷,採用了給topic加一個前綴的方式,若是是兩個集羣相互備份,那麼有前綴的topic的消息,是不會備份的。

同步配置和acl
mm1的時候,配置信息和topic acl相關的信息是不會同步的,這會給集羣管理帶來必定的困難,mm2解決了這個問題,即源集羣的配置和acl都會自動同步到目標集羣中。

說完功能,最後再介紹下部署方式。

部署方式

目前主要支持三種部署方式

  • mm2專用集羣部署:無需依賴kafka connect,mm2已經提供了一個driver能夠單獨部署mm2集羣,僅需一條命令就能夠啓動:./bin/connect-mirror-maker.sh mm2.properties
  • 依賴kafka connect集羣部署:須要先啓動kafka connect集羣模式,而後手動啓動每一個mm2相關的connector,相對比較繁瑣。適合已經有kafka connect集羣的場景。
  • 依賴kafka connect單機部署:須要在配置文件中配置好各個connector,而後啓動Kafka connect單機服務。不過這種方式便捷性不如mm2專用集羣模式,穩定性不如kafka connect 集羣模式,適合測試環境下部署。

mm2 相關的配置參照KIP-382,主要配置包括 source 和 target 的 broker 配置,hearbeat ,checkpoint 功能是否啓用,同步時間間隔等。

mm2獨立集羣部署

要部署mm2集羣相對比較簡單,只須要先在config/mm2.properties寫個配置文件:

# 指定兩個集羣,以及對應的host
clusters = us-west, us-east
us-west.bootstrap.servers = host1:9092
us-east.bootstrap.servers = host2:9092

# 指定同步備份的topic & consumer group,支持正則
topics = .*
groups = .*
emit.checkpoints.interval.seconds = 10

# 指定複製鏈條,能夠是雙向的
us-west->us-east.enabled = true
# us-east->us-west.enabled = true  # 雙向,符合條件的兩個集羣的topic會相互備份

# 能夠自定義一些配置
us-west.offset.storage.topic = mm2-offsets
# 也能夠指定是否啓用哪一個鏈條的hearbeat,默認是雙向hearbeat的
us-west->us-east.emit.heartbeats.enabled = false

而後使用一條命令就能夠啓動了,./bin/connect-mirror-maker.sh mm2.properties。啓動後用jps觀察進程,再list下topic,能夠發現多了許多個topic,這種時候應該就啓動成功了。

順便說下,若是是使用kafka connect集羣,那須要手動啓動每一個connector,相似這樣:

PUT /connectors/us-west-source/config HTTP/1.1
 
{
    "name": "us-west-source",
    "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "source.cluster.alias": "us-west",
    "target.cluster.alias": "us-east",
    "source.cluster.bootstrap.servers": "us-west-host1:9091",
    "topics": ".*"
}

以上~

相關文章
相關標籤/搜索