在實際工做中使用 kafka ,有時候會有相似這樣的場景。咱們須要把某些數據源的數據導入到 kafka,或者把 kafka 做爲數據源導出數據。或者兩種場景的需求都要。數據庫
這算是一種 kafka 生產者,消費者模式的特殊使用場景。它主要服務於數據管道的場景,爲此 kafka 在0.9版本開始增長了 connect 功能,這樣能夠很方便使得 kafka 能夠做爲數據管道各個數據段的大型緩衝區,有效的解耦了管道兩端的生產者和消費者。segmentfault
kafka connect包含兩個組件,source connector 和 sink connector。顧名思義,前者能夠數據源(好比數據庫,文件系統)拉取數據塞入 kafka 的 topic中。然後者則從kafka消費數據到另外一個數據源(好比Elasticsearch,Hadoop)。api
鏈接器和普通的生產者消費者模式有什麼區別呢?彷佛兩種方式均可以達到目的。可能第一次接觸connect的人都會由此疑問。在《kafka權威指南》這本書裏,做者給出了建議:bash
若是你是開發人員,你會使用 Kafka 客戶端將應用程序鏈接到
Kafka ,井修改應用程序的代碼,將數據推送到 Kafka 或者從 Kafka 讀取數據。若是要將 Kafka 鏈接到數據存儲系統,可使用 Connect,由於這些系統不是你開發的,
構建數據管道 I 10s你無能或者也不想修改它們的代碼。 Connect 能夠用於從外部數據存儲系統讀取數據, 或
者將數據推送到外部存儲系統。若是數據存儲系統提供了相應的鏈接器,那麼非開發人員
就能夠經過配置鏈接器的方式來使用 Connect。oop
若是你要鏈接的數據存儲系統沒有相應的鏈接器,那麼能夠考慮使用客戶端 API 或
Connect API 開發一個應用程序。咱們建議首選 Connect,由於它提供了一些開箱即用的
特性,好比配置管理、偏移量存儲、井行處理、錯誤處理,並且支持多種數據類型和標準
的 REST 管理 API。開發一個鏈接 Kafka 和外部數據存儲系統的小應用程序看起來很簡單,
但其實還有不少細節須要處理,好比數據類型和配置選項,這些無疑加大了開發的複雜
性一一毛onnect 處理了大部分細節,讓你能夠專一於數據的傳輸。
這個示例,咱們用kafka自帶的鏈接器進行演示,這樣咱們就省去了使用 connect api 去開發一個connect的麻煩。post
首先啓動 zk+kafka 的環境,而後啓動connect進程,spa
./bin/connect-distributed.sh config/connect-distributed.properties
而後咱們輸入下面的命令來確認下是否啓動成功。.net
接着咱們啓動一個文件數據源,code
解釋下,blog
其實就是把 echo 的內容做爲http post的數據發送過去,-d @-
表示從管道獲取數據。在這個數據中,咱們指定了鏈接器的名字 load-kafka-config
,鏈接器的類名,使用的是自帶的FileStream-Source
,須要讀取的數據源的路徑,固然還有kafka的topic。
這條命令執行完後,文件的內容就被髮送到kafka的topic上了,咱們能夠經過下面這個命令來查看下,
咱們用消費者控制檯讀取了topic上的消息。能夠發現每個payload包含上面讀取的文件的一行。
接着咱們使用一個自帶的鏈接器把topic裏的數據讀出來,而且導出到文件中保存(mydata.txt),保存的文件內容應該和前面讀取的那個配置文件的內容是同樣的。
參考:
《kafka權威指南》
關注公衆號:思無邪了嗎
csdn博客: https://blog.csdn.net/pony_ma...