《經過C#學Proto.Actor模型》之Persistence

Actor是有狀態的,當每一步執行失敗後,返回失敗地方繼續執行時,但願此時的狀態是正確的,爲了保證這一點,持久化就成了必要的環節了。sql

Proto.Actor提供了三種方式執久化:數據庫

  • Event Sourcing事件溯源
  • Snapshotting快照
  • Event Sourcing with Snapshotting帶快照的事件溯源
無論是那種持久化方式,首先要構造一個持久化的提供者,這個提者是內存也好,數據庫也罷,本例中用Sqlite做爲持久化的載體;在Actor中,實現持久化,首先要建立一個Persistence對象,用來將快照或事件保存起來,最重要的一點是,咱們用事件溯源或快照,是幫咱們保留住Actor在某刻的狀,保留下來,以便咱們再次啓動時能延續這個狀態,因此Persistence有一個很關鍵的做用就是能從持久化的載體中把原來的狀態回覆過來,這裏,Event Source的把原來的狀態步驟走再走一次,到達當前流程的點,但快照否則,直接取的是最後時刻的狀態;帶快照的事件溯源則是二者的結合。
碼友看碼:

NuGet安裝dom

Proto.Actorasync

Proto.Persistenceide

Proto.Persistence.Sqliteui

  1 using Microsoft.Data.Sqlite;
  2 using Proto;
  3 using Proto.Persistence;
  4 using Proto.Persistence.SnapshotStrategies;
  5 using Proto.Persistence.Sqlite;
  6 using System;
  7 using System.Threading.Tasks;
  8  
  9 namespace P008_Persistence
 10 {
 11     class Program
 12     {
 13         static void Main(string[] args)
 14         {
 15             //用sqlite持久化後
 16             var actorid = "myactorid";
 17             var dbfile = @"C:\MyFile\Source\Repos\ProtoActorSample\ProtoActorSample\P008_Persistence\data.sqlite";
 18             var sqliteProvider = new SqliteProvider(new SqliteConnectionStringBuilder() { DataSource = dbfile });
 19             while (true)
 20             {
 21                 Console.WriteLine("一、事件溯源   二、快照   三、帶快照的事件溯源  四、退出");
 22                 switch (Console.ReadLine())
 23                 {
 24                     case "1":
 25                         CallEventSource(actorid, sqliteProvider);
 26                         break;
 27                     case "2":
 28                         CallSnapShoot(actorid, sqliteProvider);
 29                         break;
 30                     case "3":
 31                         CallSnapShootEventSource(actorid, sqliteProvider);
 32                         break;
 33                     case "4":
 34                         return;
 35                 }
 36             }
 37         }
 38         /// <summary>
 39         /// 事件溯源
 40         /// </summary>
 41         /// <param name="actorid"></param>
 42         /// <param name="sqliteProvider"></param>
 43         private static void CallEventSource(string actorid, SqliteProvider sqliteProvider)
 44         {          
 45             var props = Actor.FromProducer(() => new EventSourceDataActor(sqliteProvider, actorid));
 46             var pid = Actor.Spawn(props);
 47             var result = true;
 48             while (result)
 49             {
 50                 Console.WriteLine("一、Tell  二、刪除持久化  三、退出");
 51  
 52                 switch (Console.ReadLine())
 53                 {
 54                     case "1":
 55                         var random = new Random();
 56                         var no = random.Next(5, 15);
 57                         Console.WriteLine($"隨機產生的數字:{no}");
 58                         pid.Tell(new Data { Amount = no });
 59                         break;
 60                     case "2":
 61                         //完成處理後清理持久化的操做          
 62                         sqliteProvider.DeleteEventsAsync(actorid, 100).Wait();
 63                         break;
 64                     case "3":
 65                         result = false;
 66                         break;
 67                 }
 68             }       
 69         }
 70  
 71         /// <summary>
 72         /// 快照
 73         /// </summary>
 74         /// <param name="actorid"></param>
 75         /// <param name="sqliteProvider"></param>
 76         private static void CallSnapShoot(string actorid, SqliteProvider sqliteProvider)
 77         {
 78             var props = Actor.FromProducer(() => new SnapShootDataActor(sqliteProvider, actorid));
 79             var pid = Actor.Spawn(props);
 80             var result = true;
 81             while (result)
 82             {
 83                 Console.WriteLine("一、Tell  二、刪除持久化  三、退出");
 84  
 85                 switch (Console.ReadLine())
 86                 {
 87                     case "1":
 88                         var random = new Random();
 89                         var no = random.Next(5, 15);
 90                         Console.WriteLine($"隨機產生的數字:{no}");
 91                         pid.Tell(new Data { Amount = no });
 92                         break;
 93                     case "2":
 94                         //完成處理後清理持久化的操做          
 95                         sqliteProvider.DeleteEventsAsync(actorid, 100).Wait();
 96                         break;
 97                     case "3":
 98                         result = false;
 99                         break;
100                 }
101             }
102             
103         }
104         /// <summary>
105         /// 快照事件溯源
106         /// </summary>
107         /// <param name="actorid"></param>
108         /// <param name="sqliteProvider"></param>
109         private static void CallSnapShootEventSource(string actorid, SqliteProvider sqliteProvider)
110         {
111             var props = Actor.FromProducer(() => new SnapShootEventSourceDataActor(sqliteProvider, sqliteProvider, actorid));
112             var pid = Actor.Spawn(props);
113             var result = true;
114             while (result)
115             {
116                 Console.WriteLine("一、Tell  二、刪除持久化  三、退出");
117  
118                 switch (Console.ReadLine())
119                 {
120                     case "1":
121                         var random = new Random();
122                         var no = random.Next(5, 15);
123                         Console.WriteLine($"隨機產生的數字:{no}");
124                         pid.Tell(new Data { Amount = no });
125                         break;
126                     case "2":
127                         //完成處理後清理持久化的操做          
128                         sqliteProvider.DeleteEventsAsync(actorid, 100).Wait();
129                         sqliteProvider.DeleteSnapshotsAsync(actorid, 100).Wait();
130                         break;
131                     case "3":
132                         result = false;
133                         break;
134                 }
135             }         
136         }
137     }
138  
139     public class Data
140     {
141         public long Amount { get; set; }
142     }
143  
144     #region 事件溯源
145     public class EventSourceDataActor : IActor
146     {
147         private long _value = 0;
148         private readonly Persistence _persistence;
149  
150         public EventSourceDataActor(IEventStore eventStore, string actorId)
151         {
152             //事件溯源持久化方式
153             _persistence = Persistence.WithEventSourcing(eventStore, actorId, ApplyEvent);
154         }
155         private void ApplyEvent(Proto.Persistence.Event @event)
156         {
157             switch (@event.Data)
158             {
159                 case Data msg:
160                     _value = _value + msg.Amount;
161                     Console.WriteLine($"累計:{_value}");
162                     break;
163             }
164         }
165         public async Task ReceiveAsync(IContext context)
166         {
167             switch (context.Message)
168             {
169                 case Started _:
170                     await _persistence.RecoverStateAsync();
171                     break;
172                 case Data msg:
173                     await _persistence.PersistEventAsync(new Data { Amount = msg.Amount });
174                     break;
175             }
176         }
177     }
178     #endregion
179  
180     #region 快照
181     public class SnapShootDataActor : IActor
182     {
183         private long _value = 0;
184         private readonly Persistence _persistence;
185  
186         public SnapShootDataActor(ISnapshotStore snapshotStore, string actorId)
187         {
188             //快照持久化方式
189             _persistence = Persistence.WithSnapshotting(snapshotStore, actorId, ApplySnapshot);
190         }
191         private void ApplySnapshot(Proto.Persistence.Snapshot snapshot)
192         {
193             switch (snapshot.State)
194             {
195                 case long value:
196                     _value = value;
197                     Console.WriteLine($"累計:{_value}");
198                     break;
199             }
200         }
201         public async Task ReceiveAsync(IContext context)
202         {
203             switch (context.Message)
204             {
205                 case Started _:
206                     await _persistence.RecoverStateAsync();
207                     break;
208                 case Data msg:
209                     _value = _value + msg.Amount;
210                     await _persistence.DeleteSnapshotsAsync(100);
211                     await _persistence.PersistSnapshotAsync(_value);
212                     break;
213             }
214         }
215     }
216     #endregion
217  
218     #region 事件溯源and快照
219     public class SnapShootEventSourceDataActor : IActor
220     {
221         private long _value = 0;
222         private readonly Persistence _persistence;
223  
224         public SnapShootEventSourceDataActor(IEventStore  eventStore, ISnapshotStore snapshotStore, string actorId)
225         {
226             //註釋快照策略
227             //_persistence = Persistence.WithEventSourcingAndSnapshotting(eventStore, snapshotStore, actorId, ApplyEvent, ApplySnapshot, new IntervalStrategy(5), () => { return _value; });
228             //無快照策略
229             _persistence = Persistence.WithEventSourcingAndSnapshotting(eventStore, snapshotStore, actorId, ApplyEvent, ApplySnapshot);
230         }
231         private void ApplyEvent(Proto.Persistence.Event @event)
232         {
233             switch (@event.Data)
234             {
235                 case Data msg:
236                     _value = _value + msg.Amount;
237                     Console.WriteLine($"事件溯源累計:{_value}");
238                     break;
239             }
240         }
241         private void ApplySnapshot(Proto.Persistence.Snapshot snapshot)
242         {
243             switch (snapshot.State)
244             {
245                 case long value:
246                     _value = value;
247                     Console.WriteLine($"快照累計:{_value}");
248                     break;
249             }
250         }
251         public async Task ReceiveAsync(IContext context)
252         {
253             switch (context.Message)
254             {
255                 case Started _:
256                     await _persistence.RecoverStateAsync();
257                     break;
258                 case Data msg:
259                     await _persistence.PersistEventAsync(new Data { Amount = msg.Amount });
260                     //無快照策略時啓用
261                     await _persistence.PersistSnapshotAsync(_value);
262                     break;
263             }
264         }
265     }
266     #endregion
267 }
經過代碼看到,持久化是經過在Actor中定義Persistence時,關聯一個參數爲,Event或Snapshot的方法,而且Actor的Receive方法在Stared到達是恢復(從持久載體中讀取數據來恢復),在具體消息到達時,調用Persistence.PersistEventAsync或Persistence.PersisSnapshotAsync來持久化狀態數據,這兩個方法,都會把調用似遞到Persistence產生是關聯的那個方法,並把消息實體類經過Event.Data或Snapshot.State傳遞進去。
 此例分別演示了事件溯源,快照,帶快照事件溯源,例子很簡單,就是把每次產生的隨機數累加起來

  
  
  
  
  一、 
   事件溯源

 

