以前使用MQ的時候是經過封裝成dll發佈Nuget包來使用,消息的發佈和消費都耦合在使用的站點和服務裏,這樣會形成兩個問題:數據庫
1.增長服務和站點的壓力,由於每次消息的消費就意味着接口的調用,這部分的壓力都加在了使用的站點和服務的機器上。api
2.增長修改的複雜性,若是咱們須要加兩條消費日誌,都須要再發佈一個版本從新經過dll引用。服務器
因此咱們須要作如下兩方面的工做:async
1.MQ的接收拆分爲Windows服務,經過zokeerper實現主從防止單點故障。ide
2.MQ的消費這裏作成單獨的WebApi服務。性能
這樣作的好處有如下幾方面:ui
1.解耦。MQ的消費從使用的站點和服務中被拆分出來,減輕服務壓力。spa
2.增長程序的可維護和可調試性。調試
3.單獨部署提升吞吐量。日誌
首先咱們先來看下MQ的消費服務端,其實就是把以前調接口的方法單獨放到了WebApi中,這樣能夠單獨部署,減輕服務器壓力:
/// <summary> /// MQ消費到指定的服務接口 /// </summary> [HttpPost] public async Task<ConsumerProcessEventResponse> ConsumerProcessEventAsync([FromBody]ConsumerProcessEventRequest request) { ConsumerProcessEventResponse response = new ConsumerProcessEventResponse(); try { _logger.LogInformation($"MQ準備執行ConsumerProcessEvent方法,RoutingKey:{request.RoutingKey} Message:{request.MQBodyMessage}"); using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) { //獲取綁定該routingKey的服務地址集合 var subscriptions = await StackRedis.Current.GetAllList(request.RoutingKey); if (!subscriptions.Any()) { //若是Redis中不存在 則從數據庫中查詢 加入Redis中 var queryRoutingKeyApiUrlResponse = _apiHelperService.PostAsync<QueryRoutingKeyApiUrlResponse>(ServiceAddress.QueryRoutingKeyApiUrlAsync, new QueryRoutingKeyApiUrlRequest { RoutingKey = request.RoutingKey }); if (queryRoutingKeyApiUrlResponse.Result != null && queryRoutingKeyApiUrlResponse.Result.ApiUrlList.Any()) { subscriptions = queryRoutingKeyApiUrlResponse.Result.ApiUrlList; Task.Run(() => { StackRedis.Current.SetLists(request.RoutingKey, queryRoutingKeyApiUrlResponse.Result.ApiUrlList); }); } } if(subscriptions!=null && subscriptions.Any()) { foreach (var apiUrl in subscriptions) { Task.Run(() => { _logger.LogInformation(request.MQBodyMessage); }); //這裏須要作判斷 假如MQ要發送到多個服務接口 其中一個消費失敗 應該將其單獨記錄到數據庫 而不影響這個消息的確認
//先作個備註 之後添加這個處理
await _apiHelperService.PostAsync(apiUrl, request.MQBodyMessage); } _logger.LogInformation($"MQ執行ProcessEvent方法完成,RoutingKey:{request.RoutingKey} Message:{request.MQBodyMessage}"); } } } catch(Exception ex) { response.Successful = false; response.Message = ex.Message; _logger.LogError(ex, $"MQ消費失敗 RoutingKey:{request.RoutingKey} Message:{request.MQBodyMessage}"); } return response; }
這個WebApi只有這一個方法,就是根據RoutingKey查找對應的MQ配置,而後根據配置的接口地址調用指定的接口,比較簡單哈,以前也寫過,就不細說了。
咱們來看接收MQ消息的Windows服務端,MQ首次使用都須要從新綁定Routingkey、隊列和交換器,因此我在Monitor服務裏寫了一個綁定的方法,在Windows服務端啓動的時候調用一次:
public class MQConsumerService { private readonly IApiHelperService _apiHelperService; private ILog _logger; public MQConsumerService(IApiHelperService apiHelperService,ILog logger) { _apiHelperService = apiHelperService; _logger = logger; } /// <summary> /// 發送MQ到MQ消費服務端 /// </summary> /// <param name="routingKey"></param> /// <param name="message"></param> public void ProcessEvent(string routingKey, string message) { try { _logger.Info($"MQ準備執行ProcessEvent方法,RoutingKey:{routingKey} Message:{message}"); _apiHelperService.PostAsync<ConsumerProcessEventResponse>(ServiceUrls.ConsumerProcessEvent,new ConsumerProcessEventRequest { RoutingKey=routingKey,MQBodyMessage=message}); } catch(Exception ex) { _logger.Error($"MQ發送消費服務端失敗 RoutingKey:{routingKey} Message:{message}",ex); } } /// <summary> /// MQ初始化 調用隊列交換器綁定接口 /// </summary> /// <returns></returns> public async Task MQSubscribeAsync() { try { var response= await _apiHelperService.PostAsync<MQSubscribeResponse>(ServiceUrls.MQSubscribe, new MQSubscribeRequest()); if(!response.Successful) { _logger.Error($"MQ綁定RoutingKey隊列失敗: {response.Message}"); } } catch(Exception ex) { _logger.Error($"MQ綁定RoutingKey隊列失敗",ex); } } }
這裏爲了簡單起見,交換器和隊列使用的都是同一個,路由方式是「direct」模式,以後會繼續修改的,先跑起來再說:
static void Main(string[] args) { //交換器(Exchange) const string BROKER_NAME = "mi_event_bus"; //隊列(Queue) var SubscriptionClientName = "RabbitMQ_Bus_MI"; //log4net日誌加載 ILoggerRepository repository = LogManager.CreateRepository("MI.WinService.MQConsumer"); XmlConfigurator.Configure(repository, new FileInfo("log4net.config")); ILog log = LogManager.GetLogger(repository.Name, "MI.WinService.MQConsumer"); //依賴注入加載 IServiceCollection serviceCollection = new ServiceCollection(); //WebApi調用類 serviceCollection.AddTransient<IApiHelperService, ApiHelperService>(); var serviceProvider = serviceCollection.AddHttpClient().BuildServiceProvider(); serviceProvider.GetService<ILogger>(); var apiHelperService = serviceProvider.GetService<IApiHelperService>(); //MQ消費類(發送MQ消息調用接口、綁定隊列交換器) MQConsumerService consumerService = new MQConsumerService(apiHelperService,log); //MQ鏈接類 ConnectionFactory factory = new ConnectionFactory { UserName = "", Password = "", HostName = "" }; var connection = factory.CreateConnection(); var channel = connection.CreateModel(); channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); channel.QueueDeclare(queue: SubscriptionClientName, durable: true, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (ch, ea) => { //發送到MQ消費服務端 var message = Encoding.UTF8.GetString(ea.Body); log.Info($"MQ準備消費消息 RoutingKey:{ea.RoutingKey} Message:{message}"); //發送到MQ消費服務端MQStationServer Task result= Task.Run(() => { consumerService.ProcessEvent(ea.RoutingKey, message); }); if(!result.IsFaulted) { //確認ack channel.BasicAck(ea.DeliveryTag, false); } }; channel.BasicConsume(SubscriptionClientName, false, consumer); Console.WriteLine("消費者已啓動!"); //綁定隊列RoutingKey Task taskResult= Task.Run(async() => { await consumerService.MQSubscribeAsync(); }); taskResult.Wait(); Console.WriteLine("隊列RoutingKey綁定完成!"); Console.ReadKey(); channel.Dispose(); connection.Close(); }
最後梳理下消費端消費MQ流程:
MQ發佈後,Windows服務端會受到MQ消息,而後經過調用接口將消息發送到MQ消費服務端,經過RoutingKey從數據庫查找對應的MQ和接口配置,調用指定接口,固然,這裏只是簡單的代碼列子,想用在生產中必需要作好完善的日誌調用記錄、性能監控、健康檢查以及服務器層面的集羣防止單點故障。