.Net Core 商城微服務項目系列(七):使用消息隊列(RabbitMQ)實現服務異步通訊

RabbitMQ是什麼,怎麼使用我就不介紹了,你們能夠到園子裏搜一下教程。本篇的重點在於實現服務與服務之間的異步通訊。web

首先說一下爲何要使用消息隊列來實現服務通訊:1.提升接口併發能力。  2.保證服務各方數據最終一致。  3.解耦。redis

使用消息隊列通訊的優勢就是直接調用的缺點,好比在直接調用過程當中發生未知錯誤,極可能就會出現數據不一致的問題,這個時候就須要人工修補數據,若是有過這個經歷的同窗必定是可憐的,人工修補數據簡直痛苦!!再好比高併發狀況下接口直接掛點,這就更直白了,接口掛了,功能就掛了,事故報告寫起來!!而消息隊列能夠輕鬆解決上面兩個問題,接口發生錯誤,沒關係,MQ重試一下,再不行,人工重試MQ;在使用消息隊列的時候,請求實際是被串行化,簡單說就是排隊,因此不再用擔憂由於併發致使數據不一致或者接口直接掛掉的問題。數據庫

我如今公司使用的消息隊列排隊的請求最高的有上萬個,因此徹底不須要擔憂MQ的性能。json

 

OK,咱們來實現一下微服務裏如何使用消息隊列,主要思路是這樣的:api

        【提供消費者註冊界面,用於綁定RoutingKey和隊列;消息發佈後,根據RoutingKey去Redis中查找對應的服務地址,而後異步調用。】服務器

上面這句話就是消息隊列的主體思路,也是我司如今使用的方式,話很少說,代碼敲起來。restful

首先看下咱們的項目結構:併發

首先咱們須要先建三個這樣的類庫,這裏面有些東西是用不到的,固然最最主要的就是標記出來的消息隊列部分,如今暫時提供了兩個方法,分別是發佈(Publish)和訂閱(Subscribe)。app

首先新增消息·隊列接口類IEventBus,這個未來用於在業務系統中注入使用,這裏提供了發佈訂閱方法:異步

    public interface IEventBus
    {
        void Publish(string RoutingKey, object Model);

        void Subscribe(string QueueName, string RoutingKey);
    }

新增RabbitMQ操做接口類IRabbitMQPersistentConnection,這個用來檢查RabbitMQ的鏈接和釋放:

    public interface IRabbitMQPersistentConnection : IDisposable
    {
        bool IsConnected { get; }

        bool TryConnect();

        IModel CreateModel();
    }

新增IRabbitMQPersistentConnection的實現類DefaultRabbitMQPersistentConnection,這個是RabbitMQ鏈接和釋放方法的具體實現,這個沒什麼可說的,你們一看就知道了,就是檢查RabbitMQ的鏈接狀態,沒有鏈接建立鏈接,發生錯誤的捕捉錯誤從新鏈接,這裏用到了Polly的從新策略:

