今天咱們來了解一下一些高併發的業務場景如何作到數據一致性的。redis
一、有數據表:ConCurrency,數據庫
1 CREATE TABLE [dbo].[ConCurrency]( 2 [ID] [int] NOT NULL, 3 [Total] [int] NULL 4 )
二、初始值:ID=1,Total = 0多線程
三、現要求每一次客戶端請求Total + 1併發
1 static void Main(string[] args) 2 { 3 ... 4 new Thread(Run).Start(); 5 ... 6 } 7 8 public static void Run() 9 { 10 for (int i = 1; i <= 100; i++) 11 { 12 var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString(); 13 var value = int.Parse(total) + 1; 14 15 DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null); 16 Thread.Sleep(1); 17 } 18 }
2.1 按要求,正常狀況下應該輸出:100分佈式
2.2 運行結果高併發
貌似沒有問題。測試
3.1 Main改一下spa
1 static void Main(string[] args) 2 { 3 ... 4 new Thread(Run).Start(); 5 new Thread(Run).Start(); 6 ... 7 }
3.2 咱們預期應該是要輸出200線程
3.3 運行結果code
很遺憾,倒是150,形成這個結果的緣由是這樣的:T一、T2獲取Total(假設此時值爲10),T1更新一次或屢次後,T2才更新(Total:10)
這就形成以前T1提交的被覆蓋了
3.4 如何避免呢?通常作法加鎖就能夠了,如Run改爲以下
1 public static void Run() 2 { 3 for (int i = 1; i <= 100; i++) 4 { 5 lock (resource) 6 { 7 var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString(); 8 var value = int.Parse(total) + 1; 9 10 DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null); 11 } 12 13 Thread.Sleep(1); 14 } 15 }
3.5 再次運行
4.一、定義隊列
static ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
1 /// <summary>生產者</summary> 2 public static void Produce() 3 { 4 for (int i = 1; i <= 100; i++) 5 { 6 queue.Enqueue(i); 7 } 8 } 9 10 /// <summary>消費者</summary> 11 public static void Consume() 12 { 13 int times; 14 while (queue.TryDequeue(out times)) 15 { 16 var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString(); 17 var value = int.Parse(total) + 1; 18 19 DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null); 20 Thread.Sleep(1); 21 } 22 }
4.2 Main改一下
1 static void Main(string[] args) 2 { 3 ... 4 new Thread(Produce).Start(); 5 new Thread(Produce).Start(); 6 Consume(); 7 ... 8 }
4.3 預期輸出200,看運行結果
4.4 集羣環境下測試,2臺機器
有問題!最後運行的那臺機器竟然是379,數據庫也是379。
這超出了咱們的預期結果,看來即使加鎖,對於高併發場景也是不能解決全部問題的
5.1 解決上邊問題能夠用分佈式隊列,這裏用的是redis隊列
1 /// <summary>生產者</summary> 2 public static void ProduceToRedis() 3 { 4 using (var client = RedisManager.GetClient()) 5 { 6 for (int i = 1; i <= 100; i++) 7 { 8 client.EnqueueItemOnList("EnqueueName", i.ToString()); 9 } 10 } 11 } 12 13 /// <summary>消費者</summary> 14 public static void ConsumeFromRedis() 15 { 16 using (var client = RedisManager.GetClient()) 17 { 18 while (client.GetListCount("EnqueueName") > 0) 19 { 20 if (client.SetValueIfNotExists("lock", "lock")) 21 { 22 var item = client.DequeueItemFromList("EnqueueName"); 23 var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString(); 24 var value = int.Parse(total) + 1; 25 26 DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null); 27 28 client.Remove("lock"); 29 } 30 31 Thread.Sleep(5); 32 } 33 } 34 }
5.2 Main也要改改
1 static void Main(string[] args) 2 { 3 ... 4 new Thread(ProduceToRedis).Start(); 5 new Thread(ProduceToRedis).Start(); 6 Thread.Sleep(1000 * 10); 7 8 ConsumeFromRedis(); 9 ... 10 }
5.3 在集羣裏再試試,2個都是400,沒有錯(由於每一個站點開了2個線程)
能夠看到數據徹底正確!