關於 SqlServer 批量插入的方式,前段時間也有大神給出了好幾種批量插入的方式及對比測試(http://www.cnblogs.com/jiekzou/p/6145550.html),估計你們也都明白,最佳的方式就是用 SqlBulkCopy。自從LZ把Chloe.ORM開源之後,有很多園友/羣友詢問,框架怎麼批量插入數據。個人回答是不支持!最後建議他們用 SqlBulkCopy 的方式插入。在咱們公司,我對 SqlBulkCopy 封裝成了一個 Helper 方法,使得批量插入更加方便,以知足公司內部很多批量插入需求。我也在羣裏分享了給他們。由於已經有好幾位朋友諮詢過,因此,我感受應該還有不少人尚未本身的一個批量插入方法,所以,LZ今兒給你們分享下我封裝的這個批量插入方法,但願你們喜歡。html
先看看封裝後的方法定義:sql
public static class SqlConnectionExtension { /// <summary> /// 使用 SqlBulkCopy 向 destinationTableName 表插入數據 /// </summary> /// <typeparam name="TModel">必須擁有與目標表全部字段對應屬性</typeparam> /// <param name="conn"></param> /// <param name="modelList">要插入的數據</param> /// <param name="batchSize">SqlBulkCopy.BatchSize</param> /// <param name="destinationTableName">若是爲 null,則使用 TModel 名稱做爲 destinationTableName</param> /// <param name="bulkCopyTimeout">SqlBulkCopy.BulkCopyTimeout</param> /// <param name="externalTransaction">要使用的事務</param> public static void BulkCopy<TModel>(this SqlConnection conn, List<TModel> modelList, int batchSize, string destinationTableName = null, int? bulkCopyTimeout = null, SqlTransaction externalTransaction = null); }
上面都有詳細解釋,相信你們一看就會明白,接下來演示下用法及效果:數據庫
先建立一個測試的 Users 表:app
1 CREATE TABLE [dbo].[Users]( 2 [Id] [uniqueidentifier] NOT NULL, 3 [Name] [nvarchar](100) NULL, 4 [Gender] [int] NULL, 5 [Age] [int] NULL, 6 [CityId] [int] NULL, 7 [OpTime] [datetime] NULL, 8 CONSTRAINT [PK_Users] PRIMARY KEY CLUSTERED 9 ( 10 [Id] ASC 11 )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] 12 ) ON [PRIMARY]
而後定義一個與表映射的 Model,記住,因爲 SqlBulkCopy 的特性,定義的 Model 必須擁有與表全部的字段對應的屬性:框架
1 public enum Gender 2 { 3 Man = 1, 4 Woman 5 } 6 7 public class User 8 { 9 public Guid Id { get; set; } 10 public string Name { get; set; } 11 public Gender? Gender { get; set; } 12 public int? Age { get; set; } 13 public int? CityId { get; set; } 14 public DateTime? OpTime { get; set; } 15 }
製造些數據,而後就能夠直接插入了:ide
1 List<User> usersToInsert = new List<User>(); 2 usersToInsert.Add(new User() { Id = Guid.NewGuid(), Name = "so1", Gender = Gender.Man, Age = 18, CityId = 1, OpTime = DateTime.Now }); 3 usersToInsert.Add(new User() { Id = Guid.NewGuid(), Name = "so2", Gender = Gender.Man, Age = 19, CityId = 2, OpTime = DateTime.Now }); 4 usersToInsert.Add(new User() { Id = Guid.NewGuid(), Name = "so3", Gender = Gender.Man, Age = 20, CityId = 3, OpTime = DateTime.Now }); 5 usersToInsert.Add(new User() { Id = Guid.NewGuid(), Name = "so4", Gender = Gender.Man, Age = 21, CityId = 4, OpTime = DateTime.Now }); 6 7 using (SqlConnection conn = new SqlConnection("Data Source = .;Initial Catalog = Chloe;Integrated Security = SSPI;")) 8 { 9 conn.BulkCopy(usersToInsert, 20000, "Users"); 10 }
執行插入後表數據:性能
很方便吧,定義好 Model,調用 BulkCopy 方法就能插入了。這個方法主要解決了兩個問題:1.免去手動構造 DataTable 和向 DataTable 填充數據,要知道,SqlBulkCopy 要求 DataTable 的列必須和表列順序一致,若是手動構造 DataTable 的話會使代碼很難維護;2.不用親自 new 出 SqlBulkCopy 對象以及手動給 SqlBulkCopy 對象設置各類值,如 DestinationTableName、BulkCopyTimeout、BatchSize 等,用封裝的方法,直接傳相應的值就行了。接下來貼乾貨,簡單介紹下實現。測試
先了解 SqlBulkCopy 的定義(部分):大數據
public sealed class SqlBulkCopy : IDisposable { public SqlBulkCopy(SqlConnection connection); public SqlBulkCopy(string connectionString); public SqlBulkCopy(string connectionString, SqlBulkCopyOptions copyOptions); public SqlBulkCopy(SqlConnection connection, SqlBulkCopyOptions copyOptions, SqlTransaction externalTransaction); public int BatchSize { get; set; } public int BulkCopyTimeout { get; set; } public SqlBulkCopyColumnMappingCollection ColumnMappings { get; } public string DestinationTableName { get; set; } public bool EnableStreaming { get; set; } public int NotifyAfter { get; set; } public event SqlRowsCopiedEventHandler SqlRowsCopied; public void Close(); public void WriteToServer(DataRow[] rows); public void WriteToServer(DataTable table); public void WriteToServer(IDataReader reader); public void WriteToServer(DataTable table, DataRowState rowState); }
咱們只需關注 WriteToServer 方法。由於咱們的數據源不是數據庫或excel,因此咱們直接不考慮 WriteToServer(IDataReader reader)。WriteToServer(DataRow[] rows) 直接無視,很少解釋,因此咱們只需考慮用 WriteToServer(DataTable table) 就好了。開幹!ui
1、構造一個結構嚴謹的 DataTable。
因爲 SqlBulkCopy 要求 DataTable 的列必須和表列順序一致,而且不能多也不能少,因此,咱們首先要建立一個和目標表字段順序一致的 DataTable,先查出目標表的結構:
static List<SysColumn> GetTableColumns(SqlConnection sourceConn, string tableName) { string sql = string.Format("select * from syscolumns inner join sysobjects on syscolumns.id=sysobjects.id where sysobjects.xtype='U' and sysobjects.name='{0}' order by syscolumns.colid asc", tableName); List<SysColumn> columns = new List<SysColumn>(); using (SqlConnection conn = (SqlConnection)((ICloneable)sourceConn).Clone()) { conn.Open(); using (var reader = conn.ExecuteReader(sql)) { while (reader.Read()) { SysColumn column = new SysColumn(); column.Name = reader.GetDbValue("name"); column.ColOrder = reader.GetDbValue("colorder"); columns.Add(column); } } conn.Close(); } return columns; }
獲得基本的表結構 List<SysColumn>,再建立「嚴格」的 DataTable 對象:
DataTable dt = new DataTable(); Type modelType = typeof(TModel); List<SysColumn> columns = GetTableColumns(conn, tableName); List<PropertyInfo> mappingProps = new List<PropertyInfo>(); var props = modelType.GetProperties(); for (int i = 0; i < columns.Count; i++) { var column = columns[i]; PropertyInfo mappingProp = props.Where(a => a.Name == column.Name).FirstOrDefault(); if (mappingProp == null) throw new Exception(string.Format("model 類型 '{0}'未定義與表 '{1}' 列名爲 '{2}' 映射的屬性", modelType.FullName, tableName, column.Name)); mappingProps.Add(mappingProp); Type dataType = GetUnderlyingType(mappingProp.PropertyType); if (dataType.IsEnum) dataType = typeof(int); dt.Columns.Add(new DataColumn(column.Name, dataType)); }
注意,構造 DataColumn 時,要給 Column 設置 DataType,及數據類型。由於若是不指定數據類型,默認是 string 類型,那樣會致使將數據發送至數據庫時會引發數據轉換,會有些許無謂的性能損耗,同時,若是不指定數據類型,導入一些數據類型時可能會失敗,好比模型屬性是 Guid 類型,導入時會出現類型轉換失敗異常。
2、利用反射,獲取屬性值,構造一行一行的 DataRow,填充 DataTable:
foreach (var model in modelList) { DataRow dr = dt.NewRow(); for (int i = 0; i < mappingProps.Count; i++) { PropertyInfo prop = mappingProps[i]; object value = prop.GetValue(model); if (GetUnderlyingType(prop.PropertyType).IsEnum) { if (value != null) value = (int)value; } dr[i] = value ?? DBNull.Value; } dt.Rows.Add(dr); }
3、一個完整包含數據的 DataTable 對象就建立好了,咱們就可使用 SqlBulkCopy 插入數據了:
public static void BulkCopy<TModel>(this SqlConnection conn, List<TModel> modelList, int batchSize, string destinationTableName = null, int? bulkCopyTimeout = null, SqlTransaction externalTransaction = null) { bool shouldCloseConnection = false; if (string.IsNullOrEmpty(destinationTableName)) destinationTableName = typeof(TModel).Name; DataTable dtToWrite = ToSqlBulkCopyDataTable(modelList, conn, destinationTableName); SqlBulkCopy sbc = null; try { if (externalTransaction != null) sbc = new SqlBulkCopy(conn, SqlBulkCopyOptions.Default, externalTransaction); else sbc = new SqlBulkCopy(conn); using (sbc) { sbc.BatchSize = batchSize; sbc.DestinationTableName = destinationTableName; if (bulkCopyTimeout != null) sbc.BulkCopyTimeout = bulkCopyTimeout.Value; if (conn.State != ConnectionState.Open) { shouldCloseConnection = true; conn.Open(); } sbc.WriteToServer(dtToWrite); } } finally { if (shouldCloseConnection && conn.State == ConnectionState.Open) conn.Close(); } }
完事,一個批量插入的 Helper 方法就這麼產生了,最終的完整實現以下:
1 public static class SqlConnectionExtension 2 { 3 /// <summary> 4 /// 使用 SqlBulkCopy 向 destinationTableName 表插入數據 5 /// </summary> 6 /// <typeparam name="TModel">必須擁有與目標表全部字段對應屬性</typeparam> 7 /// <param name="conn"></param> 8 /// <param name="modelList">要插入的數據</param> 9 /// <param name="batchSize">SqlBulkCopy.BatchSize</param> 10 /// <param name="destinationTableName">若是爲 null,則使用 TModel 名稱做爲 destinationTableName</param> 11 /// <param name="bulkCopyTimeout">SqlBulkCopy.BulkCopyTimeout</param> 12 /// <param name="externalTransaction">要使用的事務</param> 13 public static void BulkCopy<TModel>(this SqlConnection conn, List<TModel> modelList, int batchSize, string destinationTableName = null, int? bulkCopyTimeout = null, SqlTransaction externalTransaction = null) 14 { 15 bool shouldCloseConnection = false; 16 17 if (string.IsNullOrEmpty(destinationTableName)) 18 destinationTableName = typeof(TModel).Name; 19 20 DataTable dtToWrite = ToSqlBulkCopyDataTable(modelList, conn, destinationTableName); 21 22 SqlBulkCopy sbc = null; 23 24 try 25 { 26 if (externalTransaction != null) 27 sbc = new SqlBulkCopy(conn, SqlBulkCopyOptions.Default, externalTransaction); 28 else 29 sbc = new SqlBulkCopy(conn); 30 31 using (sbc) 32 { 33 sbc.BatchSize = batchSize; 34 sbc.DestinationTableName = destinationTableName; 35 36 if (bulkCopyTimeout != null) 37 sbc.BulkCopyTimeout = bulkCopyTimeout.Value; 38 39 if (conn.State != ConnectionState.Open) 40 { 41 shouldCloseConnection = true; 42 conn.Open(); 43 } 44 45 sbc.WriteToServer(dtToWrite); 46 } 47 } 48 finally 49 { 50 if (shouldCloseConnection && conn.State == ConnectionState.Open) 51 conn.Close(); 52 } 53 } 54 55 public static DataTable ToSqlBulkCopyDataTable<TModel>(List<TModel> modelList, SqlConnection conn, string tableName) 56 { 57 DataTable dt = new DataTable(); 58 59 Type modelType = typeof(TModel); 60 61 List<SysColumn> columns = GetTableColumns(conn, tableName); 62 List<PropertyInfo> mappingProps = new List<PropertyInfo>(); 63 64 var props = modelType.GetProperties(); 65 for (int i = 0; i < columns.Count; i++) 66 { 67 var column = columns[i]; 68 PropertyInfo mappingProp = props.Where(a => a.Name == column.Name).FirstOrDefault(); 69 if (mappingProp == null) 70 throw new Exception(string.Format("model 類型 '{0}'未定義與表 '{1}' 列名爲 '{2}' 映射的屬性", modelType.FullName, tableName, column.Name)); 71 72 mappingProps.Add(mappingProp); 73 Type dataType = GetUnderlyingType(mappingProp.PropertyType); 74 if (dataType.IsEnum) 75 dataType = typeof(int); 76 dt.Columns.Add(new DataColumn(column.Name, dataType)); 77 } 78 79 foreach (var model in modelList) 80 { 81 DataRow dr = dt.NewRow(); 82 for (int i = 0; i < mappingProps.Count; i++) 83 { 84 PropertyInfo prop = mappingProps[i]; 85 object value = prop.GetValue(model); 86 87 if (GetUnderlyingType(prop.PropertyType).IsEnum) 88 { 89 if (value != null) 90 value = (int)value; 91 } 92 93 dr[i] = value ?? DBNull.Value; 94 } 95 96 dt.Rows.Add(dr); 97 } 98 99 return dt; 100 } 101 static List<SysColumn> GetTableColumns(SqlConnection sourceConn, string tableName) 102 { 103 string sql = string.Format("select * from syscolumns inner join sysobjects on syscolumns.id=sysobjects.id where sysobjects.xtype='U' and sysobjects.name='{0}' order by syscolumns.colid asc", tableName); 104 105 List<SysColumn> columns = new List<SysColumn>(); 106 using (SqlConnection conn = (SqlConnection)((ICloneable)sourceConn).Clone()) 107 { 108 conn.Open(); 109 using (var reader = conn.ExecuteReader(sql)) 110 { 111 while (reader.Read()) 112 { 113 SysColumn column = new SysColumn(); 114 column.Name = reader.GetDbValue("name"); 115 column.ColOrder = reader.GetDbValue("colorder"); 116 117 columns.Add(column); 118 } 119 } 120 conn.Close(); 121 } 122 123 return columns; 124 } 125 126 static Type GetUnderlyingType(Type type) 127 { 128 Type unType = Nullable.GetUnderlyingType(type); ; 129 if (unType == null) 130 unType = type; 131 132 return unType; 133 } 134 135 class SysColumn 136 { 137 public string Name { get; set; } 138 public int ColOrder { get; set; } 139 } 140 } 141
代碼很少,僅僅150行,你們能夠直接拷走拿去用。其中用了反射,估計吃瓜羣衆可能不淡定了~哈哈,若是你真有大數據插入需求,這點反射消耗相對大數據插入簡直九牛一毛,微乎其微,放心好了。
最後,感謝你們閱讀至此。若是本文對您有用,還望給個愛心推薦,您的讚揚是我持續分享的動力。也歡迎廣大C#同胞入羣交流(羣號在頂部),暢談.NET復興大計。