public class DefaultRabbitMQPersistentConnection:IRabbitMQPersistentConnection
    {
        private readonly IConnectionFactory _connectionFactory;
        private readonly ILogger<DefaultRabbitMQPersistentConnection> _logger;
        private readonly int _retryCount;
        IConnection _connection;
        bool _disposed;

        object sync_root = new object();

        public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<DefaultRabbitMQPersistentConnection> logger, int retryCount = 5)
        {
            _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
            _logger = logger ?? throw new ArgumentNullException(nameof(logger));
            _retryCount = retryCount;
        }

        public bool IsConnected
        {
            get
            {
                return _connection != null && _connection.IsOpen && !_disposed;
            }
        }

        public IModel CreateModel()
        {
            if (!IsConnected)
            {
                throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");
            }

            return _connection.CreateModel();
        }

        public void Dispose()
        {
            if (_disposed) return;

            _disposed = true;

            try
            {
                _connection.Dispose();
            }
            catch (IOException ex)
            {
                _logger.LogCritical(ex.ToString());
            }
        }


        public bool TryConnect()
        {
            _logger.LogInformation("RabbitMQ Client is trying to connect");

            lock (sync_root)
            {
                var policy = RetryPolicy.Handle<SocketException>()
                    .Or<BrokerUnreachableException>()
                    .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
                    {
                        _logger.LogWarning(ex.ToString());
                    });

                policy.Execute(() =>
                {
                    _connection = _connectionFactory
                          .CreateConnection();
                });

                if (IsConnected)
                {
                    _connection.ConnectionShutdown += OnConnectionShutdown;
                    _connection.CallbackException += OnCallbackException;
                    _connection.ConnectionBlocked += OnConnectionBlocked;

                    _logger.LogInformation($"RabbitMQ persistent connection acquired a connection {_connection.Endpoint.HostName} and is subscribed to failure events");

                    return true;
                }
                else
                {
                    _logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened");

                    return false;
                }
            }
        }

        private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
        {
            if (_disposed) return;

            _logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect...");

            TryConnect();
        }

        void OnCallbackException(object sender, CallbackExceptionEventArgs e)
        {
            if (_disposed) return;

            _logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect...");

            TryConnect();
        }

        void OnConnectionShutdown(object sender, ShutdownEventArgs reason)
        {
            if (_disposed) return;

            _logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect...");

            TryConnect();
        }

    }

接下來是最重要,IEventBus的實現類EventBusRabbitMQ,在這個類裏咱們實現了消息的發佈、訂閱、消費,首先把代碼展現出來,而後一個一個的介紹:

public class EventBusRabbitMQ : IEventBus, IDisposable
    {
        const string BROKER_NAME = "mi_event_bus";
        private readonly IRabbitMQPersistentConnection _persistentConnection;
        private readonly ILogger<EventBusRabbitMQ> _logger;
        private readonly ILifetimeScope _autofac;
        private readonly IApiHelperService _apiHelperService;
        private readonly string AUTOFAC_SCOPE_NAME = "mi_event_bus";
        private readonly int _retryCount;

        private IModel _consumerChannel;
        private string _queueName;


        public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection,ILogger<EventBusRabbitMQ> logger,
            ILifetimeScope autofac, IApiHelperService apiHelperService, string queueName=null,int retryCount=5)
        {
            _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
            _logger = logger ?? throw new ArgumentNullException(nameof(logger));
            _queueName = queueName;
            _consumerChannel = CreateConsumerChannel();
            _autofac = autofac;
            _retryCount = retryCount;
            _apiHelperService = apiHelperService;
        }


        /// <summary>
        /// 發佈消息
        /// </summary>
        public void Publish(string routingKey,object Model)
        {
            if (!_persistentConnection.IsConnected)
            {
                _persistentConnection.TryConnect();
            }

            var policy = RetryPolicy.Handle<BrokerUnreachableException>()
                .Or<SocketException>()
                .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
                {
                    _logger.LogWarning(ex.ToString());
                });

            using (var channel = _persistentConnection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");
                var message = JsonConvert.SerializeObject(Model);
                var body = Encoding.UTF8.GetBytes(message);

                policy.Execute(() =>
                {
                    var properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 2; //持久化

                    channel.BasicPublish(exchange: BROKER_NAME, routingKey: routingKey, mandatory: true, basicProperties: properties, body: body);
                });
            }
        }

        /// <summary>
        /// 訂閱(綁定RoutingKey和隊列)
        /// </summary>
        public void Subscribe(string QueueName, string RoutingKey)
        {
            if (!_persistentConnection.IsConnected)
            {
                _persistentConnection.TryConnect();
            }

            using (var channel = _persistentConnection.CreateModel())
            {
                channel.QueueBind(queue: QueueName, exchange: BROKER_NAME, routingKey: RoutingKey);
            }
        }

        /// <summary>
        /// 建立消費者並投遞消息
        /// </summary>
        /// <returns></returns>
        private IModel CreateConsumerChannel()
        {
            if (!_persistentConnection.IsConnected)
            {
                _persistentConnection.TryConnect();
            }

            var channel = _persistentConnection.CreateModel();

            channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");

            channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += async (model, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body);

                await ProcessEvent(ea.RoutingKey, message);

                channel.BasicAck(ea.DeliveryTag, multiple: false);
            };

            channel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer);

            channel.CallbackException += (sender, ea) =>
            {
                _consumerChannel.Dispose();
                _consumerChannel = CreateConsumerChannel();
            };

            return channel;
        }

        /// <summary>
        /// 發送MQ數據到指定服務接口
        /// </summary>
        private async Task ProcessEvent(string routingKey, string message)
        {
            using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
            {
                //獲取綁定該routingKey的服務地址集合
                var subscriptions = await StackRedis.Current.GetAllList(routingKey);
                foreach(var apiUrl in subscriptions)
                {
                    _logger.LogInformation(message);
                    await _apiHelperService.PostAsync(apiUrl, message);
                }
            }
        }

        public void Dispose()
        {
            _consumerChannel?.Dispose();
        }
    }

 

