將postgresql中的數據實時同步到kafka中

參考地址:https://blog.csdn.net/weixin_33985507/article/details/92460419html

參考地址:https://mp.weixin.qq.com/s/sccRf9u0MWnHMsnXjlcRGggit

1、安裝kafkacat 

kafkacat 是一個C語言編寫的 kafka 生產者、消費者程序。github

安裝kafkacat 以前,須要安裝一下依賴sql

sudo apt-get install librdkafka-dev libyajl-dev

2、重點是安裝avro-c

安裝avro-c的依賴數據庫

 

 

(1)、 其中安裝libcur時會出錯,所以先執行apache

sudo apt-get install libjansson-dev

(2)、接着安裝aptitude(若沒有安裝)json

apt install aptitude

(3)、安裝curlbootstrap

tar jxvf  curl-7.66.0.tar.bz2
cd curl-7.66.0
./configure
make
make insall

安裝完成以後將curl-7.66.0/include/curl 目錄拷貝到/usr/include目錄下面(須要包含curl 目錄)vim

sudo cp -r /home/yzh/curl-7.66.0/include/curl /usr/include

(4)、安裝zlibapp

sudo apt install zlib1g-dev

(5)、安裝snappy

sudo apt install libsnappy-dev

(6)、安裝PkgConfig

sudo apt install pkg-config

(7)、安裝liblzma

sudo apt install liblzma-dev

(8)、安裝cmake

tar zxvf cmake-3.15.3.tar.gz
cd cmake-3.15.3
./bootstrap
make
make install

cmake -version
cmake version 3.15.3
CMake suite maintained and supported by Kitware (kitware.com/cmake).

(9)、安裝avro-c

須要root用戶

tar -zvxf avro-c-1.9.1.tar.gz
cd avro-c-1.9.1/
mkdir build
cd build
cmake .. -DCMAKE_INSTALL_PREFIX=/opt/avro -DCMAKE_BUILD_TYPE=Release -DTHREADSAFE=true
make
make test
make install

導入庫文件

# vi /etc/ld.so.conf
/opt/avro/lib

# ldconfig

安裝完成以後,須要將/opt/avro(安裝時指定的路徑 )中的相關文件拷貝到/usr相關路徑下面

cp -r /opt/avro/lib/* /usr/lib
cp -r/opt/avro/include /usr/include

3、安裝libserdes

git clone https://github.com/confluentinc/libserdes

cd libserdes 
./configure
make 
sudo make install 

4、安裝kafkacat

git clone https://github.com/edenhill/kafkacat

./configure
make
sudo make install

安裝以後,須要添加環境變量

sudo vim /etc/profile

exoprt LD_LIBRARY_PATH=/usr/local/lib
export PATH=$PATH:$LD_LIBRARY_PATH

5、安裝wal2json

git clone https://github.com/eulerto/wal2json

 cd wal2json

make 
sudo make install

6、修改postgresql相關配置文件

posgresql.conf

shared_preload_libraries = 'wal2json'
wal_level = logical
max_wal_senders = 4 
max_replication_slots = 4

建立具備Replication和Login受權的用戶

CREATE ROLE <name> WITH REPLICATION PASSWORD 'password' LOGIN;

修改pg_hba.conf,使該用戶能夠遠程或本地訪問數據庫

############ REPLICATION ##############
local   replication     <name>                              trust
host    replication     <name>    127.0.0.1/32     trust host    replication     <name>    ::1/128              trust

7、測試

一、創建測試環境(建立的表必需要有主鍵

CREATE DATABASE test;

CREATE TABLE test_table (
    id char(10) NOT NULL,
    code        char(10),
    PRIMARY KEY (id)
);

二、建立slot

pg_recvlogical   -h localhost -p 5432 -U postgres -d testdb --slot test_slot --create-slot -P wal2json

三、啓動zookeeper、kafka(略)

五、啓動slot

pg_recvlogical -h localhost -p 5432 -U postgres -W  -d testdb -S test_slot(對應建立的slot) --start -f - | kafkacat -b 127.0.0.1:9092 -t testdb_topic

六、消費testdb_topic

bin/kafka-console-consumer.sh --topic testdb_topic --bootstrap-server 127.0.0.1:9092 --from-beginning 

相關文章
相關標籤/搜索