用canal監控binlog並實現mysql定製同步數據的功能

業務背景

寫任何工具都不能脫離實際業務的背景。開始這個項目的時候是由於現有的項目中數據分佈太零碎,零零散散的分佈在好幾個數據庫中,沒有統一的數據庫來收集這些數據。這種狀況下想作一個大而全的會員中心繫統比較困難。(這邊是一個以互聯網保險爲中心的項目,保單,會員等數據很零散的儲存在好幾個項目之中,而且項目之間的數據基本上是隔離的)。html

現有的項目數據庫是在騰訊雲中儲存,雖然騰訊提供了數據同步功能,可是這樣必需要表結構相同才行,並不符合咱們的需求。因此須要自行開發。java

 

項目在這裏:https://github.com/hjx601496320/miner。mysql

需求

1:須要能靈活配置。git

2:實時數據10分鐘內但願能夠完成同步。github

3:來源數據與目標數據可能結構,字段名稱不一樣。spring

4:增刪改均可以同步。sql

技術選擇

這個任務交給了我和另一個同事來作。數據庫

同事的

同事但願能夠經過ETL工具Kettle來作,這個東西我沒有研究過,是同事本身在研究。具體過程不是很清楚,可是最後是經過在mysql中設置更新,修改,刪除的觸發器,而後在Kettle中作了一個定時任務,實現了數據同步的功能,初步測試符合需求。可是必需要在數據庫中設置觸發器,而且會有一個臨時表,這一點我我的不是很喜歡。apache

個人

我是本着能本身寫就本身寫的原則emoji 的图像结果,準備本身寫一個。剛開始使用的是定時任務比較兩個庫的數據差異,而後再同步數據。可是通過必定的數據測試後,發如今數據量大的時候,定時任務中的上一個任務沒有執行完畢,下一個任務就又開始了。這樣形成了兩邊數據不一致。最終這個方案廢棄了。json

後來經過研究,發現mysql的數據操做會記錄在binlog中,這時就有了新的方案。能夠經過逐行獲取binlog信息,通過解析數據後,同步在目標庫中。

既然有了方案,那麼就開始作吧。

開始嘗試:1

首先要打開數據庫的binlog功能,這一步比較簡單,修改mysql的配置文件:/etc/mysql/mysql.conf.d/mysqld.cnf,添加:

server-id        = 1
log_bin            = /var/log/mysql/mysql-bin.log
expire_logs_days    = 10
max_binlog_size         = 100M
binlog_format           = ROW

  

 

而後重啓mysql 就行了,具體每一個參數的意思,搜索一下就行了。這時候隨意的對某一個數據庫中的表作一下增刪改,對應的日誌就會記錄在/var/log/mysql/這個文件夾下了。咱們看一下這個文件夾裏的東西:

 

這裏的文件是沒有辦法正常查看的,須要使用mysql提供的命令來查看,命令是這個樣子的:

1:查看
mysqlbinlog mysql-bin.000002
2:指定位置查看
mysqlbinlog --start-position="120" --stop-position="332" mysql-bin.000002

  

由於咱們如今的binlog_format指定的格式是ROW(就在上面寫的,還記得嗎?),所謂binlog文件的內容沒有辦法正常查看,由於他是這個樣子的:

 

這時,咱們須要:

對輸出進行解碼
mysqlbinlog --base64-output=decode-rows -v mysql-bin.000001

  

這時候,顯示的結果就變成了:

 

雖然還不是正常的sql,可是好賴是有必定的格式了。

but本身來作解析的話仍是很麻煩,so~放棄這種操做。

繼續嘗試:2

通過再次研究後,發現數據庫中執行sql也是能夠查看binlog的。主要有以下幾條命令:

重置binlog
reset master;

查看binlog的配置
show variables like '%binlog%';

查看全部的binlog
show binary logs;

查看正在寫入的binlog
show master status;

查看指定binlog文件
show binlog events in 'mysql-bin.000001';

查看指定binlog文件,並指定位置
show binlog events in 'mysql-bin.000001' from [pos] limit [顯示多少條];

  

按照上面的命令執行結果爲:

 

發現sql仍是不能正常顯示。這裏的緣由應該是binlog_format配置的緣由。將其修改成 binlog_format=Mixed後,完美解決。通過數據庫中一通增刪改後,顯示的sql相似這樣:

