C#使用Parallel處理數據同步寫入Datatable並使用BulkInsert批量導入數據庫

項目須要,幾十萬張照片須要計算出每一個照片的特徵值(調用C++編寫的DLL)。sql

業務流程:選擇照片文件夾,分別訪問照片-->調用DLL接口傳遞照片路徑-->接收處理返回值-->寫入數據庫。數據庫

前期使用的for循環來處理,幾十萬張照片處理起來差很少10個小時。速度太慢,後面改進使用Parallel來進行平行計算(調用DLL處理照片),統一寫入Datatable,而後使用BulkInsert批量把Datatable寫入數據庫,目前測試8萬張照片並行計算速度30分鐘,速度提升約30%-40%左右。服務器

代碼示例以下:async

private static SqlConnection sqlconn;
private static ConcurrentDictionary<string, int> currInts = new ConcurrentDictionary<string, int>();
private void Button1_Click(object sender, EventArgs e)
        {           
            var dirPath = "";
            using (var folderBrowser = new FolderBrowserDialog())
            {
                if (folderBrowser.ShowDialog() != DialogResult.OK) return;
                dirPath = folderBrowser.SelectedPath;
                if (!Directory.Exists(dirPath))
                {
                    MessageBox.Show(@"所選路徑不存在或無權訪問", @"錯誤", MessageBoxButtons.OK, MessageBoxIcon.Error);
                    return;
                }
            }
 
            BeginInvoke(new Action(async () =>
            {
                button1.Enabled = false;
                var sw = new Stopwatch();
                sw.Start();
 
                //檢測服務器連接
                Log.WriteLine(@"嘗試鏈接數據庫服務器");
 
                sqlconn = new SqlConnection(
                    $"Data Source={txt_serverIP.Text},{txt_ServerPort.Text};User ID={txt_User.Text};Password={txt_Pwd.Text};Initial Catalog={txt_DB.Text};Persist Security Info=False;Pooling=true;Min Pool Size=30;Max Pool Size=200;");
                if (sqlconn.State == ConnectionState.Closed)
                {
                    try
                    {
                        sqlconn.Open();
                    }
                    catch (Exception exception)
                    {
                        Log.WriteLine($@"鏈接數據庫服務器【失敗】-->{exception.Message}");
                        button1.Enabled = true;
                        return;
                    }
                }
 
                Log.WriteLine($@"鏈接數據庫服務器【成功】{Environment.NewLine}獲取未轉換圖片數據。。。");
                var ds = new DataSet();
                int.TryParse(txt_start.Text, out var start);
                int.TryParse(txt_end.Text, out var end);
                var sqlstrALL = "";
                if (start == 0 || end == 0)
                {
                    sqlstrALL = "SELECT * FROM ViewWeiZhuanHuan";
                }
                else
                {
                    sqlstrALL = $"SELECT * FROM ViewWeiZhuanHuan WHERE {txt_mastKey.Text} BETWEEN {start} AND {end}";
                }
 
                var sqlcmd = new SqlCommand(sqlstrALL, sqlconn);
                DataAdapter da = new SqlDataAdapter(sqlcmd);
                da.Fill(ds);
                if (ds.Tables.Count == 0 || ds.Tables[0].Rows.Count == 0)
                {
                    Log.WriteLine("全部圖片都已經轉換完畢。");
                    sqlconn.Close();
                    return;
                }
 
                Log.WriteLine($"{ds.Tables[0].Rows.Count}個圖片須要轉換。");
 
                var total = ds.Tables[0].Rows.Count;
                var rowkey = comboBox1.SelectedValue.ToString();
                var splitkey = txt_split.Text.Trim();
 
                #region 定義數據保存
                var dt = new DataTable();
                dt.Columns.Add("zd1", typeof(int));
                dt.Columns.Add("zd2", typeof(int));
                dt.Columns.Add("zd3", typeof(string));
                dt.Columns.Add("zd4", typeof(string));
                dt.Columns.Add("zd5", typeof(string));
                dt.Columns.Add("zd6", typeof(string));
                #endregion
 
                #region 並行執行
                currInts.TryAdd("currInts", 1);//初始化進度數字爲1
                await Task.Run(() =>
               {
                  //使用8個CPU核心來運行
                   var result = Parallel.For(0, ds.Tables[0].Rows.Count, new ParallelOptions { MaxDegreeOfParallelism = 8}, (rotIndex, state) =>
                   {
                       BeginInvoke(new Action(() =>
                       {
                           currInts.TryGetValue("currInts", out var currValue);
                           lb_process.Text = $@"{currValue}/{total}";//顯示進度
                           var nextValue = currValue + 1;
                           currInts.TryUpdate("currInts", nextValue, currValue);//加1
                       }));                      
                      
                       var fileDirPath = "";//根據選擇的文件名格式,用填寫的規則生成路徑                      
 
                       var file = new List<string>{
                            $"{dirPath}\\{fileDirPath}\\{ksno}_fp1.jpg",
                            $"{dirPath}\\{fileDirPath}\\{ksno}_fp2.jpg",
                            $"{dirPath}\\{fileDirPath}\\{ksno}_fp3.jpg"};
 
                       foreach (var zwzp in file)
                       {
                           try
                           {
                               var model = ZwHelper.zwzhAsync($"{zwzp}").Result;//調用C++轉換
                               if (model != null)
                               {
//並行計算下寫入Datatable須要鎖定才能夠,不然會提示datatable索引損壞                            
                                   lock (dt.Rows.SyncRoot)
                                   {
                                       var dr = dt.NewRow();
                                       dr["zd1"] = Convert.ToInt32(filexe);
                                       dr["zd2"] = Convert.ToInt32(ds.Tables[0].Rows[rotIndex]["zd1"]);
                                       dr["zd3"] = model.zhtz;
                                       dr["zd4"] = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
                                       dr["zd5"] = "";
                                       dr["zd6"] = "";
                                       dt.Rows.Add(dr);
                                   }
                                  
                               }
                               else
                               {
                                   Log.WriteLine($@"{ksno}轉換失敗");
                                   Log.Log.Error($"{ksno}轉換失敗。");
                               }
                           }
                           catch (Exception exception)
                           {
                               Log.Log.Error($"學號{ksno},圖片路徑{zwzp}轉換失敗。{exception}");
                           }
                       }
 
                   });
                   sw.Stop();
                   Log.WriteLine($"轉換耗時:{sw.ElapsedMilliseconds}毫秒");
                   Log.WriteLine($@"開始寫入數據庫,數量{dt.Rows.Count}");
 
                   #region 批量寫入
 
                   if (dt.Rows.Count ==0)
                   {
                       Log.WriteLine(@"沒有要寫入的數據。");
                       return;
                   }
                   sw.Restart();
                   var sucess = false;
                   if (SqlHelper.BulkInsert(sqlconn, txt_TableName.Text.Trim(), dt, out var err))
                   {
                       sucess = true;
                   }
                   else
                   {
                       Log.Log.Error($"寫入數據庫失敗==》{err}");
                   }
                   sw.Stop();
                   Log.WriteLine($"寫入數據庫是否成功=>{sucess},耗時{sw.ElapsedMilliseconds}毫秒");
                   #endregion
               });
                #endregion
              
              
                if (sqlconn.State == ConnectionState.Open)
                {
                    sqlconn.Close();
                }
                button1.Enabled = true;
            }));
        }

  SQL批量寫入函數函數

        /// <summary>
        /// 批量插入
        /// </summary>
        /// <param name="conn">鏈接對象</param>
        /// <param name="tableName">將泛型集合插入到本地數據庫表的表名</param>
        /// <param name="dataTable">要批量寫入的Datatable</param>
        /// <param name="err">錯誤時返回的信息</param>
        public static bool BulkInsert(SqlConnection conn, string tableName, DataTable dataTable, out string err)
        {
            err = "";
            if (dataTable == null || dataTable.Rows.Count == 0)
            {
                err = "要寫入的數據爲空";
                return false;
            }
            var tran = conn.BeginTransaction();//開啓事務 
            var bulkCopy = new SqlBulkCopy(conn, SqlBulkCopyOptions.KeepNulls, tran);
            try
            {
                if (conn.State == ConnectionState.Closed)
                {
                    conn.Open();
                }
                bulkCopy.BatchSize = 1000;
                bulkCopy.DestinationTableName = tableName;
                bulkCopy.WriteToServer(dataTable);
                tran.Commit();
                return true;
            }
            catch (Exception e)
            {
                err = e.ToString();
                tran.Rollback();
                return false;
            }
            finally
            {
                bulkCopy.Close();
                if (conn.State == ConnectionState.Open)
                {
                    conn.Close();
                }
            }
        }
相關文章
相關標籤/搜索