使用Kafka Connect建立測試數據生成器

在最近的一些項目中,我使用Apache Kafka開發了一些數據管道在性能測試方面,數據生成老是會在整個活動中引入一些樣板代碼,例如建立客戶端實例,編寫控制流以發送數據,根據業務邏輯隨機化有效負載等等。html

在測試設置期間,擁有一個處理全部繁重工做的框架會很好,所以只須要回答兩個基本和基本的問題:java

  • 數據應該是什麼樣的?(架構)
  • 要生成多少數據?(體積)

有了Kafka Connect,事實證實實現自定義源鏈接器可以實現這一目標。如下是用於生成測試數據的示例屬性列表的快速概述。git

topic.name = generated.events 
poll.size = 10
poll.interval.ms = 5000
message.template = {「status」:「foo」,「direction」:「up」}
random.fields = status:foo | bar |巴茲,方向:向上|向下|向左|向右

這些屬性是不言自明的。爲了回答上述兩個基本問題:message.templaterandom.fields控制模式,而poll.sizepoll.interval.ms控制音量。github

基於這些屬性,我建立了一個名爲「kafka-connect-datagen」(或簡稱「datagen」)的自定義源鏈接器,可在GitHub上得到docker

在下一節中,我將簡要介紹一些實現細節。apache

實現自定義鏈接器

Kafka Connect源鏈接器將數據從數據存儲複製到Kafka,而接收器鏈接則相反。雖然是源鏈接器,但datagen不會連接到任何數據存儲; 它從內部生成數據。其實施的其他部分根據是標準卡夫卡鏈接開發指南:它延伸SourceConnectorSourceTask,並實現了一些生命週期方法的鉤。如下片斷縮寫自datagen。json

 
 

如代碼所示,Connector定義Task要運行的類型和要爲其設置的配置Task,同時Task是執行自定義邏輯的工做單元。二者ConnectorTask實例都在一個Worker 進程中運行匯合的文檔詳細介紹了這些概念。架構

除了實現這兩個類以外,還有一個步驟在運行演示以前:ConfigDef爲用戶定義配置列表()。以後,這些類能夠打包爲Connector插件。全面實施能夠在GitHub上找到。app

在下一節中,我將演示如何將插件與dockerized本地羣集設置一塊兒使用。框架

快速入門演示

在本快速入門示例中,咱們使用docker-compose管理全部必需的服務,如ZooKeeper,Kafka和Kafka Connect。要顯示全部這些服務,請運行docker-compose up -d,而後運行docker-compose ps以打印狀態信息,以下所示。

Name State Ports 
----------------------------------------------- --------------------
quickstart_broker_1 Up 0.0.0.0:9092->9092/tcp
quickstart_connect_1 Up 0.0.0.0:8083->8083/tcp,t ...
quickstart_kafka- connect-ui_1 Up 0.0.0.0:8001->8000/tcp
quickstart_kafka-rest-proxy_1 Up 0.0.0.0:8082->8082/tcp
quickstart_kafka-topics-ui_1 Up 0.0.0.0:8000->8000/tcp
quickstart_zookeeper_1 Up 0.0。 0.0:2181-> 2181 / tcp,...

Kafka和Kafka Connect將須要更長的時間才能開始。感謝Landoop Ltd,咱們有這些不錯的UI工具:打開http:// localhost:8000查看Kafka主題UI,http:// localhost:8001查看Kafka Connect UI。您也能夠運行docker-compose logs -f以查看日誌。

因爲一般Kafka Connect服務是最後一個完成啓動的服務,咱們能夠經過運行docker-compose logs -f connect來查看其日誌,以查看以下的正常運行指標。

INFO使用config offset -1(org.apache.kafka.connect.runtime.distributed.DistributedHerder)
啓動鏈接器和任務INFO完成啓動鏈接器和任務(org.apache.kafka.connect.runtime.distributed.DistributedHerder)

當全部服務徹底啓動時,是時候建立「datagen」鏈接器實例了。如下是用於此演示的配置示例。它基本上設置「datagen」任務,每5秒生成10條消息。每條消息都使用定義的JSON消息模板和一些隨機字段。運行如下命令以實例化Connector和Task。

curl -X POST http:// localhost:8083 / connectors \ 
-H'Content-Type:application / json'\
-H'Eccept:application / json'\
-d @ connect.source.datagen.json

如今,在Kafka主題UI中,咱們可以看到以generated.events定義的速率發佈到主題的隨機JSON消息

 

kafka-connect-datagen發佈消息

要中止生成,咱們能夠轉到Kafka Connect UI並暫停或刪除鏈接器。一樣,咱們可使用以下所示的REST API來實現相同的結果。查看此Confluent文檔以獲取更多操做。

#pause鏈接器(若是成功則爲空響應)
curl -X PUT http:// localhost:8083 / connectors / connect.source.datagen / pause

#delete鏈接器(若是成功則爲空響應)
curl -X DELETE http:// localhost:8083 / connectors / connect.source.datagen

總之,咱們可以利用Kafka Connect,這是一種現成的工具,能夠很好地與Kafka集成,以最少的樣板代碼實現隨機數據生成。自定義鏈接器插件 - kafka-connect-datagen - 具備高度可移植性,能夠進一步擴展以支持集成測試和不一樣消息格式等功能。

相關文章
相關標籤/搜索