首先是發佈方法,接受一個字符串類型的RoutingKey和Object類型的MQ數據,而後根據RoutingKey將數據發佈到指定的隊列,這裏RoutingKey發佈到隊列的方式用的是direct模式,生產環境下咱們一般會使用Topic模式,後面真正使用的時候這裏也會改掉;同時在MQ發佈方面也採用了Polly的重試策略。

 

接下來是訂閱Subscribe方法,這個比較簡單,就是包RoutingKey和Queue進行綁定,這裏會提供一個專門的註冊界面,用於配置RoutingKey、Queue、ExChange和服務接口地址之間的對應關係,用的就是這個方法。

            using (var channel = _persistentConnection.CreateModel())
            {
                channel.QueueBind(queue: QueueName, exchange: BROKER_NAME, routingKey: RoutingKey);
            }

 

而後是消費者的建立和消費方式方法CreateConsumerChannel,這個是最重要一個,在這個方法裏真正實現了消息的消費,消息的消費經過委託實現,咱們須要關注的是下面這個地方:

            var channel = _persistentConnection.CreateModel();

            channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");

            channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += async (model, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body);

                await ProcessEvent(ea.RoutingKey, message);

                channel.BasicAck(ea.DeliveryTag, multiple: false);
            };

            channel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer);

 

解釋下這段代碼,首先建立消息通道,併爲它綁定交換器Exchange和隊列Queue,而後在這條消息通道上建立消費者Consumer,爲這個消費者的接受消息的委託註冊一個處理方法。

當消息被路由到當前隊列Queue上時,就會觸發這個消息的處理方法,處理完成後,自動發送ack確認。

ProcessEvent是消息的具體處理方法,大致流程是這樣的,它接受一個RoutingKey和消息數據message,根據RoutingKey從Redis中拿到對應的服務地址,咱們前面說過會有一個專門的頁面用於綁定RoutingKey和服務地址的關係,拿到地址集合以後循環調用,即Api調用。

        /// <summary>
        /// 發送MQ到指定服務接口
        /// </summary>
        private async Task ProcessEvent(string routingKey, string message)
        {
            using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
            {
                //獲取綁定該routingKey的服務地址集合
                var subscriptions = await StackRedis.Current.GetAllList(routingKey);
                foreach(var apiUrl in subscriptions)
                {
                    _logger.LogInformation(message);
                    await _apiHelperService.PostAsync(apiUrl, message);
                }
            }
        }

這裏用到了Api調用的幫助類,前面已經寫過了,只不過把它放到了這個公共的地方,仍是貼下代碼:

    public interface IApiHelperService
    {
        Task<T> PostAsync<T>(string url, object Model);
        Task<T> GetAsync<T>(string url);
        Task PostAsync(string url, string requestMessage);
    }
