流式處理中常常會遇到Kafka與其餘系統進行數據同步或者Kafka集羣間數據遷移的情景。使用EMR Kafka Connect能夠方便快速的實現數據同步或者數據遷移。html
Kafka Connect是一種可擴展的、可靠的,用於在Kafka和其餘系統之間快速地進行流式數據傳輸的工具。例如能夠使用Kafka Connect獲取數據庫的binglog數據,將數據庫的數據遷入Kafka集羣,以同步數據庫的數據,或者對接下游的流式處理系統。同時,Kafka Connect提供的REST API接口能夠方便的進行Kafka Connect的建立和管理。
Kafka Connect分爲standalone和distributed兩種運行模式。standalone模式下,全部的worker都在一個進程中運行;相比之下,distributed模式更具擴展性和容錯性,是最經常使用的方式,也是生產環境推薦使用的模式。數據庫
本文介紹使用EMR Kafka Connect的REST API接口在Kafka集羣間進行數據遷移,使用distributed模式。apache
建立兩個EMR集羣,集羣類型爲Kafka。EMR Kafka Connect安裝在task節點上,進行數據遷移的目的Kafka集羣須要建立task節點。集羣建立好後,task節點上EMR Kafka Connect服務會默認啓動,端口號爲8083。json
注意要保證兩個集羣的網路互通,詳細的建立流程見建立集羣。api
3.1準備工做curl
EMR Kafka Connect的配置文件路徑爲/etc/ecm/kafka-conf/connect-distributed.properties。工具
在源Kafka集羣建立須要同步的topic,例如url
另外,Kafka Connect會將offsets, configs和任務狀態保存在topic中,topic名對應配置文件中的offset.storage.topic、config.storage.topic 和status.storage.topic三個配置項。默認的,Kafka Connect會自動的使用默認的partition和replication factor建立這三個topic。spa
3.2建立Kafka Connect3d
在目的Kafka集羣的task節點(例如emr-worker-3節點),使用curl命令經過json數據建立一個Kafka Connect。
json數據中,name字段表明建立的connect的名稱,此處爲connect-test;config字段須要根據實際狀況進行配置,其中的變量說明以下表
3.3查看Kafka Connect
查看全部的Kafka Connect
查看建立的connect-test的狀態
查看task的信息
3.4數據同步
在源Kafka集羣建立須要同步的數據。
3.5查看同步結果
在目的Kafka集羣消費同步的數據。
能夠看到,在源Kafka集羣發送的100000條數據已經遷移到了目的Kafka集羣。
本文介紹並演示了使用EMR kafka Connect在Kafka集羣間進行數據遷移的方法,關於Kafka Connect更詳細的使用請參考Kafka官網資料和REST API使用。
本文爲雲棲社區原創內容,未經容許不得轉載。