confluent+mysql實現實時數據交換

2014 年的時候,Kafka 的三個主要開發人員從 LinkedIn 出來創業,開了一家叫做 Confluent 的公司。和其餘大數據公司相似,Confluent 的產品叫做 Confluent Platform。這個產品的核心是 Kafka,分爲三個版本:Confluent Open Source、Confluent Enterprise 和 Confluent Cloud。java

這裏就不過多說confluent的背景,詳細的狀況能夠查看官方網站https://www.confluent.io,這裏主要介紹,如利用confluent平臺實時的捕獲mysql中的數據。mysql

 安裝jdbc-mysql-driver

wget http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.39.tar.gz
tar xzvf mysql-connector-java-5.1.39.tar.gz
sed -i '$a export CLASSPATH=/root/mysql-connector-java-5.1.39/mysql-connector-java-5.1.39-bin.jar:$CLASSPATH' /etc/profile
source /etc/profile

安裝confluent

下載confluent的tar包解壓安裝。sql

cd /usr/local
# tar zxvf confluent.tar.gz

confluent平臺各組件的默認端口號json

Component Default Port
Zookeeper  2181
Apache Kafka brokers (plain text) 9092
Schema Registry REST API 8081
REST Proxy 8082
Kafka Connect REST API 8083
Confluent Control Center 9021

confluent的mysql數據源配置

建立一個confluent從mysql加載數據的配置文件quickstart-mysql.propertiesbootstrap

name=mysql-whitelist-timestamp-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
connection.user=root
connection.password=root
connection.url=jdbc:mysql://192.168.248.128:3306/foodsafe?characterEncoding=utf8&useSSL=true

#數據表白名單
#table.whitelist=t1

mode=timestamp+incrementing
timestamp.column.name=modified
incrementing.column.name=id

#topic的前綴,confulent平臺會爲每張表建立一個topic,topic的名稱爲前綴+表名
topic.prefix=mysql-test-


自定義查詢模式:測試

若是使用上面的配置來啓動服務,則confluent平臺將會監測拉取全部表的數據,有時候可能並不須要這樣作,confulent平臺提供了自定義查詢模式。配置參考以下:大數據

#User defined connector instance name
name=mysql-whitelist-timestamp-source
#The class implementing the connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
#Maximum number of tasks to run for this connector instance
tasks.max=10

connection.url=jdbc:mysql://192.168.248.128:3306/foodsafe?characterEncoding=utf8&useSSL=true
connection.user=root
connection.password=root
query=SELECT f.`name`,p.price,f.create_time from foods f join price p on (f.id = p.food_id)
mode=timestamp
timestamp.column.name=timestamp

topic.prefix=mysql-joined-data


query模式下使用where查詢語句容易形成kafka拼接sql錯誤,最好採用join網站

1.啓動zookeeperui

由於zookeeper是一個長期的服務,最好在後臺運行,同時須要有寫權限到/var/lib在這一步以及以後的步驟,若是沒有權限請查看安裝confulent的用戶是否具備/var/lib的寫權限this

# cd /usr/local/confulent-3.2.2
# ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties &
# 以守護進程方式啓動
# sudo confluent-3.2.2/bin/zookeeper-server-start -daemon /etc/kafka/zookeeper.properties

中止zookeeper

$ ./bin/zookeeper-server-stop

2.啓動kafka

# cd /usr/local/confluent-3.2.2
# ./bin/kafka-server-start ./etc/kafka/server.properties &

中止kafka服務

./bin/kafka-server-stop

3.啓動Schema Registry

# cd /usr/local/confluent-3.2.2
# ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties &

中止schema-registry

# ./bin/schema-registry-stop

4.啓動監聽mysql數據的producer

# cd /usr/local/confluent-3.2.2
# ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-jdbc/quickstart-mysql.properties &

5.啓動消費數據的consumer

# cd /usr/local/confluent-3.2.2
#./bin/kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic mysql-test-t1 --from-beginning

測試sql

DROP TABLE IF EXISTS `t1`;
CREATE TABLE `t1` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(200) DEFAULT NULL,
  `createtime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `modified` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  KEY `id` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;

-- ----------------------------
-- Records of t1
-- ----------------------------
INSERT INTO `t1` VALUES ('1', 'aa', '2017-07-10 08:03:51', '2017-07-10 23:03:30');
INSERT INTO `t1` VALUES ('3', 'bb', '2017-07-10 08:03:45', '2017-07-10 23:03:34');
INSERT INTO `t1` VALUES ('4', '年內', '2017-07-10 08:05:51', '2017-07-10 23:05:45');
INSERT INTO `t1` VALUES ('5', '年內', '2017-07-10 08:44:28', '2017-07-10 23:15:45');
INSERT INTO `t1` VALUES ('6', '公共', '2017-07-18 06:05:11', '2017-07-18 21:04:58');
INSERT INTO `t1` VALUES ('7', '哈哈', '2017-07-18 19:05:04', '2017-07-18 07:32:13');
INSERT INTO `t1` VALUES ('8', '公共經濟', '2017-07-27 20:33:10', '2017-07-18 07:34:43');

數據插入語句

INSERT INTO `t1` (name,createtime,modified)VALUES ('公共經濟2', '2017-07-27 20:33:10', '2017-07-18 07:34:43');

插入新數據後將會在consumer端實時輸出咱們插入的數據

{"id":7,"name":{"string":"哈哈"},"createtime":1500429904000,"modified":1500388333000}
{"id":8,"name":{"string":"公共經濟"},"createtime":1501212790000,"modified":1500388483000}
{"id":9,"name":{"string":"公共經濟1"},"createtime":1501212790000,"modified":1500388483000}
{"id":10,"name":{"string":"公共經濟2"},"createtime":1501212790000,"modified":1500388483000}

關於confluent的使用國內目前使用彷佛不多,相關的中文文檔也極少。本文是去年7月份我在作實時數據交換技術調研是根據官方文檔實踐的記錄。

相關文章
相關標籤/搜索