use `pay`; /* ApplicationName=DataGrip 2018.2.5 */ UPDATE `pay`.`p_pay_log` t SET t.`mark_0` = 'sdfsdf' WHERE t.`id` LIKE '342' ESCAPE '#'

  

如今彷佛已經能夠開始寫數據同步了,只要在啓動的時候獲取當正在使用的是哪個日誌文件,記錄binlog的位置,而後一點一點向下執行,解析sql就行了。可是在這個過程當中,我發現阿里巴巴有一款開源的軟件能夠用。就是標題上說道的:canal。看了一下網站上的介紹,簡直美滋滋。

它的文檔和代碼地址在這裏:https://github.com/alibaba/canal,你們能夠看一下。如今就準備用這個來完成我所須要的功能。

正式開始寫

首先看一下介紹,canal是須要單獨運行一個服務的,這個服務具體的配置仍是比較簡單的。它的做用我本身理解就是監控binlog,而後根據本身的須要獲取binlog中必定量的數據。這個數據是通過處理的,能夠比較方便的知道里面的具體信息。好比那些數據發生了變更,每列數據的列名是什麼,變更前和變更後的值是啥之類的。那麼開始。

1:個人想法

1):項目啓動的時候,開啓canal的連接,以及初始化一些配置。

@Bean
public CanalConnector canalConnector() {
    CanalConnector connector = CanalConnectors.newSingleConnector(
            //對應canal服務的連接
            new InetSocketAddress(canalConf.getIp(), canalConf.getPort()),
            //連接的目標,這裏對應canal服務中的配置,須要查閱文檔
            canalConf.getDestination(), 
            //不知道是什麼用戶,使用「」
            canalConf.getUser(), 
            //不知道是什麼密碼,使用「」
            canalConf.getPassword()
    );
    return connector;
}

  

2):先開啓一個線程,裏面寫一個死循環,用於從canal的服務中獲取binlog中的消息。這個消息類是:com.alibaba.otter.canal.protocol.Message。

Message message = connector.getWithoutAck(100);

connector:canal連接的實例化對象。
connector.getWithoutAck(100):從鏈接中獲取100條binlog中的數據。

  

3):取出Message中的事件集合,就是binlog中的每一條數據。將類型爲增刪改的數據取出,以後每一條數據放在一個線程中,用線程池去執行它。

List<Entry> entries = message.getEntries();

message.getEntries():從連接中獲取的數據集合,每一條表明1條binlog數據

  

4):在每個線程中,取出Entry中的數據,根據其類型拼接各類sql,並執行。

Header header = entry.getHeader();
獲取發生變化的表名稱,可能會沒有
String tableName = header.getTableName();

獲取發生變化的數據庫名稱,可能會沒有
String schemaName = header.getSchemaName();

//獲取事件類型
EventType eventType = rowChange.getEventType();
這裏咱們只是用其中的三種類型:
    EventType.DELETE 刪除
    EventType.INSERT 插入
    EventType.UPDATE 更新

//獲取發生變化的數據
RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());

//遍歷其中的數據
int rowDatasCount = rowChange.getRowDatasCount();
for (int i = 0; i < rowDatasCount; i++) {
    //每一行中的數據
    RowData rowData = rowChange.getRowDatas(i);
}

//獲取修改前的數據
List<Column> before = rowData.getBeforeColumnsList();

//獲取修改後的數據
List<Column> after = rowData.getAfterColumnsList();

Column中有一系列方法,好比是否發生修改,時候爲key,是不是null等,就不在細說了。

  

2:萬事具有,能夠開始寫了

1):這裏先寫一個線程,用於不停的從canal服務中獲取消息,而後建立新的線程並讓其處理其中的數據。代碼以下:

@Override
public void run() {
    while (true) {
        //主要用於在連接失敗後用於再次嘗試從新連接
        try {
            if (!run) {
                
                //打開連接,並設置 run=true
                startCanal();
            }
        } catch (Exception e) {

            System.err.println("鏈接失敗,嘗試從新連接。。。");
            threadSleep(3 * 1000);
        }
        System.err.println("連接成功。。。");
        //不停的從CanalConnector中獲取消息
        try {
            while (run) {
                
                //獲取必定數量的消息,這裏爲線程池數量×3
                Message message = connector.getWithoutAck(batchSize * 3);
                long id = message.getId();

                //處理獲取到的消息
                process(message);
                connector.ack(id);
            }
        } catch (Exception e) {
            System.err.println(e.getMessage());
        } finally {
            //若是發生異常,最終關閉鏈接,並設置run=false
            stopCanal();
        }
    }

}

  

