go-mysql-transfer是一款MySQL數據庫實時增量同步工具。 可以監聽MySQL二進制日誌(Binlog)的變更,將變動內容造成指定格式的消息,實時發送到接收端。從而在數據庫和接收端之間造成一個高性能、低延遲的增量數據同步更新管道。mysql
特性git
1. 簡單,不依賴其它組件,一鍵部署
2. 集成多種接收端,如:Redis、MongoDB、Elasticsearch、RocketMQ、Kafka、RabbitMQ、HTTP API等,無需編寫客戶端,開箱即用
3. 內置豐富的數據解析、消息生成規則、模板語法
4. 支持Lua腳本擴展,可處理複雜邏輯
5. 集成Prometheus客戶端,支持監控告警
6. 集成Web Admin監控頁面
7. 支持高可用集羣部署
8. 數據同步失敗重試
9. 支持全量數據初始化github
github地址:go-mysql-transferredis
若是此工具對你有幫助,請在github中Star支持下sql
安裝部署下載安裝包docker
您能夠直接下載編譯好的安裝包: [點擊下載](https://github.com/wj596/go-mysql-transfer/releases)
最新版本:v1.0.4 release
也能夠自行編譯:
一、依賴Golang 1.14 及以上版本
二、設置 ' GO111MODULE=on '
三、拉取源碼 ' git clone https://github.com/wj596/go-mysql-transfer.git '
四、進入目錄,執行 ' go build '編譯數據庫
MySQL開啓Binlog、主從複製json
Linux操做系統在my.cnf文件,Windows操做系統在my.ini文件
按照下面配置開啓binlog和replaction,以下所示:app
log-bin=mysql-bin # 開啓 binlog binlog-format=ROW # 選擇 ROW 模式 server_id=1 # 配置 MySQL replaction 須要定義,不要和 go-mysql-transfer 的 slave_id 重複
運行
一、修改app.yml
二、命令行運行
Windows直接運行 go-mysql-transfer.exe
Linux執行 nohup go-mysql-transfer &ide
基於源碼構建docker鏡像
一、拉取源碼 ' git clone [https://github.com/wj596/go-mysql-transfer.git](https://github.com/wj596/go-mysql-transfer.git) '
二、修改配置文件 ' app.yml ' 中相關配置
三、構建鏡像 ' docker image build -t go-mysql-transfer -f Dockerfile . '
四、運行 ' docker run -d --name go-mysql-transfer -p 8060:8060 go-mysql-transfer:latest '
基於二進制可執行文件構建docker鏡像
一、下載編譯好的安裝包: [點擊下載](https://github.com/wj596/go-mysql-transfer/releases)
二、解壓,並修改配置文件 ' app.yml ' 中相關配置
三、構建鏡像 ' docker image build -t go-mysql-transfer -f Dockerfile . '
四、運行 ' docker run -d --name go-mysql-transfer -p 8060:8060 go-mysql-transfer:latest '
支持Redis部署模式:單機、主從(哨兵)、集羣(cluster)
具體配置以下所示:
# app.yml redis_addrs: 127.0.0.1:6379 #地址,多個用逗號分隔 #redis_group_type: cluster # 集羣類型 sentinel或者cluster #redis_master_name: mymaster # Master節點名稱,若是group_type爲sentinel則此項不能爲空,爲cluster此項無效 #redis_pass: 123456 #redis密碼 #redis_database: 0 #redis數據庫 0-16,默認0。若是group_type爲cluster此項無效同步數據爲string類型
有t_user表,以下:
配置數據同步規則,以下:
rule: - schema: eseap #數據庫名稱 table: t_user #表名稱 column_underscore_to_camel: true #列名稱下劃線轉駝峯,默認爲false value_encoder: json #值編碼 redis_structure: string # 數據類型。 支持string、hash、list、set、sortedset類型(與redis的數據類型一致) redis_key_prefix: USER_ #key的前綴 redis_key_column: USER_NAME #使用哪一個列的值做爲key,不填寫默認使用主鍵
同步到Redis的數據,以下:
同步數據爲list類型
有t_user表,以下:
配置數據同步規則,以下:
rule: - schema: eseap #數據庫名稱 table: t_user #表名稱 column_underscore_to_camel: true #列名稱下劃線轉駝峯,默認爲false value_formatter: '{{.ID}}|{{.USER_NAME}}' # 值格式化表達式,如:{{.ID}}|{{.USER_NAME}},{{.ID}}表示ID字段的值、{{.USER_NAME}}表示USER_NAME字段的值 redis_structure: list redis_key_value: user_list #key的值(固定值);當redis_structure爲hash、list、set、sortedset此值不能爲空
同步到Redis的數據,以下:
同步數據爲set類型有t_user表,以下:
配置數據同步規則,以下:
rule: - schema: eseap #數據庫名稱 table: t_user #表名稱 column_underscore_to_camel: true #列名稱下劃線轉駝峯,默認爲false value_formatter: '{{.ID}}|{{.USER_NAME}}' # 值格式化表達式,如:{{.ID}}|{{.USER_NAME}},{{.ID}}表示ID字段的值、{{.USER_NAME}}表示USER_NAME字段的值 redis_structure: set redis_key_value: user_set #key的值(固定值);當redis_structure爲hash、list、set、sortedset此值不能爲空
同步到Redis的數據,以下:
同步數據爲order set類型有t_user表,以下:
配置數據同步規則,以下:
rule: - schema: eseap #數據庫名稱 table: t_user #表名稱 column_underscore_to_camel: true #列名稱下劃線轉駝峯,默認爲false value_formatter: '{{.ID}}|{{.USER_NAME}}' # 值格式化表達式,如:{{.ID}}|{{.USER_NAME}},{{.ID}}表示ID字段的值、{{.USER_NAME}}表示USER_NAME字段的值 redis_structure: sortedset redis_key_value: users #key的值(固定值);當redis_structure爲hash、list、set、sortedset此值不能爲空 redis_sorted_set_score_column: CREATE_TIME #sortedset的score,當數據類型爲sortedset時,此項不能爲空,此項的值應爲數字類型
同步到Redis的數據,以下:
同步數據爲hash類型有t_user表,以下:
配置數據同步規則,以下:
rule: - schema: eseap #數據庫名稱 table: t_user #表名稱 column_underscore_to_camel: true #列名稱下劃線轉駝峯,默認爲false value_encoder: json #值編碼,支持json、kv-commas、v-commas redis_structure: hash redis_key_value: user_cache #key的值(固定值);當redis_structure爲hash、list、set、sortedset此值不能爲空 redis_hash_field_prefix: user_name_ #hash的field前綴,僅redis_structure爲hash時起做用 redis_hash_field_column: user_name #使用哪一個列的值做爲hash的field,僅redis_structure爲hash時起做用,不填寫默認使用主鍵
同步到Redis的數據,以下:
使用Lua腳本實現複雜數據轉換邏輯有t_user表,以下:
定義Lua腳本,以下:
-- t_user_redis.lua local json = require("json") -- 加載json模塊 local ops = require("redisOps") --加載redis操做模塊 local row = ops.rawRow() --數據庫當前變動的一行數據,table類型,key爲列名稱 local action = ops.rawAction() --當前數據庫事件,包括:insert、updare、delete local id = row["ID"] --獲取ID列的值 local userName = row["USER_NAME"] --獲取USER_NAME列的值 local key = "user_"..id -- 定義key if action == "delete" -- 刪除事件 then ops.DEL(key) ops.SREM("user_set",userName) else local password = row["PASSWORD"] --獲取USER_NAME列的值 local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值 local result = {} -- 定義一個table result["id"] = id result["userName"] = userName result["password"] = password result["createTime"] = createTime result["source"] = "binlog" -- 數據來源 local val = json.encode(result) -- 將newTable轉爲json ops.SET(key,val) -- 對應Redis的SET命令,第一個參數爲key(支持string類型),第二個參數爲value if action == "update" -- 修改事件 then local oldRow = ops.rawOldRow() --數據庫變動以前的數據(修改以前的數據) local oldUserName = oldRow["USER_NAME"] --獲取USER_NAME列的值 ops.SREM("user_set",oldUserName) -- 刪除舊值 end ops.SADD("user_set",userName) -- 對應Redis的SADD命令,第一個參數爲key(支持string類型),第二個參數爲value end
在數據同步規則中引入腳本,以下:
rule: - schema: eseap table: t_user lua_file_path: t_user_redis.lua #lua腳本文件
同步到Redis的數據,以下:
有t_user表,以下:
定義Lua腳本,以下:
-- t_user_redis2.lua local ops = require("redisOps") --加載redis操做模塊 local row = ops.rawRow() --當前數據庫的一行數據,table類型,key爲列名稱 local action = ops.rawAction() --當前數據庫事件,包括:insert、updare、delete if action == "insert" -- 只監聽insert事件 then local key = row["USER_NAME"] --獲取USER_NAME列的值 local id = row["ID"] --獲取ID列的值 local userName = row["USER_NAME"] --獲取USER_NAME列的值 local password = row["PASSWORD"] --獲取PASSWORD列的值 local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值 ops.HSET(key,"id",id) -- 對應Redis的HSET命令 ops.HSET(key,"userName",userName) -- 對應Redis的HSET命令 ops.HSET(key,"password",password) -- 對應Redis的HSET命令 ops.HSET(key,"createTime",createTime) -- 對應Redis的HSET命令 end
在數據同步規則中引入腳本,以下:
rule: - schema: eseap table: t_user lua_file_path: t_user_redis2.lua #lua腳本文件
同步到Redis的數據,以下: