分佈式事務之消息補償解決方案

1、數據庫本地事務

先看看數據庫事務的定義:單個邏輯工做單元執行的一系列操做,要麼徹底地執行,要麼徹底地不執行redis

這個比較容易理解,操做過數據庫的通常都懂,既是業務需求涉及到多個數據表操做的時候,須要用到事務數據庫

要麼一塊兒更新,要麼一塊兒不更新,不會出現只更新了部分數據表的狀況,下邊看看數據庫事務的使用服務器

1 begin tran
2     begin try 
3         update Table1 set Field = 1 where ID = 1
4         update Table2 set Field = 2 where ID = 1
5     end try
6     begin catch
7         rollback tran
8     end catch
9 commit tran

上實例在小型項目中通常是問題不大的,由於小型項目通常是單機系統,數據庫、Web服務大都在一臺服務器上,甚至可能只有一個數據庫文件,架構

這種狀況下使用本地事務沒有一點問題;併發

可是本地事務有很大的缺陷,由於開啓事務通常是鎖表的,事務執行期間會一直鎖着,其餘的操做通常都要排隊等待,對性能要求比較高的系統是不能忍受的。app

特別是涉及改動不一樣數據庫的操做,這會形成跨庫事務,性能更加低異步

若是還涉及到不在同一臺服務器、甚至不一樣網段部署的數據庫,那本地事務簡直是系統運行的災難,是首先須要丟棄的解決方案。分佈式

 

那若是遇到上述狀況,該怎麼作呢,這就涉及到分佈式事務了高併發

 

2、分段式事務的補償機制

若是有海量數據須要處理、或者要求高併發請求的話,同步的事務機制已是不現實的了,這種狀況下必須採用異步事務機制,既分段式的事務性能

分段式事務通常作法就是把需求任務分段式地完成,經過事務補償機制來保證業務最終執行成功,補償機制通常能夠歸類爲2種:

1 )定時任務補償:

  經過定時任務去跟進後續任務,根據不一樣的狀態表肯定下一步的操做,從而保證業務最終執行成功,

  這種辦法可能會涉及到不少的後臺服務,維護起來也會比較麻煩,這是應該是早期比較流行的作法

2) 消息補償:

  經過消息中間件觸發下一段任務,既經過實時消息通知下一段任務開始執行,執行完畢後的消息回發通知來保證業務最終完成;

  固然這也是異步進行的,可是能保證數據最終的完整性、一致性,也是近幾年比較熱門的作法

 

定時任務補償就不說了,這篇文章咱們來討論一下經過消息補償來完成分佈式事務的通常作法

 

3、分佈式事務之消息補償

0)咱們以簡單的產品下單場景來講明,(不要較真哈)

1)先來看看分佈式異步事務處理流程示意圖,APP1與APP2須要互相訂閱對方消息

2)首先看數據庫,2個,一個庫存庫,一個已下單成功的庫

 1 -- 下單通知,主要做用保留已下單操做,消息發送失敗能夠根據此表從新發送
 2 CREATE TABLE [dbo].[ProductMessage](
 3     [ID] [int] IDENTITY(1,1) NOT NULL,
 4     [Product] [varchar](50) NULL,
 5     [Amount] [int] NULL,
 6     [UpdateTime] [datetime] NULL
 7 ) 
 8 -- 庫存
 9 CREATE TABLE [dbo].[ProductStock](
10     [ID] [int] IDENTITY(1,1) NOT NULL,
11     [Product] [varchar](50) NULL,
12     [Amount] [int] NULL
13 )
14 -- 下單成功
15 CREATE TABLE [dbo].[ProductSell](
16     [ID] [int] IDENTITY(1,1) NOT NULL,
17     [Product] [varchar](50) NULL,
18     [Customer] [int] NULL,
19     [Amount] [int] NULL
20 )
21 -- 下單成功消息,主要做用防止重複消費
22 CREATE TABLE [dbo].[ProductMessageApply](
23     [ID] [int] IDENTITY(1,1) NOT NULL,
24     [MesageID] [int] NULL,
25     [CreateTime] [datetime] NULL
26 )

3)項目架構Demo

數據底層訪問使用的是Dapper、使用redis做爲消息中間件

4)實體層代碼

 1     public class ProductMessage
 2     {
 3         [Key]
 4         [IgnoreProperty(true)]
 5         public int ID { get; set; }
 6         public string Product { get; set; }
 7         public int Amount { get; set; }
 8         public DateTime UpdateTime { get; set; }
 9     }
