ClickHouse 實戰筆記 第01期:Kafka 數據同步到 ClickHouse

ClickHouse 實戰筆記 第01期:Kafka 數據同步到 ClickHouse

馬聽 悅專欄
ClickHouse 實戰筆記 第01期:Kafka 數據同步到 ClickHousehtml

做者簡介

馬聽,多年 DBA 實戰經驗,對 MySQL、 Redis、ClickHouse 等數據庫有必定了解,專欄《一線數據庫工程師帶你深刻理解 MySQL》、《Redis 運維實戰》做者。java

從這一節開始,將經過幾期跟各位分享個人一些 ClickHouse 實戰筆記。
這一期首先聊聊 Kafka 數據同步到 ClickHouse 的其中一個方案:經過 Kafka 引擎方式同步,下面進入實際操做過程(環境:CentOS7.4):linux

1 Kafka 基礎環境搭建

由於主要是爲了測試數據同步,所以 Kafka 只簡單安裝了單機版本。數據庫

1.1 安裝 JDK

cd /usr/src

在這裏[https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html]選擇合適的 JDK 版本,並下載。apache

tar zxvf jdk-8u261-linux-x64.tar.gz
mv jdk1.8.0_261/ java

編輯 /etc/profilebootstrap

vim /etc/profile

加入如下內容:vim

JAVA_HOME=/usr/src/java
PATH=$PATH:$JAVA_HOME/bin
export JAVA_HOME PATH

執行markdown

source /etc/profile

1.2 安裝 kafka

cd /usr/src

在這裏[http://archive.apache.org/dist/kafka/2.0.0/]選擇合適的 kafka 版本,並下載。oracle

tar zxvf kafka_2.11-2.0.0.tgz
mv kafka_2.11-2.0.0 kafka

1.3 啓動 zk

cd /usr/src/kafka
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

1.4 啓動 kafka

nohup ./bin/kafka-server-start.sh config/server.properties &

1.5 建立 topics

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

1.6 查看 topics

./bin/kafka-topics.sh --list --zookeeper localhost:2181

ClickHouse 實戰筆記 第01期:Kafka 數據同步到 ClickHouse

1.7 產生消息

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

執行完上面命令後,會出現下面的窗口:
ClickHouse 實戰筆記 第01期:Kafka 數據同步到 ClickHouse
而後在 > 後面輸入須要產生的消息,以下:
ClickHouse 實戰筆記 第01期:Kafka 數據同步到 ClickHouse運維

1.8 消費消息

另外開一個鏈接窗口,執行:

cd /usr/src/kafka/
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

ClickHouse 實戰筆記 第01期:Kafka 數據同步到 ClickHouse
能夠看到在 1.7 步驟生成的消息。

2 安裝 ClickHouse

ClickHouse 單機版安裝參考:https://clickhouse.tech/docs/zh/getting-started/install/

3 建立消費表

在 ClickHouse 上建立 kafka 消費表
登陸 ClickHouse

clickhouse-client

進行建庫建表操做:

create database kafka_data;

use kafka_data;

create table kafka_queue(id UInt32,code String,name String)engine =Kafka() settings kafka_broker_list = 'localhost:9092',kafka_topic_list='test',kafka_group_name='group1',kafka_format='JSONEachRow',kafka_skip_broken_messages=100;

注:

  • kafka_broker_list:kafka 的鏈接地址和端口。
  • kafka_topic_list:kafka 的 topic 名。
  • kafka_group_name:kafka 的組名。
  • kafka_format:表示用於解析消息的數據格式,消息發送端必須按此格式發送消息。
  • kafka_skip_broken_messages:當解析數據出現錯誤時,運行跳過失敗的數據行數。

4 建立存儲表

由於 Kafka 消費表不能直接做爲結果表使用。Kafka 消費表只是用來消費Kafka數據,沒有真正的存儲全部數據,只要查詢一次,數據就會清空。所以須要在 ClickHouse 中建立存儲表保存數據。
在 ClickHouse 上建立存儲表:

create table kafka_table(id UInt32,code String,name String) engine=MergeTree() order by id

5 建立數據同步視圖

建立 view 把 kafka 消費表消費到的數據導入 ClickHouse 存儲表:

create materializedview consumer to kafka_table as select id,code,name from kafka_queue

6 測試數據同步

/usr/src/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

輸入:

{"id":2,"code":"two","name":"aa"}

確認 ClickHouse 存儲表是否能正常獲取到數據

select * from kafka_table

7 其餘維護操做

中止數據同步,能夠刪除視圖

drop table consumer

或者卸載

detach table consumer

卸載以後,若是想再次恢復,可使用:

attach materialized view consumer to kafka_table(id UInt32,code String,name String)as select id,code,name from kafka_queue

8 存在的問題

經過 Kafka 引擎進行數據同步的方式儘管很方便,可是在實戰過程當中發現,Kafka 吐出來的數據不必定會是 {"id":2,"code":"two","name":"aa"} 這類格式,這種狀況能夠考慮使用另一種方案:藉助 Flume 實現 Kafka 到 CH 的同步,這個方案將在後續的文章中進行介紹。

相關文章
相關標籤/搜索