Kafka跨集羣遷移方案MirrorMaker原理、使用以及性能調優實踐

 

序言
Kakfa MirrorMaker是Kafka 官方提供的跨數據中心的流數據同步方案。其實現原理,其實就是經過從Source Cluster消費消息而後將消息生產到Target Cluster,即普通的消息生產和消費。用戶只要經過簡單的consumer配置和producer配置,而後啓動Mirror,就能夠實現準實時的數據同步。nginx

1. Kafka MirrorMaker基本特性
Kafka Mirror的基本特性有:算法

在Target Cluster沒有對應的Topic的時候,Kafka MirrorMaker會自動爲咱們在Target Cluster上建立一個如出一轍(Topic Name、分區數量、副本數量)如出一轍的topic。若是Target Cluster存在相同的Topic則不進行建立,而且,MirrorMaker運行Source Cluster和Target Cluster的Topic的分區數量和副本數量不一樣。
同咱們使用Kafka API建立KafkaConsumer同樣,Kafka MirrorMaker容許咱們指定多個Topic。好比,TopicA|TopicB|TopicC。在這裏,|實際上是正則匹配符,MirrorMaker也兼容使用逗號進行分隔。
多線程支持。MirrorMaker會在每個線程上建立一個Consumer對象,若是性能容許,建議多建立一些線程
多進程任意橫向擴展,前提是這些進程的consumerGroup相同。不管是多進程仍是多線程,都是由Kafka ConsumerGroup的設計帶來的任意橫向擴展性,具體的分區分派,即具體的TopicPartition會分派給Group中的哪一個Topic負責,是Kafka自動完成的,Consumer無需關心。
咱們使用Kafka MirrorMaker完成遠程的AWS(Source Cluster)上的Kafka信息同步到公司的計算集羣(Target Cluster)。因爲咱們的大數據集羣只有一個統一的出口IP,外網訪問咱們的內網服務器必須經過nginx轉發,所以爲了下降複雜度,決定使用「拉」模式而不是「推」模式,即,Kafka MirrorMaker部署在咱們內網集羣(Target Cluster),它負責從遠程的Source Cluster(AWS)的Kafka 上拉取數據,而後生產到本地的Kafka。
Kafka MirrorMaker的官方文檔一直沒有更新,所以新版Kafka爲MirrorMaker增長的一些參數、特性等在文檔上每每找不到,須要看Kafka MirrorMaker的源碼。Kafka MirrorMaker的主類位於kafka.tools.MirrorMaker,尤爲是一些參數的解析邏輯和主要的執行流程,會比較有助於咱們理解、調試和優化Kafka MirrorMaker。apache

 

這是我啓動Kakfa MirrorMaker 的命令:bootstrap

nohup ./bin/kafka-mirror-maker.sh --new.consumer --consumer.config config/mirror-consumer.properties --num.streams 40 --producer.config config/mirror-producer.properties --whitelist 'ABTestMsg|AppColdStartMsg|BackPayMsg|WebMsg|GoldOpenMsg|BoCaiMsg' &

mirror-consumer.properties配置文件以下:服務器

#新版consumer擯棄了對zookeeper的依賴,使用bootstrap.servers告訴consumer kafka server的位置
bootstrap.servers=ip-188-33-33-31.eu-central-1.compute.internal:9092,ip-188-33-33-32.eu-central-1.compute.internal:9092,ip-188-33-33-33.eu-central-1.compute.internal:9092網絡

#若是使用舊版Consumer,則使用zookeeper.connect
#zookeeper.connect=ip-188-33-33-31.eu-central-1.compute.internal:2181,ip-188-33-33-32.eu-central-1.compute.internal:2181,ip-188-33-33-33.eu-central-1.compute.internal:2181
1.compute.internal:2181
#change the default 40000 to 50000
request.timeout.ms=50000session

#hange default heartbeat interval from 3000 to 15000
heartbeat.interval.ms=30000多線程

#change default session timeout from 30000 to 40000
session.timeout.ms=40000
#consumer group id
group.id=africaBetMirrorGroupTest
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
#restrict the max poll records from 2147483647 to 200000
max.poll.records=20000
#set receive buffer from default 64kB to 512kb
receive.buffer.bytes=524288app

#set max amount of data per partition to override default 1048576
max.partition.fetch.bytes=5248576
#consumer timeout
#consumer.timeout.ms=5000