public class ApiHelperService : IApiHelperService
    {
        private readonly IHttpClientFactory _httpClientFactory;
        private readonly ILogger<ApiHelperService> _logger;

        public ApiHelperService(ILogger<ApiHelperService> _logger, IHttpClientFactory _httpClientFactory)
        {
            this._httpClientFactory = _httpClientFactory;
            this._logger = _logger;
        }


        /// <summary>
        /// HttpClient實現Post請求
        /// </summary>
        public async Task<T> PostAsync<T>(string url, object Model)
        {
            var http = _httpClientFactory.CreateClient("MI.Web");
            //添加Token
            var token = await GetToken();
            http.SetBearerToken(token);
            //使用FormUrlEncodedContent作HttpContent
            var httpContent = new StringContent(JsonConvert.SerializeObject(Model), Encoding.UTF8, "application/json");
            //await異步等待迴應
            var response = await http.PostAsync(url, httpContent);

            //確保HTTP成功狀態值
            response.EnsureSuccessStatusCode();

            //await異步讀取
            string Result = await response.Content.ReadAsStringAsync();

            var Item = JsonConvert.DeserializeObject<T>(Result);

            return Item;
        }

        /// <summary>
        /// HttpClient實現Post請求(用於MQ發佈功能 無返回)
        /// </summary>
        public async Task PostAsync(string url, string requestMessage)
        {
            var http = _httpClientFactory.CreateClient();
            //添加Token
            var token = await GetToken();
            http.SetBearerToken(token);
            //使用FormUrlEncodedContent作HttpContent
            var httpContent = new StringContent(requestMessage, Encoding.UTF8, "application/json");
            //await異步等待迴應
            var response = await http.PostAsync(url, httpContent);

            //確保HTTP成功狀態值
            response.EnsureSuccessStatusCode();
        }


        /// <summary>
        /// HttpClient實現Get請求
        /// </summary>
        public async Task<T> GetAsync<T>(string url)
        {
            var http = _httpClientFactory.CreateClient("MI.Web");
            //添加Token
            var token = await GetToken();
            http.SetBearerToken(token);
            //await異步等待迴應
            var response = await http.GetAsync(url);
            //確保HTTP成功狀態值
            response.EnsureSuccessStatusCode();

            var Result = await response.Content.ReadAsStringAsync();

            var Items = JsonConvert.DeserializeObject<T>(Result);

            return Items;
        }


        /// <summary>
        /// 轉換URL
        /// </summary>
        /// <param name="str"></param>
        /// <returns></returns>
        public static string UrlEncode(string str)
        {
            StringBuilder sb = new StringBuilder();
            byte[] byStr = System.Text.Encoding.UTF8.GetBytes(str);
            for (int i = 0; i < byStr.Length; i++)
            {
                sb.Append(@"%" + Convert.ToString(byStr[i], 16));
            }
            return (sb.ToString());
        }

        //獲取Token
        //獲取Token
        public async Task<string> GetToken()
        {
            var client = _httpClientFactory.CreateClient("MI.Web");
            string token = await Untity.StackRedis.Current.Get("ApiToken");
            if (!string.IsNullOrEmpty(token))
            {
                return token;
            }
            try
            {
                //DiscoveryClient類:IdentityModel提供給咱們經過基礎地址(如:http://localhost:5000)就能夠訪問令牌服務端;
                //固然能夠根據上面的restful api裏面的url自行構建;上面就是經過基礎地址,獲取一個TokenClient;(對應restful的url:token_endpoint   "http://localhost:5000/connect/token")
                //RequestClientCredentialsAsync方法:請求令牌;
                //獲取令牌後,就能夠經過構建http請求訪問API接口;這裏使用HttpClient構建請求,獲取內容;
                var cache = new DiscoveryCache("http://localhost:7000");
                var disco = await cache.GetAsync();
                if (disco.IsError) throw new Exception(disco.Error);
                var tokenResponse = await client.RequestClientCredentialsTokenAsync(new ClientCredentialsTokenRequest
                {
                    Address = disco.TokenEndpoint,
                    ClientId = "MI.Web",
                    ClientSecret = "miwebsecret",
                    Scope = "MI.Service"
                });
                if (tokenResponse.IsError)
                {
                    throw new Exception(tokenResponse.Error);

                }
                token = tokenResponse.AccessToken;
                await Untity.StackRedis.Current.Set("ApiToken", token, (int)TimeSpan.FromSeconds(tokenResponse.ExpiresIn).TotalMinutes);
            }
            catch (Exception ex)
            {
                throw new Exception(ex.Message);
            }
            return token;
        }
    }

