第二十節: 深刻理解併發機制以及解決方案(鎖機制、EF自有機制、隊列模式等)

一. 理解併發機制html

1. 什麼是併發,併發與多線程有什麼關係?nginx

①. 先從廣義上來講,或者從實際場景上來講.數據庫

  高併發一般是海量用戶同時訪問(好比:12306買票、淘寶的雙十一搶購),若是把一個用戶看作一個線程的話那麼併發能夠理解成多線程同時訪問,高併發即海量線程同時訪問。安全

      (ps:咱們在這裏模擬高併發能夠for循環多個線程便可)服務器

②.從代碼或數據的層次上來講.多線程

  多個線程同時在一條相同的數據上執行多個數據庫操做。架構

2. 從代碼層次上來講,給併發分類。併發

①.積極併發(樂觀併發、樂觀鎖):不管什麼時候從數據庫請求數據,數據都會被讀取並保存到應用內存中。數據庫級別沒有放置任何顯式鎖。數據操做會按照數據層接收到的前後順序來執行。負載均衡

 積極併發本質就是容許衝突發生,而後在代碼自己採起一種合理的方式去解決這個併發衝突,常見的方式有:框架

a.忽略衝突強制更新:數據庫會保存最後一次更新操做(以更新爲例),會損失不少用戶的更新操做。

b.部分更新:容許全部的更改,可是不容許更新完整的行,只有特定用戶擁有的列更新了。這就意味着,若是兩個用戶更新相同的記錄但卻不一樣的列,那麼這兩個更新都會成功,並且來自這兩個用戶的更改都是可見的。(EF默認實現不了這種狀況)

c.詢問用戶:當一個用戶嘗試更新一個記錄時,可是該記錄自從他讀取以後已經被別人修改了,這時應用程序就會警告該用戶該數據已經被某人更改了,而後詢問他是否仍然要重寫該數據仍是首先檢查已經更新的數據。(EF能夠實現這種狀況,在後面詳細介紹)

d.拒絕修改:當一個用戶嘗試更新一個記錄時,可是該記錄自從他讀取以後已經被別人修改了,此時告訴該用戶不容許更新該數據,由於數據已經被某人更新了。

(EF能夠實現這種狀況,在後面詳細介紹)

②.消極併發(悲觀併發、悲觀鎖):不管什麼時候從數據庫請求數據,數據都會被讀取,而後該數據上就會加鎖,所以沒有人能訪問該數據。這會下降併發出現問題的機會,缺點是加鎖是一個昂貴的操做,會下降整個應用程序的性能。

 消極併發的本質就是永遠不讓衝突發生,一般的處理凡是是隻讀鎖和更新鎖。

a. 當把只讀鎖放到記錄上時,應用程序只能讀取該記錄。若是應用程序要更新該記錄,它必須獲取到該記錄上的更新鎖。若是記錄上加了只讀鎖,那麼該記錄仍然可以被想要只讀鎖的請求使用。然而,若是須要更新鎖,該請求必須等到全部的只讀鎖釋放。一樣,若是記錄上加了更新鎖,那麼其餘的請求不能再在這個記錄上加鎖,該請求必須等到已存在的更新鎖釋放才能加鎖。

總結,這裏咱們能夠簡單理解把併發業務部分用一個鎖(如:lock,實質是數據庫鎖,後面章節單獨介紹)鎖住,使其同時只容許一個線程訪問便可。

b. 加鎖會帶來不少弊端:

 (1):應用程序必須管理每一個操做正在獲取的全部鎖;

 (2):加鎖機制的內存需求會下降應用性能

 (3):多個請求互相等待須要的鎖,會增長死鎖的可能性。

總結:儘可能不要使用消極併發,EF默認是不支持消極併發的!!!

注意:EF默認就是積極併發,固然EF也能夠配置成消極併發。

二. 併發機制的解決方案

