Kafka Connector特性

    Kafka Connector特性能夠對源數據庫到目標數據庫的拷貝處理,能夠藉助此特性實現本身的ETL工具用於數據抽取、清洗等功能。html

其數據流程圖:java

  •     Connector相關接口數據庫

    經過繼承SourceConnector和SinkConnector實現本身的Connector,分別表示對源數據的讀取和對目標庫的寫入。apache

   Connector的主要接口:api

  initialize(ConnectorContext ctx) Initialize this connector, using the provided ConnectorContext to notify the runtime of input configuration changes.oracle

  initialize(ConnectorContext ctx, List<Map<String,String>> taskConfigs) Initialize this connector, using the provided ConnectorContext to notify the runtime of input configuration changes and using the provided set of Task configurations.ide

    start(Map<String,String> props) Start this Connector.工具

   stop() Stop this connector.測試

  taskClass() Returns the Task implementation for this Connector.this

   taskConfigs(int maxTasks) Returns a set of configurations for Tasks based on the current configuration, producing at most count configurations.


    經過繼承SourceTask和SinkTask實現對應的Connector的業務邏輯任務。

    其對應的接口可查看task接口api


   對實現的Connector可經過一下配置文件進行配置

    connect-file-source.properties配置源系統的connector:

#connector惟一的名稱
name=local-console-source
#connector的實現
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
#connector須要建立的最大任務數
tasks.max=1
#須要處理的topic類別
topic=connect-test

    connect-file-sink.properties配置目標系統的connector(和以上配置相似):   

name=local-file-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=test.sink.txt
#接收topic類別列表
topics=connect-test


  •     REST API

     Kafka Connect提供rest api接口,用於管理全部的connectors信息可經過rest api查看對應接口信息

        

    對於此特性0.9版本的客戶端中未找對對應的connect包(不知包不對仍是怎麼回事),所以也未能寫出測試的demo,待之後再來補上demo。

相關文章
相關標籤/搜索