而後Redis幫助類的代碼也貼一下,Redis這裏你們能夠根據本身習慣,如何使用沒什麼區別:

    public class StackRedis : IDisposable
    {
        #region 配置屬性   基於 StackExchange.Redis 封裝
        //鏈接串 (注:IP:端口,屬性=,屬性=)
        //public string _ConnectionString = "47.99.92.76:6379,password=shenniubuxing3";
        public string _ConnectionString = "47.99.92.76:6379";
        //操做的庫(注:默認0庫)
        public int _Db = 0;
        #endregion

        #region 管理器對象

        /// <summary>
        /// 獲取redis操做類對象
        /// </summary>
        private static StackRedis _StackRedis;
        private static object _locker_StackRedis = new object();
        public static StackRedis Current
        {
            get
            {
                if (_StackRedis == null)
                {
                    lock (_locker_StackRedis)
                    {
                        _StackRedis = _StackRedis ?? new StackRedis();
                        return _StackRedis;
                    }
                }

                return _StackRedis;
            }
        }

        /// <summary>
        /// 獲取併發連接管理器對象
        /// </summary>
        private static ConnectionMultiplexer _redis;
        private static object _locker = new object();
        public ConnectionMultiplexer Manager
        {
            get
            {
                if (_redis == null)
                {
                    lock (_locker)
                    {

                        _redis = _redis ?? GetManager(_ConnectionString);
                        return _redis;
                    }
                }

                return _redis;
            }
        }

        /// <summary>
        /// 獲取連接管理器
        /// </summary>
        /// <param name="connectionString"></param>
        /// <returns></returns>
        public ConnectionMultiplexer GetManager(string connectionString)
        {
            return ConnectionMultiplexer.Connect(connectionString);
        }

        /// <summary>
        /// 獲取操做數據庫對象
        /// </summary>
        /// <returns></returns>
        public IDatabase GetDb()
        {
            return Manager.GetDatabase(_Db);
        }
        #endregion

        #region 操做方法

        #region string 操做

        /// <summary>
        /// 根據Key移除
        /// </summary>
        /// <param name="key"></param>
        /// <returns></returns>
        public async Task<bool> Remove(string key)
        {
            var db = this.GetDb();

            return await db.KeyDeleteAsync(key);
        }

        /// <summary>
        /// 根據key獲取string結果
        /// </summary>
        /// <param name="key"></param>
        /// <returns></returns>
        public async Task<string> Get(string key)
        {
            var db = this.GetDb();
            return await db.StringGetAsync(key);
        }

        /// <summary>
        /// 根據key獲取string中的對象
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="key"></param>
        /// <returns></returns>
        public async Task<T> Get<T>(string key)
        {
            var t = default(T);
            try
            {
                var _str = await this.Get(key);
                if (string.IsNullOrWhiteSpace(_str)) { return t; }

                t = JsonConvert.DeserializeObject<T>(_str);
            }
            catch (Exception ex) { }
            return t;
        }

        /// <summary>
        /// 存儲string數據
        /// </summary>
        /// <param name="key"></param>
        /// <param name="value"></param>
        /// <param name="expireMinutes"></param>
        /// <returns></returns>
        public async Task<bool> Set(string key, string value, int expireMinutes = 0)
        {
            var db = this.GetDb();
            if (expireMinutes > 0)
            {
                return db.StringSet(key, value, TimeSpan.FromMinutes(expireMinutes));
            }
            return await db.StringSetAsync(key, value);
        }

        /// <summary>
        /// 存儲對象數據到string
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="key"></param>
        /// <param name="value"></param>
        /// <param name="expireMinutes"></param>
        /// <returns></returns>
        public async Task<bool> Set<T>(string key, T value, int expireMinutes = 0)
        {
            try
            {
                var jsonOption = new JsonSerializerSettings()
                {
                    ReferenceLoopHandling = ReferenceLoopHandling.Ignore
                };
                var _str = JsonConvert.SerializeObject(value, jsonOption);
                if (string.IsNullOrWhiteSpace(_str)) { return false; }

                return await this.Set(key, _str, expireMinutes);
            }
            catch (Exception ex) { }
            return false;
        }
        #endregion

        #region List操做(注:能夠當作隊列使用)

        /// <summary>
        /// list長度
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="key"></param>
        /// <returns></returns>
        public async Task<long> GetListLen<T>(string key)
        {
            try
            {
                var db = this.GetDb();
                return await db.ListLengthAsync(key);
            }
            catch (Exception ex) { }
            return 0;
        }

        /// <summary>
        /// 獲取隊列出口數據並移除
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="key"></param>
        /// <returns></returns>
        public async Task<T> GetListAndPop<T>(string key)
        {
            var t = default(T);
            try
            {
                var db = this.GetDb();
                var _str = await db.ListRightPopAsync(key);
                if (string.IsNullOrWhiteSpace(_str)) { return t; }
                t = JsonConvert.DeserializeObject<T>(_str);
            }
            catch (Exception ex) { }
            return t;
        }

        /// <summary>
        /// 集合對象添加到list左邊
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="key"></param>
        /// <param name="values"></param>
        /// <returns></returns>
        public async Task<long> SetLists<T>(string key, List<T> values)
        {
            var result = 0L;
            try
            {
                var jsonOption = new JsonSerializerSettings()
                {
                    ReferenceLoopHandling = ReferenceLoopHandling.Ignore
                };
                var db = this.GetDb();
                foreach (var item in values)
                {
                    var _str = JsonConvert.SerializeObject(item, jsonOption);
                    result += await db.ListLeftPushAsync(key, _str);
                }
                return result;
            }
            catch (Exception ex) { }
            return result;
        }

        /// <summary>
        /// 單個對象添加到list左邊
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="key"></param>
        /// <param name="value"></param>
        /// <returns></returns>
        public async Task<long> SetList<T>(string key, T value)
        {
            var result = 0L;
            try
            {
                result = await this.SetLists(key, new List<T> { value });
            }
            catch (Exception ex) { }
            return result;
        }

        /// <summary>
        /// 獲取List全部數據
        /// </summary>
        public async Task<List<string>> GetAllList(string list)
        {
            var db = this.GetDb();
            var redisList = await db.ListRangeAsync(list);
            List<string> listMembers = new List<string>();
            foreach (var item in redisList)
            {
                listMembers.Add(JsonConvert.DeserializeObject<string>(item));
            }
            return listMembers;
        }


        #endregion

        #region 額外擴展

        /// <summary>
        /// 手動回收管理器對象
        /// </summary>
        public void Dispose()
        {
            this.Dispose(_redis);
        }

        public void Dispose(ConnectionMultiplexer con)
        {
            if (con != null)
            {
                con.Close();
                con.Dispose();
            }
        }

        #endregion

        #endregion
    }

 

 

