使用Masstransit開發基於消息傳遞的分佈式應用

Masstransit做爲.Net平臺下的一款優秀的開源產品卻沒有獲得應有的關注,這段時間有機會閱讀了Masstransit的源碼,我以爲我有必要普及一下這個框架的使用。html

值得一提的是Masstransit的源碼寫的很是優秀,值得每一個想提升本身編程能力的.Net選手閱讀,整個代碼看起來賞心悅目。反之,每次打開本身公司項目的時候心情都異常沉重。因此不是.Net不行,仍是我們水平不行。git

學會了Masstransit你不再用羨慕別人有Dubbo、Mule、Akka什麼的了,固然在某些方面他們的使用場景仍是有一些區別。另外插播一條廣告:本人目前在西安求職中,若是那位同窗有好的工做機會但願可以幫忙推薦。github

閱讀本篇文章的前提是你須要對消息隊列有一些瞭解,特別是RabbitMq,Masstransit做爲一款輕量級的ESB默認支持RabbitMq和MSMQ。本文的例子都使用RabbitMq來介紹,因此你最好能讀一下我以前寫的《如何優雅的使用RabbitMq》。web

簡單來講,Masstransit提供了使用消息隊列場景的一種抽象,也就是說,若是你有使用消息隊列的需求,均可以經過Masstransit來完成,固然若是僅僅是拿消息隊列來發個短信、郵件之類的並不能體現出Masstransit的優越性。當整個業務系統都經過Masstransit過來構建和交互的時候,才能真正體現ESB的價值所在。編程

我寫了5不一樣場景個Demo,方便你們學習和參考。我會重點講解Real World的案例,也就是如何在真實場景使用Masstransit。若是僅僅是把一些組件融入到了項目中而且可以運行,並不能算是一個合格的架構師,一個合格的架構師必定是能夠將某個組件以最佳實踐的方式融入到了本身的項目中,而且可以爲開發者提供清晰且合理的抽象,而後針對這一方案制定一些約定和規則,隨着項目的推動,整個項目的代碼都可以有章可循,始終在架構師的掌控之中。windows

1、發送命令模型(Send Command Pattern)api

這種模型最多見的就是CQRS中C,用來向DomainHandler發送一個Command。另外系統的發送郵件服務、發送短信服務也能夠經過這種模式來實現。這種模型跟郵遞員向郵箱投遞郵件有點類似。這一模型的特色是你須要知道對方終結點的地址,意味着你要明確要向哪一個地址發送消息。從Masstransit提供的api就能夠看出來:架構

var endPoint =await bus.GetSendEndpoint(sendToUri);
            var command = new GreetingCommandA()
            {
                Id = Guid.NewGuid(),
                DateTime = DateTime.Now
            };

            await endPoint.Send(command);

這個Demo主要由2個工程組成,Client發送消息到Server,Server來響應這一消息。框架

2、發佈/訂閱模型(publish/subscribe pattern)異步

之因此有基於消息傳遞的分佈式應用這種架構模式,很大程度上就是依靠這種模式來完成。一個典型的例子是子系統A發佈了一條消息,子系統B和子系統C均可以訂閱這一消息並異步處理該消息。而這一過程對子系統A來講是不關心的。從而減小不一樣的子系統之間的耦合,提升系統的可擴展性。

3、消息的繼承層次

用過RabbitMQ的同窗應該知道,RabbitMQ提供了3中類型的Exchange,分別爲direct、fanout和topic。全部這一切都是爲了提供一種路由消息的機制。而這一切是經過匹配一種字符串類型的routingKey來實現的,固然有了Masstransit你就不用這麼費勁了。C#做爲一種強類型的語言,咱們能夠經過設計消息的繼承層次來實現消息的路由機制。好比咱們能夠設計下面的消息繼承體系:

 public interface IMessage
    {
        Guid Id { get; set; }
    }

    public class Message : IMessage
    {
        public Guid Id { get; set; }
        public string Type { get; set; }
    }

    public class UserUpdatedMessage : Message
    {
        public Guid Id { get; set; }
    }

有了這樣的繼承體系,咱們能夠定義下面的Consumer類型:

public class BaseInterfaceMessageConsumer:IConsumer<IMessage>
    {
        public async Task Consume(ConsumeContext<IMessage> context)
        {
            await Console.Out.WriteLineAsync($"consumer is BaseInterfaceMessageConsumer,message type is {context.Message.GetType()}");
        }
    }

還能夠定義下面的Consumer類型:

public class UserUpdatedMessageConsumer: IConsumer<UserUpdatedMessage>
    {
        public async Task Consume(ConsumeContext<UserUpdatedMessage> context)
        {
            await Console.Out.WriteLineAsync($"consumer is UserUpdatedMessageConsumer,message type is {context.Message.GetType()}");
        }
    }

這樣就能夠路由不一樣的消息到相應的Consumer中了。

4、使用Topshelf來構建windows服務

咱們最終要將consumer程序集打成windows服務來安裝在產品環境下,Topshelf爲咱們提供了一組DSL描述的api來建立window服務:

HostFactory.Run(x =>                                 
            {
                x.Service<GreetingServer>(s =>                        
                {
                    s.ConstructUsing(name => new GreetingServer());     
                    s.WhenStarted(tc => tc.Start());            
                    s.WhenStopped(tc => tc.Stop());
                });
                x.StartAutomatically();
                x.RunAsLocalSystem();                          
                x.SetDescription("A greeting service");        
                x.SetDisplayName("Greeting Service");                      
                x.SetServiceName("GreetingService");     
            });

