分享MSSQL、MySql、Oracle的大數據批量導入方法及編程手法細節

1:MSSQL

SQL語法篇:

BULK INSERT   
   [ database_name . [ schema_name ] . | schema_name . ] [ table_name | view_name ]   
      FROM 'data_file'   
     [ WITH   
    (   
   [ [ , ] BATCHSIZE = batch_size ]   
   [ [ , ] CHECK_CONSTRAINTS ]   
   [ [ , ] CODEPAGE = { 'ACP' | 'OEM' | 'RAW' | 'code_page' } ]   
   [ [ , ] DATAFILETYPE =   
      { 'char' | 'native'| 'widechar' | 'widenative' } ]   
   [ [ , ] FIELDTERMINATOR = 'field_terminator' ]   
   [ [ , ] FIRSTROW = first_row ]   
   [ [ , ] FIRE_TRIGGERS ]   
   [ [ , ] FORMATFILE = 'format_file_path' ]   
   [ [ , ] KEEPIDENTITY ]   
   [ [ , ] KEEPNULLS ]   
   [ [ , ] KILOBYTES_PER_BATCH = kilobytes_per_batch ]   
   [ [ , ] LASTROW = last_row ]   
   [ [ , ] MAXERRORS = max_errors ]   
   [ [ , ] ORDER ( { column [ ASC | DESC ] } [ ,...n ] ) ]   
   [ [ , ] ROWS_PER_BATCH = rows_per_batch ]   
   [ [ , ] ROWTERMINATOR = 'row_terminator' ]   
   [ [ , ] TABLOCK ]   
   [ [ , ] ERRORFILE = 'file_name' ]   
    )]   

SQL示例:mysql

 bulk insert 表名  from 'D:\mydata.txt' 
with 
 (fieldterminator=',', 
 rowterminator='\n', 
 check_constraints) 
select * from 表名 

因爲C#提供了SqlBulkCopy,因此非DBA的咱們,更多會經過程序來調用:sql

C#代碼篇:

C#代碼調用示例及細節,如下代碼摘錄自CYQ.Data:數據庫

using (SqlBulkCopy sbc = new SqlBulkCopy(con, (keepID ? SqlBulkCopyOptions.KeepIdentity : SqlBulkCopyOptions.Default) | SqlBulkCopyOptions.FireTriggers, sqlTran))
                    {
                        sbc.BatchSize = 100000;
                        sbc.DestinationTableName = SqlFormat.Keyword(mdt.TableName, DalType.MsSql);
                        sbc.BulkCopyTimeout = AppConfig.DB.CommandTimeout;
                        foreach (MCellStruct column in mdt.Columns)
                        {
                            sbc.ColumnMappings.Add(column.ColumnName, column.ColumnName);
                        }
                        sbc.WriteToServer(mdt);
                    }

有5個細節:服務器

1:事務:app

若是隻是單個事務,構造函數能夠是連接字符串。ide

若是須要和外部合成一個事務(好比先刪除,再插入,這在同一個事務中)函數

就須要本身構造Connection對象和Transaction,在上下文中傳遞來處理。工具

2:插入是否引起觸發器大數據

經過SqlBulkCopyOptions.FireTriggers 引入spa

3:其它:批量數、超時時間、是否寫入主鍵ID。

可能引起的數據庫Down機的狀況:

在歷史的過程當中,我遇到過的一個大坑是:

當數據的長度過長,數據的字段太短,產生數據二進制截斷時,數據庫服務居然停掉了(也許是特例,也許不是)。

因此當心使用,盡力作好對外部數據作好數據長度驗證。

2:MySql

關於MySql的批量,這是一段悲催的往事,有幾個坑,直到今天,才發現並解決了。

SQL語法篇:

LOAD DATA [LOW_PRIORITY | CONCURRENT] [LOCAL] INFILE 'data.txt'
    [REPLACE | IGNORE]
    INTO TABLE tbl_name
    [FIELDS
        [TERMINATED BY 'string']
        [[OPTIONALLY] ENCLOSED BY 'char']
        [ESCAPED BY 'char' ]
    ]
    [LINES
        [STARTING BY 'string']
        [TERMINATED BY 'string']
    ]
    [IGNORE number LINES]
    [(col_name_or_user_var,...)]
    [SET col_name = expr,...)]

示例篇:

LOAD DATA LOCAL INFILE 'C:\\Users\\cyq\\AppData\\Local\\Temp\\BulkCopy.csv' INTO TABLE `BulkCopy` CHARACTER SET utf8 FIELDS TERMINATED BY '$,$' LINES TERMINATED BY '
' (`ID`,`Name`,`CreateTime`,`Sex`)

雖然MySql.Data.dll 提供了MySqlBulkLoader,可是看源碼只是生成了個Load Data 並用ADO.NET執行,

核心大坑的生成*.csv數據文件的居然沒提供,因此本身生成語句並執行就行了,不須要用它。

C#代碼篇:

如下代碼摘自CYQ.Data,是一段今天才修正好的代碼:

 private static string MDataTableToFile(MDataTable dt, bool keepID, DalType dalType)
        {
            string path = Path.GetTempPath() + dt.TableName + ".csv";
            using (StreamWriter sw = new StreamWriter(path, false, new UTF8Encoding(false)))
            {
                MCellStruct ms;
                string value;
                foreach (MDataRow row in dt.Rows)
                {
                    for (int i = 0; i < dt.Columns.Count; i++)
                    {
                        #region 設置值
                        ms = dt.Columns[i];
                        if (!keepID && ms.IsAutoIncrement)
                        {
                            continue;
                        }
                        else if (dalType == DalType.MySql && row[i].IsNull)
                        {
                            sw.Write("\\N");//Mysql用\N表示null值。
                        }
                        else
                        {
                            value = row[i].ToString();
                            if (ms.SqlType == SqlDbType.Bit)
                            {
                                int v = (value.ToLower() == "true" || value == "1") ? 1 : 0;
                                if (dalType == DalType.MySql)
                                {
                                    byte[] b = new byte[1];
                                    b[0] = (byte)v;
                                    value = System.Text.Encoding.UTF8.GetString(b);//mysql必須用字節存檔。
                                }
                                else
                                {
                                    value = v.ToString();
                                }

                            }
                            else
                            {
                                value = value.Replace("\\", "\\\\");//處理轉義符號
                            }
                            sw.Write(value);
                        }

                        if (i != dt.Columns.Count - 1)//不是最後一個就輸出
                        {
                            sw.Write(AppConst.SplitChar);
                        }
                        #endregion
                    }
                    sw.WriteLine();
                }
            }
            if (Path.DirectorySeparatorChar == '\\')
            {
                path = path.Replace(@"\", @"\\");
            }
            return path;
        }

以上代碼是產生一個csv文件,用於被調用,有兩個核心的坑,費了我很多時間:

1:Bit類型數據導不進去?

2:第1行數據自增ID被重置爲1?

這兩個問題,網上搜不到答案,放縱到今天,覺的應該解決了,而後就把它解決了。

解決的思路是這樣的:

A:先用Load Data OutFile導出一個文件,再用Load Data InFile導入文件。

一開始我用記事本打開看了一下,又順手Ctrl+S了一下,結果發現問題和個人同樣,讓我懷疑居然不支持?

直到今天,從新導出,中間不看了,直接導入,發現它居然又正常的,因而,思惟一轉:

B:把本身生成的文件和命令產生的文件,進行了十六進制比對,結果發現:

Bit類型本身生成的的數據:是0,1,在十六進制下顯示是30、31。

命令產生的數據在十六進制是00、01,查了下資料,發現MySql的Bit存檔的Bit是二進制。

因而,把0,1用字節表示,再轉字符串,再存檔,就行了。

因而這麼一段代碼產生了(網上的DataTable轉CSV代碼都是沒處理的,都不知道他們是怎麼跑的,難道都沒有定義Bit類型?):

if (ms.SqlType == SqlDbType.Bit)
{
     int v = (value.ToLower() == "true" || value == "1") ? 1 : 0;
     if (dalType == DalType.MySql)
     {
           byte[] b = new byte[1];
           b[0] = (byte)v;
           value = System.Text.Encoding.UTF8.GetString(b);//mysql必須用字節存檔。
       }
      else
       {
            value = v.ToString();
       }
}

另外關於Null值,用\N表示。

解決完第一個問題,剩下就是第二個問題了,爲何第一個行代碼的主鍵會被置爲1?

仍是比對十六進制,結果驚人的發現:

是BOM頭,讓它錯識別了第一個主鍵值,因此被忽略主鍵,用了第1個自增值1替代了。

這也解釋了爲何只要從新保存的數據都有Bug的緣由。

因而,解決的方法就是StreaWrite的時候,不生成BOM頭,怎麼處理呢?

因而就有了如下的代碼:

 using (StreamWriter sw = new StreamWriter(path, false, new UTF8Encoding(false)))
{
       ...................
}

經過New一個Encoding,並指定參數爲false,替代咱們常規的System.Text.Encoding.UTF8Encoding。

這些細節很隱祕,不說你都猜不道。。。

3:Oracle

SQL語法篇

LOAD[DATA]
[ { INFILE | INDDN } {file | * }
[STREAM | RECORD | FIXED length [BLOCKSIZE size]|
VARIABLE [length] ]
[ { BADFILE | BADDN } file ]
{DISCARDS | DISCARDMAX} integr ]
[ {INDDN | INFILE} . . . ]
[ APPEND | REPLACE | INSERT ]
[RECLENT integer]
[ { CONCATENATE integer |
CONTINUEIF { [THIS | NEXT] (start[: end])LAST }
Operator { 'string' | X 'hex' } } ]
INTO TABLE [user.]table
[APPEND | REPLACE|INSERT]
[WHEN condition [AND condition]...]
[FIELDS [delimiter] ]
(
column {
RECNUM | CONSTANT value |
SEQUENCE ( { integer | MAX |COUNT} [, increment] ) |
[POSITION ( { start [end] | * [ + integer] }
) ]
datatype
[TERMINATED [ BY ] {WHITESPACE| [X] 'character' } ]
[ [OPTIONALLY] ENCLOSE[BY] [X]'charcter']
[NULLIF condition ]
[DEFAULTIF condotion]
}
[ ,...]
)

以上配置存檔成一個CTL文件,再由如下的命令調用:

Sqlldr userid=用戶名/密碼@數據庫 control=文件名.ctl 

C#語法篇:

.NET裏大概有三種操做Oracle的手法:

1:System.Data.OracleClient (須要安裝客戶端)沒有帶批量方法(還區分x86和x64)。

2:Oracle.DataAccess  (須要安裝客戶端)帶批量方法(也區分x86和x64)。

3:Oracle.ManagedDataAccess (不須要安裝客戶端)沒帶批量方法(不區分x86和x64,但僅支持.NET 4.0或以上)

Oracle.DataAccess 帶的批量方法叫:OracleBulkCopy,因爲使用方式和SqlBulkCopy幾乎一致,就不介紹了。

若是調用程序所在的服務器安裝了Oracle客戶端,能夠進行如下方法的調用:

流程以下:

1:產生*.cvs數據文件,見MySql中的代碼,同樣用的。

2:產生*.ctl控制文件,把生成的Load Data 語句存檔成一個*.ctl文件便可。

3:用sqlidr.exe執行CTL文件,這裏悲催的一點是,不能用ADO.NET調用,只能用進程調用,因此,這個批量只能單獨使用。

調用進程的相關代碼:

 bool hasSqlLoader = false;
        private bool HasSqlLoader() //檢測是否安裝了客戶端。
        {
            hasSqlLoader = false;
            Process proc = new Process();
            proc.StartInfo.FileName = "sqlldr";
            proc.StartInfo.CreateNoWindow = true;
            proc.StartInfo.UseShellExecute = false;
            proc.StartInfo.RedirectStandardOutput = true;

            proc.OutputDataReceived += new DataReceivedEventHandler(proc_OutputDataReceived);
            proc.Start();
            proc.BeginOutputReadLine();
            proc.WaitForExit();
            return hasSqlLoader;
        }

        void proc_OutputDataReceived(object sender, DataReceivedEventArgs e)
        {
            if (!hasSqlLoader)
            {
                hasSqlLoader = e.Data.StartsWith("SQL*Loader:");
            }
        }
        //已經實現,但沒有事務,因此暫時先不引入。
        private bool ExeSqlLoader(string arg)
        {
            try
            {
                Process proc = new Process();
                proc.StartInfo.FileName = "sqlldr";
                proc.StartInfo.Arguments = arg;
                proc.Start();
                proc.WaitForExit();
                return true;
            }
            catch
            {

            }
            return false;
        }

總結:

隨着大數據的普及,數據間的批量移動必然越來頻繁的被涉及,因此無論是用SQL腳本,仍是本身寫代碼,或是用DBImport工具,都將成必備技能之一了!

鑑於此,分享一下我在這一塊費過的力和填過的坑,供大夥參考!

相關文章
相關標籤/搜索