Kafka Connector特性能夠對源數據庫到目標數據庫的拷貝處理,能夠藉助此特性實現本身的ETL工具用於數據抽取、清洗等功能。html
其數據流程圖:java
Connector相關接口數據庫
經過繼承SourceConnector和SinkConnector實現本身的Connector,分別表示對源數據的讀取和對目標庫的寫入。
apache
Connector的主要接口:
api
Initialize this connector, using the provided ConnectorContext to notify the runtime of input configuration changes.oracleinitialize(ConnectorContext ctx)
initialize(ConnectorContext ctx, List<Map<String,String>> taskConfigs)
Initialize this connector, using the provided ConnectorContext to notify the runtime of input configuration changes and using the provided set of Task configurations.ide
Start this Connector.工具 start(Map<String,String> props)
stop()
Stop this connector.測試
taskClass()
Returns the Task implementation for this Connector.this
Returns a set of configurations for Tasks based on the current configuration, producing at most count configurations.taskConfigs(int maxTasks)
經過繼承SourceTask和SinkTask實現對應的Connector的業務邏輯任務。
其對應的接口可查看task接口api。
對實現的Connector可經過一下配置文件進行配置
connect-file-source.properties配置源系統的connector:
#connector惟一的名稱 name=local-console-source #connector的實現 connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector #connector須要建立的最大任務數 tasks.max=1 #須要處理的topic類別 topic=connect-test
connect-file-sink.properties配置目標系統的connector(和以上配置相似):
name=local-file-sink connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector tasks.max=1 file=test.sink.txt #接收topic類別列表 topics=connect-test
REST API
Kafka Connect提供rest api接口,用於管理全部的connectors信息,可經過rest api查看對應接口信息。
對於此特性0.9版本的客戶端中未找對對應的connect包(不知包不對仍是怎麼回事),所以也未能寫出測試的demo,待之後再來補上demo。