5、RPC調用(request/response pattern)

咱們還能夠經過Masstransit實現RPC調用:

var response = await client.Request(new SimpleRequest() {CustomerId = customerId});

Console.WriteLine("Customer Name: {0}", response.CusomerName);

這有點像是一個webservice調用,不過在ESB的設計中咱們應該儘可能避免這種設計,特別是在異構系統之間,應該儘可能採用send command pattern和publish/subscriber pattern。

6、正式場景該如何使用Masstransit

在使用Masstranit的正式場景中,咱們主要考慮如下幾個方面:

一、配置方式

定義一個抽象類,用來統一配置方式:

    public abstract class BusConfiguration
    {
        public abstract string RabbitMqAddress { get; }
        public abstract string QueueName { get; }
        public abstract string RabbitMqUserName { get; }
        public abstract string RabbitMqPassword { get; }
        public abstract Action<IRabbitMqBusFactoryConfigurator,IRabbitMqHost> Configuration { get; }


        public virtual IBus CreateBus()
        {
            var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host(new Uri(RabbitMqAddress), hst =>
                {
                    hst.Username(RabbitMqUserName);
                    hst.Password(RabbitMqPassword);
                });

                Configuration?.Invoke(cfg, host);
            });

            return bus;
        }
    }

具體的項目會繼承該配置類作對應的配置:如UserManagementBusConfiguration、UserManagementServiceBusConfiguration等

二、可以跟DI容器結合,本例以Castle Windsor Container爲例:

在web項目中添加ServiceBusInstaller:

public class ServiceBusInstaller:IWindsorInstaller
    {
        public void Install(IWindsorContainer container, IConfigurationStore store)
        {
            container.Register(
                Component.For<IBus, IBusControl>()
                    .Instance(UserManagementBusConfiguration.BusInstance)
                    .LifestyleSingleton());
        }
    }

而後咱們就能夠在controller中注入IBus了:

private readonly IUserProvider _userProvider;
        private readonly IBus _bus;

        public ValuesController(IUserProvider userProvider,IBus bus)
        {
            _userProvider = userProvider;
            _bus = bus;
        }

        [HttpGet]
        [Route("api/values/createuser")]
        public string CreateUser()
        {
            //save user in local db

            _bus.Publish(new UserCreatedEvent() {UserName = "Tom", Email = "tom@google.com"});

            return "create user named Tom";
        }

一樣的道理,在consumer項目中也能夠作一樣的配置,添加ConsumersInstaller:

public class ConsumersInstaller:IWindsorInstaller
    {
        public void Install(IWindsorContainer container, IConfigurationStore store)
        {
            container.Register(
                Classes.FromThisAssembly().BasedOn(typeof (IConsumer)).WithServiceBase().WithServiceSelf().LifestyleTransient());
        }
    }

在Consumer中注入一個組件試試:

public class UserCreatedEventConsumer : IConsumer<UserCreatedEvent>
    {
        private readonly GreetingWriter _greetingWriter;

        public UserCreatedEventConsumer(GreetingWriter greetingWriter)
        {
            _greetingWriter = greetingWriter;
        }

        public async Task Consume(ConsumeContext<UserCreatedEvent> context)
        {
            _greetingWriter.SayHello();

            await Console.Out.WriteLineAsync($"user name is {context.Message.UserName}");
            await Console.Out.WriteLineAsync($"user email is {context.Message.Email}");
        }
    }

把web項目和consumer服務都跑起來看看:

三、重試配置

 cfg.UseRetry(Retry.Interval(3, TimeSpan.FromMinutes(1)));

消息消費失敗後重試3次,每次間隔1分鐘

四、限速器

cfg.UseRateLimit(1000, TimeSpan.FromSeconds(1));

每分鐘消息消費數限定在1000以內

五、熔斷器

cfg.UseCircuitBreaker(cb =>
                    {
                        cb.TrackingPeriod = TimeSpan.FromMinutes(1);
                        cb.TripThreshold = 15;
                        cb.ActiveThreshold = 10;
                    });

參照Martin Folwer對熔斷器模式的描述:CircuitBreaker

六、異常處理

 public class UserUpdatedEventComsumer
        :IConsumer<UserUpdatedEvent>
        ,IConsumer<Fault<UserUpdatedEvent>>
    {
        public Task Consume(ConsumeContext<UserUpdatedEvent> context)
        {
            throw new System.NotImplementedException();
        }

        public async Task Consume(ConsumeContext<Fault<UserUpdatedEvent>> context)
        {
            await Console.Out.WriteLineAsync($"catch exception: {context.Message.Message}");
        }
    }

只要繼承於對應的Fault<TMessage>便可爲對應的消息編寫異常處理。

七、單元測試(待續)

八、消息定時發送(待續)

九、自定義中間件(待續)

十、自定義觀察者(待續)

十一、長生命週期的消費者:Turnout(待續)

十二、長生命週期的狀態機:saga(待續)

1三、Routing slip pattern的實現:Courier(待續)

整個Demo代碼提供下載:http://git.oschina.net/richieyangs/RabbitMQ.Practice

相關文章
相關標籤/搜索