1. 獲取 Connect Worker 信息
curl -s http://127.0.0.1:8083/ | jqmysql
lenmom@M1701:~/workspace/software/kafka_2.11-2.1.0/logs$ curl -s http://127.0.0.1:8083/ | jq { "version": "2.1.0", "commit": "809be928f1ae004e", "kafka_cluster_id": "NGQRxNZMSY6Q53ktQABHsQ" }
2.列出 Connect Worker 上全部 Connector
curl -s http://127.0.0.1:8083/connector-plugins | jqsql
lenmom@M1701:~/workspace/software/kafka_2.11-2.1.0/logs$ curl -s http://127.0.0.1:8083/connector-plugins | jq [ { "class": "io.confluent.connect.hdfs.HdfsSinkConnector", "type": "sink", "version": "5.2.1" }, { "class": "io.confluent.connect.hdfs.tools.SchemaSourceConnector", "type": "source", "version": "2.1.0" }, { "class": "io.confluent.connect.storage.tools.SchemaSourceConnector", "type": "source", "version": "2.1.0" }, { "class": "io.debezium.connector.mongodb.MongoDbConnector", "type": "source", "version": "0.9.4.Final" }, { "class": "io.debezium.connector.mysql.MySqlConnector", "type": "source", "version": "0.9.4.Final" }, { "class": "io.debezium.connector.oracle.OracleConnector", "type": "source", "version": "0.9.4.Final" }, { "class": "io.debezium.connector.postgresql.PostgresConnector", "type": "source", "version": "0.9.4.Final" }, { "class": "io.debezium.connector.sqlserver.SqlServerConnector", "type": "source", "version": "0.9.4.Final" }, { "class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "type": "sink", "version": "2.1.0" }, { "class": "org.apache.kafka.connect.file.FileStreamSourceConnector", "type": "source", "version": "2.1.0" } ]
3.獲取 Connector 上 Task 以及相關配置的信息
curl -s http://127.0.0.1:8083/connectors/<Connector名字>/tasks | jqmongodb
lenmom@M1701:~/workspace/software/kafka_2.11-2.1.0/logs$ curl -s localhost:8083/connectors/inventory-connector/tasks |jq [ { "id": { "connector": "inventory-connector", "task": 0 }, "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.user": "root", "database.server.id": "184054", "tasks.max": "1", "database.history.kafka.bootstrap.servers": "127.0.0.1:9092", "database.history.kafka.topic": "dbhistory.inventory", "database.server.name": "127.0.0.1", "database.port": "3306", "task.class": "io.debezium.connector.mysql.MySqlConnectorTask", "database.hostname": "127.0.0.1", "database.password": "root", "name": "inventory-connector", "database.whitelist": "inventory" } } ]
4.獲取 Connector 狀態信息
curl -s http://127.0.0.1:8083/connectors/<Connector名字>/status | jqapache
lenmom@M1701:~/workspace/software/kafka_2.11-2.1.0/logs$ curl -s localhost:8083/connectors/inventory-connector/status |jq { "name": "inventory-connector", "connector": { "state": "RUNNING", "worker_id": "127.0.0.1:8083" }, "tasks": [ { "state": "RUNNING", "id": 0, "worker_id": "127.0.0.1:8083" } ], "type": "source" }
5.獲取 Connector 配置信息
curl -s http://127.0.0.1:8083/connectors/<Connector名字>/config | jqjson
lenmom@M1701:~/workspace/software/kafka_2.11-2.1.0/logs$ curl -s localhost:8083/connectors/inventory-connector/config |jq { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.user": "root", "database.server.id": "184054", "tasks.max": "1", "database.history.kafka.bootstrap.servers": "127.0.0.1:9092", "database.history.kafka.topic": "dbhistory.inventory", "database.server.name": "127.0.0.1", "database.port": "3306", "database.hostname": "127.0.0.1", "database.password": "root", "name": "inventory-connector", "database.whitelist": "inventory" }
6.暫停 Connector
curl -s -X PUT http://127.0.0.1:8083/connectors/<Connector名字>/pause
bootstrap
7.重啓 Connector
curl -s -X PUT http://127.0.0.1:8083/connectors/<Connector名字>/resume
oracle
8.刪除 Connector
curl -s -X DELETE http://127.0.0.1:8083/connectors/<Connector名字>
app
9.建立新 Connector (以FileStreamSourceConnector舉例)
curl -s -X POST -H "Content-Type: application/json" --data
'{curl
"name": "hdfs-hive-sink",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "1",
"topics": "127.0.0.1.inventory.customers",
"hdfs.url": "hdfs://127.0.0.1:9000/inventory",
"flush.size": "10",
"format.class":"io.confluent.connect.hdfs.string.StringFormat",
"hive.integration": true,
"hive.database": "inventory",
"hive.metastore.uris": "thrift://127.0.0.1:9083",
"schema.compatibility": "BACKWARD"
}
}'sqlserver
http://http://127.0.0.1:8083/connectors | jq
lenmom@M1701:~/workspace/software/kafka_2.11-2.1.0/logs$ curl -H "applicaiton/json" http://127.0.0.1:8083/connectors/hdfs-hive-sink |jq % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 371 100 371 0 0 61833 0 --:--:-- --:--:-- --:--:-- 74200 { "name": "hdfs-hive-sink", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "format.class": "io.confluent.connect.hdfs.string.StringFormat", "flush.size": "10", "tasks.max": "1", "topics": "127.0.0.1.inventory.customers", "hdfs.url": "hdfs://127.0.0.1:9000/inventory", "name": "hdfs-hive-sink" }, "tasks": [ { "connector": "hdfs-hive-sink", "task": 0 } ], "type": "sink" }
10.更新 Connector配置 (以FileStreamSourceConnector舉例)curl -s -X PUT -H "Content-Type: application/json" --data '{"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","key.converter.schemas.enable":"true","file":"demo-file.txt","tasks.max":"2","value.converter.schemas.enable":"true","name":"file-stream-demo-distributed","topic":"demo-2-distributed","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter"}' http://127.0.0.1:8083/connectors/file-stream-demo-distributed/config | jq