mysql數據同步到mariadb ColumnStore,如何進行全量同步和增量同步?

MariaDB ColumnStore專爲大數據擴展而設計,可處理PB級數據,線性可伸縮性和出色的性能,並能對分析查詢進行實時響應。 它利用列式存儲,壓縮,即時投影以及水平和垂直分區的I / O優點在分析大型數據集時提供了出色的性能。(這是官方的介紹,https://mariadb.com/kb/en/mariadb-columnstore/mysql

本人自研發了一個mysql -> mariadb columnStore同步工具,同時支持全量同步和增量同步模式,github地址:https://github.com/yutianyong125/mcs_etlgit

接下來演示一下同步過程github


下載該項目sql

git clone https://github.com/yutianyong125/mcs_etl.git
cd mcs_etl

啓動mysql,方便起見,這裏使用docker,僅測試,不考慮數據掛載問題,這裏我把secure-file-priv配置的目錄掛載出來,是爲了導入數據的時候用到,另須要指定使用自定義的mysql配置文件docker

vim /tmp/conf.d/my.cnf

# 保存爲如下配置

[mysqld]
log_bin = mysql_bin # 開啓binlog日誌並指定binlog日誌命名前綴
binlog_format=ROW # 指定binlog格式
server_id = 1 # 指定master server_id
secure_file_priv=/tmp/mysql-files # SELECT INTO OUTFILE 語句須要開啓這個配置
mkdir /tmp/mysql-files # 先建立好存放導出數據的文件夾

docker run --rm -d --name mysql -v /tmp/conf.d:/etc/mysql/conf.d -v /tmp/mysql-files:/tmp/mysql-files -e MYSQL_ROOT_PASSWORD=123456 -p 3306:3306 mysql:5.7 --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci --secure-file-priv=/tmp/mysql-files

使用存儲過程添加測試數據數據庫

-- 建立test庫
create database test;
use test;

-- 建立user表

CREATE TABLE user(
  id INT NOT NULL AUTO_INCREMENT,
  uname VARCHAR(20) NOT NULL,
  sex VARCHAR(5) NOT NULL,
  score INT NOT NULL,
  copy_id INT NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=INNODB CHARSET=utf8;

-- 存儲過程插入10萬條數據

DROP PROCEDURE IF EXISTS add_user;  
DELIMITER //
    create PROCEDURE add_user(in num INT)
    BEGIN
        DECLARE rowid INT DEFAULT 0;
        DECLARE firstname CHAR(1);
        DECLARE name1 CHAR(1);
        DECLARE name2 CHAR(1);
        DECLARE sex CHAR(1);
        DECLARE score CHAR(2);
        DECLARE uname CHAR(3);
        WHILE rowid < num DO
        SET firstname = SUBSTRING('趙錢孫李周吳鄭王林楊柳劉孫陳江阮侯鄒高彭徐',FLOOR(1+21*RAND()),1); 
        SET name1 = SUBSTRING('一二三四五六七八九十甲乙丙丁靜景京晶名明銘敏閔民軍君俊駿天田甜兲恬益依成城誠立莉力黎勵',ROUND(1+43*RAND()),1); 
        SET name2 = SUBSTRING('一二三四五六七八九十甲乙丙丁靜景京晶名明銘敏閔民軍君俊駿天田甜兲恬益依成城誠立莉力黎勵',ROUND(1+43*RAND()),1); 
        SET sex=FLOOR(0 + (RAND() * 2));
        SET score= FLOOR(40 + (RAND() *60));
        SET rowid = rowid + 1;
        SET uname = CONCAT(firstname,name1,name2);
        insert INTO user (uname,sex,score,copy_id) VALUES (uname,sex,score,rowid);  
        END WHILE;
    END //
DELIMITER ;

call add_user(100000);

-- 建立test1庫

create database test1;
use test1;

-- 建立表t1

CREATE TABLE `t` (
  `id` int(11) NOT NULL,
  `a` int(11) DEFAULT NULL,
  `b` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `a` (`a`),
  KEY `b` (`b`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- 存儲過程插入數據

delimiter //
create procedure idata()
begin
  declare i int;
  set i=1;
  while(i<=100000)do
    insert into t values(i, i, i);
    set i=i+1;
  end while;
end //
delimiter ;
call idata();

啓動mariadb columnStorevim

docker run --rm -d --name mcs -eMARIADB_ROOT_PASSWORD=123456 -p 3307:3306 mariadb/columnstore:1.2.3

修改項目配置文件, conf/etl.toml數組

# 增量ETL配置段
[IncrementEtl]
  StartFile = "" # 開始同步的binlog文件
  StartPosition = 0 # 開始同步的binlog位置
  ServerId = 100 # 同步slave的serverId,只要不與master serverId相同便可

# 全量ETL配置段
[FullEtl]
  # mysql數據導出存放文件夾
  OutFileDir = "/tmp/mysql-files/"

# 全量ETL規則,Schema: 數據庫, Tables: table數組
# eg: 導出整個庫的表 tables=["*"],導出特定的表 tables=["test1", "test2"]
[[Rule]]
  Schema = "test"
  Tables = ["user"]
[[Rule]]
  Schema = "test1"
  Tables = ["*"]

# mysql 數據庫配置
[Source]
  Host = "127.0.0.1"
  Port = 3306
  User = "root"
  Pwd = "123456"

# mariadb columnStore 數據庫配置
[Target]
  Host = "127.0.0.1"
  Port = 3307
  User = "root"
  Pwd = "123456"

執行二進制文件,指定全量同步模式異步

./mcs_etl -model full

成功輸出以下,由於是多協程異步執行任務的,因此總耗時可能與etl時長最大的表相差無幾工具

同步`test1`.`t` 耗時 2.025725559s
同步`test`.`user` 耗時 4.358782152s
fullEtl 耗時 4.373473788s

增量方式進行同步步驟以下:

mysql運行 show master status 查看當前binlog日誌文件和Position,修改 conf/etl.toml 配置文件中的IncrementEtl配置段

# 增量ETL配置段
[IncrementEtl]
  StartFile = "mysql_bin.000004" # 開始同步的binlog文件
  StartPosition = 154 # 開始同步的binlog位置
  ServerId = 100 # 同步slave的serverId,只要不與master serverId相同便可

執行二進制文件,指定增量同步模式,此時會進入循環監聽等待消費狀態

./mcs_etl -model increment

接下來在mysql作一些增刪改操做

update `test`.`user` set sex = 0 where id = 1;

insert into `test1`.`t` values (100001, 100001, 100001);

delete from `test1`.`t` where id = 100001;

控制檯輸出日誌以下

binlog恢復的語句:
update `test`.`user` set `id` = '1',`uname` = '鄒二君',`sex` = '0',`score` = '59',`copy_id` = '1' where `id` = '1' and `uname` = '鄒二君' and `sex` = '1' and `score` = '59' and `copy_id` = '1'
==>
兼容處理轉換後的語句:
update `test`.`user` set `id` = '1',`uname` = '鄒二君',`sex` = '0',`score` = '59',`copy_id` = '1' where `id` = '1' and `uname` = '鄒二君' and `sex` = '1' and `score` = '59' and `copy_id` = '1'

執行結果:
執行成功

binlog恢復的語句:
insert into `test1`.`t` (`id`,`a`,`b`) values ('100001','100001','100001')
==>
兼容處理轉換後的語句:
insert into `test1`.`t` (`id`,`a`,`b`) values ('100001','100001','100001')

執行結果:
執行成功

binlog恢復的語句:
delete from `test1`.`t` where `id` = '100001' and `a` = '100001' and `b` = '100001'
==>
兼容處理轉換後的語句:
delete from `test1`.`t` where `id` = '100001' and `a` = '100001' and `b` = '100001'

執行結果:
執行成功
相關文章
相關標籤/搜索