在最近的一些項目中,我使用Apache Kafka開發了一些數據管道。在性能測試方面,數據生成老是會在整個活動中引入一些樣板代碼,例如建立客戶端實例,編寫控制流以發送數據,根據業務邏輯隨機化有效負載等等。html
在測試設置期間,擁有一個處理全部繁重工做的框架會很好,所以只須要回答兩個基本和基本的問題:java
有了Kafka Connect,事實證實實現自定義源鏈接器可以實現這一目標。如下是用於生成測試數據的示例屬性列表的快速概述。git
topic.name = generated.events
poll.size = 10
poll.interval.ms = 5000
message.template = {「status」:「foo」,「direction」:「up」}
random.fields = status:foo | bar |巴茲,方向:向上|向下|向左|向右
這些屬性是不言自明的。爲了回答上述兩個基本問題:message.template
和random.fields
控制模式,而poll.size
和poll.interval.ms
控制音量。github
基於這些屬性,我建立了一個名爲「kafka-connect-datagen」(或簡稱「datagen」)的自定義源鏈接器,可在GitHub上得到。docker
在下一節中,我將簡要介紹一些實現細節。apache
Kafka Connect源鏈接器將數據從數據存儲複製到Kafka,而接收器鏈接則相反。雖然是源鏈接器,但datagen不會連接到任何數據存儲; 它從內部生成數據。其實施的其他部分根據是標準卡夫卡鏈接開發指南:它延伸SourceConnector
和SourceTask
,並實現了一些生命週期方法的鉤。如下片斷縮寫自datagen。json
如代碼所示,Connector
定義Task
要運行的類型和要爲其設置的配置Task
,同時Task
是執行自定義邏輯的工做單元。二者Connector
和Task
實例都在一個Worker
進程中運行。該匯合的文檔詳細介紹了這些概念。架構
除了實現這兩個類以外,還有一個步驟在運行演示以前:ConfigDef
爲用戶定義配置列表()。以後,這些類能夠打包爲Connector插件。在全面實施能夠在GitHub上找到。app
在下一節中,我將演示如何將插件與dockerized本地羣集設置一塊兒使用。框架
在本快速入門示例中,咱們使用docker-compose
管理全部必需的服務,如ZooKeeper,Kafka和Kafka Connect。要顯示全部這些服務,請運行docker-compose up -d
,而後運行docker-compose ps
以打印狀態信息,以下所示。
Name State Ports
----------------------------------------------- --------------------
quickstart_broker_1 Up 0.0.0.0:9092->9092/tcp
quickstart_connect_1 Up 0.0.0.0:8083->8083/tcp,t ...
quickstart_kafka- connect-ui_1 Up 0.0.0.0:8001->8000/tcp
quickstart_kafka-rest-proxy_1 Up 0.0.0.0:8082->8082/tcp
quickstart_kafka-topics-ui_1 Up 0.0.0.0:8000->8000/tcp
quickstart_zookeeper_1 Up 0.0。 0.0:2181-> 2181 / tcp,...
Kafka和Kafka Connect將須要更長的時間才能開始。感謝Landoop Ltd,咱們有這些不錯的UI工具:打開http:// localhost:8000查看Kafka主題UI,http:// localhost:8001查看Kafka Connect UI。您也能夠運行docker-compose logs -f
以查看日誌。
因爲一般Kafka Connect服務是最後一個完成啓動的服務,咱們能夠經過運行docker-compose logs -f connect
來查看其日誌,以查看以下的正常運行指標。
INFO使用config offset -1(org.apache.kafka.connect.runtime.distributed.DistributedHerder)
啓動鏈接器和任務INFO完成啓動鏈接器和任務(org.apache.kafka.connect.runtime.distributed.DistributedHerder)
當全部服務徹底啓動時,是時候建立「datagen」鏈接器實例了。如下是用於此演示的配置示例。它基本上設置「datagen」任務,每5秒生成10條消息。每條消息都使用定義的JSON消息模板和一些隨機字段。運行如下命令以實例化Connector和Task。
curl -X POST http:// localhost:8083 / connectors \
-H'Content-Type:application / json'\
-H'Eccept:application / json'\
-d @ connect.source.datagen.json
如今,在Kafka主題UI中,咱們可以看到以generated.events
定義的速率發佈到主題的隨機JSON消息。
kafka-connect-datagen發佈消息
要中止生成,咱們能夠轉到Kafka Connect UI並暫停或刪除鏈接器。一樣,咱們可使用以下所示的REST API來實現相同的結果。查看此Confluent文檔以獲取更多操做。
#pause鏈接器(若是成功則爲空響應)
curl -X PUT http:// localhost:8083 / connectors / connect.source.datagen / pause
#delete鏈接器(若是成功則爲空響應)
curl -X DELETE http:// localhost:8083 / connectors / connect.source.datagen
總之,咱們可以利用Kafka Connect,這是一種現成的工具,能夠很好地與Kafka集成,以最少的樣板代碼實現隨機數據生成。自定義鏈接器插件 - kafka-connect-datagen - 具備高度可移植性,能夠進一步擴展以支持集成測試和不一樣消息格式等功能。