項目前期使用mysql數據庫,大約天天200w數據量,十天1500w數據量以後,讀取寫入都會很慢,並且常常鎖表,後來採用vertica數據庫,下面分享vertica數據庫使用方法,以及大批量數據快速寫入數據庫的方法:mysql
1,數據庫IDE的配置sql
windows系統下需先安裝jdk,而後才能使用IDE,能夠使用dbvis_windows-x64_9_1_9,一個數據庫統一客戶端工具,網上搜索下載。數據庫
可能會出現須要加載jar包才能使用的狀況,選擇軟件目錄\jdbc\vertica.jar包便可。c#
2,c#裏如何使用windows
下載類庫:https://my.vertica.com/download/vertica/client-drivers/工具
項目中引用:sqlserver
代碼:ui
1 VerticaConnection conn = new VerticaConnection(ConfigurationManager.AppSettings["VerticalTest"]); 2 { 3 conn.Open(); 4 VerticaTransaction txn = conn.BeginTransaction(); 5 VerticaCommand cmd = new VerticaCommand(); 6 cmd.Connection = conn; 7 cmd.Transaction = txn; 8 string deleteSourceData = @"Delete from " + strTableName + " where sampledate = " + time.ToString("yyyyMMdd") + " and city='" + city + "'"; 9 cmd.CommandText = deleteSourceData; 10 cmd.CommandTimeout = 90; 11 cmd.ExecuteNonQuery(); 12 }
3,大量數據快速批量插入vertica數據庫方法spa
sqlserver中常見插入方法:3d
insert into table1(a,b) valuse ('Cust1', 'Smith Company'),('Cust2', 'Perform Company')
vertica裏不支持這種寫法,可是支持以下:
insert into table1(a,b) select a,b,c from table2 union select 'Cust1', 'Smith Company'
這種寫法速度並不快,更快速的方法是使用copy,copy能夠從txt文件,也能夠是內存流:
1 /// <summary> 2 /// 批量插入數據到vertica數據庫 3 /// </summary> 4 /// <param name="dt">數據源</param> 5 /// <param name="strTableName">插入的目標表名</param> 6 /// <param name="time">日期(刪除數據使用)</param> 7 /// <param name="city">城市(刪除數據使用)</param> 8 public static void BulkCopy(DataTable dt, string strTableName, DateTime time,string city) 9 { 10 if (dt == null || dt.Rows.Count == 0) 11 { 12 return; 13 } 14 if (dt.Columns.Count == 0) 15 { 16 throw new Exception("The length of column cannot be zero."); 17 } 18 //從datatable中獲取列名 19 List<string> lstField = new List<string>(); 20 for (int colIndex = 0; colIndex < dt.Columns.Count; colIndex++) 21 { 22 lstField.Add(dt.Columns[colIndex].ColumnName); 23 } 24 string strFiledList = string.Format("({0})", string.Join(",", lstField.ToArray())); 25 //拼寫copy語句 26 const char RowSplit = '\n'; 27 const char ColSplit = '\t'; 28 29 string strCopyStatement = string.Format("copy {0}{1} from stdin record terminator E'{2}' delimiter E'{3}' enforcelength no commit", 30 strTableName, strFiledList, RowSplit, ColSplit); 31 //按照copy語句中的分隔符,分隔數據源 32 StringBuilder sbText = new StringBuilder(); 33 foreach (DataRow dr in dt.Rows) 34 { 35 bool bFirstField = true; 36 for (int colIndex = 0; colIndex < dt.Columns.Count; colIndex++) 37 { 38 string strVal = GetDataString(dr, colIndex); 39 if (bFirstField) 40 { 41 sbText.Append(strVal); 42 bFirstField = false; 43 } 44 else 45 { 46 sbText.AppendFormat("{0}{1}", ColSplit, strVal); 47 } 48 } 49 sbText.Append(RowSplit); 50 } 51 //數據源寫入內存流 52 string strTemp = sbText.ToString(); 53 byte[] buff = Encoding.UTF8.GetBytes(strTemp); 54 using (MemoryStream ms = new MemoryStream()) 55 { 56 ms.Write(buff, 0, buff.Length); 57 ms.Flush(); 58 ms.Position = 0; 59 //創建vertica數據庫鏈接 60 VerticaConnection conn = new VerticaConnection(ConfigurationManager.AppSettings["VerticalTest"]); 61 { 62 conn.Open(); 63 VerticaTransaction txn = conn.BeginTransaction(); 64 Vertica.Data.VerticaClient.VerticaCopyStream vcs = new VerticaCopyStream(conn, strCopyStatement); 65 //插入數據前,先刪除重複數據 66 VerticaCommand cmd = new VerticaCommand(); 67 cmd.Connection = conn; 68 cmd.Transaction = txn; 69 string deleteSourceData = @"Delete from " + strTableName + " where sampledate = " + time.ToString("yyyyMMdd") + " and city='" + city + "'"; 70 cmd.CommandText = deleteSourceData; 71 cmd.CommandTimeout = 90; 72 cmd.ExecuteNonQuery(); 73 //批量插入數據 74 vcs.Start(); 75 vcs.AddStream(ms, false); 76 vcs.Execute(); 77 78 long insertedCount = vcs.Finish(); 79 80 IList<long> lstRejected = vcs.Rejects; 81 if (lstRejected.Count > 0) 82 { 83 txn.Rollback(); 84 conn.Close(); 85 86 // Maybe need more detail info to show 87 throw new Exception("Bulk copy failure."); 88 } 89 else 90 { 91 txn.Commit(); 92 conn.Close(); 93 } 94 } 95 96 ms.Close(); 97 } 98 }