mirror-producer.properties的配置文件以下:dom

bootstrap.servers=10.120.241.146:9092,10.120.241.82:9092,10.120.241.110:9092

# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=

# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type=sync

# specify the compression codec for all data generated: none, gzip, snappy, lz4.
# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively
compression.codec=none
# message encoder
serializer.class=kafka.serializer.DefaultEncoder

同時,我使用kafka-consumer-groups.sh循環監控消費延遲:

bin/kafka-consumer-groups.sh --bootstrap-server ip-188-33-33-31.eu-central-1.compute.internal:9092,ip-188-33-33-32.eu-central-1.compute.internal:9092,ip-188-33-33-33.eu-central-1.compute.internal:9092 --describe --group africaBetMirrorGroupTest --new-consumer

當咱們使用new KafkaConsumer進行消息消費,要想經過kafka-consumer-groups.sh獲取整個group的offset、lag延遲信息,也必須加上–new-consumer,告知kafka-consumer-groups.sh,這個group的消費者使用的是new kafka consumer,即group中全部consumer的信息保存在了Kafka上的一個名字叫作__consumer_offsets的特殊topic上,而不是保存在zookeeper上。我在使用kafka-consumer-groups.sh的時候就不知道還須要添加--new-consumer,結果我啓動了MirrorMaker之後,感受消息在消費,可是就是在zookeeper的/consumer/ids/上找不到group的任何信息。後來在stack overflow上問了別人才知道。

3. 負載不均衡緣由診斷以及問題解決
在個人另一篇博客《Kafka爲Consumer分派分區:RangeAssignor和RoundRobinAssignor》中,介紹了Kafka內置的分區分派策略:RangeAssignor和RoundRobinAssignor。因爲RangeAssignor是早期版本的Kafka的惟一的分區分派策略,所以,默認不配置的狀況下,Kafka使用RangeAssignor進行分區分派,可是,在MirrorMaker的使用場景下,RoundRobinAssignor更有利於均勻的分區分派。甚至在KAFKA-3831中有人建議直接將MirrorMaker的默認分區分派策略改成RoundRobinAssignor。那麼,它們到底有什麼區別呢?咱們先來看兩種策略下的分區分派結果。在個人實驗場景下,有6個topic:ABTestMsg|AppColdStartMsg|BackPayMsg|WebMsg|GoldOpenMsg|BoCaiMsg,每一個topic有兩個分區。因爲MirrorMaker所在的服務器性能良好,我設置--num.streams 40,即單臺MirrorMaker會用40個線程,建立40個獨立的Consumer進行消息消費,兩個MirrorMaker加起來80個線程,80個並行Consumer。因爲總共只有6 * 2=12個TopicPartition,所以最多也只有12個Consumer會被分派到分區,其他Consumer空閒。
咱們來看基於RangeAssignor分派策略,運行kafka-consumer-groups.sh觀察到的分區分派的結果:

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
ABTestMsg 0 780000 820038 49938 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-0
ABTestMsg 1 774988 820038 55000 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-1
AppColdStartMsg 0 774000 820039 55938 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-0
AppColdStartMsg 1 774100 820045 56038 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-1
BackPayMsg 0 780000 820038 49938 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-0
BackPayMsg 1 774988 820038 55000 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-1
WebMsg 0 774000 820039 55938 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-0
WebMsg 1 774100 820045 56038 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-1
GoldOpenMsg 0 780000 820038 49938 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-0
GoldOpenMsg 1 774988 820038 55000 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-1
BoCaiMsg 0 774000 820039 55938 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-0
BoCaiMsg 1 774100 820045 56038 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-1
- - - - - africaBetMirrorGroupTest-6-ae373364-2ae2-42b8-8a74-683557e315bf/114.113.198.126 africaBetMirrorGroupTest-6
- - - - - africaBetMirrorGroupTest-9-0e346b46-1a2c-46a2-a2da-d977402f5c5d/114.113.198.126 africaBetMirrorGroupTest-9
- - - - - africaBetMirrorGroupTest-7-f0ae9f31-33e6-4ddd-beac-236fb7cf20d5/114.113.198.126 africaBetMirrorGroupTest-7
- - - - - africaBetMirrorGroupTest-7-e2a9e905-57c1-40a6-a7f3-4aefd4f1a30a/114.113.198.126 africaBetMirrorGroupTest-7
- - - - - africaBetMirrorGroupTest-8-480a2ef5-907c-48ed-be1f-33450903ec72/114.113.198.126 africaBetMirrorGroupTest-8
- - - - - africaBetMirrorGroupTest-8-4206bc08-58a5-488a-b756-672fb4eee6e0/114.113.198.126 africaBetMirrorGroupTest-8
.....後續更多空閒consumer省略不顯示