void process(Message message) {
    List<Entry> entries = message.getEntries();
    if (entries.size() <= 0) {
        return;
    }
    log.info("process message.entries.size:{}", entries.size());
    for (Entry entry : entries) {
        Header header = entry.getHeader();
        String tableName = header.getTableName();
        String schemaName = header.getSchemaName();

        //這裏判斷是否能夠取出數據庫名稱和表名稱,若是不行,跳過循環
        if (StringUtils.isAllBlank(tableName, schemaName)) {
            continue;
        }

        //建立新的線程,並執行
        jobList.stream()
                .filter(job -> job.isMatches(tableName, schemaName))
                .forEach(job -> executorService.execute(job.newTask(entry)));
    }
}

  

這裏的jobList是我本身定義List<Job>,代碼以下:

package com.hebaibai.miner.job;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;

import static com.alibaba.otter.canal.protocol.CanalEntry.Entry;

@Slf4j
@Data
public abstract class Job {


    /**
     * 數據庫連接
     */
    protected JdbcTemplate jdbcTemplate;

    /**
     * 額外配置
     */
    protected JSONObject prop;

    /**
     * 校驗目標是否爲合適的數據庫和表
     *
     * @param table
     * @param database
     * @return
     */
    abstract public boolean isMatches(String table, String database);

    /**
     * 實例化一個Runnable
     *
     * @param entry
     * @return
     */
    abstract public Runnable newTask(final Entry entry);


    /**
     * 獲取RowChange
     *
     * @param entry
     * @return
     */
    protected CanalEntry.RowChange getRowChange(Entry entry) {
        try {
            return CanalEntry.RowChange.parseFrom(entry.getStoreValue());
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
        return null;
    }

}

  

jobList裏面放的是Job的實現類。

3:寫一個Job的實現類,並用於同步表,並轉換字段名稱。

由於需求中要求兩個同步的數據中可能字段名稱不一致,因此我寫了一個josn用來配置兩個表的字段對應關係:

別的配置
。。。
"prop": {
//來源數據庫
  "database": "pay",
//來源表
  "table": "p_pay_msg",
//目標表(目標庫在其餘地方配置)
  "target": "member",
//字段對應關係
//key  :來源表的字段名
//value:目標表的字段名
  "mapping": {
    "id": "id",
    "mch_code": "mCode",
    "send_type": "mName",
    "order_id": "phone",
    "created_time": "create_time",
    "creator": "remark"
  }
}
。。。
別的配置

  

下面是所有的代碼,主要作的就是取出變更的數據,按照對應的字段名從新拼裝sql,而後執行就行了,很少解釋。

package com.hebaibai.miner.job;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import static com.alibaba.otter.canal.protocol.CanalEntry.*;

/**
 * 單表同步,表的字段名稱能夠不一樣,類型須要一致
 * 表中須要有id字段
 */
@SuppressWarnings("ALL")
@Slf4j
public class TableSyncJob extends Job {


    /**
     * 用於校驗是否適用於當前的配置
     *
     * @param table
     * @param database
     * @return
     */
    @Override
    public boolean isMatches(String table, String database) {
        return prop.getString("database").equals(database) &&
                prop.getString("table").equals(table);
    }

    /**
     * 返回一個新的Runnable
     *
     * @param entry
     * @return
     */
    @Override
    public Runnable newTask(final Entry entry) {
        return () -> {
            RowChange rowChange = super.getRowChange(entry);
            if (rowChange == null) {
                return;
            }
            EventType eventType = rowChange.getEventType();
            int rowDatasCount = rowChange.getRowDatasCount();
            for (int i = 0; i < rowDatasCount; i++) {
                RowData rowData = rowChange.getRowDatas(i);
                if (eventType == EventType.DELETE) {
                    delete(rowData.getBeforeColumnsList());
                }
                if (eventType == EventType.INSERT) {
                    insert(rowData.getAfterColumnsList());
                }
                if (eventType == EventType.UPDATE) {
                    update(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList());
                }
            }
        };
    }

