MySQL數據庫同步工具的設計與實現

1、背景java

  在測試過程當中,對於不一樣的測試團隊,出於不一樣的測試目的,咱們可能會有多套測試環境。在產品版本迭代過程當中,根據業務需求,會對數據庫的結構進行一些修改,如:新增表、字段、索引,修改表、字段索引等操做,在一些流程不規範的公司,開發人員不按照規範操做,不及時將這些修改數據庫的 SQL 提交到 SVN/Git,當修改後的業務代碼部署到新環境時就會引發錯誤,從而影響測試效率。換個角度再說,就算流程規範的大公司,核心業務都採起分庫分表的架構,上千張表難道咱們都採用手工執行 SQL 的方式去添加和修改字段嗎?這樣固然不妥,也許會有同窗想到,咱們能夠採起使用腳本語言的方式批量更新和修改對應數據庫,這樣也是一種方式,但這種狀況的前提是執行人員很是清楚兩個數據庫的差別,若是執行人員本身也不清楚兩個數據庫之間的差別呢?可能有的同窗又想到能夠把源數據庫的結構和數據都導入到目標數據庫當中,這樣就解決了。這樣看似可行,但實際不妥。前面咱們說了,有多套測試環境,他們的做用可能不同,舉個例子:測試環境用於內部測試,聯調環境用於和外部系統的聯調,若是咱們把測試環境的數據庫結構和全部數據都導入聯調環境,那麼聯調環境原有的數據不存在了,沒法再和外部進行聯調了,因此這也不是一種好的方式。mysql

  基於以上種種緣由,一個數據庫結構同步工具貌似是一個比較好的解決方案。sql

2、實現功能數據庫

基於以上的分析,該工具須要實現如下三個功能服務器

  • 分析(diff):分析源數據庫和目標數據庫結構上的差別,在執行同步和拷貝前建議先執行分析來肯定源數據庫和目標數據庫的差別;
  • 同步(sync):只同步數據庫的結構,不一樣步數據;
  • 拷貝(copy): 對於數據沒有要求的狀況,能夠直接使用拷貝將源數據庫的數據庫結構和數據所有導入目標數據庫;

3、實現思路多線程

具體流程以下:架構

  • 對傳入指令進行解析,包括:源數據庫和目標數據庫的 IP、端口、用戶名、密碼、數據庫名以及執行動做(diff、sync、copy);
  • 分析 db,執行 SQL;
  • 分析 db 下的表,執行 SQL;
  • 分析表的字段和索引,執行 SQL;

4、分析過程框架

咱們要對數據庫的結構進行比對和分析,包括:數據庫、數據庫下面表、表中的字段和索引,那具體咱們應該如何來進行分析和比對呢?ide

既然咱們要作的是MySQL數據庫的同步工具,那麼咱們對 MySQL 數據庫就須要有深刻一點的瞭解。在MySQL中,把 INFORMATION_SCHEMA 看做是一個數據庫,確切說是信息數據庫。其中保存着關於當前 MySQL 服務器所維護的全部其餘數據庫的信息。如數據庫名,數據庫的表、表的字段與索引以及訪問權限等等。因此咱們應該關注的是 INFORMATION_SCHEMA 中的如下幾張表:工具

  • SCHEMATA:提供了當前mysql實例中全部數據庫的信息。SHOW DATABASES 的結果取之此表;
  • TABLES:提供了關於數據庫中的表的信息(包括視圖)。詳細表述了某個表屬於哪一個 schema,表類型,表引擎,建立時間等信息。SHOW TABLES FROM SCHEMANAME的結果取之此表;
  • COLUMNS:提供了表中的列信息。詳細表述了某張表的全部列以及每一個列的信息。SHOW COLUMNS FROM SCHEMANAME.TABLENAME 的結果取之此表;
  • STATISTICS:提供了關於表索引的信息。SHOW INDEX FROM SCHEMANAME.TABLENAME 的結果取之此表;

關鍵 SQL:

  • SELECT * FROM SCHEMATA WHERE SCHEMA_NAME='XXX';
  • SELECT * FROM TABLES WHERE TABLE_SCHEMA='XXX' AND TABLE_NAME='XX';
  • SELECT * FROM COLUMNS WHERE TABLE_SCHEMA='XXX' AND TABLE_NAME='XX';
  • SELECT * FROM STATISTICS WHERE TABLE_SCHEMA='XXX' AND TABLE_NAME='XX';

