閒來無事,編寫一個數據遷移小工具

1、前言

  生命不息,折騰不止。近期公司有數據遷移的計劃,從Sqlserver遷移到mysql,雖然說網上有不少數據遷移方案,但閒着也是閒着,就本身整一個,權當作是練練手了html

2、解決思路

  整個遷移過程相似於ETL,將數據歷來源端通過抽取(extract)、轉換(transform)、加載(load)至目的端。讀取並轉換sqlserver庫數據,將數據解析爲csv文件,載入文件到mysql。流程以下:node

  1. 抽取、轉換
    此過程主要是處理源數據庫與目標數據庫表字段的映射關係,爲了保證程序的通用性,經過配置文件映射字段關係,解析配置文件並生成數據庫腳本
  2. 加載
    數據遷移的時候最好不要用INSERT語句插入批量插入,這樣數據量稍稍大一點就很慢。sqlserver可經過SqlBulkCopy將DataTable對象快速插入到數據庫,而後mysql並無這東西,查閱資料後發現mysql可經過MySqlBulkLoader將csv文件快速導入到數據庫。經測試遷移10K條數據MySqlBulkLoader可在1S內處理完,速度仍是至關不錯的

  

3、實現

  1. 配置文件
    db_caption.xml(數據庫),主要用來存儲表描述文件名,若待遷移的表不存在外鍵關係即遷移時不用考慮前後順序,此配置文件能夠不要。其中maxClients參數指的是異步遷移時,最大併發數。
    <?xml version="1.0" encoding="utf-8" ?>
    <root>
      <maxClients value="3"></maxClients>
      <tables>
        <table filename="t_drawtemplate.xml" caption="抽獎模板"></table>
        <table filename="t_drawprize.xml" caption="抽獎獎品"></table>
        <table filename="t_drawrecord.xml" caption="抽獎記錄"></table>
        <table filename="t_drawwinner.xml" caption="中獎記錄"></table>
      </tables>
    </root>

    t_table.xml(表),主要用來描述待遷移表信息及字段描述mysql

    <?xml version="1.0" encoding="utf-8" ?>
    <root>
      <![CDATA[抽獎記錄]]>
      <!--是否分頁,默認不分頁就好啦,false_不分頁-->
      <isPaging value="true"></isPaging>
      <pageSize value="10000"></pageSize>
    
      <!--mssql數據庫表主鍵-->
      <primaryKey value="DrawRecordId"></primaryKey>
      <!--mssql數據庫表名-->
      <msTable value="DrawRecord"></msTable>
      <!--mysql數據庫表名-->
      <myTable value="t_drawrecord"></myTable>
      <!--篩選條件,無特殊狀況爲空便可-->
      <filter value="1=1"></filter>
      <!--字段映射-->
      <fields>
        <field msName ="DrawRecordId" myName="id"></field>
        <field msName ="FK_MemberId" myName="user_id"></field>
        <field msName ="Remark" myName="remark"></field>
        <field msName ="DataStatus" myName="data_status"></field>
        <field msName ="DrawTime" myName="drawTime"></field>
        <!--須要調整字段示例-->
        <field msName ="CASE WHEN DrawWinnerId >0 THEN DrawWinnerId END" myName="drawwinner_id"></field>
      </fields>
      <!--遷移完成後,數據修復腳本,主要用來修正日期類型爲0000-00-00 00:00:00問題-->
      <fixSql></fixSql>
    </root>
  2. 建立xml文件映射對象並重寫ToString方法,將對象解析爲sql
    db_caption.xml映射對象
     1 /// <summary>
     2 /// 數據庫描述類(db_caption)
     3 /// </summary>
     4 internal class DBCaptionModel
     5 {
     6     public DBCaptionModel()
     7     {
     8         this.Tables = new List<TableModel>();
     9     }
    10 
    11     /// <summary>
    12     /// 最大鏈接數
    13     /// </summary>
    14     public int MaxClients { get; set; }
    15 
    16     /// <summary>
    17     /// 表集合
    18     /// </summary>
    19     public IList<TableModel> Tables { get; private set; }
    20 }
    21 
    22 internal class TableModel
    23 {
    24     /// <summary>
    25     /// 表xml文件名
    26     /// </summary>
    27     public string FileName { get; set; }
    28 
    29     /// <summary>
    30     /// 描述
    31     /// </summary>
    32     public string Caption { get; set; }
    33 
    34     /// <summary>
    35     /// 是否已同步
    36     /// </summary>
    37     public bool IsSync { get; set; }
    38 }
    t_table.xml映射對象
      1 /// <summary>
      2 /// 表描述類
      3 /// </summary>
      4 internal class TableCaptionModel
      5 {
      6     public TableCaptionModel()
      7     {
      8         this.Fields = new List<FieldModel>();
      9     }
     10 
     11     /// <summary>
     12     /// 是否分頁
     13     /// </summary>
     14     public bool IsPaging { get; set; }
     15 
     16     /// <summary>
     17     /// 分頁大小
     18     /// </summary>
     19     public int PageSize { get; set; }
     20 
     21     /// <summary>
     22     /// 源數據表表名
     23     /// </summary>
     24     public string SourceTableName { get; set; }
     25 
     26     /// <summary>
     27     /// 目標數據表表名
     28     /// </summary>
     29     public string TargetTableName { get; set; }
     30 
     31     /// <summary>
     32     /// 源數據表主鍵
     33     /// </summary>
     34     public string PrimaryKey { get; set; }
     35 
     36     /// <summary>
     37     /// 過濾條件
     38     /// </summary>
     39     public string Filter { get; set; }
     40 
     41     /// <summary>
     42     /// 字段集合
     43     /// </summary>
     44     public List<FieldModel> Fields { get; set; }
     45 
     46     /// <summary>
     47     /// 數據遷移完成後,數據修復腳本
     48     /// </summary>
     49     public string FixSql { get; set; }
     50 
     51     /// <summary>
     52     /// ToString
     53     /// </summary>
     54     /// <returns>sql</returns>
     55     public override string ToString()
     56     {
     57         string sql = GetBaseSql();
     58         string filter = GetFilterSql();
     59         if (!string.IsNullOrWhiteSpace(filter))
     60         {
     61             sql += " WHERE " + filter;
     62         }
     63 
     64         sql += " ORDER BY " + this.PrimaryKey;
     65         return sql;
     66     }
     67 
     68     /// <summary>
     69     /// 獲取基礎查詢Sql
     70     /// </summary>
     71     /// <![CDATA[SELECT SourceField AS TargetField,...... FROM table]]>
     72     /// <returns></returns>
     73     private string GetBaseSql()
     74     {
     75         StringBuilder sb = new StringBuilder("SELECT");
     76 
     77         foreach (var item in this.Fields)
     78         {
     79             sb.AppendFormat(" {0},", item.ToString());
     80         }
     81 
     82         sb = sb.Remove(sb.Length - 1, 1);
     83 
     84         sb.Append(" FROM ");
     85         sb.Append(this.SourceTableName);
     86         return sb.ToString();
     87     }
     88 
     89     /// <summary>
     90     /// 獲取sql查詢條件
     91     /// </summary>
     92     /// <![CDATA[filter || PrimaryKey NOT IN (SELECT PrimaryKey FORM table WHERE filter)]]>
     93     /// <returns></returns>
     94     private string GetFilterSql()
     95     {
     96         if (!this.IsPaging)
     97         {
     98             return this.Filter;
     99         }
    100 
    101         StringBuilder sb = new StringBuilder();
    102         sb.AppendFormat("SELECT ROW_NUMBER() OVER(ORDER BY {0}) RowNo,{0} FROM {1}", this.PrimaryKey, this.SourceTableName);
    103 
    104         if (!string.IsNullOrWhiteSpace(this.Filter))
    105         {
    106             sb.Append(" WHERE " + this.Filter);
    107         }
    108 
    109         sb.Insert(0, string.Format("SELECT {0} FROM (", this.PrimaryKey));
    110         sb.AppendFormat(") T WHERE RowNo BETWEEN @StartIndex AND @EndIndex");
    111 
    112         return string.Format("{0} IN ({1})", this.PrimaryKey, sb.ToString());
    113     }
    114 }
    115 
    116 /// <summary>
    117 /// 字段類
    118 /// </summary>
    119 internal class FieldModel
    120 {
    121     /// <summary>
    122     /// 源字段名
    123     /// </summary>
    124     public string SourceFieldName { get; set; }
    125 
    126     /// <summary>
    127     /// 目標字段名
    128     /// </summary>
    129     public string TargetFieldName { get; set; }
    130 
    131     /// <summary>
    132     /// ToString
    133     /// </summary>
    134     /// <returns>'SourceFieldName' AS 'TargetFieldName'" </returns>
    135     public override string ToString()
    136     {
    137         if (this.SourceFieldName.IndexOfAny(new char[] { ' ', '(' }) < 0)
    138         {
    139             //非表達式
    140             return string.Format("[{0}] AS '{1}'", SourceFieldName, TargetFieldName);
    141         }
    142         else
    143         {
    144             return string.Format("{0} AS '{1}'", SourceFieldName, TargetFieldName);
    145         }
    146     }
    147 }
  3. 解析XML文件
    XML解析可經過XmlSerializer直接反序列化爲對象,此處只是爲了溫習XML解析方式,故採用此方法
     1 /// <summary>
     2 /// 載入數據庫描述xml
     3 /// </summary>
     4 /// <returns></returns>
     5 private static DBCaptionModel LoadDBCaption()
     6 {
     7     DBCaptionModel model = new DBCaptionModel();
     8 
     9     XmlDocument doc = new XmlDocument();
    10     doc.Load(CONN_XML_PATH + "db_caption.xml");
    11 
    12     XmlNode root = doc.SelectSingleNode("root");
    13     //獲取最大鏈接數
    14     model.MaxClients = root.SelectSingleNode("maxClients").GetAttribute<int>("value");
    15 
    16     //獲取表描述
    17     XmlNodeList tables = root.SelectSingleNode("tables").SelectNodes("table");
    18     foreach (XmlNode node in tables)
    19     {
    20         model.Tables.Add(new TableModel
    21         {
    22             FileName = node.GetAttribute("filename"),
    23             Caption = node.GetAttribute("caption")
    24         });
    25     }
    26 
    27     return model;
    28 }
    29 
    30 /// <summary>
    31 /// 載入表描述xml
    32 /// </summary>
    33 /// <param name="fileName">表描敘xml文件名</param>
    34 /// <returns></returns>
    35 private static TableCaptionModel LoadTableCaption(string fileName)
    36 {
    37     XmlDocument doc = new XmlDocument();
    38     doc.Load(CONN_XML_PATH + fileName);
    39 
    40     TableCaptionModel model = new TableCaptionModel();
    41 
    42     XmlNode root = doc.SelectSingleNode("root");
    43     model.IsPaging = root.SelectSingleNode("isPaging").GetAttribute<bool>("value");
    44     if (model.IsPaging)
    45     {
    46         model.PageSize = root.SelectSingleNode("pageSize").GetAttribute<int>("value");
    47     }
    48     model.SourceTableName = root.SelectSingleNode("msTable").GetAttribute("value");
    49     model.TargetTableName = root.SelectSingleNode("myTable").GetAttribute("value");
    50     model.PrimaryKey = root.SelectSingleNode("primaryKey").GetAttribute("value");
    51     model.FixSql = root.SelectSingleNode("fixSql").GetAttribute("value");
    52 
    53     XmlNodeList fields = root.SelectSingleNode("fields").SelectNodes("field");
    54 
    55     foreach (XmlNode field in fields)
    56     {
    57         model.Fields.Add(new FieldModel
    58         {
    59             SourceFieldName = field.GetAttribute("msName"),
    60             TargetFieldName = field.GetAttribute("myName")
    61         });
    62     }
    63 
    64     return model;
    65 }

    Node.GetAttribute擴展方法,簡化讀取Node屬性代碼sql

     1 public static class XmlNodeExtension
     2 {
     3     /// <summary>
     4     /// 獲取節點屬性
     5     /// </summary>
     6     /// <param name="node">當前節點</param>
     7     /// <param name="attrName">屬性名稱</param>
     8     /// <returns></returns>
     9     public static string GetAttribute(this XmlNode node, string attrName)
    10     {
    11         if (node == null)
    12         {
    13             return null;
    14         }
    15         return ((XmlElement)node).GetAttribute(attrName);
    16     }
    17 
    18     /// <summary>
    19     /// 獲取節點屬性
    20     /// </summary>
    21     /// <param name="node">當前節點</param>
    22     /// <param name="attrName">屬性名稱</param>
    23     /// <returns></returns>
    24     public static T GetAttribute<T>(this XmlNode node, string attrName) where T : struct
    25     {
    26         if (node == null)
    27         {
    28             return default(T);
    29         }
    30         string value = GetAttribute(node, attrName);
    31         return (T)Convert.ChangeType(value, typeof(T));
    32     }
    33 }
  4. 實現數據遷移幫助方法
    FileHelper,將DataTable解析爲CSV文件
     1 public class FileHelper
     2 {
     3     /// <summary>
     4     /// 將DataTable寫入CSV
     5     /// </summary>
     6     /// <param name="dataTable"></param>
     7     /// <param name="fileFullPath"></param>
     8     public static void WriteDataTableToCSVFile(DataTable dataTable, string fileFullPath)
     9     {
    10         WriteDataTableToCSVFile(dataTable, fileFullPath, Encoding.UTF8);
    11     }
    12 
    13     /// <summary>
    14     /// 將DataTable寫入CSV
    15     /// </summary>
    16     /// <param name="dataTable"></param>
    17     /// <param name="fileFullPath"></param>
    18     /// <param name="codeType"></param>
    19     public static void WriteDataTableToCSVFile(DataTable dataTable, string fileFullPath, Encoding codeType)
    20     {
    21         using (Stream stream = new FileStream(fileFullPath, FileMode.Create, FileAccess.Write))
    22         using (StreamWriter swriter = new StreamWriter(stream, codeType))
    23         {
    24             try
    25             {
    26                 int num = dataTable.Columns.Count;
    27                 string[] arr = new string[num];
    28 
    29                 //寫標題
    30                 for (int i = 0; i < num; i++)
    31                 {
    32                     arr[i] = dataTable.Columns[i].ColumnName;
    33                 }
    34                 WriteArrayToCSVFile(swriter, arr);
    35 
    36                 //寫數據
    37                 foreach (DataRow item in dataTable.Rows)
    38                 {
    39                     for (int i = 0; i < num; i++)
    40                     {
    41                         arr[i] = Convert.IsDBNull(item[i]) ? "" : item[i].ToString();
    42                     }
    43                     WriteArrayToCSVFile(swriter, arr);
    44                 }
    45             }
    46             catch (Exception ex)
    47             {
    48                 throw new IOException(ex.Message);
    49             }
    50         }
    51     }
    52 
    53     /// <summary>
    54     /// 將數據寫入CSV文件
    55     /// </summary>
    56     /// <param name="swriter"></param>
    57     /// <param name="arr"></param>
    58     private static void WriteArrayToCSVFile(StreamWriter swriter, string[] arr)
    59     {
    60         for (int i = 0; i < arr.Length; i++)
    61         {
    62             if (!string.IsNullOrWhiteSpace(arr[i]))
    63             {
    64                 swriter.Write(arr[i]);
    65             }
    66 
    67             if (i < arr.Length - 1)
    68             {
    69                 swriter.Write("|||");
    70             }
    71         }
    72         swriter.Write(swriter.NewLine);
    73     }
    74 }

    MysqlHelper,導入VCS文件到Mysql數據庫數據庫

      1 public class MySqlDBHelper
      2 {
      3     private static readonly string tmpBasePath = AppDomain.CurrentDomain.BaseDirectory;
      4     private static readonly string tmpCSVFilePattern = "Temp\\{0}.csv";   //0表示文件名稱
      5 
      6     /// <summary>
      7     /// DB鏈接字符串
      8     /// </summary>
      9     public static string DBConnectionString
     10     {
     11         get
     12         {
     13             return ConfigHelper.GetConfigString("SQLConnStr_Mysql");
     14         }
     15     }
     16 
     17     public static int ExecNonQuery(string sqlText, CommandType cmdType, string[] paramNames, object[] paramValues)
     18     {
     19         int result = 0;
     20         using (MySqlConnection mySqlCon = new MySqlConnection(DBConnectionString))
     21         {
     22             MySqlCommand mySqlCmd = new MySqlCommand(sqlText, mySqlCon);
     23             mySqlCmd.CommandType = cmdType;
     24             try
     25             {
     26                 fillParameters(mySqlCmd, paramNames, paramValues);
     27                 mySqlCon.Open();
     28                 result = mySqlCmd.ExecuteNonQuery();
     29             }
     30             catch (MySqlException mse)
     31             {
     32                 throw mse;
     33             }
     34         }
     35         return 0;
     36     }
     37 
     38     public static int ExecuteNonQuery(string sqlText)
     39     {
     40         return ExecNonQuery(sqlText, CommandType.Text, null, null);
     41     }
     42 
     43     public static bool BulkInsert(DataTable dataTable)
     44     {
     45         bool result = false;
     46         if (dataTable != null && dataTable.Rows.Count > 0)
     47         {
     48             using (MySqlConnection mySqlCon = new MySqlConnection(DBConnectionString))
     49             {
     50                 mySqlCon.Open();
     51                 MySqlTransaction sqlTran = mySqlCon.BeginTransaction(IsolationLevel.ReadCommitted);
     52                 MySqlBulkLoader sqlBulkCopy = new MySqlBulkLoader(mySqlCon);
     53                 sqlBulkCopy.Timeout = 60;
     54 
     55                 result = BulkInsert(sqlBulkCopy, dataTable, sqlTran);
     56             }
     57         }
     58         return result;
     59     }
     60 
     61     public static bool BulkInsert<T, T1>(T sqlBulkCopy, DataTable dataTable, T1 sqlTrasaction)
     62     {
     63         bool result = false;
     64         string tmpCsvPath = tmpBasePath + string.Format(tmpCSVFilePattern, dataTable.TableName + DateTime.Now.Ticks.ToString());
     65         string tmpFolder = tmpCsvPath.Remove(tmpCsvPath.LastIndexOf("\\"));
     66 
     67         if (!Directory.Exists(tmpFolder))
     68             Directory.CreateDirectory(tmpFolder);
     69 
     70         FileHelper.WriteDataTableToCSVFile(dataTable, tmpCsvPath);   //Write to csv File
     71 
     72         MySqlBulkLoader sqlBC = (MySqlBulkLoader)Convert.ChangeType(sqlBulkCopy, typeof(MySqlBulkLoader));
     73         MySqlTransaction sqlTran = (MySqlTransaction)Convert.ChangeType(sqlTrasaction, typeof(MySqlTransaction));
     74         try
     75         {
     76             sqlBC.TableName = dataTable.TableName;
     77             sqlBC.FieldTerminator = "|||";
     78             sqlBC.LineTerminator = "\r\n";
     79             sqlBC.FileName = tmpCsvPath;
     80             sqlBC.NumberOfLinesToSkip = 1;
     81 
     82             //Mapping Destination Field of Database Table
     83             for (int i = 0; i < dataTable.Columns.Count; i++)
     84             {
     85                 sqlBC.Columns.Add(dataTable.Columns[i].ColumnName);
     86             }
     87             //Write DataTable
     88             sqlBC.Load();
     89 
     90             sqlTran.Commit();
     91             result = true;
     92         }
     93         catch (MySqlException mse)
     94         {
     95             result = false;
     96             sqlTran.Rollback();
     97             throw mse;
     98         }
     99         finally
    100         {
    101             //T、T1給默認值爲Null, 由系統調用GC
    102             sqlBC = null;
    103             sqlBulkCopy = default(T);
    104             sqlTrasaction = default(T1);
    105             File.Delete(tmpCsvPath);
    106         }
    107         return result;
    108     }
    109 
    110     private static void fillParameters(MySqlCommand mySqlCmd, string[] paramNames, object[] paramValues)
    111     {
    112         if (paramNames == null || paramNames.Length == 0)
    113             return;
    114         if (paramValues == null || paramValues.Length == 0)
    115             return;
    116 
    117         if (paramNames.Length != paramValues.Length)
    118             throw new ArgumentException("The Name Count of parameters does not match its Value Count! ");
    119 
    120         string name;
    121         object value;
    122         for (int i = 0; i < paramNames.Length; i++)
    123         {
    124             name = paramNames[i];
    125             value = paramValues[i];
    126             if (value != null)
    127                 mySqlCmd.Parameters.AddWithValue(name, value);
    128             else
    129                 mySqlCmd.Parameters.AddWithValue(name, DBNull.Value);
    130         }
    131     }
    132 }
  5. 數據遷移
      1 private static void Main(string[] args)
      2 {
      3     DBCaptionModel tablesCaption = LoadDBCaption();
      4 
      5     Stopwatch watch = new Stopwatch();
      6     watch.Start();
      7 
      8     try
      9     {
     10         foreach (var item in tablesCaption.Tables)
     11         {
     12             int total = DataMigration(item);
     13         }
     14         //異步
     15         //DataMigrationAsync(tablesCaption, 0, 0);
     16         //Console.ReadKey();
     17     }
     18     catch (Exception ex)
     19     {
     20         Console.WriteLine("遷移失敗");
     21         Console.WriteLine(ex.StackTrace);
     22     }
     23 
     24     Console.WriteLine("總耗時:" + watch.ElapsedMilliseconds);
     25 }
     26 
     27 /// <summary>
     28 /// 同步遷移
     29 /// </summary>
     30 /// <param name="model">表描述</param>
     31 /// <returns>遷移記錄數</returns>
     32 private static int DataMigration(TableModel model)
     33 {
     34     Console.WriteLine(string.Format("【{0}】遷移開始", model.Caption));
     35     Stopwatch watch = new Stopwatch();
     36     watch.Start();
     37 
     38     TableCaptionModel tableCaption = LoadTableCaption(model.FileName);
     39 
     40     string sql = tableCaption.ToString();
     41     Console.WriteLine(sql);
     42 
     43     SqlParameter[] parms =
     44     {
     45         new SqlParameter("@StartIndex", SqlDbType.Int, 4),
     46         new SqlParameter("@EndIndex", SqlDbType.Int, 4)
     47     };
     48 
     49     int total = 0;
     50 
     51     if (tableCaption.IsPaging)
     52     {
     53         //分頁
     54         int pageNo = 0;
     55         while (true)
     56         {
     57             Console.WriteLine(string.Format("【{0}】當前分頁:{1}", model.Caption, pageNo));
     58 
     59             parms[0].Value = pageNo * tableCaption.PageSize + 1;
     60             parms[1].Value = (pageNo + 1) * tableCaption.PageSize;
     61             int num = DataMigration(sql, parms, tableCaption.TargetTableName);
     62             total += num;
     63             if (num < tableCaption.PageSize)
     64             {
     65                 break;
     66             }
     67             pageNo++;
     68         }
     69     }
     70     else
     71     {
     72         //不分頁
     73         total = DataMigration(sql, parms, tableCaption.TargetTableName);
     74     }
     75 
     76     //修復數據
     77     if (FixData(tableCaption) >= 0)
     78     {
     79         Console.WriteLine(string.Format("【{0}】數據修復完成", model.Caption));
     80     }
     81 
     82     Console.WriteLine(string.Format("【{0}】遷移結束,耗時:{1},記錄數:{2}\r\n", model.Caption, watch.ElapsedMilliseconds, total));
     83     return total;
     84 }
     85 
     86 /// <summary>
     87 /// 數據遷移
     88 /// </summary>
     89 /// <param name="sql"></param>
     90 /// <param name="parms"></param>
     91 /// <param name="tableName"></param>
     92 /// <returns></returns>
     93 private static int DataMigration(string sql, SqlParameter[] parms, string tableName)
     94 {
     95     DataTable dt = MsSqlDBHelper.ExecSql(sql, parms).Tables[0];
     96     dt.TableName = tableName;
     97     MySqlDBHelper.BulkInsert(dt);
     98     return dt.Rows.Count;
     99 }
    100 
    101 /// <summary>
    102 /// 修復數據
    103 /// </summary>
    104 /// <param name="model"></param>
    105 /// <returns></returns>
    106 private static int FixData(TableCaptionModel model)
    107 {
    108     if (!string.IsNullOrWhiteSpace(model.FixSql))
    109     {
    110         return MySqlDBHelper.ExecuteNonQuery(model.FixSql.Replace("@MyTable", model.TargetTableName));
    111     }
    112     return -1;
    113 }
  6. 遷移結果示例
  7. 數據遷移失敗清空數據庫腳本
     1 -- 清空數據庫
     2 DELIMITER// 
     3 CREATE PROCEDURE sp_clear(IN dbname VARCHAR(128))
     4 BEGIN
     5     -- 接收動態腳本
     6     DECLARE v_sql VARCHAR(256);
     7     
     8     -- 定義遊標遍歷時,做爲判斷是否遍歷徹底部記錄的標記
     9     DECLARE no_more_items INT DEFAULT 0;
    10     
    11     -- 定義遊標
    12     DECLARE c_result CURSOR FOR SELECT CONCAT('TRUNCATE TABLE ',dbname,'.',TABLE_NAME,';') FROM information_schema.TABLES WHERE TABLE_SCHEMA = dbname AND TABLE_TYPE ='BASE TABLE';
    13         
    14     -- 聲明當遊標遍歷徹底部記錄後將標誌變量置成某個值
    15     DECLARE CONTINUE HANDLER FOR NOT FOUND SET no_more_items = 1;
    16     
    17 
    18     -- 禁用外鍵,外鍵會致使TRUNCATE TABLE語句執行失敗,另:在SET FOREIGN_KEY_CHECKS以後聲明變量會報錯,暫不知緣由
    19     SET FOREIGN_KEY_CHECKS = 0;    
    20     
    21     -- 打開遊標
    22     OPEN c_result;
    23     -- 循環開始
    24     REPEAT                     
    25         FETCH c_result INTO v_sql;
    26         SET @v_sql=v_sql;
    27         SELECT @v_sql;
    28 
    29         -- 執行動態腳本
    30         -- 預處理須要執行的動態SQL,其中stmt是一個變量
    31         PREPARE stmt FROM @v_sql;
    32         -- 執SQL語句
    33         EXECUTE stmt;
    34         -- 釋放掉預處理段
    35         DEALLOCATE PREPARE stmt;
    36      
    37     -- 循環結束
    38     UNTIL no_more_items END REPEAT;
    39     -- 關閉遊標
    40     CLOSE c_result;
    41     
    42     -- 恢復外鍵
    43     SET FOREIGN_KEY_CHECKS = 1; 
    44 END//
    45 DELIMITER ;

4、最後

  特別提醒:生成CSV文件時必定要生成列名,不然會致使第一條記錄主鍵數據異常,寫demo時被這個問題坑了很久

  參考連接:Mysql快速導入數據併發

5、補充

  以前的數據遷移中,對於Sqlserver中的null遷移到mysql會變成當前字段類型的默認值,例如:int默認爲值0、DateTime類型爲0000-00-00 00:00:00。app

  原修復方案:數據遷移完成後,執行數據修復腳本fixSql,將默認值更新爲null異步

  優化方法:將DBNULL類型解析爲CSV文件時,解析成\N,CSV載入數據庫時\N會轉換爲NULLide

相關文章
相關標籤/搜索