Saga單詞翻譯過來是指尤指古代挪威或冰島講述冒險經歷和英雄業績的長篇故事,對,這裏強調長篇故事。許多系統都存在長時間運行的業務流程,NServiceBus使用基於事件驅動的體系結構將容錯性和可伸縮性融入這些業務處理過程當中。
固然一個單一接口調用則算不上一個長時間運行的業務場景,那麼若是在給定的用例中有兩個或多個調用,則應該考慮數據一致性的問題,這裏有可能第一個接口調用成功,第二次調用則可能失敗或者超時,Saga的設計以簡單而健壯的方式處理這樣的業務用例。mysql
先來經過一段代碼簡單認識一下Saga,在NServiceBus裏,使用Saga的話則須要實現抽象類Saga
public class Saga:Saga<State>, IAmStartedByMessages<StartOrder>, IHandleMessages<CompleteOrder> { protected override void ConfigureHowToFindSaga(SagaPropertyMapper<State> mapper) { mapper.ConfigureMapping<StartOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId); mapper.ConfigureMapping<CompleteOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId); } public Task Handle(StartOrder message, IMessageHandlerContext context) { return Task.CompletedTask; } public Task Handle(CompleteOrder message, IMessageHandlerContext context) { MarkAsComplete(); return Task.CompletedTask; } }
長時間運行則意味着有狀態,任何涉及多個網絡調用的進程都須要一個臨時狀態,這個臨時狀態能夠存儲在內存中,序列化在磁盤中,也能夠存儲在分佈式緩存中。在NServiceBus中咱們定義實體,繼承抽象類ContainSagaData便可,默認狀況下,全部公開訪問的屬性都會被持久化。數據庫
public class State:ContainSagaData { public Guid OrderId { get; set; } }
在NServiceBus裏,處理消息的有兩種接口:IHandlerMessages
在前面的代碼片斷裏,咱們看到已經實現了接口IAmStartedByMessages
若是你的業務用例中確實存在無序消息的狀況,則還須要業務流程正常輪轉,那麼則須要多個messaeg都要事先接口IAmStartedByMessages接口,也就是說多個message均可以建立Saga實例。網絡
在處理無序消息和多個消息類型的時候,就存在消息丟失的可能,必須在你的Saga狀態完成之後,這個Saga實例又收到一條消息,但這時Saga狀態已是完結狀態,這條消息則仍然須要處理,這裏則實現NServiceBus的IHandleSagaNotFound接口。app
public class SagaNotFoundHandler:IHandleSagaNotFound { public Task Handle(object message, IMessageProcessingContext context) { return context.Reply(new SagaNotFoundMessage()); } } public class SagaNotFoundMessage { }
當你的業務用例再也不須要Saga實例時,則調用MarkComplete()來結束Saga實例。這個方法在前面的代碼片斷中也能夠看到,其實本質也就是設置Saga.Complete屬性,這是個bool值,你在業務用例中也能夠用此值來判斷Saga流程是否結束。async
namespace NServiceBus { using System; using System.Threading.Tasks; using Extensibility; public abstract class Saga { /// <summary> /// The saga's typed data. /// </summary> public IContainSagaData Entity { get; set; } public bool Completed { get; private set; } internal protected abstract void ConfigureHowToFindSaga(IConfigureHowToFindSagaWithMessage sagaMessageFindingConfiguration); protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, DateTime at) where TTimeoutMessageType : new() { return RequestTimeout(context, at, new TTimeoutMessageType()); } protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, DateTime at, TTimeoutMessageType timeoutMessage) { if (at.Kind == DateTimeKind.Unspecified) { throw new InvalidOperationException("Kind property of DateTime 'at' must be specified."); } VerifySagaCanHandleTimeout(timeoutMessage); var options = new SendOptions(); options.DoNotDeliverBefore(at); options.RouteToThisEndpoint(); SetTimeoutHeaders(options); return context.Send(timeoutMessage, options); } protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, TimeSpan within) where TTimeoutMessageType : new() { return RequestTimeout(context, within, new TTimeoutMessageType()); } protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, TimeSpan within, TTimeoutMessageType timeoutMessage) { VerifySagaCanHandleTimeout(timeoutMessage); var sendOptions = new SendOptions(); sendOptions.DelayDeliveryWith(within); sendOptions.RouteToThisEndpoint(); SetTimeoutHeaders(sendOptions); return context.Send(timeoutMessage, sendOptions); } protected Task ReplyToOriginator(IMessageHandlerContext context, object message) { if (string.IsNullOrEmpty(Entity.Originator)) { throw new Exception("Entity.Originator cannot be null. Perhaps the sender is a SendOnly endpoint."); } var options = new ReplyOptions(); options.SetDestination(Entity.Originator); context.Extensions.Set(new AttachCorrelationIdBehavior.State { CustomCorrelationId = Entity.OriginalMessageId }); options.Context.Set(new PopulateAutoCorrelationHeadersForRepliesBehavior.State { SagaTypeToUse = null, SagaIdToUse = null }); return context.Reply(message, options); } //這個方法結束saga流程,標記Completed屬性 protected void MarkAsComplete() { Completed = true; } void VerifySagaCanHandleTimeout<TTimeoutMessageType>(TTimeoutMessageType timeoutMessage) { var canHandleTimeoutMessage = this is IHandleTimeouts<TTimeoutMessageType>; if (!canHandleTimeoutMessage) { var message = $"The type '{GetType().Name}' cannot request timeouts for '{timeoutMessage}' because it does not implement 'IHandleTimeouts<{typeof(TTimeoutMessageType).FullName}>'"; throw new Exception(message); } } void SetTimeoutHeaders(ExtendableOptions options) { options.SetHeader(Headers.SagaId, Entity.Id.ToString()); options.SetHeader(Headers.IsSagaTimeoutMessage, bool.TrueString); options.SetHeader(Headers.SagaType, GetType().AssemblyQualifiedName); } } }
本機開發環境咱們使用LearningPersistence,可是投產的話則須要使用數據庫持久化,這裏咱們基於MySQL,SQL持久化須要引入NServiceBus.Persistence.Sql。SQL Persistence會生成幾種關係型數據庫的sql scripts,而後會根據你的斷言配置選擇所需數據庫,好比SQL Server、MySQL、PostgreSQL、Oracle。
持久化Saga自動建立所需表結構,你只需手動配置便可,配置後編譯成功後項目執行目錄下會生成sql腳本,文件夾名稱是NServiceBus.Persistence.Sql,下面會有Saga子目錄。分佈式
/* TableNameVariable */ set @tableNameQuoted = concat('`', @tablePrefix, 'Saga`'); set @tableNameNonQuoted = concat(@tablePrefix, 'Saga'); /* Initialize */ drop procedure if exists sqlpersistence_raiseerror; create procedure sqlpersistence_raiseerror(message varchar(256)) begin signal sqlstate 'ERROR' set message_text = message, mysql_errno = '45000'; end; /* CreateTable */ set @createTable = concat(' create table if not exists ', @tableNameQuoted, '( Id varchar(38) not null, Metadata json not null, Data json not null, PersistenceVersion varchar(23) not null, SagaTypeVersion varchar(23) not null, Concurrency int not null, primary key (Id) ) default charset=ascii; '); prepare script from @createTable; execute script; deallocate prepare script; /* AddProperty OrderId */ select count(*) into @exist from information_schema.columns where table_schema = database() and column_name = 'Correlation_OrderId' and table_name = @tableNameNonQuoted; set @query = IF( @exist <= 0, concat('alter table ', @tableNameQuoted, ' add column Correlation_OrderId varchar(38) character set ascii'), 'select \'Column Exists\' status'); prepare script from @query; execute script; deallocate prepare script; /* VerifyColumnType Guid */ set @column_type_OrderId = ( select concat(column_type,' character set ', character_set_name) from information_schema.columns where table_schema = database() and table_name = @tableNameNonQuoted and column_name = 'Correlation_OrderId' ); set @query = IF( @column_type_OrderId <> 'varchar(38) character set ascii', 'call sqlpersistence_raiseerror(concat(\'Incorrect data type for Correlation_OrderId. Expected varchar(38) character set ascii got \', @column_type_OrderId, \'.\'));', 'select \'Column Type OK\' status'); prepare script from @query; execute script; deallocate prepare script; /* WriteCreateIndex OrderId */ select count(*) into @exist from information_schema.statistics where table_schema = database() and index_name = 'Index_Correlation_OrderId' and table_name = @tableNameNonQuoted; set @query = IF( @exist <= 0, concat('create unique index Index_Correlation_OrderId on ', @tableNameQuoted, '(Correlation_OrderId)'), 'select \'Index Exists\' status'); prepare script from @query; execute script; deallocate prepare script; /* PurgeObsoleteIndex */ select concat('drop index ', index_name, ' on ', @tableNameQuoted, ';') from information_schema.statistics where table_schema = database() and table_name = @tableNameNonQuoted and index_name like 'Index_Correlation_%' and index_name <> 'Index_Correlation_OrderId' and table_schema = database() into @dropIndexQuery; select if ( @dropIndexQuery is not null, @dropIndexQuery, 'select ''no index to delete'';') into @dropIndexQuery; prepare script from @dropIndexQuery; execute script; deallocate prepare script; /* PurgeObsoleteProperties */ select concat('alter table ', table_name, ' drop column ', column_name, ';') from information_schema.columns where table_schema = database() and table_name = @tableNameNonQuoted and column_name like 'Correlation_%' and column_name <> 'Correlation_OrderId' into @dropPropertiesQuery; select if ( @dropPropertiesQuery is not null, @dropPropertiesQuery, 'select ''no property to delete'';') into @dropPropertiesQuery; prepare script from @dropPropertiesQuery; execute script; deallocate prepare script; /* CompleteSagaScript */
生成的表結構:
ide
Saga持久化須要依賴NServiceBus.Persistence.Sql。引入後須要實現SqlSaga抽象類,抽象類須要重寫ConfigureMapping,配置Saga工做流程業務主鍵。
public class Saga:SqlSaga<State>, IAmStartedByMessages<StartOrder> { protected override void ConfigureMapping(IMessagePropertyMapper mapper) { mapper.ConfigureMapping<StartOrder>(message=>message.OrderId); } protected override string CorrelationPropertyName => nameof(StartOrder.OrderId); public Task Handle(StartOrder message, IMessageHandlerContext context) { Console.WriteLine($"Receive message with OrderId:{message.OrderId}"); MarkAsComplete(); return Task.CompletedTask; } } static async Task MainAsync() { Console.Title = "Client-UI"; var configuration = new EndpointConfiguration("Client-UI"); //這個方法開啓自動建表、自動建立RabbitMQ隊列 configuration.EnableInstallers(); configuration.UseSerialization<NewtonsoftSerializer>(); configuration.UseTransport<LearningTransport>(); string connectionString = "server=127.0.0.1;uid=root;pwd=000000;database=nservicebus;port=3306"; var persistence = configuration.UsePersistence<SqlPersistence>(); persistence.SqlDialect<SqlDialect.MySql>(); //配置mysql鏈接串 persistence.ConnectionBuilder(()=>new MySqlConnection(connectionString)); var instance = await Endpoint.Start(configuration).ConfigureAwait(false); var command = new StartOrder() { OrderId = Guid.NewGuid() }; await instance.SendLocal(command).ConfigureAwait(false); Console.ReadKey(); await instance.Stop().ConfigureAwait(false); }
在消息驅動類型的環境中,雖然傳遞的無鏈接特性能夠防止在線等待過程當中消耗資源,可是畢竟等待時間須要有一個上線。在NServiceBus裏已經提供了Timeout方法,咱們只需訂閱便可,能夠在你的Handle方法中根據須要訂閱Timeout,可參考以下代碼:
public class Saga:Saga<State>, IAmStartedByMessages<StartOrder>, IHandleMessages<CompleteOrder>, IHandleTimeouts<TimeOutMessage> { public Task Handle(StartOrder message, IMessageHandlerContext context) { var model=new TimeOutMessage(); //訂閱超時消息 return RequestTimeout(context,TimeSpan.FromMinutes(10)); } public Task Handle(CompleteOrder message, IMessageHandlerContext context) { MarkAsComplete(); return Task.CompletedTask; } protected override string CorrelationPropertyName => nameof(StartOrder.OrderId); public Task Timeout(TimeOutMessage state, IMessageHandlerContext context) { //處理超時消息 } protected override void ConfigureHowToFindSaga(SagaPropertyMapper<State> mapper) { mapper.ConfigureMapping<StartOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId); mapper.ConfigureMapping<CompleteOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId); } }
//從Timeout的源碼看,這個方法是經過設置SendOptions,而後再把當前這個消息發送給本身來實現 protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, TimeSpan within, TTimeoutMessageType timeoutMessage) { VerifySagaCanHandleTimeout(timeoutMessage); var sendOptions = new SendOptions(); sendOptions.DelayDeliveryWith(within); sendOptions.RouteToThisEndpoint(); SetTimeoutHeaders(sendOptions); return context.Send(timeoutMessage, sendOptions); }
NServiceBus由於是商業產品,對分佈式消息系統所涉及到的東西都作了實現,包括分佈式事務(Outbox)、DTC都有,還有心跳檢測,監控都有,全而大,目前咱們用到的也只是NServiceBus裏很小的一部分功能。