OK,核心代碼部分介紹到這裏,具體來看怎麼使用,推送當前類庫到本身的Nuget包,不知道怎麼建Nuget服務器的能夠看下我以前的那篇文章。

打開MI.Web項目,在Startup中註冊RabbitMQ的相關信息:

        /// <summary>
        /// 消息總線RabbitMQ
        /// </summary>
        private void RegisterEventBus(IServiceCollection services)
        {
            #region 加載RabbitMQ帳戶
            services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
            {
                var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
                var factory = new ConnectionFactory()
                {
                    HostName = Configuration["EventBusConnection"]
                };

                if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
                {
                    factory.UserName = Configuration["EventBusUserName"];
                }

                if (!string.IsNullOrEmpty(Configuration["EventBusPassword"]))
                {
                    factory.Password = Configuration["EventBusPassword"];
                }

                var retryCount = 5;
                if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
                {
                    retryCount = int.Parse(Configuration["EventBusRetryCount"]);
                }

                return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
            });
            #endregion

            var subscriptionClientName = Configuration["SubscriptionClientName"];

            services.AddSingleton<IEventBus, EventBusRabbitMQ.EventBusRabbitMQ>(sp =>
            {
                var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
                var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
                var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ.EventBusRabbitMQ>>();
                var apiHelper = sp.GetRequiredService<IApiHelperService>();

                var retryCount = 5;
                if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
                {
                    retryCount = int.Parse(Configuration["EventBusRetryCount"]);
                }

                return new EventBusRabbitMQ.EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, apiHelper, subscriptionClientName, retryCount);
            });
        }