5、代碼實現

這裏只對 sync(同步)作簡單介紹:

@Slf4j
public class App {
  /**
   * java -jar xxxx.jar src dst action
   *
   * src: host:port:username:passwd
   *
   * dst: host:port:username:passwd
   *
   * action: sync(同步)|diff(比對)|copy(複製)
   *
   * eg. java -jar day09-1.0.0.jar 127.0.0.1:3366:root:123456 127.0.0.1:3377:root:123456 sync
   *
   * @param args src dst action
   */
  public static void main(String[] args) {
    log.info("db schema sync start, args={}", Arrays.toString(args));
    start(args);
  }

  public static void start(String[] args){

    // 1.校驗,參數個數,類型,格式不對校驗
    calibration(args);

    // 2.解析,將args 解析成 SyncActionDTO
    SchemaActionDTO actionDTO = parse(args);
    // System.out.println(actionDTO);

    // 3.執行同步/比對/複製
    SchemaHander.doAction(actionDTO);
  }
}
App 入口類

入口類包含三個步驟:校驗參數、解析參數、執行操做(doAction)

public class SchemaHander {

  public static void  doAction(SchemaActionDTO actionDTO){
    ConnectDTO src = actionDTO.getSrc();
    ConnectDTO dst = actionDTO.getDst();
    Action action = actionDTO.getAction();

    if (Action.SYNC.equals(action)){
      SyncHander.doSync(src,dst);
    }else if (Action.DIFF.equals(action)){
      DiffHander.doDiff(src,dst);
    }else if (Action.COPY.equals(action)){
      CopyHander.doCopy(src,dst);
    }else {
      throw new IllegalStateException("do not supprt this action");
    }
  }
}
SchemaHander

根據接收到的指令的第三個參數從而作對應的操做(diff、sync、copy)

public class SyncHander {

  /**
   * 分析src和dst兩個數據庫實例
   * @param src
   * @param dst
   */
  public static void doSync(ConnectDTO src,ConnectDTO dst){

    // 1.解析src和dst中的db差別,相同的數據庫名和不一樣的數據庫名
    Pair<Set<String>, Set<String>> dbPair = parseDb(src, dst);
    System.out.println("dbPair = " + dbPair);

    // 2.src有,dst無
    DbHander.copyDb(src, dst, dbPair.getLeft());

    // 3.src有,dst有
    DbHander.diffDb(src, dst, dbPair.getRight());

  }
}
SyncHander

解析源數據庫和目標數據庫的差別,相同的數據庫和不一樣的數據庫(不一樣的指的是src中有二dst中沒有)。

src 中有而 dst 中沒有的數據庫,直接在 dst 中建立數據庫、表和索引。

src 中和 dst 中都有的數據庫,則進一步分析該數據庫中的表的狀況。

public class DbHander {
  /**
   * 分析db,src有,dst有
   * @param src
   * @param dst
   * @param target
   */
  public static void diffDb(ConnectDTO src, ConnectDTO dst, Set<String> target){

    for (String db : target) {

      // 解析src和dst中的同名的數據庫的差別,返回該數據庫中表的差別,相同的表名和不一樣的表名
      Pair<Set<String>, Set<String>> tablePair = parseTable(src, dst, db);

      // 複製差別表
      TableHandler.copyTable(src, dst, db, tablePair.getLeft());

      // 對比相同表
      TableHandler.diffTable(src, dst, db, tablePair.getRight());
      }
  }
}
DbHander

套路和分析數據庫同樣

src 中有而 dst 中沒有的表,直接在 dst 中建立。

src 中和 dst 中都有的表,則進一步分析該表的全部字段和字段屬性。

public class TableHandler {
/**
   * 分析相同表的字段和索引
   * @param src 源
   * @param dst 目標
   * @param db  數據庫
   * @param targetTables 分析的目標表
   */
  public static void diffTable(ConnectDTO src,
      ConnectDTO dst,
      String db,
      Set<String> targetTables) {

    for (String table : targetTables) {

      // 1.分析差別字段
      Pair<Set<String>, Set<String>> columnPair = parseColumn(src, dst, db, table);

      // 2.複製src有,dst無
      ColumnHandler.copyColumn(src, dst, db, table, columnPair.getLeft());

      // 3.分析src有,dst有
      ColumnHandler.diffColumn(src, dst, db, table, columnPair.getRight());

      // 1.分析差別索引
      Pair<Set<String>, Set<String>> indexPair = parseIndex(src, dst, db, table);

      // 2.複製src有,dst無
      IndexHander.copyIndex(src, dst, db, table, indexPair.getLeft());

      // 3.分析src有,dst有
      IndexHander.diffIndex(src, dst, db, table, indexPair.getRight());
    }
  }
}
TableHandler

