2014 年的時候,Kafka 的三個主要開發人員從 LinkedIn 出來創業,開了一家叫做 Confluent 的公司。和其餘大數據公司相似,Confluent 的產品叫做 Confluent Platform。這個產品的核心是 Kafka,分爲三個版本:Confluent Open Source、Confluent Enterprise 和 Confluent Cloud。java
這裏就不過多說confluent的背景,詳細的狀況能夠查看官方網站https://www.confluent.io,這裏主要介紹,如利用confluent平臺實時的捕獲mysql中的數據。mysql
wget http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.39.tar.gz tar xzvf mysql-connector-java-5.1.39.tar.gz sed -i '$a export CLASSPATH=/root/mysql-connector-java-5.1.39/mysql-connector-java-5.1.39-bin.jar:$CLASSPATH' /etc/profile source /etc/profile
下載confluent的tar包解壓安裝。sql
cd /usr/local # tar zxvf confluent.tar.gz
confluent平臺各組件的默認端口號json
Component | Default Port |
Zookeeper | 2181 |
Apache Kafka brokers (plain text) | 9092 |
Schema Registry REST API | 8081 |
REST Proxy | 8082 |
Kafka Connect REST API | 8083 |
Confluent Control Center | 9021 |
建立一個confluent從mysql加載數據的配置文件quickstart-mysql.propertiesbootstrap
name=mysql-whitelist-timestamp-source connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=10 connection.user=root connection.password=root connection.url=jdbc:mysql://192.168.248.128:3306/foodsafe?characterEncoding=utf8&useSSL=true #數據表白名單 #table.whitelist=t1 mode=timestamp+incrementing timestamp.column.name=modified incrementing.column.name=id #topic的前綴,confulent平臺會爲每張表建立一個topic,topic的名稱爲前綴+表名 topic.prefix=mysql-test-
自定義查詢模式:測試
若是使用上面的配置來啓動服務,則confluent平臺將會監測拉取全部表的數據,有時候可能並不須要這樣作,confulent平臺提供了自定義查詢模式。配置參考以下:大數據
#User defined connector instance name name=mysql-whitelist-timestamp-source #The class implementing the connector connector.class=io.confluent.connect.jdbc.JdbcSourceConnector #Maximum number of tasks to run for this connector instance tasks.max=10 connection.url=jdbc:mysql://192.168.248.128:3306/foodsafe?characterEncoding=utf8&useSSL=true connection.user=root connection.password=root query=SELECT f.`name`,p.price,f.create_time from foods f join price p on (f.id = p.food_id) mode=timestamp timestamp.column.name=timestamp topic.prefix=mysql-joined-data
query模式下使用where查詢語句容易形成kafka拼接sql錯誤,最好採用join網站
1.啓動zookeeperui
由於zookeeper是一個長期的服務,最好在後臺運行,同時須要有寫權限到/var/lib在這一步以及以後的步驟,若是沒有權限請查看安裝confulent的用戶是否具備/var/lib的寫權限this
# cd /usr/local/confulent-3.2.2 # ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties & # 以守護進程方式啓動 # sudo confluent-3.2.2/bin/zookeeper-server-start -daemon /etc/kafka/zookeeper.properties
中止zookeeper
$ ./bin/zookeeper-server-stop
2.啓動kafka
# cd /usr/local/confluent-3.2.2 # ./bin/kafka-server-start ./etc/kafka/server.properties &
中止kafka服務
./bin/kafka-server-stop
3.啓動Schema Registry
# cd /usr/local/confluent-3.2.2 # ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties &
中止schema-registry
# ./bin/schema-registry-stop
4.啓動監聽mysql數據的producer
# cd /usr/local/confluent-3.2.2 # ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-jdbc/quickstart-mysql.properties &
5.啓動消費數據的consumer
# cd /usr/local/confluent-3.2.2 #./bin/kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic mysql-test-t1 --from-beginning
測試sql
DROP TABLE IF EXISTS `t1`; CREATE TABLE `t1` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(200) DEFAULT NULL, `createtime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `modified` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`), KEY `id` (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8; -- ---------------------------- -- Records of t1 -- ---------------------------- INSERT INTO `t1` VALUES ('1', 'aa', '2017-07-10 08:03:51', '2017-07-10 23:03:30'); INSERT INTO `t1` VALUES ('3', 'bb', '2017-07-10 08:03:45', '2017-07-10 23:03:34'); INSERT INTO `t1` VALUES ('4', '年內', '2017-07-10 08:05:51', '2017-07-10 23:05:45'); INSERT INTO `t1` VALUES ('5', '年內', '2017-07-10 08:44:28', '2017-07-10 23:15:45'); INSERT INTO `t1` VALUES ('6', '公共', '2017-07-18 06:05:11', '2017-07-18 21:04:58'); INSERT INTO `t1` VALUES ('7', '哈哈', '2017-07-18 19:05:04', '2017-07-18 07:32:13'); INSERT INTO `t1` VALUES ('8', '公共經濟', '2017-07-27 20:33:10', '2017-07-18 07:34:43');
數據插入語句
INSERT INTO `t1` (name,createtime,modified)VALUES ('公共經濟2', '2017-07-27 20:33:10', '2017-07-18 07:34:43');
插入新數據後將會在consumer端實時輸出咱們插入的數據
{"id":7,"name":{"string":"哈哈"},"createtime":1500429904000,"modified":1500388333000} {"id":8,"name":{"string":"公共經濟"},"createtime":1501212790000,"modified":1500388483000} {"id":9,"name":{"string":"公共經濟1"},"createtime":1501212790000,"modified":1500388483000} {"id":10,"name":{"string":"公共經濟2"},"createtime":1501212790000,"modified":1500388483000}
關於confluent的使用國內目前使用彷佛不多,相關的中文文檔也極少。本文是去年7月份我在作實時數據交換技術調研是根據官方文檔實踐的記錄。