RabbitMQ是什麼,怎麼使用我就不介紹了,你們能夠到園子裏搜一下教程。本篇的重點在於實現服務與服務之間的異步通訊。web
首先說一下爲何要使用消息隊列來實現服務通訊:1.提升接口併發能力。 2.保證服務各方數據最終一致。 3.解耦。redis
使用消息隊列通訊的優勢就是直接調用的缺點,好比在直接調用過程當中發生未知錯誤,極可能就會出現數據不一致的問題,這個時候就須要人工修補數據,若是有過這個經歷的同窗必定是可憐的,人工修補數據簡直痛苦!!再好比高併發狀況下接口直接掛點,這就更直白了,接口掛了,功能就掛了,事故報告寫起來!!而消息隊列能夠輕鬆解決上面兩個問題,接口發生錯誤,沒關係,MQ重試一下,再不行,人工重試MQ;在使用消息隊列的時候,請求實際是被串行化,簡單說就是排隊,因此不再用擔憂由於併發致使數據不一致或者接口直接掛掉的問題。數據庫
我如今公司使用的消息隊列排隊的請求最高的有上萬個,因此徹底不須要擔憂MQ的性能。json
OK,咱們來實現一下微服務裏如何使用消息隊列,主要思路是這樣的:api
【提供消費者註冊界面,用於綁定RoutingKey和隊列;消息發佈後,根據RoutingKey去Redis中查找對應的服務地址,而後異步調用。】服務器
上面這句話就是消息隊列的主體思路,也是我司如今使用的方式,話很少說,代碼敲起來。restful
首先看下咱們的項目結構:併發
首先咱們須要先建三個這樣的類庫,這裏面有些東西是用不到的,固然最最主要的就是標記出來的消息隊列部分,如今暫時提供了兩個方法,分別是發佈(Publish)和訂閱(Subscribe)。app
首先新增消息·隊列接口類IEventBus,這個未來用於在業務系統中注入使用,這裏提供了發佈訂閱方法:異步
public interface IEventBus { void Publish(string RoutingKey, object Model); void Subscribe(string QueueName, string RoutingKey); }
新增RabbitMQ操做接口類IRabbitMQPersistentConnection,這個用來檢查RabbitMQ的鏈接和釋放:
public interface IRabbitMQPersistentConnection : IDisposable { bool IsConnected { get; } bool TryConnect(); IModel CreateModel(); }
新增IRabbitMQPersistentConnection的實現類DefaultRabbitMQPersistentConnection,這個是RabbitMQ鏈接和釋放方法的具體實現,這個沒什麼可說的,你們一看就知道了,就是檢查RabbitMQ的鏈接狀態,沒有鏈接建立鏈接,發生錯誤的捕捉錯誤從新鏈接,這裏用到了Polly的從新策略:
public class DefaultRabbitMQPersistentConnection:IRabbitMQPersistentConnection { private readonly IConnectionFactory _connectionFactory; private readonly ILogger<DefaultRabbitMQPersistentConnection> _logger; private readonly int _retryCount; IConnection _connection; bool _disposed; object sync_root = new object(); public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<DefaultRabbitMQPersistentConnection> logger, int retryCount = 5) { _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _retryCount = retryCount; } public bool IsConnected { get { return _connection != null && _connection.IsOpen && !_disposed; } } public IModel CreateModel() { if (!IsConnected) { throw new InvalidOperationException("No RabbitMQ connections are available to perform this action"); } return _connection.CreateModel(); } public void Dispose() { if (_disposed) return; _disposed = true; try { _connection.Dispose(); } catch (IOException ex) { _logger.LogCritical(ex.ToString()); } } public bool TryConnect() { _logger.LogInformation("RabbitMQ Client is trying to connect"); lock (sync_root) { var policy = RetryPolicy.Handle<SocketException>() .Or<BrokerUnreachableException>() .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => { _logger.LogWarning(ex.ToString()); }); policy.Execute(() => { _connection = _connectionFactory .CreateConnection(); }); if (IsConnected) { _connection.ConnectionShutdown += OnConnectionShutdown; _connection.CallbackException += OnCallbackException; _connection.ConnectionBlocked += OnConnectionBlocked; _logger.LogInformation($"RabbitMQ persistent connection acquired a connection {_connection.Endpoint.HostName} and is subscribed to failure events"); return true; } else { _logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened"); return false; } } } private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) { if (_disposed) return; _logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect..."); TryConnect(); } void OnCallbackException(object sender, CallbackExceptionEventArgs e) { if (_disposed) return; _logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect..."); TryConnect(); } void OnConnectionShutdown(object sender, ShutdownEventArgs reason) { if (_disposed) return; _logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect..."); TryConnect(); } }
接下來是最重要,IEventBus的實現類EventBusRabbitMQ,在這個類裏咱們實現了消息的發佈、訂閱、消費,首先把代碼展現出來,而後一個一個的介紹:
public class EventBusRabbitMQ : IEventBus, IDisposable { const string BROKER_NAME = "mi_event_bus"; private readonly IRabbitMQPersistentConnection _persistentConnection; private readonly ILogger<EventBusRabbitMQ> _logger; private readonly ILifetimeScope _autofac; private readonly IApiHelperService _apiHelperService; private readonly string AUTOFAC_SCOPE_NAME = "mi_event_bus"; private readonly int _retryCount; private IModel _consumerChannel; private string _queueName; public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection,ILogger<EventBusRabbitMQ> logger, ILifetimeScope autofac, IApiHelperService apiHelperService, string queueName=null,int retryCount=5) { _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _queueName = queueName; _consumerChannel = CreateConsumerChannel(); _autofac = autofac; _retryCount = retryCount; _apiHelperService = apiHelperService; } /// <summary> /// 發佈消息 /// </summary> public void Publish(string routingKey,object Model) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } var policy = RetryPolicy.Handle<BrokerUnreachableException>() .Or<SocketException>() .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => { _logger.LogWarning(ex.ToString()); }); using (var channel = _persistentConnection.CreateModel()) { channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); var message = JsonConvert.SerializeObject(Model); var body = Encoding.UTF8.GetBytes(message); policy.Execute(() => { var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; //持久化 channel.BasicPublish(exchange: BROKER_NAME, routingKey: routingKey, mandatory: true, basicProperties: properties, body: body); }); } } /// <summary> /// 訂閱(綁定RoutingKey和隊列) /// </summary> public void Subscribe(string QueueName, string RoutingKey) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } using (var channel = _persistentConnection.CreateModel()) { channel.QueueBind(queue: QueueName, exchange: BROKER_NAME, routingKey: RoutingKey); } } /// <summary> /// 建立消費者並投遞消息 /// </summary> /// <returns></returns> private IModel CreateConsumerChannel() { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } var channel = _persistentConnection.CreateModel(); channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += async (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); await ProcessEvent(ea.RoutingKey, message); channel.BasicAck(ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer); channel.CallbackException += (sender, ea) => { _consumerChannel.Dispose(); _consumerChannel = CreateConsumerChannel(); }; return channel; } /// <summary> /// 發送MQ數據到指定服務接口 /// </summary> private async Task ProcessEvent(string routingKey, string message) { using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) { //獲取綁定該routingKey的服務地址集合 var subscriptions = await StackRedis.Current.GetAllList(routingKey); foreach(var apiUrl in subscriptions) { _logger.LogInformation(message); await _apiHelperService.PostAsync(apiUrl, message); } } } public void Dispose() { _consumerChannel?.Dispose(); } }
首先是發佈方法,接受一個字符串類型的RoutingKey和Object類型的MQ數據,而後根據RoutingKey將數據發佈到指定的隊列,這裏RoutingKey發佈到隊列的方式用的是direct模式,生產環境下咱們一般會使用Topic模式,後面真正使用的時候這裏也會改掉;同時在MQ發佈方面也採用了Polly的重試策略。
接下來是訂閱Subscribe方法,這個比較簡單,就是包RoutingKey和Queue進行綁定,這裏會提供一個專門的註冊界面,用於配置RoutingKey、Queue、ExChange和服務接口地址之間的對應關係,用的就是這個方法。
using (var channel = _persistentConnection.CreateModel()) { channel.QueueBind(queue: QueueName, exchange: BROKER_NAME, routingKey: RoutingKey); }
而後是消費者的建立和消費方式方法CreateConsumerChannel,這個是最重要一個,在這個方法裏真正實現了消息的消費,消息的消費經過委託實現,咱們須要關注的是下面這個地方:
var channel = _persistentConnection.CreateModel(); channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += async (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); await ProcessEvent(ea.RoutingKey, message); channel.BasicAck(ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer);
解釋下這段代碼,首先建立消息通道,併爲它綁定交換器Exchange和隊列Queue,而後在這條消息通道上建立消費者Consumer,爲這個消費者的接受消息的委託註冊一個處理方法。
當消息被路由到當前隊列Queue上時,就會觸發這個消息的處理方法,處理完成後,自動發送ack確認。
ProcessEvent是消息的具體處理方法,大致流程是這樣的,它接受一個RoutingKey和消息數據message,根據RoutingKey從Redis中拿到對應的服務地址,咱們前面說過會有一個專門的頁面用於綁定RoutingKey和服務地址的關係,拿到地址集合以後循環調用,即Api調用。
/// <summary> /// 發送MQ到指定服務接口 /// </summary> private async Task ProcessEvent(string routingKey, string message) { using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) { //獲取綁定該routingKey的服務地址集合 var subscriptions = await StackRedis.Current.GetAllList(routingKey); foreach(var apiUrl in subscriptions) { _logger.LogInformation(message); await _apiHelperService.PostAsync(apiUrl, message); } } }
這裏用到了Api調用的幫助類,前面已經寫過了,只不過把它放到了這個公共的地方,仍是貼下代碼:
public interface IApiHelperService { Task<T> PostAsync<T>(string url, object Model); Task<T> GetAsync<T>(string url); Task PostAsync(string url, string requestMessage); }
public class ApiHelperService : IApiHelperService { private readonly IHttpClientFactory _httpClientFactory; private readonly ILogger<ApiHelperService> _logger; public ApiHelperService(ILogger<ApiHelperService> _logger, IHttpClientFactory _httpClientFactory) { this._httpClientFactory = _httpClientFactory; this._logger = _logger; } /// <summary> /// HttpClient實現Post請求 /// </summary> public async Task<T> PostAsync<T>(string url, object Model) { var http = _httpClientFactory.CreateClient("MI.Web"); //添加Token var token = await GetToken(); http.SetBearerToken(token); //使用FormUrlEncodedContent作HttpContent var httpContent = new StringContent(JsonConvert.SerializeObject(Model), Encoding.UTF8, "application/json"); //await異步等待迴應 var response = await http.PostAsync(url, httpContent); //確保HTTP成功狀態值 response.EnsureSuccessStatusCode(); //await異步讀取 string Result = await response.Content.ReadAsStringAsync(); var Item = JsonConvert.DeserializeObject<T>(Result); return Item; } /// <summary> /// HttpClient實現Post請求(用於MQ發佈功能 無返回) /// </summary> public async Task PostAsync(string url, string requestMessage) { var http = _httpClientFactory.CreateClient(); //添加Token var token = await GetToken(); http.SetBearerToken(token); //使用FormUrlEncodedContent作HttpContent var httpContent = new StringContent(requestMessage, Encoding.UTF8, "application/json"); //await異步等待迴應 var response = await http.PostAsync(url, httpContent); //確保HTTP成功狀態值 response.EnsureSuccessStatusCode(); } /// <summary> /// HttpClient實現Get請求 /// </summary> public async Task<T> GetAsync<T>(string url) { var http = _httpClientFactory.CreateClient("MI.Web"); //添加Token var token = await GetToken(); http.SetBearerToken(token); //await異步等待迴應 var response = await http.GetAsync(url); //確保HTTP成功狀態值 response.EnsureSuccessStatusCode(); var Result = await response.Content.ReadAsStringAsync(); var Items = JsonConvert.DeserializeObject<T>(Result); return Items; } /// <summary> /// 轉換URL /// </summary> /// <param name="str"></param> /// <returns></returns> public static string UrlEncode(string str) { StringBuilder sb = new StringBuilder(); byte[] byStr = System.Text.Encoding.UTF8.GetBytes(str); for (int i = 0; i < byStr.Length; i++) { sb.Append(@"%" + Convert.ToString(byStr[i], 16)); } return (sb.ToString()); } //獲取Token //獲取Token public async Task<string> GetToken() { var client = _httpClientFactory.CreateClient("MI.Web"); string token = await Untity.StackRedis.Current.Get("ApiToken"); if (!string.IsNullOrEmpty(token)) { return token; } try { //DiscoveryClient類:IdentityModel提供給咱們經過基礎地址(如:http://localhost:5000)就能夠訪問令牌服務端; //固然能夠根據上面的restful api裏面的url自行構建;上面就是經過基礎地址,獲取一個TokenClient;(對應restful的url:token_endpoint "http://localhost:5000/connect/token") //RequestClientCredentialsAsync方法:請求令牌; //獲取令牌後,就能夠經過構建http請求訪問API接口;這裏使用HttpClient構建請求,獲取內容; var cache = new DiscoveryCache("http://localhost:7000"); var disco = await cache.GetAsync(); if (disco.IsError) throw new Exception(disco.Error); var tokenResponse = await client.RequestClientCredentialsTokenAsync(new ClientCredentialsTokenRequest { Address = disco.TokenEndpoint, ClientId = "MI.Web", ClientSecret = "miwebsecret", Scope = "MI.Service" }); if (tokenResponse.IsError) { throw new Exception(tokenResponse.Error); } token = tokenResponse.AccessToken; await Untity.StackRedis.Current.Set("ApiToken", token, (int)TimeSpan.FromSeconds(tokenResponse.ExpiresIn).TotalMinutes); } catch (Exception ex) { throw new Exception(ex.Message); } return token; } }
而後Redis幫助類的代碼也貼一下,Redis這裏你們能夠根據本身習慣,如何使用沒什麼區別:
public class StackRedis : IDisposable { #region 配置屬性 基於 StackExchange.Redis 封裝 //鏈接串 (注:IP:端口,屬性=,屬性=) //public string _ConnectionString = "47.99.92.76:6379,password=shenniubuxing3"; public string _ConnectionString = "47.99.92.76:6379"; //操做的庫(注:默認0庫) public int _Db = 0; #endregion #region 管理器對象 /// <summary> /// 獲取redis操做類對象 /// </summary> private static StackRedis _StackRedis; private static object _locker_StackRedis = new object(); public static StackRedis Current { get { if (_StackRedis == null) { lock (_locker_StackRedis) { _StackRedis = _StackRedis ?? new StackRedis(); return _StackRedis; } } return _StackRedis; } } /// <summary> /// 獲取併發連接管理器對象 /// </summary> private static ConnectionMultiplexer _redis; private static object _locker = new object(); public ConnectionMultiplexer Manager { get { if (_redis == null) { lock (_locker) { _redis = _redis ?? GetManager(_ConnectionString); return _redis; } } return _redis; } } /// <summary> /// 獲取連接管理器 /// </summary> /// <param name="connectionString"></param> /// <returns></returns> public ConnectionMultiplexer GetManager(string connectionString) { return ConnectionMultiplexer.Connect(connectionString); } /// <summary> /// 獲取操做數據庫對象 /// </summary> /// <returns></returns> public IDatabase GetDb() { return Manager.GetDatabase(_Db); } #endregion #region 操做方法 #region string 操做 /// <summary> /// 根據Key移除 /// </summary> /// <param name="key"></param> /// <returns></returns> public async Task<bool> Remove(string key) { var db = this.GetDb(); return await db.KeyDeleteAsync(key); } /// <summary> /// 根據key獲取string結果 /// </summary> /// <param name="key"></param> /// <returns></returns> public async Task<string> Get(string key) { var db = this.GetDb(); return await db.StringGetAsync(key); } /// <summary> /// 根據key獲取string中的對象 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="key"></param> /// <returns></returns> public async Task<T> Get<T>(string key) { var t = default(T); try { var _str = await this.Get(key); if (string.IsNullOrWhiteSpace(_str)) { return t; } t = JsonConvert.DeserializeObject<T>(_str); } catch (Exception ex) { } return t; } /// <summary> /// 存儲string數據 /// </summary> /// <param name="key"></param> /// <param name="value"></param> /// <param name="expireMinutes"></param> /// <returns></returns> public async Task<bool> Set(string key, string value, int expireMinutes = 0) { var db = this.GetDb(); if (expireMinutes > 0) { return db.StringSet(key, value, TimeSpan.FromMinutes(expireMinutes)); } return await db.StringSetAsync(key, value); } /// <summary> /// 存儲對象數據到string /// </summary> /// <typeparam name="T"></typeparam> /// <param name="key"></param> /// <param name="value"></param> /// <param name="expireMinutes"></param> /// <returns></returns> public async Task<bool> Set<T>(string key, T value, int expireMinutes = 0) { try { var jsonOption = new JsonSerializerSettings() { ReferenceLoopHandling = ReferenceLoopHandling.Ignore }; var _str = JsonConvert.SerializeObject(value, jsonOption); if (string.IsNullOrWhiteSpace(_str)) { return false; } return await this.Set(key, _str, expireMinutes); } catch (Exception ex) { } return false; } #endregion #region List操做(注:能夠當作隊列使用) /// <summary> /// list長度 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="key"></param> /// <returns></returns> public async Task<long> GetListLen<T>(string key) { try { var db = this.GetDb(); return await db.ListLengthAsync(key); } catch (Exception ex) { } return 0; } /// <summary> /// 獲取隊列出口數據並移除 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="key"></param> /// <returns></returns> public async Task<T> GetListAndPop<T>(string key) { var t = default(T); try { var db = this.GetDb(); var _str = await db.ListRightPopAsync(key); if (string.IsNullOrWhiteSpace(_str)) { return t; } t = JsonConvert.DeserializeObject<T>(_str); } catch (Exception ex) { } return t; } /// <summary> /// 集合對象添加到list左邊 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="key"></param> /// <param name="values"></param> /// <returns></returns> public async Task<long> SetLists<T>(string key, List<T> values) { var result = 0L; try { var jsonOption = new JsonSerializerSettings() { ReferenceLoopHandling = ReferenceLoopHandling.Ignore }; var db = this.GetDb(); foreach (var item in values) { var _str = JsonConvert.SerializeObject(item, jsonOption); result += await db.ListLeftPushAsync(key, _str); } return result; } catch (Exception ex) { } return result; } /// <summary> /// 單個對象添加到list左邊 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="key"></param> /// <param name="value"></param> /// <returns></returns> public async Task<long> SetList<T>(string key, T value) { var result = 0L; try { result = await this.SetLists(key, new List<T> { value }); } catch (Exception ex) { } return result; } /// <summary> /// 獲取List全部數據 /// </summary> public async Task<List<string>> GetAllList(string list) { var db = this.GetDb(); var redisList = await db.ListRangeAsync(list); List<string> listMembers = new List<string>(); foreach (var item in redisList) { listMembers.Add(JsonConvert.DeserializeObject<string>(item)); } return listMembers; } #endregion #region 額外擴展 /// <summary> /// 手動回收管理器對象 /// </summary> public void Dispose() { this.Dispose(_redis); } public void Dispose(ConnectionMultiplexer con) { if (con != null) { con.Close(); con.Dispose(); } } #endregion #endregion }
OK,核心代碼部分介紹到這裏,具體來看怎麼使用,推送當前類庫到本身的Nuget包,不知道怎麼建Nuget服務器的能夠看下我以前的那篇文章。
打開MI.Web項目,在Startup中註冊RabbitMQ的相關信息:
/// <summary> /// 消息總線RabbitMQ /// </summary> private void RegisterEventBus(IServiceCollection services) { #region 加載RabbitMQ帳戶 services.AddSingleton<IRabbitMQPersistentConnection>(sp => { var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>(); var factory = new ConnectionFactory() { HostName = Configuration["EventBusConnection"] }; if (!string.IsNullOrEmpty(Configuration["EventBusUserName"])) { factory.UserName = Configuration["EventBusUserName"]; } if (!string.IsNullOrEmpty(Configuration["EventBusPassword"])) { factory.Password = Configuration["EventBusPassword"]; } var retryCount = 5; if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) { retryCount = int.Parse(Configuration["EventBusRetryCount"]); } return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); }); #endregion var subscriptionClientName = Configuration["SubscriptionClientName"]; services.AddSingleton<IEventBus, EventBusRabbitMQ.EventBusRabbitMQ>(sp => { var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>(); var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>(); var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ.EventBusRabbitMQ>>(); var apiHelper = sp.GetRequiredService<IApiHelperService>(); var retryCount = 5; if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) { retryCount = int.Parse(Configuration["EventBusRetryCount"]); } return new EventBusRabbitMQ.EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, apiHelper, subscriptionClientName, retryCount); }); }
這裏暫時還沒作出專門用於註冊RoutingKey的界面,因此暫時用在這裏用方法註冊下,後面再修改,這裏的RoutingKey用於用戶註冊使用:
//綁定RoutingKey與隊列 private void ConfigureEventBus(IApplicationBuilder app) { var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>(); eventBus.Subscribe(Configuration["SubscriptionClientName"], "UserRegister"); }
上面用的都是appsettings.json裏的配置,貼下代碼,標藍的部分是須要用到的:
{ "Logging": { "IncludeScopes": false, "LogLevel": { "Default": "Warning" } }, "ConnectionStrings": { "ElasticSearchServerAddress": "", "Redis": "47.99.92.76:6379" }, "ServiceAddress": { "Service.Identity": "http://localhost:7000", "Service.Account": "http://localhost:7001", "Service.Ocelot": "http://localhost:7003", "Service.Picture": "http://localhost:7005" }, "MehtodName": { "Account.MiUser.SSOLogin": "/Account/MiUser/SSOLogin", //登陸 "Identity.Connect.Token": "/connect/token", //獲取token "Picture.QueryPicture.QueryStartProduct": "/Picture/QueryPicture/QueryStartProduct", //查詢明星產品 "Picture.QueryPicture.QuerySlideImg": "/Picture/QueryPicture/QuerySlideImg", //查詢輪播圖 "Picture.QueryPicture.QueryHadrWare": "/Picture/QueryPicture/QueryHadrWare" //查詢智能硬件表數據 }, "EventBusConnection": "******", //RabbitMQ地址 "EventBusUserName": "guest", "EventBusPassword": "guest", "EventBusRetryCount": 5, "SubscriptionClientName": "RabbitMQ_Bus_MI" }
OK,配置部分算是完成了,接下咱們就要去發送MQ了,咱們這裏使用IEventBus對象調用發佈方法,用於發送用戶的註冊信息,最終最調用新增用戶接口:
private readonly IEventBus _eventBus; public LoginController(IEventBus _eventBus) { this._eventBus = _eventBus; } public JsonResult RegisterUser(string UserName, string UserPwd) { try { if (!string.IsNullOrEmpty(UserName) && !string.IsNullOrEmpty(UserPwd)) { RegisterRequest request = new RegisterRequest { UserName = UserName, Password = UserPwd }; _eventBus.Publish("UserRegister", request); } } catch (Exception ex) { _logger.LogError(ex, "註冊失敗!"); } return Json(""); }
最終會新增當前傳入的用戶信息。
固然,這不是消息隊列的最終使用方式,後面會繼續修改,這裏的問題在於發佈和消費都耦合再了業務層,對於業務系統來講這是一種負擔,舉個例子,咱們公司當前隊列消息多的能達到上百萬個,若是把消息的消費和業務系統放在一塊兒可能會影響,因此使用的時候會把消費端單獨拿出來作成Windows服務,並添加自動重試和補償機制,畢竟RabbitMQ也不是沒有錯誤的,好比調用Api出現問題,遲遲沒法返回ack確認,這個時候就會報出 wait ack timeout的錯誤。
OK,今天先到這裏,我去煮包泡麪吃。。。