.net core使用EasyNetQ作EventBus

轉自 https://www.cnblogs.com/focus-lei/p/9121095.htmlhtml

尚未親測,有時間反饋測試結果api

      隨着SOA、微服務、CQRS的盛行,EventBus愈來愈流行,上GitHub搜了一下,仍是有蠻多的這類實現,老牌的有NServiceBus(收費)、MassTransit,最近的有CAP(國人寫的,1.4k個Star,很是不錯)、ReBus(張隊長在NanoFabric中推薦的)、RawRabbit等,今天我介紹的是另一款產品:EasyNetQ,支持.net core,正如其名,用起來確實very easy。app

       首先在asp.net core項目中添加EasyNetQ的nuget引用,注入:asp.net

public void ConfigureServices(IServiceCollection services)
{
       string rabbitMqConnection = Configuration["RabbitMqConnection"];
       services.AddSingleton(RabbitHutch.CreateBus(rabbitMqConnection));
}

       一、發佈消息:

複製代碼
public class ValuesController : Controller
    {
        private IBus _bus;
        public ValuesController(IBus bus)
        {
            _bus = bus;
        }


        // POST api/values
        [HttpPost]
        public async Task Post([FromBody]Order message)
        {
            await _bus.PublishAsync(message);
        }
    }
複製代碼

          可自定義消息的Exchange和Queue,須要修改消息實體:異步

    [Queue("Qka.Order", ExchangeName = "Qka.Order")]
    public class Order
    {
        public int OrderId { get; set; }
    }

 

          二、訂閱消息

   咱們訂閱的服務也是寄宿在asp.net core程序中,首先定義消費者:async

複製代碼
    public class OrderConsumer : IConsume<Order>
    {
        [AutoSubscriberConsumer(SubscriptionId = "OrderService")]
        public void Consume(Order message)
        {
            //業務代碼
        }
    }
複製代碼

          也能夠使用異步方法:  ide

複製代碼
    public class OrderConsumer : IConsumeAsync<Order>
    {
        [AutoSubscriberConsumer(SubscriptionId = "OrderService")]
        public Task ConsumeAsync(Order message)
        {
            //業務代碼
        }
    }
複製代碼

         利用EasyNetQ的自動訂閱者進行訂閱,這裏擴展IApplicationBuilder:微服務

複製代碼
public static class ApplicationExtenssion
{
        public static IApplicationBuilder UseSubscribe(this IApplicationBuilder appBuilder, string subscriptionIdPrefix, Assembly assembly)
        {
            var services = appBuilder.ApplicationServices.CreateScope().ServiceProvider;

            var lifeTime = services.GetService<IApplicationLifetime>();
            var bus = services.GetService<IBus>();
            lifeTime.ApplicationStarted.Register(() =>
            {
                var subscriber = new AutoSubscriber(bus, subscriptionIdPrefix);
                subscriber.Subscribe(assembly);         
subscriber.SubscribeAsync(assembly);
});

        lifeTime.ApplicationStopped.Register(() => { bus.Dispose(); });

        return appBuilder;
     }
}
複製代碼

          使用擴展方法:測試

        public void Configure(IApplicationBuilder app, IHostingEnvironment env)
        {
            app.UseSubscribe("OrderService", Assembly.GetExecutingAssembly());
        }
相關文章
相關標籤/搜索