1. 從架構的角度去解決(大層次 如:12306買票)

  nginx負載均衡、數據庫讀寫分離、多個業務服務器、多個數據庫服務器、NoSQL, 使用隊列來處理業務,將高併發的業務依次放到隊列中,而後按照先進先出的原則, 逐個處理(隊列的處理能夠採用 Redis、RabbitMq等等)

  (PS:在後面的框架篇章裏詳細介紹該方案)

2. 從代碼的角度去解決(在服務器能承載壓力的狀況下,併發訪問同一條數據)

  實際的業務場景:如進銷存類的項目,涉及到同一個物品的出庫、入庫、庫存,咱們都知道庫存在數據庫裏對應了一條記錄,入庫要查出如今庫存的數量,而後加上入庫的數量,假設兩個線程同時入庫,假設查詢出來的庫存數量相同,可是更新庫存數量在數據庫層次上是有前後,最終就保留了後更新的數據,顯然是不正確的,應該保留的是兩次入庫的數量和。

(該案例的實質:多個線程同時在一條相同的數據上執行多個數據庫操做)

事先準備一張數據庫表:

解決方案一:(最經常使用的方式)

  給入庫和出庫操做加一個鎖,使其同時只容許一個線程訪問,這樣即便兩個線程同時訪問,但在代碼層次上,因爲鎖的緣由,仍是有先有後的,這樣就保證了入庫操做的線程惟一性,固然庫存量就不會出錯了.

總結:該方案能夠說是適合處理小範圍的併發且鎖內的業務執行不是很複雜。假設一萬線程同時入庫,每次入庫要等2s,那麼這一萬個線程執行完成須要的總時間很是多,顯然不適合。

    (這種方式的實質就是給核心業務加了個lock鎖,這裏就不作測試了)

 

解決方案二:EF處理積極併發帶來的衝突

1. 配置準備

  (1). 針對DBFirst模式,能夠給相應的表額外加一列RowVersion,數據庫中爲timestamp類型,對應的類中爲byte[]類型,而且在Edmx模型上給該字段的併發模式設置爲fixed(默認爲None),這樣該表中全部字段都監控併發。

若是不想監視全部列(在不添加RowVersion的狀況下),只需在Edmx模型是給特定的字段的併發模式設置爲fixed,這樣只有被設置的字段被監測併發。

  測試結果: (DBFirst模式下的併發測試)

  事先在UserInfor1表中插入一條id、userName、userSex、userAge均爲1的數據(清空數據)。

測試狀況1:

  在不設置RowVersion併發模式爲Fixed的狀況下,兩個線程修改不一樣字段(修改同一個字段一個道理),後執行的線程的結果覆蓋前面的線程結果.

  發現測試結果爲:1,1,男,1 ; 顯然db1線程修改的結果被db2線程給覆蓋了. (修改同一個字段一個道理)

 1             {
 2                 //1.建立兩個EF上下文,模擬表明兩個線程
 3                 var db1 = new ConcurrentTestDBEntities();
 4                 var db2 = new ConcurrentTestDBEntities();
 5 
 6                 UserInfor1 user1 = db1.UserInfor1.Find("1");
 7                 UserInfor1 user2 = db2.UserInfor1.Find("1");
 8 
 9                 //2. 執行修改操做
10                 //(db1的線程先執行完修改操做,並保存)
11                 user1.userName = "ypf";
12                 db1.Entry(user1).State = EntityState.Modified;
13                 db1.SaveChanges();
14 
15                 //(db2的線程在db1線程修改完成後,執行修改操做)
16                 try
17                 {
18                     user2.userSex = "";
19                     db2.Entry(user2).State = EntityState.Modified;
20                     db2.SaveChanges();
21 
22                     Console.WriteLine("測試成功");
23                 }
24                 catch (Exception)
25                 {
26                     Console.WriteLine("測試失敗");
27                 }
28             }
View Code