    /**
     * 修改後的數據
     *
     * @param after
     */
    private void insert(List<Column> after) {
        //找到改動的數據
        List<Column> collect = after.stream().filter(column -> column.getUpdated() || column.getIsKey()).collect(Collectors.toList());
        //根據表映射關係拼裝更新sql
        JSONObject mapping = prop.getJSONObject("mapping");
        String target = prop.getString("target");
        List<String> columnNames = new ArrayList<>();
        List<String> columnValues = new ArrayList<>();
        for (int i = 0; i < collect.size(); i++) {
            Column column = collect.get(i);
            if (!mapping.containsKey(column.getName())) {
                continue;
            }
            String name = mapping.getString(column.getName());
            columnNames.add(name);
            if (column.getIsNull()) {
                columnValues.add("null");
            } else {
                columnValues.add("'" + column.getValue() + "'");
            }
        }
        StringBuilder sql = new StringBuilder();
        sql.append("REPLACE INTO ").append(target).append("( ")
                .append(StringUtils.join(columnNames, ", "))
                .append(") VALUES ( ")
                .append(StringUtils.join(columnValues, ", "))
                .append(");");
        String sqlStr = sql.toString();
        log.debug(sqlStr);
        jdbcTemplate.execute(sqlStr);
    }

    /**
     * 更新數據
     *
     * @param before 原始數據
     * @param after  更新後的數據
     */
    private void update(List<Column> before, List<Column> after) {
        //找到改動的數據
        List<Column> updataCols = after.stream().filter(column -> column.getUpdated()).collect(Collectors.toList());
        //找到以前的數據中的keys
        List<Column> keyCols = before.stream().filter(column -> column.getIsKey()).collect(Collectors.toList());
        //沒有key,執行更新替換
        if (keyCols.size() == 0) {
            return;
        }
        //根據表映射關係拼裝更新sql
        JSONObject mapping = prop.getJSONObject("mapping");
        String target = prop.getString("target");
        //待更新數據
        List<String> updatas = new ArrayList<>();
        for (int i = 0; i < updataCols.size(); i++) {
            Column updataCol = updataCols.get(i);
            if (!mapping.containsKey(updataCol.getName())) {
                continue;
            }
            String name = mapping.getString(updataCol.getName());
            if (updataCol.getIsNull()) {
                updatas.add("`" + name + "` = null");
            } else {
                updatas.add("`" + name + "` = '" + updataCol.getValue() + "'");
            }
        }
        //若是沒有要修改的數據,返回
        if (updatas.size() == 0) {
            return;
        }
        //keys
        List<String> keys = new ArrayList<>();
        for (Column keyCol : keyCols) {
            String name = mapping.getString(keyCol.getName());
            keys.add("`" + name + "` = '" + keyCol.getValue() + "'");
        }
        StringBuilder sql = new StringBuilder();
        sql.append("UPDATE ").append(target).append(" SET ");
        sql.append(StringUtils.join(updatas, ", "));
        sql.append(" WHERE ");
        sql.append(StringUtils.join(keys, "AND "));
        String sqlStr = sql.toString();
        log.debug(sqlStr);
        jdbcTemplate.execute(sqlStr);
    }

    /**
     * 刪除數據
     *
     * @param before
     */
    private void delete(List<Column> before) {
        //找到改動的數據
        List<Column> keyCols = before.stream().filter(column -> column.getIsKey()).collect(Collectors.toList());
        if (keyCols.size() == 0) {
            return;
        }
        //根據表映射關係拼裝更新sql
        JSONObject mapping = prop.getJSONObject("mapping");
        String target = prop.getString("target");
        StringBuilder sql = new StringBuilder();
        sql.append("DELETE FROM `").append(target).append("` WHERE ");
        List<String> where = new ArrayList<>();
        for (Column column : keyCols) {
            String name = mapping.getString(column.getName());
            where.add(name + " = '" + column.getValue() + "' ");
        }
        sql.append(StringUtils.join(where, "and "));
        String sqlStr = sql.toString();
        log.debug(sqlStr);
        jdbcTemplate.execute(sqlStr);
    }
}

項目在這裏:https://github.com/hjx601496320/miner

 

原文連接:http://www.javashuo.com/article/p-cjoptupm-kb.html 

相關文章
相關標籤/搜索