Kafka 跨集羣同步方案

該方案解決Kafka跨集羣同步、建立Kafka集羣鏡像等相關問題,主要使用Kafka內置的MirrorMaker工具實現。java

Kafka鏡像即已有Kafka集羣的副本。下圖展現如何使用MirrorMaker工具建立從源Kafka集羣(source cluster)到目標Kafka集羣(target cluster)的鏡像。該工具經過Kafka consumer從源Kafka集羣消費數據,而後經過一個內置的Kafka producer將數據從新推送到目標Kafka集羣。面試

 

 

1、如何建立鏡像正則表達式

使用MirrorMaker建立鏡像是比較簡單的,搭建好目標Kafka集羣后,只須要啓動mirror-maker程序便可。其中,一個或多個consumer配置文件、一個producer配置文件是必須的,whitelist、blacklist是可選的。在consumer的配置中指定源Kafka集羣的Zookeeper,在producer的配置中指定目標集羣的Zookeeper(或者broker.list)。算法

kafka-run-class.sh kafka.tools.MirrorMaker –consumer.config sourceCluster1Consumer.config –consumer.config sourceCluster2Consumer.config –num.streams 2 –producer.config targetClusterProducer.config –whitelist=「.*」

例如,你須要建立S集羣的鏡像,目標集羣T已經搭建好,簡單的作法以下:網絡

1. 建立consumer配置文件:sourceClusterConsumer.config架構

zk.connect=szk0:2181,szk1:2181,szk2:2181
groupid=test-mirror-consumer-group

2. 建立producer配置文件:targetClusterProducer.config負載均衡

zk.connect=tzk0:2181,tzk1:2181

3. 建立啓動腳本:start.sh異步

$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.MirrorMaker –consumer.config sourceClusterConsumer.config –num.streams 2 –producer.config targetClusterProducer.config –whitelist=「.*」

4. 執行腳本socket

執行start.sh經過日誌信息查看運行情況,到目標Kafka集羣的log.dir中便可看到同步過來的數據。工具

2、MirrorMaker的參數說明

$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.MirrorMaker –help

執行上面的命令就能夠看到各個參數的說明:

1. 白名單(whitelist) 黑名單(blacklist)

mirror-maker接受精確指定同步topic的白名單和黑名單。使用java標準的正則表達式,爲了方便,逗號(‘,’)被編譯爲java正則中的(‘|’)。

2. Producer timeout

爲了支持高吞吐量,你最好使用異步的內置producer,並將內置producer設置爲阻塞模式(queue.enqueueTimeout.ms=-1)。這樣能夠保證數據(messages)不會丟失。不然,異步producer默認的 enqueueTimeout是0,若是producer內部的隊列滿了,數據(messages)會被丟棄,並拋出QueueFullExceptions異常。而對於阻塞模式的producer,若是內部隊列滿了就會一直等待,從而有效的節制內置consumer的消費速度。你能夠打開producer的的trace logging,隨時查看內部隊列剩餘的量。若是producer的內部隊列長時間處於滿的狀態,這說明對於mirror-maker來講,將消息從新推到目標Kafka集羣或者將消息寫入磁盤是瓶頸。

對於kafka的producer同步異步的詳細配置請參考$KAFKA_HOME/config/producer.properties文件。關注其中的producer.type和queue.enqueueTimeout.ms這兩個字段。

3. Producer 重試次數(retries)

若是你在producer的配置中使用broker.list,你能夠設置當發佈數據失敗時候的重試次數。retry參數只在使用broker.list的時候使用,由於在重試的時候會從新選擇broker。

4. Producer 數量

經過設置—num.producers參數,可使用一個producer池來提升mirror maker的吞吐量。在接受數據(messages)的broker上的producer是隻使用單個線程來處理的。就算你有多個消費流,吞吐量也會在producer處理請求的時候被限制。

5. 消費流(consumption streams)數量

使用—num.streams能夠指定consumer的線程數。請注意,若是你啓動多個mirror maker進程,你可能須要看看其在源Kafka集羣partitions的分佈狀況。若是在每一個mirror maker進程上的消費流(consumption streams)數量太多,某些消費進程若是不擁有任何分區的消費權限會被置於空閒狀態,主要緣由在於consumer的負載均衡算法。

6. 淺迭代(Shallow iteration)與producer壓縮

咱們建議在mirror maker的consumer中開啓淺迭代(shallow iteration)。意思就是mirror maker的consumer不對已經壓縮的消息集(message-sets)進行解壓,只是直接將獲取到的消息集數據同步到producer中。

若是你開啓淺迭代(shallow iteration),那麼你必須關閉mirror maker中producer的壓縮功能,不然消息集(message-sets)會被重複壓縮。

7. Consumer 和 源Kafka集羣(source cluster)的 socket buffer sizes

鏡像常常用在跨集羣場景中,你可能但願經過一些配置選項來優化內部集羣的通訊延遲和特定硬件性能瓶頸。通常來講,你應該對mirror-maker中consumer的socket.buffersize 和源集羣broker的socket.send.buffer設定一個高的值。此外,mirror-maker中消費者(consumer)的fetch.size應該設定比socket.buffersize更高的值。注意,套接字緩衝區大小(socket buffer size)是操做系統網絡層的參數。若是你啓用trace級別的日誌,你能夠檢查實際接收的緩衝區大小(buffer size),以肯定是否調整操做系統的網絡層。

3、如何檢驗MirrorMaker運行情況

Consumer offset checker工具能夠用來檢查鏡像對源集羣的消費進度。例如:

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker –group KafkaMirror –zkconnect localhost:2181 –topic test-topic
KafkaMirror,topic1,0-0 (Group,Topic,BrokerId-PartitionId)
            Owner = KafkaMirror_jkoshy-ld-1320972386342-beb4bfc9-0
  Consumer offset = 561154288
                  = 561,154,288 (0.52G)
         Log size = 2231392259
                  = 2,231,392,259 (2.08G)
     Consumer lag = 1670237971
                  = 1,670,237,971 (1.56G)
BROKER INFO
0 -> 127.0.0.1:9092

注意,–zkconnect參數須要指定到源集羣的Zookeeper。另外,若是指定topic沒有指定,則打印當前消費者group下全部topic的信息。

歡迎工做一到五年的Java工程師朋友們加入Java架構開發:855835163

本羣提供免費的學習指導 架構資料 以及免費的解答

不懂得問題均可以在本羣提出來 以後還會有職業生涯規劃以及面試指導

同時你們能夠多多關注一下小編 你們一塊兒學習進步

相關文章
相關標籤/搜索