測試狀況2:

  設置RowVersion併發模式爲Fixed的狀況下,兩個線程修改不一樣字段(修改同一個字段一個道理),若是該條數據已經被修改,利用DbUpdateConcurrencyException能夠捕獲異常,進行積極併發的衝突處理。測試結果以下:

  a.RefreshMode.ClientWins: 1,1,男,1

  b.RefreshMode.StoreWins: 1,ypf,1,1

  c.ex.Entries.Single().Reload(); 1,ypf,1,1

 1             {
 2                 //1.建立兩個EF上下文,模擬表明兩個線程
 3                 var db1 = new ConcurrentTestDBEntities();
 4                 var db2 = new ConcurrentTestDBEntities();
 5 
 6                 UserInfor1 user1 = db1.UserInfor1.Find("1");
 7                 UserInfor1 user2 = db2.UserInfor1.Find("1");
 8 
 9                 //2. 執行修改操做
10                 //(db1的線程先執行完修改操做,並保存)
11                 user1.userName = "ypf";
12                 db1.Entry(user1).State = EntityState.Modified;
13                 db1.SaveChanges();
14 
15                 //(db2的線程在db1線程修改完成後,執行修改操做)
16                 try
17                 {
18                     user2.userSex = "";
19                     db2.Entry(user2).State = EntityState.Modified;
20                     db2.SaveChanges();
21 
22                     Console.WriteLine("測試成功");
23                 }
24                 catch (DbUpdateConcurrencyException ex)
25                 {
26                     Console.WriteLine("測試失敗:" + ex.Message);
27 
28                     //1. 保留上下文中的現有數據(即最新,最後一次輸入)
29                     //var oc = ((IObjectContextAdapter)db2).ObjectContext;
30                     //oc.Refresh(RefreshMode.ClientWins, user2);
31                     //oc.SaveChanges();
32 
33                     //2. 保留原始數據(即數據源中的數據代替當前上下文中的數據)
34                     //var oc = ((IObjectContextAdapter)db2).ObjectContext;
35                     //oc.Refresh(RefreshMode.StoreWins, user2);
36                     //oc.SaveChanges();
37 
38                     //3. 保留原始數據(而Reload處理也就是StoreWins,意味着放棄當前內存中的實體,從新到數據庫中加載當前實體)
39                     ex.Entries.Single().Reload();
40                     db2.SaveChanges();
41                 }
42             }

測試狀況3:

  在不設置RowVersion併發模式爲Fixed的狀況下(也不須要RowVersion這個字段),單獨設置userName字段的併發模式爲Fixed,兩個線程同時修改該字段,利用DbUpdateConcurrencyException能夠捕獲異常,進行積極併發的衝突處理,但若是是兩個線程同時修改userName之外的字段,將不能捕獲異常,將走EF默認的處理方式,後執行的覆蓋先執行的。

  a.RefreshMode.ClientWins: 1,ypf2,1,1

  b.RefreshMode.StoreWins: 1,ypf,1,1

  c.ex.Entries.Single().Reload(); 1,ypf,1,1

 1             {
 2                 //1.建立兩個EF上下文,模擬表明兩個線程
 3                 var db1 = new ConcurrentTestDBEntities();
 4                 var db2 = new ConcurrentTestDBEntities();
 5 
 6                 UserInfor1 user1 = db1.UserInfor1.Find("1");
 7                 UserInfor1 user2 = db2.UserInfor1.Find("1");
 8 
 9                 //2. 執行修改操做
10                 //(db1的線程先執行完修改操做,並保存)
11                 user1.userName = "ypf";
12                 db1.Entry(user1).State = EntityState.Modified;
13                 db1.SaveChanges();
14 
15                 //(db2的線程在db1線程修改完成後,執行修改操做)
16                 try
17                 {
18                     user2.userName = "ypf2";
19                     db2.Entry(user2).State = EntityState.Modified;
20                     db2.SaveChanges();
21 
22                     Console.WriteLine("測試成功");
23                 }
24                 catch (DbUpdateConcurrencyException ex)
25                 {
26                     Console.WriteLine("測試失敗:" + ex.Message);
27 
28                     //1. 保留上下文中的現有數據(即最新,最後一次輸入)
29                     var oc = ((IObjectContextAdapter)db2).ObjectContext;
30                     oc.Refresh(RefreshMode.ClientWins, user2);
31                     oc.SaveChanges();
32 
33                     //2. 保留原始數據(即數據源中的數據代替當前上下文中的數據)
34                     //var oc = ((IObjectContextAdapter)db2).ObjectContext;
35                     //oc.Refresh(RefreshMode.StoreWins, user2);
36                     //oc.SaveChanges();
37 
38                     //3. 保留原始數據(而Reload處理也就是StoreWins,意味着放棄當前內存中的實體,從新到數據庫中加載當前實體)
39                     //ex.Entries.Single().Reload();
40                     //db2.SaveChanges();
41                 }
42             }
View Code

  (2). 針對CodeFirst模式,須要有這樣的一個屬性 public byte[] RowVersion { get; set; },而且給屬性加上特性[Timestamp],這樣該表中全部字段都監控併發。若是不想監視全部列(在不添加RowVersion的狀況下),只需給特定的字段加上特性 [ConcurrencyCheck],這樣只有被設置的字段被監測併發。

  除了再配置上不一樣於DBFirst模式覺得,是經過加特性的方式來標記併發,其它捕獲併發和積極併發的幾類處理方式均同DBFirst模式相同。(這裏不作測試了)

