MySQL數據實時增量同步到Redis


工具簡介

go-mysql-transfer是一款MySQL數據庫實時增量同步工具。 可以監聽MySQL二進制日誌(Binlog)的變更,將變動內容造成指定格式的消息,實時發送到接收端。從而在數據庫和接收端之間造成一個高性能、低延遲的增量數據同步更新管道。mysql

特性git

 1. 簡單,不依賴其它組件,一鍵部署
 2. 集成多種接收端,如:Redis、MongoDB、Elasticsearch、RocketMQ、Kafka、RabbitMQ、HTTP API等,無需編寫客戶端,開箱即用
 3. 內置豐富的數據解析、消息生成規則、模板語法
 4. 支持Lua腳本擴展,可處理複雜邏輯
 5. 集成Prometheus客戶端,支持監控告警
 6. 集成Web Admin監控頁面
 7. 支持高可用集羣部署
 8. 數據同步失敗重試
 9. 支持全量數據初始化github

github地址:go-mysql-transferredis

若是此工具對你有幫助,請在github中Star支持下sql

安裝部署

下載安裝包docker

您能夠直接下載編譯好的安裝包:  [點擊下載](https://github.com/wj596/go-mysql-transfer/releases)
最新版本:v1.0.4 release
也能夠自行編譯:
一、依賴Golang 1.14 及以上版本
二、設置 ' GO111MODULE=on '
三、拉取源碼 ' git clone https://github.com/wj596/go-mysql-transfer.git '
四、進入目錄,執行  ' go build '編譯數據庫

MySQL開啓Binlog、主從複製json

Linux操做系統在my.cnf文件,Windows操做系統在my.ini文件
按照下面配置開啓binlog和replaction,以下所示:app

log-bin=mysql-bin # 開啓 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 須要定義,不要和 go-mysql-transfer 的 slave_id 重複

運行
一、修改app.yml
二、命令行運行
Windows直接運行 go-mysql-transfer.exe
Linux執行 nohup go-mysql-transfer &ide

基於源碼構建docker鏡像
一、拉取源碼 ' git clone [https://github.com/wj596/go-mysql-transfer.git](https://github.com/wj596/go-mysql-transfer.git) '
二、修改配置文件 ' app.yml ' 中相關配置
三、構建鏡像 ' docker image build -t go-mysql-transfer -f Dockerfile .  '
四、運行 ' docker run -d --name go-mysql-transfer -p 8060:8060 go-mysql-transfer:latest '

基於二進制可執行文件構建docker鏡像
一、下載編譯好的安裝包:  [點擊下載](https://github.com/wj596/go-mysql-transfer/releases)
二、解壓,並修改配置文件 ' app.yml ' 中相關配置
三、構建鏡像 ' docker image build -t go-mysql-transfer -f Dockerfile .  '
四、運行 ' docker run -d --name go-mysql-transfer -p 8060:8060 go-mysql-transfer:latest '

Redis配置

支持Redis部署模式:單機、主從(哨兵)、集羣(cluster)
具體配置以下所示:

# app.yml
redis_addrs: 127.0.0.1:6379 #地址,多個用逗號分隔
#redis_group_type: cluster   # 集羣類型 sentinel或者cluster
#redis_master_name: mymaster # Master節點名稱,若是group_type爲sentinel則此項不能爲空,爲cluster此項無效
#redis_pass: 123456 #redis密碼
#redis_database: 0  #redis數據庫 0-16,默認0。若是group_type爲cluster此項無效

同步數據爲string類型

有t_user表,以下:

配置數據同步規則,以下:

rule:
  -
    schema: eseap #數據庫名稱
    table: t_user #表名稱
    column_underscore_to_camel: true #列名稱下劃線轉駝峯,默認爲false
    value_encoder: json  #值編碼
    redis_structure: string # 數據類型。 支持string、hash、list、set、sortedset類型(與redis的數據類型一致)
    redis_key_prefix: USER_ #key的前綴
    redis_key_column: USER_NAME #使用哪一個列的值做爲key,不填寫默認使用主鍵

同步到Redis的數據,以下:

 

 

 

 

 

同步數據爲list類型

有t_user表,以下:

配置數據同步規則,以下:

rule:
  -
    schema: eseap #數據庫名稱
    table: t_user #表名稱
    column_underscore_to_camel: true #列名稱下劃線轉駝峯,默認爲false
    value_formatter: '{{.ID}}|{{.USER_NAME}}' # 值格式化表達式,如:{{.ID}}|{{.USER_NAME}},{{.ID}}表示ID字段的值、{{.USER_NAME}}表示USER_NAME字段的值
    redis_structure: list 
    redis_key_value: user_list #key的值(固定值);當redis_structure爲hash、list、set、sortedset此值不能爲空

同步到Redis的數據,以下:

 

同步數據爲set類型

有t_user表,以下:

配置數據同步規則,以下:

rule:
    -
        schema: eseap #數據庫名稱
        table: t_user #表名稱
        column_underscore_to_camel: true #列名稱下劃線轉駝峯,默認爲false
        value_formatter: '{{.ID}}|{{.USER_NAME}}' # 值格式化表達式,如:{{.ID}}|{{.USER_NAME}},{{.ID}}表示ID字段的值、{{.USER_NAME}}表示USER_NAME字段的值
        redis_structure: set
        redis_key_value: user_set #key的值(固定值);當redis_structure爲hash、list、set、sortedset此值不能爲空

同步到Redis的數據,以下:

 

同步數據爲order set類型

有t_user表,以下:

配置數據同步規則,以下:

rule:
    -
        schema: eseap #數據庫名稱
        table: t_user #表名稱
        column_underscore_to_camel: true #列名稱下劃線轉駝峯,默認爲false
        value_formatter: '{{.ID}}|{{.USER_NAME}}' # 值格式化表達式,如:{{.ID}}|{{.USER_NAME}},{{.ID}}表示ID字段的值、{{.USER_NAME}}表示USER_NAME字段的值
        redis_structure: sortedset
        redis_key_value: users #key的值(固定值);當redis_structure爲hash、list、set、sortedset此值不能爲空
        redis_sorted_set_score_column: CREATE_TIME  #sortedset的score,當數據類型爲sortedset時,此項不能爲空,此項的值應爲數字類型

同步到Redis的數據,以下:

 

 

 

同步數據爲hash類型

有t_user表,以下:

配置數據同步規則,以下:

rule:
    -
        schema: eseap #數據庫名稱
        table: t_user #表名稱
        column_underscore_to_camel: true #列名稱下劃線轉駝峯,默認爲false
        value_encoder: json  #值編碼,支持json、kv-commas、v-commas
        redis_structure: hash 
        redis_key_value: user_cache #key的值(固定值);當redis_structure爲hash、list、set、sortedset此值不能爲空
        redis_hash_field_prefix: user_name_ #hash的field前綴,僅redis_structure爲hash時起做用
        redis_hash_field_column: user_name #使用哪一個列的值做爲hash的field,僅redis_structure爲hash時起做用,不填寫默認使用主鍵

同步到Redis的數據,以下:

 

使用Lua腳本實現複雜數據轉換邏輯

示例一,同時插入多條記錄

有t_user表,以下:

定義Lua腳本,以下:

-- t_user_redis.lua
 
local json = require("json")   -- 加載json模塊
local ops = require("redisOps") --加載redis操做模塊
 
local row = ops.rawRow()  --數據庫當前變動的一行數據,table類型,key爲列名稱
local action = ops.rawAction()  --當前數據庫事件,包括:insert、updare、delete
 
local id = row["ID"] --獲取ID列的值
local userName = row["USER_NAME"] --獲取USER_NAME列的值
local key = "user_"..id -- 定義key
 
if action == "delete" -- 刪除事件
then
	ops.DEL(key) 
	ops.SREM("user_set",userName) 
else 
	local password = row["PASSWORD"] --獲取USER_NAME列的值
	local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值
	local result = {}  -- 定義一個table
	result["id"] = id
	result["userName"] = userName
	result["password"] = password
	result["createTime"] = createTime
	result["source"] = "binlog" -- 數據來源
	local val = json.encode(result) -- 將newTable轉爲json
	ops.SET(key,val) -- 對應Redis的SET命令,第一個參數爲key(支持string類型),第二個參數爲value
	
	if action == "update" -- 修改事件
	then
		local oldRow = ops.rawOldRow()  --數據庫變動以前的數據(修改以前的數據)
		local oldUserName = oldRow["USER_NAME"] --獲取USER_NAME列的值
		ops.SREM("user_set",oldUserName) -- 刪除舊值
	end
	
	ops.SADD("user_set",userName) -- 對應Redis的SADD命令,第一個參數爲key(支持string類型),第二個參數爲value
end

在數據同步規則中引入腳本,以下:

rule:
    -
        schema: eseap
        table: t_user
        lua_file_path: t_user_redis.lua   #lua腳本文件

同步到Redis的數據,以下:

 

 

 

 

 

示例二,將talbe中的一行映射成一個HASH

有t_user表,以下:

定義Lua腳本,以下:

-- t_user_redis2.lua
 
local ops = require("redisOps") --加載redis操做模塊
 
local row = ops.rawRow()  --當前數據庫的一行數據,table類型,key爲列名稱
local action = ops.rawAction()  --當前數據庫事件,包括:insert、updare、delete
 
if action == "insert" -- 只監聽insert事件
then
    local key = row["USER_NAME"] --獲取USER_NAME列的值
   
    local id = row["ID"] --獲取ID列的值
    local userName = row["USER_NAME"] --獲取USER_NAME列的值
    local password = row["PASSWORD"] --獲取PASSWORD列的值
    local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值
    
    ops.HSET(key,"id",id) -- 對應Redis的HSET命令
    ops.HSET(key,"userName",userName) -- 對應Redis的HSET命令
    ops.HSET(key,"password",password) -- 對應Redis的HSET命令
    ops.HSET(key,"createTime",createTime) -- 對應Redis的HSET命令
end

在數據同步規則中引入腳本,以下:

rule:
    -
        schema: eseap
        table: t_user
        lua_file_path: t_user_redis2.lua   #lua腳本文件

同步到Redis的數據,以下:

相關文章
相關標籤/搜索