Actor是有狀態的,當每一步執行失敗後,返回失敗地方繼續執行時,但願此時的狀態是正確的,爲了保證這一點,持久化就成了必要的環節了。sql
Proto.Actor提供了三種方式執久化:數據庫
無論是那種持久化方式,首先要構造一個持久化的提供者,這個提者是內存也好,數據庫也罷,本例中用Sqlite做爲持久化的載體;在Actor中,實現持久化,首先要建立一個Persistence對象,用來將快照或事件保存起來,最重要的一點是,咱們用事件溯源或快照,是幫咱們保留住Actor在某刻的狀,保留下來,以便咱們再次啓動時能延續這個狀態,因此Persistence有一個很關鍵的做用就是能從持久化的載體中把原來的狀態回覆過來,這裏,Event Source的把原來的狀態步驟走再走一次,到達當前流程的點,但快照否則,直接取的是最後時刻的狀態;帶快照的事件溯源則是二者的結合。
碼友看碼:
NuGet安裝dom
Proto.Actorasync
Proto.Persistenceide
Proto.Persistence.Sqliteui
1 using Microsoft.Data.Sqlite; 2 using Proto; 3 using Proto.Persistence; 4 using Proto.Persistence.SnapshotStrategies; 5 using Proto.Persistence.Sqlite; 6 using System; 7 using System.Threading.Tasks; 8 9 namespace P008_Persistence 10 { 11 class Program 12 { 13 static void Main(string[] args) 14 { 15 //用sqlite持久化後 16 var actorid = "myactorid"; 17 var dbfile = @"C:\MyFile\Source\Repos\ProtoActorSample\ProtoActorSample\P008_Persistence\data.sqlite"; 18 var sqliteProvider = new SqliteProvider(new SqliteConnectionStringBuilder() { DataSource = dbfile }); 19 while (true) 20 { 21 Console.WriteLine("一、事件溯源 二、快照 三、帶快照的事件溯源 四、退出"); 22 switch (Console.ReadLine()) 23 { 24 case "1": 25 CallEventSource(actorid, sqliteProvider); 26 break; 27 case "2": 28 CallSnapShoot(actorid, sqliteProvider); 29 break; 30 case "3": 31 CallSnapShootEventSource(actorid, sqliteProvider); 32 break; 33 case "4": 34 return; 35 } 36 } 37 } 38 /// <summary> 39 /// 事件溯源 40 /// </summary> 41 /// <param name="actorid"></param> 42 /// <param name="sqliteProvider"></param> 43 private static void CallEventSource(string actorid, SqliteProvider sqliteProvider) 44 { 45 var props = Actor.FromProducer(() => new EventSourceDataActor(sqliteProvider, actorid)); 46 var pid = Actor.Spawn(props); 47 var result = true; 48 while (result) 49 { 50 Console.WriteLine("一、Tell 二、刪除持久化 三、退出"); 51 52 switch (Console.ReadLine()) 53 { 54 case "1": 55 var random = new Random(); 56 var no = random.Next(5, 15); 57 Console.WriteLine($"隨機產生的數字:{no}"); 58 pid.Tell(new Data { Amount = no }); 59 break; 60 case "2": 61 //完成處理後清理持久化的操做 62 sqliteProvider.DeleteEventsAsync(actorid, 100).Wait(); 63 break; 64 case "3": 65 result = false; 66 break; 67 } 68 } 69 } 70 71 /// <summary> 72 /// 快照 73 /// </summary> 74 /// <param name="actorid"></param> 75 /// <param name="sqliteProvider"></param> 76 private static void CallSnapShoot(string actorid, SqliteProvider sqliteProvider) 77 { 78 var props = Actor.FromProducer(() => new SnapShootDataActor(sqliteProvider, actorid)); 79 var pid = Actor.Spawn(props); 80 var result = true; 81 while (result) 82 { 83 Console.WriteLine("一、Tell 二、刪除持久化 三、退出"); 84 85 switch (Console.ReadLine()) 86 { 87 case "1": 88 var random = new Random(); 89 var no = random.Next(5, 15); 90 Console.WriteLine($"隨機產生的數字:{no}"); 91 pid.Tell(new Data { Amount = no }); 92 break; 93 case "2": 94 //完成處理後清理持久化的操做 95 sqliteProvider.DeleteEventsAsync(actorid, 100).Wait(); 96 break; 97 case "3": 98 result = false; 99 break; 100 } 101 } 102 103 } 104 /// <summary> 105 /// 快照事件溯源 106 /// </summary> 107 /// <param name="actorid"></param> 108 /// <param name="sqliteProvider"></param> 109 private static void CallSnapShootEventSource(string actorid, SqliteProvider sqliteProvider) 110 { 111 var props = Actor.FromProducer(() => new SnapShootEventSourceDataActor(sqliteProvider, sqliteProvider, actorid)); 112 var pid = Actor.Spawn(props); 113 var result = true; 114 while (result) 115 { 116 Console.WriteLine("一、Tell 二、刪除持久化 三、退出"); 117 118 switch (Console.ReadLine()) 119 { 120 case "1": 121 var random = new Random(); 122 var no = random.Next(5, 15); 123 Console.WriteLine($"隨機產生的數字:{no}"); 124 pid.Tell(new Data { Amount = no }); 125 break; 126 case "2": 127 //完成處理後清理持久化的操做 128 sqliteProvider.DeleteEventsAsync(actorid, 100).Wait(); 129 sqliteProvider.DeleteSnapshotsAsync(actorid, 100).Wait(); 130 break; 131 case "3": 132 result = false; 133 break; 134 } 135 } 136 } 137 } 138 139 public class Data 140 { 141 public long Amount { get; set; } 142 } 143 144 #region 事件溯源 145 public class EventSourceDataActor : IActor 146 { 147 private long _value = 0; 148 private readonly Persistence _persistence; 149 150 public EventSourceDataActor(IEventStore eventStore, string actorId) 151 { 152 //事件溯源持久化方式 153 _persistence = Persistence.WithEventSourcing(eventStore, actorId, ApplyEvent); 154 } 155 private void ApplyEvent(Proto.Persistence.Event @event) 156 { 157 switch (@event.Data) 158 { 159 case Data msg: 160 _value = _value + msg.Amount; 161 Console.WriteLine($"累計:{_value}"); 162 break; 163 } 164 } 165 public async Task ReceiveAsync(IContext context) 166 { 167 switch (context.Message) 168 { 169 case Started _: 170 await _persistence.RecoverStateAsync(); 171 break; 172 case Data msg: 173 await _persistence.PersistEventAsync(new Data { Amount = msg.Amount }); 174 break; 175 } 176 } 177 } 178 #endregion 179 180 #region 快照 181 public class SnapShootDataActor : IActor 182 { 183 private long _value = 0; 184 private readonly Persistence _persistence; 185 186 public SnapShootDataActor(ISnapshotStore snapshotStore, string actorId) 187 { 188 //快照持久化方式 189 _persistence = Persistence.WithSnapshotting(snapshotStore, actorId, ApplySnapshot); 190 } 191 private void ApplySnapshot(Proto.Persistence.Snapshot snapshot) 192 { 193 switch (snapshot.State) 194 { 195 case long value: 196 _value = value; 197 Console.WriteLine($"累計:{_value}"); 198 break; 199 } 200 } 201 public async Task ReceiveAsync(IContext context) 202 { 203 switch (context.Message) 204 { 205 case Started _: 206 await _persistence.RecoverStateAsync(); 207 break; 208 case Data msg: 209 _value = _value + msg.Amount; 210 await _persistence.DeleteSnapshotsAsync(100); 211 await _persistence.PersistSnapshotAsync(_value); 212 break; 213 } 214 } 215 } 216 #endregion 217 218 #region 事件溯源and快照 219 public class SnapShootEventSourceDataActor : IActor 220 { 221 private long _value = 0; 222 private readonly Persistence _persistence; 223 224 public SnapShootEventSourceDataActor(IEventStore eventStore, ISnapshotStore snapshotStore, string actorId) 225 { 226 //註釋快照策略 227 //_persistence = Persistence.WithEventSourcingAndSnapshotting(eventStore, snapshotStore, actorId, ApplyEvent, ApplySnapshot, new IntervalStrategy(5), () => { return _value; }); 228 //無快照策略 229 _persistence = Persistence.WithEventSourcingAndSnapshotting(eventStore, snapshotStore, actorId, ApplyEvent, ApplySnapshot); 230 } 231 private void ApplyEvent(Proto.Persistence.Event @event) 232 { 233 switch (@event.Data) 234 { 235 case Data msg: 236 _value = _value + msg.Amount; 237 Console.WriteLine($"事件溯源累計:{_value}"); 238 break; 239 } 240 } 241 private void ApplySnapshot(Proto.Persistence.Snapshot snapshot) 242 { 243 switch (snapshot.State) 244 { 245 case long value: 246 _value = value; 247 Console.WriteLine($"快照累計:{_value}"); 248 break; 249 } 250 } 251 public async Task ReceiveAsync(IContext context) 252 { 253 switch (context.Message) 254 { 255 case Started _: 256 await _persistence.RecoverStateAsync(); 257 break; 258 case Data msg: 259 await _persistence.PersistEventAsync(new Data { Amount = msg.Amount }); 260 //無快照策略時啓用 261 await _persistence.PersistSnapshotAsync(_value); 262 break; 263 } 264 } 265 } 266 #endregion 267 }
經過代碼看到,持久化是經過在Actor中定義Persistence時,關聯一個參數爲,Event或Snapshot的方法,而且Actor的Receive方法在Stared到達是恢復(從持久載體中讀取數據來恢復),在具體消息到達時,調用Persistence.PersistEventAsync或Persistence.PersisSnapshotAsync來持久化狀態數據,這兩個方法,都會把調用似遞到Persistence產生是關聯的那個方法,並把消息實體類經過Event.Data或Snapshot.State傳遞進去。
此例分別演示了事件溯源,快照,帶快照事件溯源,例子很簡單,就是把每次產生的隨機數累加起來
一、
事件溯源
三個綠色箭頭,意思是進了三次「一、事件溯源」這個選項
三次藍色箭頭,意思是調用了三次Tell方法,用來獲取三次隨機數,藍色橢圓是產生的三個數字,分別是7,7,6,藍色方框是累計結果,從上往下,第一次是(0+7)7,第二次是(7+7)14,第三次是(14+6)20
紅色箭頭是退出事件溯源的方法,返回上一級
綠色方框是綠色箭頭再次進入,自動恢復,事件溯源後的結果(即從持久化載體中把以前的全部事件從新走一次)仍是以前退出時的結果,累計20,因此無論這個Actor在什麼地方退出,再次運行,都會把以前的補運行回來。
也能夠打開sqlite庫進行查看保存的事件結果
二、
快照
快照與事件溯源相似,差異在於每次再次進來,只取上次退出時的結果,同時,在數據裏,只保存了最後一次的結果。
三、帶快照的事件溯源
與快照相似,上面代碼咱們是一個事件,一個快照。
官方給出帶快照的事件能夠經過快照策略來保存快照
在建立持久化對象時,能夠添加快照策略
1 _persistence = Persistence.WithEventSourcingAndSnapshotting(eventStore, snapshotStore, actorId, ApplyEvent, ApplySnapshot, new IntervalStrategy(5), () => { return _value; });
您能夠選擇ISnapshotStrategy在保存事件時指定自動保存快照。提供的策略是:
同時要在Actor的Receive把保存快照註釋掉,Demo中我用的是5個事件後保存一次快照,以下圖結果
綠色是第一次,要保存一下快照,而後以後第五個事件過來後保存第二次快照,若是在第四個事件後程序就退出,那快照保存的只有第一次的,不有擔憂,當再次調用時,由於記錄下了全部事件,Actor會取出最後一次快照,再支執行快照後的事件,這是由於在保存快照和事件時,會把他們的索引保存起來,索引是同樣的,就能用最後的快照+這個快照索引後的事件,恢復到退出的地方。spa
記得執行後查看Sqlite數據,有助於你更好的瞭解Proto.Actor的Persistence機制哦!3d