2. 積極併發處理的三種形式總結:

  利用DbUpdateConcurrencyException能夠捕獲異常,而後:

    a. RefreshMode.ClientWins:保留上下文中的現有數據(即最新,最後一次輸入)

    b. RefreshMode.StoreWins:保留原始數據(即數據源中的數據代替當前上下文中的數據)

    c.ex.Entries.Single().Reload(); 保留原始數據(而Reload處理也就是StoreWins,意味着放棄當前內存中的實體,從新到數據庫中加載當前實體)

3. 該方案總結:

  這種模式實質上就是獲取異常告訴程序,讓開發人員結合需求本身選擇怎麼處理,但這種模式是解決代碼層次上的併發衝突,並非解決大數量同時訪問崩潰問題的。

解決方案三:利用隊列來解決業務上的併發(架構層次上其實也是這種思路解決的)

1.先分析:

  前面說過所謂的高併發,就是海量的用戶同時向服務器發送請求,進行某個業務處理(好比定時秒殺的搶單),而這個業務處理是須要 必定時間的。

2.處理思路:

  將海量用戶的請求放到一個隊列裏(如:Queue),先不進行業務處理,而後另一個服務器從線程中讀取這個請求(MVC框架能夠放到Global全局裏),依次進行業務處理,至於處理完成後,是否須要告訴客戶端,能夠根據實際需求來定,若是須要的話(能夠藉助Socket、Signalr、推送等技術來進行).

  特別注意:讀取隊列的線程是一直在運行,只要隊列中有數據,就給他拿出來.

  這裏使用Queue隊列,能夠參考:http://www.cnblogs.com/yaopengfei/p/8322016.html

  (PS:架構層次上的處理方案無非隊列是單獨一臺服務器,執行從隊列讀取的是另一臺業務服務器,處理思想是相同的)

隊列單例類的代碼:

 1  /// <summary>
 2     /// 單例類
 3     /// </summary>
 4     public class QueueUtils
 5     {
 6         /// <summary>
 7         /// 靜態變量:由CLR保證,在程序第一次使用該類以前被調用,並且只調用一次
 8         /// </summary>
 9         private static readonly QueueUtils _QueueUtils = new QueueUtils();
10 
11         /// <summary>
12         /// 聲明爲private類型的構造函數,禁止外部實例化
13         /// </summary>
14         private QueueUtils()
15         {
16 
17         }
18         /// <summary>
19         /// 聲明屬性,供外部調用,此處也能夠聲明成方法
20         /// </summary>
21         public static QueueUtils instanse
22         {
23             get
24             {
25                 return _QueueUtils;
26             }
27         }
28 
29 
30         //下面是隊列相關的
31          System.Collections.Queue queue = new System.Collections.Queue();
32 
33         private static object o = new object();
34 
35         public int getCount()
36         {
37             return queue.Count;
38         }
39 
40         /// <summary>
41         /// 入隊方法
42         /// </summary>
43         /// <param name="myObject"></param>
44         public void Enqueue(object myObject)
45         {
46             lock (o)
47             {
48                 queue.Enqueue(myObject);
49             }
50         }
51         /// <summary>
52         /// 出隊操做
53         /// </summary>
54         /// <returns></returns>
55         public object Dequeue()
56         {
57             lock (o)
58             {
59                 if (queue.Count > 0)
60                 {
61                     return queue.Dequeue();
62                 }
63             }
64             return null;
65         }
66 
67     }
View Code