10     public class ProductMessageApply
11     {
12         [Key]
13         [IgnoreProperty(true)]
14         public int ID { get; set; }
15         public int MesageID { get; set; }
16         public DateTime CreateTime { get; set; }
17     }
18     public class ProductSell
19     {
20         [Key]
21         [IgnoreProperty(true)]
22         public int ID { get; set; }
23         public string Product { get; set; }
24         public int Customer { get; set; }
25         public int Amount { get; set; }
26     }
27     public class ProductStock
28     {
29         [Key]
30         [IgnoreProperty(true)]
31         public int ID { get; set; }
32         public string Product { get; set; }
33         public int Amount { get; set; }
34     }

5)服務接口層代碼

 1     public interface IProductMessageApplyService
 2     {
 3         void Add(ProductMessageApply entity);
 4         ProductMessageApply Get(int id);
 5     }
 6     public interface IProductMessageService
 7     {
 8         void Add(ProductMessage entity);
 9         IEnumerable<ProductMessage> Gets(object paramPairs = null);
10         void Delete(int id);
11     }
12     public interface IProductSellService
13     {
14         void Add(ProductSell entity);
15     }
16     public interface IProductStockService
17     {
18         void ReduceReserve(int id, int amount);
19     }

6)庫存、消息通知

 1     public class ProductMessageService : IProductMessageService
 2     {
 3         private IRepository<ProductMessage> repository;
 4 
 5         public ProductMessageService(IRepository<ProductMessage> repository)
 6         {
 7             this.repository = repository;
 8         }
 9 
10         public void Add(ProductMessage entity)
11         {
12             this.repository.Add(entity);
13         }
14 
15         public IEnumerable<ProductMessage> Gets(object paramPairs = null)
16         {
17             return this.repository.Gets(paramPairs);
18         }
19 
20         public void Delete(int id)
21         {
22             this.repository.Delete(id);
23         }
24     }
25 
26     public class ProductStockService : IProductStockService
27     {
28         private IRepository<ProductStock> repository;
29 
30         public ProductStockService(IRepository<ProductStock> repository)
31         {
32             this.repository = repository;
33         }
34 
35         public void ReduceReserve(int id, int amount)
36         {
37             var entity = this.repository.Get(id);
38             if (entity == null) return;
39 
40             entity.Amount = entity.Amount - amount;
41             this.repository.Update(entity);
42         }
43     }

7)下單、下單成功消息

 1     public class ProductMessageApplyService : IProductMessageApplyService
 2     {
 3         private IRepository<ProductMessageApply> repository;
 4 
 5         public ProductMessageApplyService(IRepository<ProductMessageApply> repository)
 6         {
 7             this.repository = repository;
 8         }
 9 
10         public void Add(ProductMessageApply entity)
11         {
12             this.repository.Add(entity);
13         }
14 
15         public ProductMessageApply Get(int id)
16         {
17             return this.repository.Get(id);
18         }
19     }
20 
21     public class ProductSellService : IProductSellService
22     {
23         private IRepository<ProductSell> repository;
24 
25         public ProductSellService(IRepository<ProductSell> repository)
26         {
27             this.repository = repository;
28         }
29 
30         public void Add(ProductSell entity)
31         {
32             this.repository.Add(entity);
33         }
34     }

