來自哈羅單車開源的組件。支持同步PG數據到kafka或者ES。java
https://github.com/hellobike/tunnelgit
tunnel總體的部署比較簡單的github
須要事先部署好zk和kafka(我下面演示的是單節點的zk和kafka)sql
節點部署關係:vim
192.168.2.4 部署zk、kafka、pg10運行在1921端口oracle
192.168.2.189 部署tunnelmaven
確保已開啓PG的邏輯複製ide
wal_level = 'logical';測試
max_replication_slots = 20spa
注意這個設置要重啓PG進程的
而後,建立測試庫表和同步用的帳號
CREATE DATABASE test_database;
\c test_database
create table test_1 (id int primary key , name char(40));
create table test_2 (id int primary key , name char(40));
CREATE ROLE test_rep LOGIN ENCRYPTED PASSWORD 'xxxx' REPLICATION;
GRANT CONNECT ON DATABASE test_database to test_rep;
vim pg_hba.conf 增長2行配置:
host all test_rep 192.168.2.0/24 md5
host replication test_rep 192.168.2.0/24 md5
而後 reload 下PG
到192.168.2.189 機器上去 編譯tunnel:
注意: tunnel的啓動須要事先安裝好oracle jdk 1.8
git clone https://github.com/hellobike/tunnel
cd tunnel
mvn clean package -Dmaven.test.skip=true
cd target
unzip AppTunnelService.zip
cd AppTunnelService
vim conf/test.yml 內容以下:
tunnel_subscribe_config:
pg_dump_path: '/usr/local/pgsql-10.10/bin/pg_dump'
subscribes:
- slotName: slot_for_test
pgConnConf:
host: 192.168.2.4
port: 1921
database: test_database
user: test_rep
password: xxxx
rules:
- {table: test_1, pks: ['id'], topic: test_1_logs}
- {table: test_2, pks: ['id'], topic: test_2_logs}
kafkaConf:
addrs:
- 192.168.2.4:9092
tunnel_zookeeper_address: 192.168.2.4:2181
前臺啓動:
java -server -classpath conf/*:lib/* com.hellobike.base.tunnel.TunnelLauncher -u false -c cfg.properties -p 7788 # 暴露prometheus metric在7788端口(配置監控不是這裏的重點,也很簡單,暫時先跳過)
而後,咱們再在PG10上面的test_database的2張表隨便造些數據,而後能夠看到kafka裏面已經有數據了(下圖是經過kafkamanager和 kafka-eagle的結果)。
格式化下,數據就是這樣的:
UPDATE的記錄的樣子:
{
"dataList": [{
"dataType": "integer",
"name": "id",
"value": "1111"
}, {
"dataType": "character",
"name": "name",
"value": "大狗蛋 "
}],
"eventType": "UPDATE",
"lsn": 10503246616,
"schema": "public",
"table": "test_1"
}
DELETE的記錄的樣子:
{
"dataList": [{
"dataType": "integer",
"name": "id",
"value": "3"
}],
"eventType": "DELETE",
"lsn": 10503247064,
"schema": "public",
"table": "test_1"
}