當沒有在mirror-consumer.properties 中配置分區分派策略,即便用默認的RangeAssignor的時候,咱們發現,儘管咱們每個MirrorMaker有40個Consumer,整個Group中有80個Consumer,可是,一共6 * 2 = 12個TopicPartition居然所有彙集在2-3個Consumer上,顯然,這徹底浪費了並行特性,被分配到一個consumer上的多個TopicPartition只能串行消費。

所以,經過partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor顯式指定分區分派策略爲RoundRobinAssignor,重啓MirrorMaker,從新經過kafka-consumer-groups.sh 命令觀察分區分派和消費延遲結果:

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
ABTestMsg 0 819079 820038 959 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-1
ABTestMsg 1 818373 820038 1665 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-5
AppColdStartMsg 0 818700 818907 1338 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-20
AppColdStartMsg 1 818901 820045 1132 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-18
BackPayMsg 0 819032 820038 959 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-5
BackPayMsg 1 818343 820038 1638 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-8
WebMsg 0 818710 818907 1328 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-7
WebMsg 1 818921 820045 1134 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-9
GoldOpenMsg 0 819032 820038 959 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-12
GoldOpenMsg 1 818343 820038 1638 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-14
BoCaiMsg 0 818710 818907 1322 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-14
BoCaiMsg 1 818921 820045 1189 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-117
- - - - - africaBetMirrorGroupTest-6-ae373364-2ae2-42b8-8a74-683557e315bf/114.113.198.126 africaBetMirrorGroupTest-6
- - - - - africaBetMirrorGroupTest-9-0e346b46-1a2c-46a2-a2da-d977402f5c5d/114.113.198.126 africaBetMirrorGroupTest-9
- - - - - africaBetMirrorGroupTest-7-f0ae9f31-33e6-4ddd-beac-236fb7cf20d5/114.113.198.126 africaBetMirrorGroupTest-7
- - - - - africaBetMirrorGroupTest-7-e2a9e905-57c1-40a6-a7f3-4aefd4f1a30a/114.113.198.126 africaBetMirrorGroupTest-7
- - - - - africaBetMirrorGroupTest-8-480a2ef5-907c-48ed-be1f-33450903ec72/114.113.198.126 africaBetMirrorGroupTest-8
- - - - - africaBetMirrorGroupTest-8-4206bc08-58a5-488a-b756-672fb4eee6e0/114.113.198.126 africaBetMirrorGroupTest-8
.....後續更多空閒consumer省略不顯示

對比RangeAssingor,消息延遲明顯減輕,並且,12個TopicPartition被均勻分配到了不一樣的consumer上,即單個Consumer只負責一個TopicPartition的消息消費,不一樣的TopicPartition之間實現了徹底並行。
之因此出現以上不一樣,緣由在於兩個分區分派方式的策略不一樣:

RangeAssingor:先對全部Consumer進行排序,而後對Topic逐個進行分區分派。用以上Topic做爲例子:
對全部的Consumer進行排序,排序後的結果爲Consumer-0,Consumer-1,Consumer-2 ....Consumer-79
對ABTestMsg進行分區分派:
ABTestMsg-0分配給Consumer-0
ABTestMsg-1分配各Consumer-1

對AppColdStartMsg進行分區分派:
AppColdStartMsg-0分配各Consumer-0
AppColdStartMsg-1分配各Consumer-1

#後續TopicParition的分派以此類推

可見,RangeAssingor 會致使多個TopicPartition被分派在少許分區上面。
- RoundRobinAssignor:與RangeAssignor最大的區別,是再也不逐個Topic進行分區分派,而是先將Group中的全部TopicPartition平鋪展開,再一次性對他們進行一輪分區分派。

將Group中的全部TopicPartition展開,展開結果爲:

ABTestMsg-0,ABTestMsg-1,AppColdStartMsg-0,AppColdStartMsg-1,BackPayMsg-0,BackPayMsg-1,WebMsg-0,WebMsg-1,GoldOpenMsg-0,GoldOpenMsg-1,BoCaiMsg-0,BoCaiMsg-1

對全部的Consumer進行排序,排序後的結果爲Consumer-0,Consumer-1,Consumer-2 ,Consumer-79。

開始講平鋪的TopicPartition進行分區分派

ABTestMsg-0分配給Consumer-0
ABTestMsg-1分配給Consumer-1
AppColdStartMsg-0分配給Consumer-2
AppColdStartMsg-1分配給Consumer-3
BackPayMsg-0分配給Consumer-4
BackPayMsg-1分配給Consumer-5


#後續TopicParition的分派以此類推

因而可知,RoundRobinAssignor平鋪式的分區分派算法是讓咱們的Kafka MirrorMaker可以無重疊地將TopicParition分派給Consumer的緣由。

4. 自己網絡帶寬限制問題
網絡帶寬自己也會限制Kafka Mirror的吞吐量。進行壓測的時候,我分別在咱們的在線環境和測試環境分別運行Kafka MirrorMaker,均選擇兩臺服務器運行MirrorMaker,可是在線環境是實體機環境,單臺機器經過SCP方式拷貝Source Cluster上的大文件,平均吞吐量是600KB-1.5MB之間,可是測試環境的機器是同一個host主機上的多臺虛擬機,SCP吞吐量是100KB如下。通過測試,測試環境消息積壓會逐漸增多,在線環境持續積壓,可是積壓一直保持穩定。這種穩定積壓是因爲每次poll()的間隙新產生的消息量,屬於正常現象。

5. 適當配置單次poll的消息總量和單次poll()的消息大小
經過Kafka MirrorMaker運行時指定的consumer配置文件(在個人環境中爲$MIRROR_HOME/config/mirror-consumer.properties)來配置consumer。其中,經過如下配置,能夠控制單次poll()的消息體量(數量和整體大小)
max.poll.records:單次poll()操做最多消費的消息總量,這裏說的poll是單個consumer而言的。不管過大太小,都會發生問題:

若是設置得太小,則消息傳輸率下降,大量的頭信息會佔用較大的網絡帶寬;-
若是設置得過大,則會產生一個很是難以判斷緣由同時又會影響整個group中全部消息的消費的重要問題:rebalance。看過kafka代碼的話能夠看到,每次poll()請求都會順帶向遠程server發送心跳信息,遠程GroupCoordinator會根據這個心跳信息判斷consumer的活性。若是超過指定時間(heartbeat.interval.ms)沒有收到對應Consumer的心跳,則GroupCoordinator會斷定這個Server已經掛掉,所以將這個Consumer負責的partition分派給其它Consumer,即觸發rebalance。rebalance操做的影響範圍是整個Group,即Group中全部的Consumer所有暫停消費直到Rebalance完成。並且,TopicPartition越長,這個過程會越長。其實,一個正常消費的環境,應該是任什麼時候候都不該該發生rebalance的(一個新的Consumer的正常加入也會引發Rebalance,這種狀況除外)。雖然Kafka自己是很是穩定的,可是仍是應該儘可能避免rebalance的發生。在某些極端狀況下觸發一些bug,rebalance可能永遠停不下來了。。。若是單次max.poll.records消費太多消息,這些消息produce到Target Cluster的時間可能會較長,從而可能觸發Rebalance。
6. 惡劣網絡環境下增長超時時間配置
在不穩定的網絡環境下,應該增長部分超時時間配置,如request.timeout.ms或者session.timeout.ms,一方面能夠避免頻繁的超時致使大量沒必要要的重試操做,同時,經過增長如上文所講,經過增長heartbeat.interval.ms時間,能夠避免沒必要要的rebalance操做。固然,在網絡環境良好的狀況下,上述配置能夠適當減少以增長Kafka Server對MirrorMaker出現異常狀況下的更加及時的響應。

總之,Kafka MirrorMaker做爲跨數據中心的Kafka數據同步方案,絕對沒法容許數據丟失以及數據的傳輸速度低於生產速度致使數據越積累越多。所以,惟有進行充分的壓測和精準的性能調優,才能綜合網絡環境、服務器性能,將Kafka MirrorMaker的性能發揮到最大。

相關文章
相關標籤/搜索