Kafka Connect Details 詳解

Kafka Connect Details 詳解

原文地址:http://3gods.com/2017/08/18/Kafka-Connect-Details.html。

概覽

Kafka Connect是在0.9之後加入的功能,主要是用來將其餘系統的數據導入到Kafka,而後再將Kafka中的數據導出到另外的系統。
能夠用來作實時數據同步的ETL,數據實時分析處理等。html

主要有2種模式:Standalone(單機模式)和Distribute(分佈式模式)。
單機主要用來開發,測試,分佈式的用於生產環境。apache

啓動和配置

Standalone 單機模式

啓動命令: bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties …]
這麼多的配置,搞毛線啊。說真的,官網的這個,當時我真沒看懂。下面解釋下:
執行單機啓動腳本connect-standalone.sh,將connect-standalone.properties屬性文件傳遞進去做爲Worker的配置,
另外的配置就是屬於Connector的配置,會被所有傳遞給SouceConnector或者SinkConnector。json

bootstrap.servers:kafka集羣地址,例如:10.33.2.1:9092,10.33.2.9:9092,10.33.1.13:9092bootstrap

key.converter:用來轉換寫入或者讀出kafka中消息的key的,例如:org.apache.kafka.connect.json.JsonConverter。
效果是對指定了key是id=1000,轉換成{「id」 : 1000},也可使用Avro的格式。我作的項目使用的maxwell,發現使用Json轉換後,
字段的雙引號全沒了,冒號變成等號,變成了這種鬼東西{id=1000}。後面直接改爲使用String的轉換器:org.apache.kafka.connect.storage.StringConverter。併發

value.converter:同上,只是用來轉換消息的value的,也就是傳輸的具體數據。分佈式

offset.storage.file.filename:默認是:/tmp/connect.offsets。
這個配置要注意,單機模式是須要本身持久化offset的。 Kafka Connect會用這裏配置的文件保存offset。
並且針對producer和consumer(也就是source和sink)須要單獨分別配置:
producer.offset.storage.file.filename=/temp/source-offset
consuer.offset.storage.file.filename=/temp/sink-offset
可是,我單機好像歷來沒成功過,每次重啓,都是重頭消費。學習

Distribute 分佈式模式

啓動命令:
bin/connect-distributed.sh config/connect-distributed.properties
分佈式模式下,connector類及其配置都是經過Rest API接口提交給kafka的。
但不須要配置保存offset的文件,由於分佈式下,都是將offsets,configs和status保存到topics中的。
而後由Worker決定如何存儲配置,分配工做,存儲offsets和task的狀態信息。
切記,爲了程序的高可用,這3個topics最好手動建立。 
具體命令,請看另一篇博客:Kafka命令 。測試

group.id:也就是connect-cluster的group id,這個不能和consumer的goup id衝突。
config.storage.topic:用來保存connector和task的配置的。 單分片,多副本,壓實類型(compacted) 的topic。
由於非壓實的topic在必定配置,觸發條件下,會刪除!!!
這裏的多副本是爲了配置一直均可用,建議數量等於Kafka Brokers的數量。單分片,應該是剛開始啓動,初始化的時候, 只有一個線程消費。spa

offset.storage.topic:用來保存offset的,既有source connect的,也有sink connect的offset。多分片,多副本,壓實的topic。
status.storage.topic:用來存儲task狀態的,多分片,多副本,壓實的topic。
這兩個多分片,多副本配置和通常的topic相同就好了,好比咱們是3個副本,5個partition。插件

上面的3個topic,你均可以用console-consumer進行消費看看,特別是status.storage.topic很是有用,
由於分佈式模式下,task由於一些配置,異常關掉,只會顯示xxxTask closed,可是不會顯示異常信息。
而從status.storage.topic中消費出來的消息能夠看到具體異常信息。

Connector的配置

name:connector的惟一名字
connector.class:用來鏈接Kafka集羣的類名,也就是你繼承SourceConnector或SinkConnector的實現類,
也就是Connector程序的入口,儘可能使用全量路徑名。
tasks.max:task的數量,一個task就是一個線程。task數量設置要小於等於分片partition的數量,多了併發度沒法提升。
key.converter:覆蓋掉傳遞給Worker的消息的key轉換類,也就是connect-stadalone.properties
和connect-distirbute.properties中key.converter。
value.converter:同上。
topics:要消費的topic列表,對於sink connector才須要配置。

Transformations 轉換器

用來將消息進行修改,轉換,以及路由的。能夠將多個組合起來,做爲一個轉換鏈。
我的建議是,這些代碼直接在connector中寫,除非能夠部署上去,多個系統公用。
還有一方面是,黑盒的類太多,出問題了後不知道是哪裏出問題了,並且也不能本地debug。
再就是着他媽的Transformations配置也太多了,學習成本好高啊。這塊的詳細內容沒看,詳情請看官網。

REST API

這塊就是用來查詢Connector和Task的狀態,主要用於Connector集羣的監控。
GET /connectors - 查詢全部connectors
POST /connectors - 提交一個connector。好比是JSON格式,例子:

{ "name": "dis-maxwell-sink", "config": { "name" : "maxwell-sink-song", "connector.class" : "com.cimc.maxwell.sink.MySqlSinkConnector", "tasks.max": 1, "topics": "estation.db_ez.t_parcel,estation.db_ez.t_box", } }

GET /connectors/{name} - 查詢指定connector信息的
GET /connectors/{name}/config - 查詢指定connector配置的
PUT /connectors/{name}/config - 更新指定connector配置的
GET /connectors/{name}/status - 查詢指定connector狀態的
GET /connectors/{name}/tasks - 查詢指定connector的全部tasks
GET /connectors/{name}/tasks/{taskid}/status - 查詢指定connector的指定task的狀態的,taskid通常是0,1,2之類
PUT /connectors/{name}/pause - 暫停指定connector的,慎用,好比由於系統更新升級,想停掉source connector拉取消息
PUT /connectors/{name}/resume - 恢復上面暫停的connector的
POST /connectors/{name}/restart - 重啓一個connector(connector由於一些緣由掛掉了,好比被強行殺死,通常不是異常形成)
POST /connectors/{name}/tasks/{taskId}/restart - 重啓一個指定的task的
DELETE /connectors/{name} - 刪除一個connector
GET /connector-plugins - 獲取全部已安裝的connector插件
PUT /connector-plugins/{connector-type}/config/validate - 校驗connector的配置的屬性類型。

Kafka Connect 開發詳解

詳見個人另一篇博客:Kafka Connect 開發詳解 。

Kafka Connect VS Producer Consumer

其實Kafka Connect的本質就是將Kafka Client包裝了一層,並對開發者提供統一的實現接口。 
Source Connector對應Producer,Sink Connector對應Consumer。

Kafka Connect的優勢

1.對開發者提供了統一的實現接口
2.開發,部署和管理都很是方便,統一 
3.使用分佈式模式進行水平擴展,毫無壓力
4.在分佈式模式下能夠經過Rest Api提交和管理Connectors
5.對offset自動管理,只須要很簡單的配置,而不像Consumer中須要開發者處理
6.流式/批式處理的支持

第三方資源

這是已經獲得支持的組件,不須要作額外的開發: https://www.confluent.io/product/connectors/
括號中的Source表示將數據從其餘系統導入Kafka,Sink表示將數據從Kafka導出到其餘系統。
其餘的我沒看,可是JDBC的實現比較的坑爹,是經過primary key(如id)和時間戳(如updateTime)字段,
來判斷數據是否更新,這樣的話應用範圍很是受侷限。

參考

Kafka Documentation

相關文章
相關標籤/搜索