PS:這裏的入隊和出隊都要加鎖,由於Queue默認不是線程安全的,不加鎖會存在資源競用問題從而業務出錯,或者直接使用ConcurrentQueue線程安全的隊列,就不須要加鎖了,關於隊列線程安全問題詳見:http://www.cnblogs.com/yaopengfei/p/8322016.html

臨時存儲數據類的代碼:

 1     /// <summary>
 2     /// 該類用來存儲請求信息
 3     /// </summary>
 4     public class TempInfor
 5     {
 6         /// <summary>
 7         /// 用戶編號
 8         /// </summary>
 9         public string userId { get; set; }
10     }

模擬高併發入隊,單獨線程出隊的代碼:

 1  {
 2                 //3.1 模擬高併發請求 寫入隊列
 3                 {
 4                     for (int i = 0; i < 100; i++)
 5                     {
 6                         Task.Run(() =>
 7                         {
 8                             TempInfor tempInfor = new TempInfor();
 9                             tempInfor.userId = Guid.NewGuid().ToString("N");
10                             //下面進行入隊操做
11                             QueueUtils.instanse.Enqueue(tempInfor);
12 
13                         });
14                     }
15                 }        
16                 //3.2 模擬另一個線程隊列中讀取數據請求標記,進行相應的業務處理(該線程一直運行,不中止)
17                 Task.Run(() =>
18                 {
19                     while (true)
20                     {
21                         if (QueueUtils.instanse.getCount() > 0)
22                         {
23                             //下面進行出隊操做
24                             TempInfor tempInfor2 = (TempInfor)QueueUtils.instanse.Dequeue();
25 
26                             //拿到請求標記,進行相應的業務處理
27                             Console.WriteLine("id={0}的業務執行成功", tempInfor2.userId);
28                         }
29                     }           
30                 });
31                 //3.3 模擬過了一段時間(6s後),又有新的請求寫入
32                 Thread.Sleep(6000);
33                 Console.WriteLine("6s的時間已通過去了");
34                 {
35                     for (int j = 0; j < 100; j++)
36                     {
37                         Task.Run(() =>
38                         {
39                             TempInfor tempInfor = new TempInfor();
40                             tempInfor.userId = Guid.NewGuid().ToString("N");
41                             //下面進行入隊操做
42                             QueueUtils.instanse.Enqueue(tempInfor);
43 
44                         });
45                     }
46                 }
47             }

3.下面案例的測試結果:

  一次輸出100條數據,6s事後,再一次輸出100條數據。

4. 總結:

  該方案是一種迂迴的方式處理高併發,在業內這種思想也是很是常見,但該方案也有一個弊端,客戶端請求的實時性很難保證,或者即便要保證(好比引入實時通信技術),

 也要付出很多代價.

 

解決方案四: 利用數據庫自有的鎖機制進行處理

   (在後面數據鎖機制章節進行介紹)

 

 

!

  • 做       者 : Yaopengfei(姚鵬飛)
  • 博客地址 : http://www.cnblogs.com/yaopengfei/
  • 聲     明1 : 本人才疏學淺,用郭德綱的話說「我是一個小學生」,若有錯誤,歡迎討論,請勿謾罵^_^。
  • 聲     明2 : 原創博客請在轉載時保留原文連接或在文章開頭加上本人博客地址,不然保留追究法律責任的權利。
相關文章
相關標籤/搜索