這裏暫時還沒作出專門用於註冊RoutingKey的界面,因此暫時用在這裏用方法註冊下,後面再修改,這裏的RoutingKey用於用戶註冊使用:

        //綁定RoutingKey與隊列
        private void ConfigureEventBus(IApplicationBuilder app)
        {
            var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
            eventBus.Subscribe(Configuration["SubscriptionClientName"], "UserRegister");
        }

上面用的都是appsettings.json裏的配置,貼下代碼,標藍的部分是須要用到的:

{
  "Logging": {
    "IncludeScopes": false,
    "LogLevel": {
      "Default": "Warning"
    }
  },
  "ConnectionStrings": {
    "ElasticSearchServerAddress": "",
    "Redis": "47.99.92.76:6379"
  },
  "ServiceAddress": {
    "Service.Identity": "http://localhost:7000",
    "Service.Account": "http://localhost:7001",
    "Service.Ocelot": "http://localhost:7003",
    "Service.Picture": "http://localhost:7005"
  },
  "MehtodName": {
    "Account.MiUser.SSOLogin": "/Account/MiUser/SSOLogin", //登陸
    "Identity.Connect.Token": "/connect/token", //獲取token
    "Picture.QueryPicture.QueryStartProduct": "/Picture/QueryPicture/QueryStartProduct", //查詢明星產品
    "Picture.QueryPicture.QuerySlideImg": "/Picture/QueryPicture/QuerySlideImg", //查詢輪播圖
    "Picture.QueryPicture.QueryHadrWare": "/Picture/QueryPicture/QueryHadrWare" //查詢智能硬件表數據
  },
  "EventBusConnection": "******", //RabbitMQ地址
  "EventBusUserName": "guest",
  "EventBusPassword": "guest",
  "EventBusRetryCount": 5,
  "SubscriptionClientName": "RabbitMQ_Bus_MI"
}

OK,配置部分算是完成了,接下咱們就要去發送MQ了,咱們這裏使用IEventBus對象調用發佈方法,用於發送用戶的註冊信息,最終最調用新增用戶接口:

        private readonly IEventBus _eventBus;
        public LoginController(IEventBus _eventBus)
        {
            this._eventBus = _eventBus;
        }


        public JsonResult RegisterUser(string UserName, string UserPwd)
        {
            try
            {
                if (!string.IsNullOrEmpty(UserName) && !string.IsNullOrEmpty(UserPwd))
                {
                    RegisterRequest request = new RegisterRequest
                    {
                        UserName = UserName,
                        Password = UserPwd
                    };

                    _eventBus.Publish("UserRegister", request);
                }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "註冊失敗!");
            }
            return Json("");
        }

 

最終會新增當前傳入的用戶信息。

固然,這不是消息隊列的最終使用方式,後面會繼續修改,這裏的問題在於發佈和消費都耦合再了業務層,對於業務系統來講這是一種負擔,舉個例子,咱們公司當前隊列消息多的能達到上百萬個,若是把消息的消費和業務系統放在一塊兒可能會影響,因此使用的時候會把消費端單獨拿出來作成Windows服務,並添加自動重試和補償機制,畢竟RabbitMQ也不是沒有錯誤的,好比調用Api出現問題,遲遲沒法返回ack確認,這個時候就會報出 wait ack timeout的錯誤。

OK,今天先到這裏,我去煮包泡麪吃。。。

相關文章
相關標籤/搜索