8)下單減庫存測試

 1 namespace Demo.Reserve.App
 2 {
 3     class Program
 4     {
 5         static void Main(string[] args)
 6         {
 7             Console.WriteLine(string.Format("{0} 程序已啓動", DateTime.Now.ToString()));
 8 
 9             Send();
10             Subscribe();
11            
12             Console.ReadKey();
13         }
14 
15         private static void Send()
16         {
17             var unitOfWork = new UnitOfWork(Enums.Reserve);
18 
19             try
20             {
21                 var productStockRepository = new BaseRepository<ProductStock>(unitOfWork);
22                 var productStockServic = new ProductStockService(productStockRepository);
23                 var productMessageRepository = new BaseRepository<ProductMessage>(unitOfWork);
24                 var productMessageService = new ProductMessageService(productMessageRepository);
25 
26                 var id = 1;
27                 var amount = 2;
28                 var productMessage = new ProductMessage()
29                 {
30                     Product = "ProductCode",
31                     Amount = amount,
32                     UpdateTime = DateTime.Now
33                 };
34 
35                 productStockServic.ReduceReserve(id, amount);
36                 productMessageService.Add(productMessage);
37                 unitOfWork.Commit();
38                 Console.WriteLine(string.Format("{0} 減庫存完成", DateTime.Now.ToString()));
39                 Thread.Sleep(1000);
40 
41                 var message = JsonConvert.SerializeObject(productMessage);
42                 RedisConfig.Instrace.Publish("channel.Send", message);
43                 Console.WriteLine(string.Format("{0} 發送減庫存消息: {1}", DateTime.Now.ToString(), message));
44             }
45             catch (Exception ex)
46             {
47                 //Logger.Error(ex);
48                 unitOfWork.Rollback();
49             }
50         }
51 
52         private static void Subscribe()
53         {
54             var client = RedisConfig.Instrace.NewClient();
55             var subscriber = client.GetSubscriber();
56 
57             subscriber.Subscribe("channel.Success", (chl, message) =>
58             {
59                 try
60                 {
61                     var unitOfWork = new UnitOfWork(Enums.Reserve);
62                     var productMessageRepository = new BaseRepository<ProductMessage>(unitOfWork);
63                     var productMessageService = new ProductMessageService(productMessageRepository);
64 
65                     var messageID = message.ToString().ToInt();
66                     if (messageID > 0)
67                     {
68                         productMessageService.Delete(messageID);
69                         Console.WriteLine(string.Format("{0} 收到消費成功消息:{1}", DateTime.Now.ToString(), message));
70                     }
71                 }
72                 catch (Exception ex)
73                 {
74                     //Logger.Error(ex);
75                 }
76             });
77         }
78     }
79 }

9)下單成功及消息回發測試

 1 namespace Demo.Sell.App
 2 {
 3     class Program
 4     {
 5         static void Main(string[] args)
 6         {
 7             Subscribe();
 8 
 9             Console.WriteLine(string.Format("{0} 程序已啓動", DateTime.Now.ToString()));
10             Console.ReadKey();
11         }
12 
13         private static void Subscribe()
14         {
15             var client = RedisConfig.Instrace.NewClient();
16             var subscriber = client.GetSubscriber();
17 
18             subscriber.Subscribe("channel.Send", (chl, message) =>
19             {
20                 Consume(message);
21             });
22         }
23 
24         private static void Consume(string message)
25         {
26             var unitOfWork = new UnitOfWork(Enums.Sell);
27 
28             try
29             {
30                 Console.WriteLine(string.Format("{0} 收到減庫存消息: {1}", DateTime.Now.ToString(), message));
31 
32                 var productMessage = JsonConvert.DeserializeObject<ProductMessage>(message);
33 
34                 var productSellRepository = new BaseRepository<ProductSell>(unitOfWork);
35                 var productSellService = new ProductSellService(productSellRepository);
36 
37                 var productMessageApplyRepository = new BaseRepository<ProductMessageApply>(unitOfWork);
38                 var productMessageApplyService = new ProductMessageApplyService(productMessageApplyRepository);
39 
40                 var noExists = productMessageApplyService.Get(productMessage.ID) == null;
41                 if (noExists)
42                 {
43                     productSellService.Add(new ProductSell()
44                     {
45                         Product = productMessage.Product,
46                         Amount = productMessage.Amount,
47                         Customer = 123
48                     });
49 
50                     productMessageApplyService.Add(new ProductMessageApply()
51                     {
52                         MesageID = productMessage.ID,
53                         CreateTime = DateTime.Now
54                     });
55 
56                     unitOfWork.Commit();
57                     Console.WriteLine(string.Format("{0} 消息消費完成", DateTime.Now.ToString()));
58                     Thread.Sleep(1000);
59                 }
60 
61                 RedisConfig.Instrace.Publish("channel.Success", productMessage.ID.ToString());
62                 Console.WriteLine(string.Format("{0} 發送消費完成通知:{1}", DateTime.Now.ToString(), productMessage.ID.ToString()));
63             }
64             catch (Exception ex)
65             {
66                 //Logger.Error(ex);
67                 unitOfWork.Rollback();
68             }
69         }
70     }
71 }

 10)好了,到了最後檢驗成果的時候了

先打開Demo.Sell.App.exe、而後打開Demo.Reserve.App.exe

 

大功告成!

相關文章
相關標籤/搜索