本身之前都走了彎路,覺得學習戰術設計就會DDD了,其實DDD的精華在戰略設計,可是對於咱們菜鳥來講,學習一些技術概念也是挺好的
常常看到這些術語,概念太多,也想簡單學習一下,記憶力比較差記錄一下實現的細節
數據庫
/// <summary> /// 存儲聚合根中的事件到EventStorage 發佈事件 /// </summary> /// <typeparam name="TAggregationRoot"></typeparam> /// <param name="event"></param> /// <returns></returns> public async Task AppendEventStoragePublishEventAsync<TAggregationRoot>(TAggregationRoot @event) where TAggregationRoot : IAggregationRoot { var domainEventList = @event.UncommittedEvents.ToList(); if (domainEventList.Count == 0) { throw new Exception("請添加事件!"); } await TryAppendEventStorageAsync(domainEventList).ContinueWith(async e => { if (e.Result == (int)EventStorageStatus.Success) { await TryPublishDomainEventAsync(domainEventList).ConfigureAwait(false); @event.ClearEvents(); } }); } /// <summary> /// 發佈領域事件 /// </summary> /// <returns></returns> public async Task PublishDomainEventAsync(List<IDomainEvent> domainEventList) { using (var connection = new SqlConnection(ConnectionStr)) { if (connection.State == ConnectionState.Closed) { await connection.OpenAsync().ConfigureAwait(false); } using (var transaction = await connection.BeginTransactionAsync().ConfigureAwait(false)) { try { if (domainEventList.Count > 0) { foreach (var domainEvent in domainEventList) { await _capPublisher.PublishAsync(domainEvent.GetRoutingKey(), domainEvent).ConfigureAwait(false); } } await transaction.CommitAsync().ConfigureAwait(false); } catch (Exception e) { await transaction.RollbackAsync().ConfigureAwait(false); throw; } } } } /// <summary> /// 發佈領域事件重試 /// </summary> /// <param name="domainEventList"></param> /// <returns></returns> public async Task TryPublishDomainEventAsync(List<IDomainEvent> domainEventList) { var policy = Policy.Handle<SocketException>().Or<IOException>().Or<Exception>() .RetryForeverAsync(onRetry: exception => { Task.Factory.StartNew(() => { //記錄重試的信息 _loggerHelper.LogInfo("發佈領域事件異常", exception.Message); }); }); await policy.ExecuteAsync(async () => { await PublishDomainEventAsync(domainEventList).ConfigureAwait(false); }); } /// <summary> /// 存儲聚合根中的事件到EventStorage中 /// </summary> /// <returns></returns> public async Task<int> AppendEventStorageAsync(List<IDomainEvent> domainEventList) { if (domainEventList.Count == 0) { throw new Exception("請添加事件!"); } var status = (int)EventStorageStatus.Failure; using (var connection = new SqlConnection(ConnectionStr)) { try { if (connection.State == ConnectionState.Closed) { await connection.OpenAsync().ConfigureAwait(false); } using (var transaction = await connection.BeginTransactionAsync().ConfigureAwait(false)) { try { if (domainEventList.Count > 0) { foreach (var domainEvent in domainEventList) { EventStorage eventStorage = new EventStorage { Id = Guid.NewGuid(), AggregateRootId = domainEvent.AggregateRootId, AggregateRootType = domainEvent.AggregateRootType, CreateDateTime = domainEvent.CreateDateTime, Version = domainEvent.Version, EventData = Events(domainEvent) }; var eventStorageSql = $"INSERT INTO EventStorageInfo(Id,AggregateRootId,AggregateRootType,CreateDateTime,Version,EventData) VALUES (@Id,@AggregateRootId,@AggregateRootType,@CreateDateTime,@Version,@EventData)"; await connection.ExecuteAsync(eventStorageSql, eventStorage, transaction).ConfigureAwait(false); } } await transaction.CommitAsync().ConfigureAwait(false); status = (int)EventStorageStatus.Success; } catch (Exception e) { await transaction.RollbackAsync().ConfigureAwait(false); throw; } } } catch (Exception e) { connection.Close(); throw; } } return status; } /// <summary> /// AppendEventStorageAsync異常重試 /// </summary> public async Task<int> TryAppendEventStorageAsync(List<IDomainEvent> domainEventList) { var policy = Policy.Handle<SocketException>().Or<IOException>().Or<Exception>() .RetryForeverAsync(onRetry: exception => { Task.Factory.StartNew(() => { //記錄重試的信息 _loggerHelper.LogInfo("存儲事件異常", exception.Message); }); }); var result = await policy.ExecuteAsync(async () => { var resulted = await AppendEventStorageAsync(domainEventList).ConfigureAwait(false); return resulted; }); return result; } /// <summary> /// 根據DomainEvent序列化事件Json /// </summary> /// <param name="domainEvent"></param> /// <returns></returns> public string Events(IDomainEvent domainEvent) { ConcurrentDictionary<string, string> dictionary = new ConcurrentDictionary<string, string>(); //獲取領域事件的類型(方便解析Json) var domainEventTypeName = domainEvent.GetType().Name; var domainEventStr = JsonConvert.SerializeObject(domainEvent); dictionary.GetOrAdd(domainEventTypeName, domainEventStr); var eventData = JsonConvert.SerializeObject(dictionary); return eventData; }
解析EventStorage中存儲的事件
服務器
public async Task<List<IDomainEvent>> GetAggregateRootEventStorageById(Guid AggregateRootId) { try { using (var connection = new SqlConnection(ConnectionStr)) { var eventStorageList = await connection.QueryAsync<EventStorage>($"SELECT * FROM dbo.EventStorageInfo WHERE AggregateRootId='{AggregateRootId}'"); List<IDomainEvent> domainEventList = new List<IDomainEvent>(); foreach (var item in eventStorageList) { var dictionaryDomainEvent = JsonConvert.DeserializeObject<Dictionary<string, string>>(item.EventData); foreach (var entry in dictionaryDomainEvent) { var domainEventType = TypeNameProvider.GetType(entry.Key); if (domainEventType != null) { var domainEvent = JsonConvert.DeserializeObject(entry.Value, domainEventType) as IDomainEvent; domainEventList.Add(domainEvent); } } } return domainEventList; } } catch (Exception ex) { throw; }
1.事件沒持久化就表明事件還沒發生成功,事件存儲可能失敗,必須先存儲事件,在發佈事件,保證存儲事件與發佈事件一致性
1.使用事件驅動,必需要作好冥等的處理
2.若是業務場景中有狀態時:經過狀態來控制
3.新建一張表,用來記錄消費的信息,消費端的代碼裏面,根據惟一的標識,判斷是否處理過該事件
4.Q端的任何更新都應該把聚合根ID和事件版本號做爲條件,Q端的更新不用遵循聚合的原則,可使用最簡單的方式處理
5.倉儲是用來重建聚合的,它的行爲和集合同樣只有Get ,Add ,Delete
6.DDD不是技術,是思想,核心在戰略模塊,戰術設計是實現的一種選擇,戰略設計,須要面向對象的分析能力,職責分配,深層次的分析業務
併發