三個綠色箭頭,意思是進了三次「一、事件溯源」這個選項
三次藍色箭頭,意思是調用了三次Tell方法,用來獲取三次隨機數,藍色橢圓是產生的三個數字,分別是7,7,6,藍色方框是累計結果,從上往下,第一次是(0+7)7,第二次是(7+7)14,第三次是(14+6)20
紅色箭頭是退出事件溯源的方法,返回上一級
綠色方框是綠色箭頭再次進入,自動恢復,事件溯源後的結果(即從持久化載體中把以前的全部事件從新走一次)仍是以前退出時的結果,累計20,因此無論這個Actor在什麼地方退出,再次運行,都會把以前的補運行回來。
也能夠打開sqlite庫進行查看保存的事件結果

  
  
  
  
  二、 
   快照

快照與事件溯源相似,差異在於每次再次進來,只取上次退出時的結果,同時,在數據裏,只保存了最後一次的結果。
 
三、帶快照的事件溯源

與快照相似,上面代碼咱們是一個事件,一個快照。 
官方給出帶快照的事件能夠經過快照策略來保存快照
在建立持久化對象時,能夠添加快照策略
1 _persistence = Persistence.WithEventSourcingAndSnapshotting(eventStore, snapshotStore, actorId, ApplyEvent, ApplySnapshot, new IntervalStrategy(5), () => { return _value; });
您能夠選擇ISnapshotStrategy在保存事件時指定自動保存快照。提供的策略是:
  • EventTypeStrategy - 根據保存的事件類型保存快照
  • IntervalStrategy - 根據保存的事件數量,即每100個事件,按期保存快照
  • TimeStrategy - 根據時間以固定間隔保存快照,即在快照之間等待至少6小時
同時要在Actor的Receive把保存快照註釋掉,Demo中我用的是5個事件後保存一次快照,以下圖結果

綠色是第一次,要保存一下快照,而後以後第五個事件過來後保存第二次快照,若是在第四個事件後程序就退出,那快照保存的只有第一次的,不有擔憂,當再次調用時,由於記錄下了全部事件,Actor會取出最後一次快照,再支執行快照後的事件,這是由於在保存快照和事件時,會把他們的索引保存起來,索引是同樣的,就能用最後的快照+這個快照索引後的事件,恢復到退出的地方。spa

 

記得執行後查看Sqlite數據,有助於你更好的瞭解Proto.Actor的Persistence機制哦!3d

相關文章
相關標籤/搜索