Kafka數據遷移

1.概述

Kafka的使用場景很是普遍,一些實時流數據業務場景,均依賴Kafka來作數據分流。而在分佈式應用場景中,數據遷移是一個比較常見的問題。關於Kafka集羣數據如何遷移,今天筆者將爲你們詳細介紹。html

2.內容

本篇博客爲你們介紹兩種遷移場景,分別是同集羣數據遷移、跨集羣數據遷移。以下圖所示:git

2.1 同集羣遷移

同集羣之間數據遷移,好比在已有的集羣中新增了一個Broker節點,此時須要將原來集羣中已有的Topic的數據遷移部分到新的集羣中,緩解集羣壓力。github

將新的節點添加到Kafka集羣很簡單,只需爲它們分配一個惟一的Broker ID,並在新服務器上啓動Kafka。可是,這些新服務器節點不會自動分配任何數據分區,所以除非將分區移動到新增的節點,不然在建立新Topic以前新節點不會執行任何操做。所以,一般在將新服務器節點添加到Kafka集羣時,須要將一些現有數據遷移到這些新的節點。json

遷移數據的過程是手動啓動的,執行過程是徹底自動化的。在Kafka後臺服務中,Kafka將添加新服務器做爲其正在遷移的分區的Follower,並容許新增節點徹底複製該分區中的現有數據。當新服務器節點徹底複製此分區的內容並加入同步副本(ISR)時,其中一個現有副本將刪除其分區的數據。服務器

Kafka系統提供了一個分區從新分配工具(kafka-reassign-partitions.sh),該工具可用於在Broker之間遷移分區。理想狀況下,將確保全部Broker的數據和分區均勻分配。分區從新分配工具沒法自動分析Kafka羣集中的數據分佈並遷移分區以實現均勻的負載均衡。所以,管理員在操做的時候,必須弄清楚應該遷移哪些Topic或分區。負載均衡

分區從新分配工具能夠在3種互斥模式下運行:分佈式

  • --generate:在此模式下,給定Topic列表和Broker列表,該工具會生成候選從新​​分配,以將指定Topic的全部分區遷移到新Broker中。此選項僅提供了一種方便的方法,可在給定Topic和目標Broker列表的狀況下生成分區從新分配計劃。
  • --execute:在此模式下,該工具將根據用戶提供的從新分配計劃啓動分區的從新分配。 (使用--reassignment-json-file選項)。由管理員手動制定自定義從新分配計劃,也可使用--generate選項提供。
  • --verify:在此模式下,該工具將驗證最後一次--execute期間列出的全部分區的從新分配狀態。狀態能夠有成功、失敗或正在進行等狀態。

2.1.1 遷移過程實現

分區從新分配工具可用於將一些Topic從當前的Broker節點中遷移到新添加的Broker中。這在擴展示有集羣時一般頗有用,由於將整個Topic移動到新的Broker變得更容易,而不是一次移動一個分區。當執行此操做時,用戶須要提供已有的Broker節點的Topic列表,以及到新節點的Broker列表(源Broker到新Broker的映射關係)。而後,該工具在新的Broker中均勻分配給指定Topic列表的全部分區。在遷移過程當中,Topic的複製因子保持不變。工具

現有以下實例,將Topic爲ke01,ke02的全部分區從Broker1中移動到新增的Broker2和Broker3中。因爲該工具接受Topic的輸入列表做爲JSON文件,所以須要明確遷移的Topic並建立json文件,以下所示:學習

> cat topic-to-move.json
{"topics": [{"topic": "ke01"},
            {"topic": "ke02"}],
"version":1
}

準備好JSON文件,而後使用分區從新分配工具生成候選分配,命令以下:spa

> bin/kafka-reassign-partitions.sh --zookeeper dn1:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate

執行命名以前,Topic(ke0一、ke02)的分區以下圖所示:

 

執行完成命令以後,控制檯出現以下信息:

該工具生成一個候選分配,將全部分區從Topic ke01,ke02移動到Broker1和Broker2。需求注意的是,此時分區移動還沒有開始,它只是告訴你當前的分配和建議。保存當前分配,以防你想要回滾它。新的賦值應保存在JSON文件(例如expand-cluster-reassignment.json)中,以使用--execute選項執行。JSON文件以下:

{"version":1,"partitions":[{"topic":"ke02","partition":0,"replicas":[2]},{"topic":"ke02","partition":1,"replicas":[1]},{"topic":"ke02","partition":2,"replicas":[2]},{"topic":"ke01","partition":0,"replicas":[2]},{"topic":"ke01","partition":1,"replicas":[1]},{"topic":"ke01","partition":2,"replicas":[2]}]}

執行命令以下所示:

> ./kafka-reassign-partitions.sh --zookeeper dn1:2181 --reassignment-json-file expand-cluster-reassignment.json --execute

最後,--verify選項可與該工具一塊兒使用,以檢查分區從新分配的狀態。須要注意的是,相同的expand-cluster-reassignment.json(與--execute選項一塊兒使用)應與--verify選項一塊兒使用,執行命令以下:

> ./kafka-reassign-partitions.sh --zookeeper dn1:2181 --reassignment-json-file expand-cluster-reassignment.json --verify

執行結果以下圖所示:

 

 同時,咱們能夠經過Kafka Eagle工具來查看Topic的分區狀況。

2.2 跨集羣遷移

這裏跨集羣遷移,咱們指的是在Kafka多個集羣之間複製數據「鏡像」的過程,以免與單個集羣中的節點之間發生的複製混淆。 Kafka附帶了一個用於在Kafka集羣之間鏡像數據的工具。該工具從源集羣使用並生成到目標集羣。這種鏡像的一個常見用例是在另外一個數據中心提供副本。

另外,你能夠運行許多此類鏡像進程以提升吞吐量和容錯(若是一個進程終止,其餘進程將佔用額外負載)。將從源集羣中的Topic讀取數據,並將其寫入目標集羣中具備相同名稱的主題。事實上,「鏡像」數據只不過是一個Kafka將消費者和生產者聯繫在了一塊兒。

源集羣和目標集羣是徹底獨立的實體,它們能夠具備不一樣數量的分區,而且偏移量將不相同。出於這個緣由,鏡像集羣並非真正意圖做爲容錯機制(由於消費者的位置會有所不一樣);爲此,建議使用正常的集羣內複製。可是,鏡像進程將保留並使用消息Key進行分區,所以能夠按Key保留順序。

下面是一個跨集羣的單Topic實例,命令以下:

> ./kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --whitelist ke03

須要注意的是,consumer.properties文件配置源Kafka集羣Broker地址,producer.properties文件配置目標Kafka集羣地址。若是須要遷移多個Topic,可使用 --whitelist 'A|B',若是須要遷移全部的Topic,可使用 --whitelist '*'。

3.結果預覽

執行跨集羣遷移命令後,目標集羣中使用Kafka Eagle中查看Topic Size大小看是否與源集羣的Topic Size大小相等,或者使用SQL語句,驗證是否有數據遷移過來,結果以下圖所示:

 

4.總結

跨集羣遷移數據的本質是,Kafka啓動了消費者讀取源集羣數據,並將消費後的數據寫入到目標集羣,在遷移的過程當中,能夠啓動多個實例,提供遷出的吞吐量。

5.結束語

這篇博客就和你們分享到這裏,若是你們在研究學習的過程中有什麼問題,能夠加羣進行討論或發送郵件給我,我會盡我所能爲您解答,與君共勉!

另外,博主出書了《Kafka並不難學》,喜歡的朋友或同窗, 能夠在公告欄那裏點擊購買連接購買博主的書進行學習,在此感謝你們的支持。

相關文章
相關標籤/搜索