src 中有而 dst 中沒有的字段和索引,直接在 dst 中建立。

src 中和 dst 中都有的字段和索引,則進一步分析。

須要注意的是索引,因爲索引分爲普通索引、惟一索引、主鍵索引和組合索引幾種類型,因此在生成修改 SQL 時會比較複雜。

public class ColumnHandler {
長度、是否可爲空、默認值、註釋
   * @param src 源數據庫實例
   * @param dst 目標數據庫實例
   * @param db  數據庫
   * @param table 表
   * @param targetColumns 分析的目標列
   */
  public static void diffColumn(ConnectDTO src,
      ConnectDTO dst,
      String db,
      String table,
      Set<String> targetColumns) {

    for (String column : targetColumns) {

      String queryColumnInfoSql = String.format(
          "select * from COLUMNS where TABLE_SCHEMA='%s' and TABLE_NAME='%s' AND COLUMN_NAME='%s'",
          db, table, column);

      // 1.取出src中的column的幾個咱們關注的屬性,COLUMN_TYPE,COLUMN_COMMENT,IS_NULLABLE,COLUMN_DEFAULT
      Set<ColumnInfoDTO> srcColumnSet = JdbcUtils
          .read(src, ConnectConsts.INFO_SCHEMA_DB_NAME, queryColumnInfoSql)
          .stream()
          .map(entity -> ColumnInfoDTO.builder()
              .columnComment(entity.get("COLUMN_COMMENT").toString())
              .columnDefault(StringUtils.defaultString(String.valueOf(entity.get("COLUMN_DEFAULT")), ""))
              .columnType(entity.get("COLUMN_TYPE").toString())
              .isNullable(entity.get("IS_NULLABLE").toString())
              .build()).collect(Collectors.toSet());

      // 2.取出dst中的column的幾個咱們關注的屬性,COLUMN_TYPE,COLUMN_COMMENT,IS_NULLABLE,COLUMN_DEFAULT
      Set<ColumnInfoDTO> dstColumnSet = JdbcUtils
          .read(dst, ConnectConsts.INFO_SCHEMA_DB_NAME, queryColumnInfoSql)
          .stream()
          .map(entity -> ColumnInfoDTO.builder()
              .columnComment(entity.get("COLUMN_COMMENT").toString())
              .columnDefault(StringUtils.defaultString(String.valueOf(entity.get("COLUMN_DEFAULT")), ""))
              .columnType(entity.get("COLUMN_TYPE").toString())
              .isNullable(entity.get("IS_NULLABLE").toString())
              .build()).collect(Collectors.toSet());

      // 3.逐個去對比,若是不同,就生成修改SQL,若是同樣,就什麼都不作
      // 3.1 這個differenceColumn是須要去修改到dst中的
      Set<ColumnInfoDTO> differenceColumn = Sets.difference(srcColumnSet, dstColumnSet)
          .immutableCopy();

      for (ColumnInfoDTO infoDTO : differenceColumn) {
        String sql = String.format("alter table %s modify column %s %s %s %s comment '%s'",
            table,
            column,
            infoDTO.getColumnType(),
            isNullableSet(infoDTO.getIsNullable()),
            isDefaultSet(infoDTO.getColumnDefault()),
            infoDTO.getColumnComment()
        );
        JdbcUtils.write(dst, db, sql);
      }
    }
  }
}
ColumnHandler
public class IndexHander {
/**
   * 分析相同表相同索引的屬性,並修改dst中索引的屬性
   * 屬性包括索引類型,是否惟一,單索引仍是組合索引
   * @param src 源
   * @param dst 目標
   * @param db  db
   * @param table 表
   * @param targeIndexs 分析的目標索引
   */
  public static void diffIndex(ConnectDTO src,
    ConnectDTO dst,
    String db,
    String table,
    Set<String> targeIndexs){

    for (String index : targeIndexs) {
      String queryIndexInfoSql = String.format(
              "select * from STATISTICS where TABLE_SCHEMA='%s' and TABLE_NAME='%s' and INDEX_NAME='%s'",
              db, table, index);

      // 查出該index信息的返回結果,若是是組合索引,一個索引名對應多條記錄
      List<Map<String, Object>> entities = JdbcUtils
              .read(src, ConnectConsts.INFO_SCHEMA_DB_NAME, queryIndexInfoSql);

      // 1.取出src中的index的幾個咱們關注的屬性,COLUMN_NAME,NON_UNIQUE,SEQ_IN_INDEX
      Set<IndexInfoDTO> srcIndexSet = JdbcUtils
              .read(src, ConnectConsts.INFO_SCHEMA_DB_NAME, queryIndexInfoSql)
              .stream()
              .map(entity-> IndexInfoDTO.builder()
                  .columnName(entity.get("COLUMN_NAME").toString())
                  .nonUnique(entity.get("NON_UNIQUE").toString())
                  .seqInIndex(entity.get("SEQ_IN_INDEX").toString())
              .build()).collect(Collectors.toSet());

      // 2.取出dst中的index的幾個咱們關注的屬性,COLUMN_NAME,NON_UNIQUE,SEQ_IN_INDEX
      Set<IndexInfoDTO> dstIndexSet = JdbcUtils
              .read(dst, ConnectConsts.INFO_SCHEMA_DB_NAME, queryIndexInfoSql)
              .stream()
              .map(entity-> IndexInfoDTO.builder()
                      .columnName(entity.get("COLUMN_NAME").toString())
                      .nonUnique(entity.get("NON_UNIQUE").toString())
                      .seqInIndex(entity.get("SEQ_IN_INDEX").toString())
                      .build()).collect(Collectors.toSet());

      // 對比,找出名稱同樣,可是屬性不同的索引。組合索引的比對有問題
      Set<IndexInfoDTO> differenctIndex = Sets.difference(srcIndexSet, dstIndexSet).immutableCopy();

      System.out.println("differenctIndex.size() = " + differenctIndex.size());

      for (IndexInfoDTO infoDTO : differenctIndex) {
        System.out.println("infoDTO = " + infoDTO);
      }

      String sql = null;
      // 單列索引
      if (differenctIndex.size() == 1) {

        // 先刪除dst中的索引
        deleteIndex(dst,db,table,index);

        // 再在dst中建立索引
        for (IndexInfoDTO indexDTO : differenctIndex) {
          if ("PRIMARY".equals(index)) {
            sql = String.format("ALTER TABLE %s ADD PRIMARY KEY(%s);", table, indexDTO.getColumnName());
          }else {
            sql = String.format("ALTER TABLE %s ADD %s %s(%s)",
                    table,
                    isNonUnique(indexDTO.getNonUnique()),
                    index,
                    indexDTO.getColumnName());
          }
          JdbcUtils.write(dst,db,sql);
          }
      // 組合索引
      }else if (differenctIndex.size() > 1) {
        // 先刪除dst中的索引
        deleteIndex(dst,db,table,index);

        // 再在dst中建立索引
        String[] arrs = getPair(entities).getLeft();
        String nonUnique = getPair(entities).getRight();

        if ("PRIMARY".equals(index)){
          String baseSql = "alter table %s add primary key(";
          String formatSql = formatSql(arrs, baseSql);
          sql = String.format(formatSql, table);
        }else {
          String baseSql = "alter table %s add %s %s(";
          String formatSql = formatSql(arrs, baseSql);
          sql = String.format(formatSql, table, isNonUnique(nonUnique), index);
        }
        JdbcUtils.write(dst,db,sql);
      }
    }
  }
}
IndexHander

這裏是分析字段和索引的過程。

以上全部代碼,複製數據庫、表、字段、索引的代碼都沒有貼出來,你們能夠本身來實現。

另外,最後索引的分析有一個 bug,但願你們能夠發現。

6、問題

上面,咱們基本實現了這個工具的框架,可是還存在一些問題:

Connection

  • 使用鏈接池,而且基於鏈接信息作了一個Map<ConnectDTO,DruidDatasource>

SQL 執行

  • 使用批量執行SQL;

多線程執行

  • 任務分割去從線程池中申請線程,而後去執行;
相關文章
相關標籤/搜索