SqlBulkCopy簡單封裝,讓批量插入更方便

關於 SqlServer 批量插入的方式,前段時間也有大神給出了好幾種批量插入的方式及對比測試(http://www.cnblogs.com/jiekzou/p/6145550.html),估計你們也都明白,最佳的方式就是用 SqlBulkCopy。自從LZ把Chloe.ORM開源之後,有很多園友/羣友詢問,框架怎麼批量插入數據。個人回答是不支持!最後建議他們用 SqlBulkCopy 的方式插入。在咱們公司,我對 SqlBulkCopy 封裝成了一個 Helper 方法,使得批量插入更加方便,以知足公司內部很多批量插入需求。我也在羣裏分享了給他們。由於已經有好幾位朋友諮詢過,因此,我感受應該還有不少人尚未本身的一個批量插入方法,所以,LZ今兒給你們分享下我封裝的這個批量插入方法,但願你們喜歡。html

先看看封裝後的方法定義:sql

public static class SqlConnectionExtension
{
    /// <summary>
    /// 使用 SqlBulkCopy 向 destinationTableName 表插入數據
    /// </summary>
    /// <typeparam name="TModel">必須擁有與目標表全部字段對應屬性</typeparam>
    /// <param name="conn"></param>
    /// <param name="modelList">要插入的數據</param>
    /// <param name="batchSize">SqlBulkCopy.BatchSize</param>
    /// <param name="destinationTableName">若是爲 null,則使用 TModel 名稱做爲 destinationTableName</param>
    /// <param name="bulkCopyTimeout">SqlBulkCopy.BulkCopyTimeout</param>
    /// <param name="externalTransaction">要使用的事務</param>
    public static void BulkCopy<TModel>(this SqlConnection conn, List<TModel> modelList, int batchSize, string destinationTableName = null, int? bulkCopyTimeout = null, SqlTransaction externalTransaction = null);
}

上面都有詳細解釋,相信你們一看就會明白,接下來演示下用法及效果:數據庫

先建立一個測試的 Users 表:app

  1 CREATE TABLE [dbo].[Users](
  2 [Id] [uniqueidentifier] NOT NULL,
  3 [Name] [nvarchar](100) NULL,
  4 [Gender] [int] NULL,
  5 [Age] [int] NULL,
  6 [CityId] [int] NULL,
  7 [OpTime] [datetime] NULL,
  8  CONSTRAINT [PK_Users] PRIMARY KEY CLUSTERED
  9 (
 10 [Id] ASC
 11 )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
 12 ) ON [PRIMARY]

而後定義一個與表映射的 Model,記住,因爲 SqlBulkCopy 的特性,定義的 Model 必須擁有與表全部的字段對應的屬性:框架

  1 public enum Gender
  2 {
  3     Man = 1,
  4     Woman
  5 }
  6 
  7 public class User
  8 {
  9     public Guid Id { get; set; }
 10     public string Name { get; set; }
 11     public Gender? Gender { get; set; }
 12     public int? Age { get; set; }
 13     public int? CityId { get; set; }
 14     public DateTime? OpTime { get; set; }
 15 }


製造些數據,而後就能夠直接插入了:ide

  1 List<User> usersToInsert = new List<User>();
  2 usersToInsert.Add(new User() { Id = Guid.NewGuid(), Name = "so1", Gender = Gender.Man, Age = 18, CityId = 1, OpTime = DateTime.Now });
  3 usersToInsert.Add(new User() { Id = Guid.NewGuid(), Name = "so2", Gender = Gender.Man, Age = 19, CityId = 2, OpTime = DateTime.Now });
  4 usersToInsert.Add(new User() { Id = Guid.NewGuid(), Name = "so3", Gender = Gender.Man, Age = 20, CityId = 3, OpTime = DateTime.Now });
  5 usersToInsert.Add(new User() { Id = Guid.NewGuid(), Name = "so4", Gender = Gender.Man, Age = 21, CityId = 4, OpTime = DateTime.Now });
  6 
  7 using (SqlConnection conn = new SqlConnection("Data Source = .;Initial Catalog = Chloe;Integrated Security = SSPI;"))
  8 {
  9     conn.BulkCopy(usersToInsert, 20000, "Users");
 10 }

執行插入後表數據:性能

image

很方便吧,定義好 Model,調用 BulkCopy 方法就能插入了。這個方法主要解決了兩個問題:1.免去手動構造 DataTable 和向 DataTable 填充數據,要知道,SqlBulkCopy 要求 DataTable 的列必須和表列順序一致,若是手動構造 DataTable 的話會使代碼很難維護;2.不用親自 new 出 SqlBulkCopy 對象以及手動給 SqlBulkCopy 對象設置各類值,如 DestinationTableName、BulkCopyTimeout、BatchSize 等,用封裝的方法,直接傳相應的值就行了。接下來貼乾貨,簡單介紹下實現。測試

先了解 SqlBulkCopy 的定義(部分):大數據

public sealed class SqlBulkCopy : IDisposable
{
    public SqlBulkCopy(SqlConnection connection);
    public SqlBulkCopy(string connectionString);
    public SqlBulkCopy(string connectionString, SqlBulkCopyOptions copyOptions);
    public SqlBulkCopy(SqlConnection connection, SqlBulkCopyOptions copyOptions, SqlTransaction externalTransaction);

    public int BatchSize { get; set; }
    public int BulkCopyTimeout { get; set; }
    public SqlBulkCopyColumnMappingCollection ColumnMappings { get; }
    public string DestinationTableName { get; set; }
    public bool EnableStreaming { get; set; }
    public int NotifyAfter { get; set; }
    public event SqlRowsCopiedEventHandler SqlRowsCopied;

    public void Close();
    public void WriteToServer(DataRow[] rows);
    public void WriteToServer(DataTable table);
    public void WriteToServer(IDataReader reader);
    public void WriteToServer(DataTable table, DataRowState rowState);
}

咱們只需關注 WriteToServer 方法。由於咱們的數據源不是數據庫或excel,因此咱們直接不考慮 WriteToServer(IDataReader reader)。WriteToServer(DataRow[] rows) 直接無視,很少解釋,因此咱們只需考慮用 WriteToServer(DataTable table) 就好了。開幹!ui

1、構造一個結構嚴謹的 DataTable。
因爲 SqlBulkCopy 要求 DataTable 的列必須和表列順序一致,而且不能多也不能少,因此,咱們首先要建立一個和目標表字段順序一致的 DataTable,先查出目標表的結構:

static List<SysColumn> GetTableColumns(SqlConnection sourceConn, string tableName)
{
    string sql = string.Format("select * from syscolumns inner join sysobjects on syscolumns.id=sysobjects.id where sysobjects.xtype='U' and sysobjects.name='{0}' order by syscolumns.colid asc", tableName);

    List<SysColumn> columns = new List<SysColumn>();
    using (SqlConnection conn = (SqlConnection)((ICloneable)sourceConn).Clone())
    {
        conn.Open();
        using (var reader = conn.ExecuteReader(sql))
        {
            while (reader.Read())
            {
                SysColumn column = new SysColumn();
                column.Name = reader.GetDbValue("name");
                column.ColOrder = reader.GetDbValue("colorder");

                columns.Add(column);
            }
        }
        conn.Close();
    }

    return columns;
}

獲得基本的表結構 List<SysColumn>,再建立「嚴格」的 DataTable 對象:

DataTable dt = new DataTable();

Type modelType = typeof(TModel);

List<SysColumn> columns = GetTableColumns(conn, tableName);
List<PropertyInfo> mappingProps = new List<PropertyInfo>();

var props = modelType.GetProperties();
for (int i = 0; i < columns.Count; i++)
{
    var column = columns[i];
    PropertyInfo mappingProp = props.Where(a => a.Name == column.Name).FirstOrDefault();
    if (mappingProp == null)
        throw new Exception(string.Format("model 類型 '{0}'未定義與表 '{1}' 列名爲 '{2}' 映射的屬性", modelType.FullName, tableName, column.Name));

    mappingProps.Add(mappingProp);
    Type dataType = GetUnderlyingType(mappingProp.PropertyType);
    if (dataType.IsEnum)
        dataType = typeof(int);
    dt.Columns.Add(new DataColumn(column.Name, dataType));
}

注意,構造 DataColumn 時,要給 Column 設置 DataType,及數據類型。由於若是不指定數據類型,默認是 string 類型,那樣會致使將數據發送至數據庫時會引發數據轉換,會有些許無謂的性能損耗,同時,若是不指定數據類型,導入一些數據類型時可能會失敗,好比模型屬性是 Guid 類型,導入時會出現類型轉換失敗異常。

2、利用反射,獲取屬性值,構造一行一行的 DataRow,填充 DataTable:

foreach (var model in modelList)
{
    DataRow dr = dt.NewRow();
    for (int i = 0; i < mappingProps.Count; i++)
    {
        PropertyInfo prop = mappingProps[i];
        object value = prop.GetValue(model);

        if (GetUnderlyingType(prop.PropertyType).IsEnum)
        {
            if (value != null)
                value = (int)value;
        }

        dr[i] = value ?? DBNull.Value;
    }

    dt.Rows.Add(dr);
}

3、一個完整包含數據的 DataTable 對象就建立好了,咱們就可使用 SqlBulkCopy 插入數據了:

public static void BulkCopy<TModel>(this SqlConnection conn, List<TModel> modelList, int batchSize, string destinationTableName = null, int? bulkCopyTimeout = null, SqlTransaction externalTransaction = null)
{
    bool shouldCloseConnection = false;

    if (string.IsNullOrEmpty(destinationTableName))
        destinationTableName = typeof(TModel).Name;

    DataTable dtToWrite = ToSqlBulkCopyDataTable(modelList, conn, destinationTableName);

    SqlBulkCopy sbc = null;

    try
    {
        if (externalTransaction != null)
            sbc = new SqlBulkCopy(conn, SqlBulkCopyOptions.Default, externalTransaction);
        else
            sbc = new SqlBulkCopy(conn);

        using (sbc)
        {
            sbc.BatchSize = batchSize;
            sbc.DestinationTableName = destinationTableName;

            if (bulkCopyTimeout != null)
                sbc.BulkCopyTimeout = bulkCopyTimeout.Value;

            if (conn.State != ConnectionState.Open)
            {
                shouldCloseConnection = true;
                conn.Open();
            }

            sbc.WriteToServer(dtToWrite);
        }
    }
    finally
    {
        if (shouldCloseConnection && conn.State == ConnectionState.Open)
            conn.Close();
    }
}

完事,一個批量插入的 Helper 方法就這麼產生了,最終的完整實現以下:

  1 public static class SqlConnectionExtension
  2 {
  3     /// <summary>
  4     /// 使用 SqlBulkCopy 向 destinationTableName 表插入數據
  5     /// </summary>
  6     /// <typeparam name="TModel">必須擁有與目標表全部字段對應屬性</typeparam>
  7     /// <param name="conn"></param>
  8     /// <param name="modelList">要插入的數據</param>
  9     /// <param name="batchSize">SqlBulkCopy.BatchSize</param>
 10     /// <param name="destinationTableName">若是爲 null,則使用 TModel 名稱做爲 destinationTableName</param>
 11     /// <param name="bulkCopyTimeout">SqlBulkCopy.BulkCopyTimeout</param>
 12     /// <param name="externalTransaction">要使用的事務</param>
 13     public static void BulkCopy<TModel>(this SqlConnection conn, List<TModel> modelList, int batchSize, string destinationTableName = null, int? bulkCopyTimeout = null, SqlTransaction externalTransaction = null)
 14     {
 15         bool shouldCloseConnection = false;
 16 
 17         if (string.IsNullOrEmpty(destinationTableName))
 18             destinationTableName = typeof(TModel).Name;
 19 
 20         DataTable dtToWrite = ToSqlBulkCopyDataTable(modelList, conn, destinationTableName);
 21 
 22         SqlBulkCopy sbc = null;
 23 
 24         try
 25         {
 26             if (externalTransaction != null)
 27                 sbc = new SqlBulkCopy(conn, SqlBulkCopyOptions.Default, externalTransaction);
 28             else
 29                 sbc = new SqlBulkCopy(conn);
 30 
 31             using (sbc)
 32             {
 33                 sbc.BatchSize = batchSize;
 34                 sbc.DestinationTableName = destinationTableName;
 35 
 36                 if (bulkCopyTimeout != null)
 37                     sbc.BulkCopyTimeout = bulkCopyTimeout.Value;
 38 
 39                 if (conn.State != ConnectionState.Open)
 40                 {
 41                     shouldCloseConnection = true;
 42                     conn.Open();
 43                 }
 44 
 45                 sbc.WriteToServer(dtToWrite);
 46             }
 47         }
 48         finally
 49         {
 50             if (shouldCloseConnection && conn.State == ConnectionState.Open)
 51                 conn.Close();
 52         }
 53     }
 54 
 55     public static DataTable ToSqlBulkCopyDataTable<TModel>(List<TModel> modelList, SqlConnection conn, string tableName)
 56     {
 57         DataTable dt = new DataTable();
 58 
 59         Type modelType = typeof(TModel);
 60 
 61         List<SysColumn> columns = GetTableColumns(conn, tableName);
 62         List<PropertyInfo> mappingProps = new List<PropertyInfo>();
 63 
 64         var props = modelType.GetProperties();
 65         for (int i = 0; i < columns.Count; i++)
 66         {
 67             var column = columns[i];
 68             PropertyInfo mappingProp = props.Where(a => a.Name == column.Name).FirstOrDefault();
 69             if (mappingProp == null)
 70                 throw new Exception(string.Format("model 類型 '{0}'未定義與表 '{1}' 列名爲 '{2}' 映射的屬性", modelType.FullName, tableName, column.Name));
 71 
 72             mappingProps.Add(mappingProp);
 73             Type dataType = GetUnderlyingType(mappingProp.PropertyType);
 74             if (dataType.IsEnum)
 75                 dataType = typeof(int);
 76             dt.Columns.Add(new DataColumn(column.Name, dataType));
 77         }
 78 
 79         foreach (var model in modelList)
 80         {
 81             DataRow dr = dt.NewRow();
 82             for (int i = 0; i < mappingProps.Count; i++)
 83             {
 84                 PropertyInfo prop = mappingProps[i];
 85                 object value = prop.GetValue(model);
 86 
 87                 if (GetUnderlyingType(prop.PropertyType).IsEnum)
 88                 {
 89                     if (value != null)
 90                         value = (int)value;
 91                 }
 92 
 93                 dr[i] = value ?? DBNull.Value;
 94             }
 95 
 96             dt.Rows.Add(dr);
 97         }
 98 
 99         return dt;
100     }
101     static List<SysColumn> GetTableColumns(SqlConnection sourceConn, string tableName)
102     {
103         string sql = string.Format("select * from syscolumns inner join sysobjects on syscolumns.id=sysobjects.id where sysobjects.xtype='U' and sysobjects.name='{0}' order by syscolumns.colid asc", tableName);
104 
105         List<SysColumn> columns = new List<SysColumn>();
106         using (SqlConnection conn = (SqlConnection)((ICloneable)sourceConn).Clone())
107         {
108             conn.Open();
109             using (var reader = conn.ExecuteReader(sql))
110             {
111                 while (reader.Read())
112                 {
113                     SysColumn column = new SysColumn();
114                     column.Name = reader.GetDbValue("name");
115                     column.ColOrder = reader.GetDbValue("colorder");
116 
117                     columns.Add(column);
118                 }
119             }
120             conn.Close();
121         }
122 
123         return columns;
124     }
125 
126     static Type GetUnderlyingType(Type type)
127     {
128         Type unType = Nullable.GetUnderlyingType(type); ;
129         if (unType == null)
130             unType = type;
131 
132         return unType;
133     }
134 
135     class SysColumn
136     {
137         public string Name { get; set; }
138         public int ColOrder { get; set; }
139     }
140 }
141 
View Code

代碼很少,僅僅150行,你們能夠直接拷走拿去用。其中用了反射,估計吃瓜羣衆可能不淡定了~哈哈,若是你真有大數據插入需求,這點反射消耗相對大數據插入簡直九牛一毛,微乎其微,放心好了。

最後,感謝你們閱讀至此。若是本文對您有用,還望給個愛心推薦,您的讚揚是我持續分享的動力。也歡迎廣大C#同胞入羣交流(羣號在頂部),暢談.NET復興大計。

相關文章
相關標籤/搜索