使用Kafka Connect導入/導出數據

從控制檯寫入數據並將其寫回到控制檯是一個方便的起點,可是您可能要使用其餘來源的數據或將數據從Kafka導出到其餘系統。對於許多系統,可使用Kafka Connect導入或導出數據,而無需編寫自定義集成代碼。node

Kafka Connect是Kafka附帶的工具,用於將數據導入和導出到Kafka。它是運行鏈接器的可擴展工具,該 鏈接器實現用於與外部系統進行交互的自定義​​邏輯。在此快速入門中,咱們將看到如何使用簡單的鏈接器運行Kafka Connect,該鏈接器將數據從文件導入到Kafka主題,並將數據從Kafka主題導出到文件。bootstrap

首先,咱們將從建立一些種子數據開始進行測試:bash

echo -e "foo\nbar" > test.txt

接下來,咱們將啓動兩個以獨立模式運行的鏈接器,這意味着它們將在單個本地專用進程中運行。咱們提供了三個配置文件做爲參數。第一個始終是Kafka Connect流程的配置,其中包含通用配置,例如要鏈接的Kafka代理和數據的序列化格式。其他配置文件均指定要建立的鏈接器。這些文件包括惟一的鏈接器名稱,要實例化的鏈接器類,以及鏈接器所需的任何其餘配置。ide

[root@zookeep-kafka-node1 kafka_2.12-2.3.0]#  bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

如報錯能夠修改connect-standalone.properties文件 bootstrap.servers值改爲IP工具

# These are defaults. This file just demonstrates how to override some settings. bootstrap.servers=10.23.209.70:9092 # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will # need to configure these based on the format they want their data in when loaded from or stored into Kafka

這些示例配置文件(隨Kafka一塊兒提供)使用您以前啓動的默認本地集羣配置,並建立兩個鏈接器:第一個是源鏈接器,它從輸入文件中讀取行並將每一個行生成到Kafka主題,第二個是宿鏈接器。從Kafka主題讀取消息,並在輸出文件中將它們做爲一行顯示。測試

在啓動過程當中,您將看到許多日誌消息,其中包括一些代表正在實例化鏈接器的消息。Kafka Connect流程啓動後,源鏈接器應開始從test.txt主題中讀取行並將其生成到主題connect-test,而接收器鏈接器應開始從主題中讀取消息connect-test 並將其寫入文件test.sink.txt咱們能夠經過檢查輸出文件的內容來驗證數據已經過整個管道傳遞:spa

[root@zookeep-kafka-node1 kafka_2.12-2.3.0]# more test.sink.txt foo bar

請注意,數據存儲在Kafka主題中connect-test,所以咱們也能夠運行控制檯使用者以查看該主題中的數據(或使用自定義使用者代碼進行處理):代理

[root@zookeep-kafka-node2 kafka_2.12-2.3.0]# bin/kafka-console-consumer.sh --bootstrap-server 10.23.209.71:9092 --topic connect-test --from-beginning {"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"}

鏈接器繼續處理數據,所以咱們能夠將數據添加到文件中,並查看它在管道中的移動狀況:日誌

[root@zookeep-kafka-node1 kafka_2.12-2.3.0]# echo "hello world" >>test.txt

您應該看到該行出如今控制檯使用者輸出和接收器文件中。code

[root@zookeep-kafka-node2 kafka_2.12-2.3.0]# bin/kafka-console-consumer.sh --bootstrap-server 10.23.209.71:9092 --topic connect-test --from-beginning {"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"} {"schema":{"type":"string","optional":false},"payload":"hello world"}
相關文章
相關標籤/搜索