.Net Core 商城微服務項目系列(十一):MQ消費端獨立爲Window服務+消息處理服務

以前使用MQ的時候是經過封裝成dll發佈Nuget包來使用,消息的發佈和消費都耦合在使用的站點和服務裏,這樣會形成兩個問題:數據庫

1.增長服務和站點的壓力,由於每次消息的消費就意味着接口的調用,這部分的壓力都加在了使用的站點和服務的機器上。api

2.增長修改的複雜性,若是咱們須要加兩條消費日誌,都須要再發佈一個版本從新經過dll引用。服務器

 

因此咱們須要作如下兩方面的工做:async

1.MQ的接收拆分爲Windows服務,經過zokeerper實現主從防止單點故障。ide

2.MQ的消費這裏作成單獨的WebApi服務。性能

 

這樣作的好處有如下幾方面:ui

1.解耦。MQ的消費從使用的站點和服務中被拆分出來,減輕服務壓力。spa

2.增長程序的可維護和可調試性。調試

3.單獨部署提升吞吐量。日誌

 

首先咱們先來看下MQ的消費服務端,其實就是把以前調接口的方法單獨放到了WebApi中,這樣能夠單獨部署,減輕服務器壓力:

        /// <summary>
        /// MQ消費到指定的服務接口
        /// </summary>
        [HttpPost]
        public async Task<ConsumerProcessEventResponse> ConsumerProcessEventAsync([FromBody]ConsumerProcessEventRequest request)
        {
            ConsumerProcessEventResponse response = new ConsumerProcessEventResponse();
            try
            {
                _logger.LogInformation($"MQ準備執行ConsumerProcessEvent方法,RoutingKey:{request.RoutingKey} Message:{request.MQBodyMessage}");
                using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
                {
                    //獲取綁定該routingKey的服務地址集合
                    var subscriptions = await StackRedis.Current.GetAllList(request.RoutingKey);
                    if (!subscriptions.Any())
                    {
                        //若是Redis中不存在 則從數據庫中查詢 加入Redis中
                        var queryRoutingKeyApiUrlResponse = _apiHelperService.PostAsync<QueryRoutingKeyApiUrlResponse>(ServiceAddress.QueryRoutingKeyApiUrlAsync, new QueryRoutingKeyApiUrlRequest { RoutingKey = request.RoutingKey });
                        if (queryRoutingKeyApiUrlResponse.Result != null && queryRoutingKeyApiUrlResponse.Result.ApiUrlList.Any())
                        {
                            subscriptions = queryRoutingKeyApiUrlResponse.Result.ApiUrlList;
                            Task.Run(() =>
                            {
                                StackRedis.Current.SetLists(request.RoutingKey, queryRoutingKeyApiUrlResponse.Result.ApiUrlList);
                            });
                        }
                    }
                    if(subscriptions!=null && subscriptions.Any())
                    {
                        foreach (var apiUrl in subscriptions)
                        {
                            Task.Run(() =>
                            {
                                _logger.LogInformation(request.MQBodyMessage);
                            });

                            //這裏須要作判斷 假如MQ要發送到多個服務接口 其中一個消費失敗 應該將其單獨記錄到數據庫 而不影響這個消息的確認
//先作個備註 之後添加這個處理
await _apiHelperService.PostAsync(apiUrl, request.MQBodyMessage); } _logger.LogInformation($"MQ執行ProcessEvent方法完成,RoutingKey:{request.RoutingKey} Message:{request.MQBodyMessage}"); } } } catch(Exception ex) { response.Successful = false; response.Message = ex.Message; _logger.LogError(ex, $"MQ消費失敗 RoutingKey:{request.RoutingKey} Message:{request.MQBodyMessage}"); } return response; }

 

這個WebApi只有這一個方法,就是根據RoutingKey查找對應的MQ配置,而後根據配置的接口地址調用指定的接口,比較簡單哈,以前也寫過,就不細說了。

 

咱們來看接收MQ消息的Windows服務端,MQ首次使用都須要從新綁定Routingkey、隊列和交換器,因此我在Monitor服務裏寫了一個綁定的方法,在Windows服務端啓動的時候調用一次:

public class MQConsumerService
    {
        private readonly IApiHelperService _apiHelperService;
        private ILog _logger;

        public MQConsumerService(IApiHelperService apiHelperService,ILog logger)
        {
            _apiHelperService = apiHelperService;
            _logger = logger;
        }

        /// <summary>
        /// 發送MQ到MQ消費服務端
        /// </summary>
        /// <param name="routingKey"></param>
        /// <param name="message"></param>
        public void ProcessEvent(string routingKey, string message)
        {
            try
            {
                _logger.Info($"MQ準備執行ProcessEvent方法,RoutingKey:{routingKey} Message:{message}");
                _apiHelperService.PostAsync<ConsumerProcessEventResponse>(ServiceUrls.ConsumerProcessEvent,new ConsumerProcessEventRequest { RoutingKey=routingKey,MQBodyMessage=message});
            }
            catch(Exception ex)
            {
                _logger.Error($"MQ發送消費服務端失敗 RoutingKey:{routingKey} Message:{message}",ex);
            }
        }

        /// <summary>
        /// MQ初始化 調用隊列交換器綁定接口
        /// </summary>
        /// <returns></returns>
        public async Task MQSubscribeAsync()
        {
            try
            {
                var response= await _apiHelperService.PostAsync<MQSubscribeResponse>(ServiceUrls.MQSubscribe, new MQSubscribeRequest());
                if(!response.Successful)
                {
                    _logger.Error($"MQ綁定RoutingKey隊列失敗: {response.Message}");
                }
            }
            catch(Exception ex)
            {
                _logger.Error($"MQ綁定RoutingKey隊列失敗",ex);
            }            
        }
    }

 

這裏爲了簡單起見,交換器和隊列使用的都是同一個,路由方式是「direct」模式,以後會繼續修改的,先跑起來再說:

static void Main(string[] args)
        {
            //交換器(Exchange)
            const string BROKER_NAME = "mi_event_bus";
            //隊列(Queue)
            var SubscriptionClientName = "RabbitMQ_Bus_MI";
            //log4net日誌加載
            ILoggerRepository repository = LogManager.CreateRepository("MI.WinService.MQConsumer");
            XmlConfigurator.Configure(repository, new FileInfo("log4net.config"));
            ILog log = LogManager.GetLogger(repository.Name, "MI.WinService.MQConsumer");
            //依賴注入加載
            IServiceCollection serviceCollection = new ServiceCollection();
            //WebApi調用類
            serviceCollection.AddTransient<IApiHelperService, ApiHelperService>();
            var serviceProvider = serviceCollection.AddHttpClient().BuildServiceProvider();
            serviceProvider.GetService<ILogger>();
            var apiHelperService = serviceProvider.GetService<IApiHelperService>();
            //MQ消費類(發送MQ消息調用接口、綁定隊列交換器)
            MQConsumerService consumerService = new MQConsumerService(apiHelperService,log);

            //MQ鏈接類
            ConnectionFactory factory = new ConnectionFactory
            {
                UserName = "",
                Password = "",
                HostName = ""
            };

            var connection = factory.CreateConnection();
            var channel = connection.CreateModel();

            channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");

            channel.QueueDeclare(queue: SubscriptionClientName, durable: true, exclusive: false, autoDelete: false, arguments: null);

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (ch, ea) =>
            {
                //發送到MQ消費服務端
                var message = Encoding.UTF8.GetString(ea.Body);
                log.Info($"MQ準備消費消息 RoutingKey:{ea.RoutingKey} Message:{message}");

                //發送到MQ消費服務端MQStationServer
                Task result= Task.Run(() =>
                {
                    consumerService.ProcessEvent(ea.RoutingKey, message);
                });
                if(!result.IsFaulted)
                {
                    //確認ack
                    channel.BasicAck(ea.DeliveryTag, false);
                }
            };
            channel.BasicConsume(SubscriptionClientName, false, consumer);
            Console.WriteLine("消費者已啓動!");

            //綁定隊列RoutingKey
            Task taskResult= Task.Run(async() =>
            {
                await consumerService.MQSubscribeAsync();
            });

            taskResult.Wait();


            Console.WriteLine("隊列RoutingKey綁定完成!");

            Console.ReadKey();
            channel.Dispose();
            connection.Close();
        }

 

最後梳理下消費端消費MQ流程:

MQ發佈後,Windows服務端會受到MQ消息,而後經過調用接口將消息發送到MQ消費服務端,經過RoutingKey從數據庫查找對應的MQ和接口配置,調用指定接口,固然,這裏只是簡單的代碼列子,想用在生產中必需要作好完善的日誌調用記錄、性能監控、健康檢查以及服務器層面的集羣防止單點故障。

相關文章
相關標籤/搜索