kafka 安裝java
wget http://mirror.bit.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgznode
cp kafka_2.12-2.0.1.tgz kafka.tgzgit
sudo tar xzvf kafka.tgz --directory=/opt/java/kafka --strip 1github
啓動 kafka,須要先啓動本地的 zookeeper,注意修改配置文件中zk的鏈接地址web
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.propertiessql
kafkacat 是一個C語言編寫的 kafka 生產者、消費者程序。安裝過程須要一些庫可能須要手動下載。apache
postgres 邏輯解碼, 程序 jsoncdcjson
jsoncdc 依賴於rust,可能須要先安裝 rust 或者能夠使用 wal2json替代bootstrap
編譯好以後本地目錄下有 jsoncdc.so 或者 wal2json.sovim
postgres 安裝解碼插件:
vim $PGDATA/postgresql.conf
shared_preload_libraries = 'jsoncdc.so'
安裝完成插件
postgres 插入數據
生產數據到本地 kafka
/opt/bin/pgsql/pg_10/bin/pg_recvlogical -d postgres -S jsoncdc --start -o pretty-print=1 -f - | ./kafkacat/kafkacat -b 127.0.0.1:9092 -t pg
Auto-selecting Producer mode (use -P or -C to override)
消費數據測試:
./kafkacat/kafkacat -b 127.0.0.1:9092 -t pg
kafka自消費實驗:
啓動zk,我這邊有zk服務器,所以不須要啓動:
bin/zookeeper-server-start.sh config/zookeeper.properties &
啓動kafka:
bin/kafka-server-start.sh config/server.properties
—在裏面修改zk鏈接信息
建立topic:
bin/kafka-topics.sh --create --zookeeper --replication-factor 1 --partitions 1 --topic test
建立一個叫作「test」的topic,它只有一個分區,一個副本。
> bin/kafka-topics.sh --create --zookeeper 10.9.5.20:4119,10.9.5.21:4119,10.9.5.22:4119,10.9.5.24:4119,10.9.5.35:4119,10.9.5.36:4119 --replication-factor 1 --partitions 1 --topic test
能夠經過list命令查看建立的topic:
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test
運行producer並在控制檯中輸一些消息,這些消息將被髮送到服務端:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a messageThis is another message
ctrl+c能夠退出發送。
消費消息:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer.config config/server.properties --topic test --from-beginning
接下來想寫一個nodejs程序,將pg的變化動態回放到web中,顯示出來。