.Net Core下使用RabbitMQ比較完備的兩種方案(雖然代碼有點慘淡,不過我會完善)

1、前言html

    上篇說給你們來寫C#和Java的方案,最近工做也比較忙,遲到了一些,我先給你們補上C#的方案,另外若是沒看我上篇博客的人最好看一下,不然你可能看的雲裏霧裏的,這裏我就不進行具體的方案畫圖了;傳送門git

2、使用的插件github

    HangFireweb

    一個開源的.NET任務調度框架,最大特色在於內置提供集成化的控制檯,方便後臺查看及監控,支持多種存儲方式;在方案中主要使用定時任務作補償機制,後期可能會封裝一些,能經過頁面的形式直接添加任務;sql

   NLogapi

   日誌記錄框架,方案中使用記錄日誌,後期可能回集成多個日誌框架;緩存

   Autofac網絡

   依賴注入的框架,應該不用作過多介紹;框架

  SqlSugardom

  ORM框架,這個從剛開始我就在使用了,在如今公司沒有推行起來,不過在上兩家公司都留下的遺產,聽說還用的能夠,固然我仍是最佩服做者;

  Polly

  容錯服務框架,相似於Java下的Hystrix,主要是爲了解決分佈式系統中,系統之間相互依賴,可能會由於多種因素致使服務不可用的而產生的一套框架,支持服務的超時重試、限流、熔斷器等等;

  RabbitMQ.Client

  官方提供的C#鏈接RabbitMQ的SDK;

3、方案

  模擬一個簡單訂單下單的場景,沒有進行具體的實現。同時建議下游服務不要寫在web端,最好以服務的形式奔跑,代碼中是Web端實現的,你們不要這麼搞。總體上仍是實現了以前提到的兩種方案:一是入庫打標,二是延時隊列(這塊沒有進行很好的測試,可是估計也沒有很大的問題);固然也是有一些特色:RabbitMQ宕機狀況下無需重啓服務,網絡異常的狀況下也能夠進行斷線重連。接下來聊下代碼和各方插件在系統中的具體應用:

  項目結構:

  

  RabbitMQExtensions:

  採用Autofac按照單例的形式注入,採用Polly進行斷線重連,也開啓了自身斷線重連和心跳檢測機制,配置方面採用最簡單的URI規範進行配置,有興趣參考下官方,總體上這塊代碼還相對比較規範,之後可能也不會有太多調整;

/// <summary>
    /// rabbitmq持久化鏈接 /// </summary>
    public interface IRabbitMQPersistentConnection { bool IsConnected { get; } bool TryConnect(); IModel CreateModel(); } /// <summary>
    /// rabbitmq持久化鏈接具體實現 /// </summary>
    public class DefaultRabbitMQPersistentConnection : IRabbitMQPersistentConnection { private readonly IConnectionFactory connectionFactory; private readonly ILogger<DefaultRabbitMQPersistentConnection> logger; private IConnection connection; private const int RETTRYCOUNT = 6; private static readonly object lockObj = new object(); public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<DefaultRabbitMQPersistentConnection> logger) { this.connectionFactory = connectionFactory; this.logger = logger; } public bool IsConnected { get { return connection != null && connection.IsOpen; } } public void Cleanup() { try { connection.Dispose(); connection.Close(); connection = null; } catch (IOException ex) { logger.LogCritical(ex.ToString()); } } public IModel CreateModel() { if (!IsConnected) { connection.Close(); throw new InvalidOperationException("鏈接不到rabbitmq"); } return connection.CreateModel(); } public bool TryConnect() { logger.LogInformation("RabbitMQ客戶端嘗試鏈接"); lock (lockObj) { if (connection == null) { var policy = RetryPolicy.Handle<SocketException>() .Or<BrokerUnreachableException>() .WaitAndRetry(RETTRYCOUNT, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => { logger.LogWarning(ex.ToString()); }); policy.Execute(() => { connection = connectionFactory.CreateConnection(); }); } if (IsConnected) { connection.ConnectionShutdown += OnConnectionShutdown; connection.CallbackException += OnCallbackException; connection.ConnectionBlocked += OnConnectionBlocked; logger.LogInformation($"RabbitMQ{connection.Endpoint.HostName}獲取了鏈接"); return true; } else { logger.LogCritical("沒法建立和打開RabbitMQ鏈接"); return false; } } } private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) { logger.LogWarning("RabbitMQ鏈接異常,嘗試重連..."); Cleanup(); TryConnect(); } private void OnCallbackException(object sender, CallbackExceptionEventArgs e) { logger.LogWarning("RabbitMQ鏈接異常,嘗試重連..."); Cleanup(); TryConnect(); } private void OnConnectionShutdown(object sender, ShutdownEventArgs reason) { logger.LogWarning("RabbitMQ鏈接異常,嘗試重連..."); Cleanup(); TryConnect(); } }
View Code

  OrderDal

  SqlSugar的一些簡單封裝,有些小特色:你們能夠能夠經過配置來實現讀寫分離,採用倉儲設計。若是不太喜歡這麼寫,也能夠參考傑哥的作法;

//倉儲設計
    public interface IBaseDal<T> where T:class,new() { DbSqlSugarClient DbContext { get; } IBaseDal<T> UserDb(string dbName); IInsertable<T> AsInsertable(T t); IInsertable<T> AsInsertable(T[] t); IInsertable<T> AsInsertable(List<T> t); IUpdateable<T> AsUpdateable(T t); IUpdateable<T> AsUpdateable(T[] t); IUpdateable<T> AsUpdateable(List<T> t); IDeleteable<T> AsDeleteable(); List<T> GetList(); Task<List<T>> GetListAnsync(); List<T> GetList(Expression<Func<T,bool>> whereExpression); Task<List<T>> GetListAnsync(Expression<Func<T, bool>> whereExpression); List<T> GetList(Expression<Func<T, bool>> whereExpression, Expression<Func<T, object>> orderExpression, OrderByType orderByType = OrderByType.Desc); Task<List<T>> GetListAnsync(Expression<Func<T, bool>> whereExpression, Expression<Func<T, object>> orderExpression, OrderByType orderByType = OrderByType.Desc); List<T> GetPageList(Expression<Func<T, bool>> whereExpression, PageModel page); Task<List<T>> GetPageListAsync(Expression<Func<T, bool>> whereExpression, PageModel page); List<T> GetPageList(Expression<Func<T, bool>> whereExpression, PageModel page, Expression<Func<T, object>> orderByExpression = null, OrderByType orderByType = OrderByType.Asc); Task<List<T>> GetPageListAsync(Expression<Func<T, bool>> whereExpression, PageModel page, Expression<Func<T, object>> orderByExpression = null, OrderByType orderByType = OrderByType.Asc); int Count(Expression<Func<T, bool>> whereExpression); Task<int> CountAsync(Expression<Func<T, bool>> whereExpression); T GetById(dynamic id); T GetSingle(Expression<Func<T, bool>> whereExpression); Task<T> GetSingleAsync(Expression<Func<T, bool>> whereExpression); T GetFirst(Expression<Func<T, bool>> whereExpression); Task<T> GetFirstAsync(Expression<Func<T, bool>> whereExpression); bool IsAny(Expression<Func<T, bool>> whereExpression); Task<bool> IsAnyAsync(Expression<Func<T, bool>> whereExpression); bool Insert(T t); Task<bool> InsertAsync(T t); bool InsertRange(List<T> t); Task<bool> InsertRangeAsync(List<T> t); bool InsertRange(T[] t); Task<bool> InsertRangeAsync(T[] t); int InsertReturnIdentity(T t); Task<long> InsertReturnIdentityAsync(T t); bool Delete(Expression<Func<T, bool>> whereExpression); Task<bool> DeleteAsync(Expression<Func<T, bool>> whereExpression); bool Delete(T t); Task<bool> DeleteAsync(T t); bool DeleteById(dynamic id); Task<bool> DeleteByIdAsync(dynamic id); bool DeleteByIds(dynamic[] ids); Task<bool> DeleteByIdsAsync(dynamic[] ids); bool Update(Expression<Func<T, T>> columns, Expression<Func<T, bool>> whereExpression); Task<bool> UpdateAsync(Expression<Func<T, T>> columns, Expression<Func<T, bool>> whereExpression); bool Update(T t); Task<bool> UpdateAsync(T t); bool UpdateRange(T[] t); Task<bool> UpdateRangeAsync(T[] t); void BeginTran(); void CommitTran(); void RollbackTran(); }
View Code

  OrderCommon

  定義全局異常的中間件,還有包含一些用到的實體等等,這部分代碼還可優化拆分一下;

  OrderService

  生產者和消費者的具體實現,這塊我還想在改造一番,將消費和業務分割開,如今寫的很凌亂,不建議這麼寫,先把代碼放出來,看看你們贊同不贊同個人這些用法,能夠討論,也歡迎爭論,雖然這塊代碼寫的很差,可是其實裏面涉及一些RabbitMQ回調函數的用法,也是比較重要的,沒有這些函數也就實現不了我上面說那兩個特色;

//RabbitMQ宕機之後回調
//客戶端這塊你們不要採用遞歸調用恢復連接
//具體爲何你們能夠測試下,這裏留點小疑問哈哈
connection.ConnectionShutdown += OnConnectionShutdown; //消費端異常之後回調 consumerchannel.CallbackException += OnOnConsumerMessageAndWriteMessageLogException;

  Order

  具體的調用者,你們應該根據方法名字就能區分出我上面提到的兩種方案的設計,總體的設計思路都是最終一致,延時隊列發送消息這塊最終也是能夠經過定時任務來實現最終一致,實現方式有不少種,簡單來講下能夠經過入庫時生成的緩存機制,經過定時任務來進行補償實現,這塊我沒有進行具體實現,有興趣咱們能夠探討下這個方案;

[Route("api/[controller]/[action]")] [ApiController] public class OrderController : ControllerBase { private readonly IBaseDal<OrderMessageLogEntity> orderBaseDal; private readonly IMessageService<OrderMessageLogEntity> messageService; private readonly IConsumerMessageService consumerMessageService; private const string EXCHANGENAME = "order"; private const string QUEUENAME = "order"; private const string ROUTINGKEY = "order"; public OrderController(IBaseDal<OrderMessageLogEntity> orderBaseDal, IMessageService<OrderMessageLogEntity> messageService,IConsumerMessageService consumerMessageService) { this.orderBaseDal = orderBaseDal; this.messageService = messageService; this.consumerMessageService = consumerMessageService; } /// <summary>
        /// 建立訂單 /// </summary>
        /// <returns></returns>
        public ActionResult<bool> CreateOrder(long userId) { //建立訂單成功
            OrderEntity orderEntity = new OrderEntity(); Random random= new Random(); orderEntity.OrderId = random.Next(); orderEntity.OrderNo = random.Next(); orderEntity.UserId = userId; orderEntity.OrderInfo = random.Next() + "詳情"; //bool isCreateOrderSuccress = orderService.CreateOrder(orderId); //if (!isCreateOrderSuccress) //{ // throw new Exception("建立訂單失敗"); //} //建立訂單成功之後開始入消息記錄庫 //消息建議設計的冗餘一些方便之後好查詢 //千萬級之後連表太困難 //建議冗餘的信息有用戶信息、訂單信息、方便之後按照這個覈對信息 //消息表的建議是按照不一樣的業務進行分表存儲
            Random messageRandom = new Random(); OrderMessageLogEntity orderMessageLog = new OrderMessageLogEntity(); orderMessageLog.MessageId = messageRandom.Next(); orderMessageLog.MessageInfo = orderEntity.OrderId+"訂單信息"; orderMessageLog.Status = (int)MessageStatusEnum.SENDING; orderMessageLog.OrderId = orderEntity.OrderId; orderMessageLog.UserId = orderEntity.UserId; orderMessageLog.CreateTime = DateTime.Now; orderMessageLog.UpdateTime = DateTime.Now; orderMessageLog.TryCount = 0; orderMessageLog.NextRetryTime = DateTime.Now.AddMinutes(5); //必須保證消息先落庫
            bool isCreateOrderMessageLosSuccess = orderBaseDal.Insert(orderMessageLog); if (!isCreateOrderMessageLosSuccess) throw new Exception("消息入庫異常"); Message message = new Message(); message.ExchangeName = EXCHANGENAME; message.QueueName = QUEUENAME; message.MessageId = orderMessageLog.MessageId; message.RoutingKey = ROUTINGKEY; message.Body = Encoding.UTF8.GetBytes(orderMessageLog.MessageInfo); //落庫成功之後開始發送消息到MQ //這個地方採用最終一致而不去使用分佈式事物最終一致
 messageService.SendMessage(message, orderMessageLog); return true; } /// <summary>
        /// 消費訂單 /// </summary>
        /// <returns></returns>
        public ActionResult<bool> ConsumerOrder() { Message message = new Message(); message.ExchangeName = EXCHANGENAME; message.QueueName = QUEUENAME; message.RoutingKey = ROUTINGKEY; consumerMessageService.ConsumerMessage(); return true; } /// <summary>
        /// 經過延時隊列發送消息 /// </summary>
        /// <param name="userId"></param>
        /// <returns></returns>
        public ActionResult<bool> CreateDelayCreateOrder(long userId) { //建立訂單成功
            OrderEntity orderEntity = new OrderEntity(); Random random = new Random(); orderEntity.OrderId = random.Next(); orderEntity.OrderNo = random.Next(); orderEntity.UserId = userId; orderEntity.OrderInfo = random.Next() + "詳情"; //bool isCreateOrderSuccress = orderService.CreateOrder(orderId); //if (!isCreateOrderSuccress) //{ // throw new Exception("建立訂單失敗"); //} //建立訂單成功之後開始入消息記錄庫 //消息建議設計的冗餘一些方便之後好查詢 //千萬級之後連表太困難 //建議冗餘的信息有用戶信息、訂單信息、方便之後按照這個覈對信息 //消息表的建議是按照不一樣的業務進行分表存儲
            Random messageRandom = new Random(); OrderMessageLogEntity orderMessageLog = new OrderMessageLogEntity(); orderMessageLog.MessageId = messageRandom.Next(); orderMessageLog.MessageInfo = orderEntity.OrderId + "訂單信息"; orderMessageLog.Status = (int)MessageStatusEnum.SENDING; orderMessageLog.OrderId = orderEntity.OrderId; orderMessageLog.UserId = orderEntity.UserId; orderMessageLog.CreateTime = DateTime.Now; orderMessageLog.UpdateTime = DateTime.Now; orderMessageLog.TryCount = 0; orderMessageLog.NextRetryTime = DateTime.Now.AddMinutes(5); ////必須保證消息先落庫
            //bool isCreateOrderMessageLosSuccess = orderBaseDal.Insert(orderMessageLog); //if (!isCreateOrderMessageLosSuccess) // throw new Exception("消息入庫異常");
 Message message = new Message(); message.ExchangeName = EXCHANGENAME; message.QueueName = QUEUENAME; message.MessageId = orderMessageLog.MessageId; message.RoutingKey = ROUTINGKEY; message.Body = Encoding.UTF8.GetBytes(orderMessageLog.MessageInfo); //這裏的設計是不進行落庫 //假如兩條消息都失敗必須藉助定時任務去對比消息庫和訂單庫的消息id而後進行再補發 //剩下的只要有一條發送成功其實就能保證下游必然會消費調這條消息,排除下游消費異常的狀況 這個地方我不在進行實現本身可腦補一下 //開始發送消息到MQ
 messageService.SendMessage(message, orderMessageLog); //發送延時消息
 messageService.SendDelayMessage(message, orderMessageLog); return true; } /// <summary>
        /// 消費消息之後併入庫 /// </summary>
        /// <returns></returns>
        public ActionResult<bool> ConsumerOrderAndWirteMessageLog() { consumerMessageService.ConsumerMessageAndWriteMessageLog(); return true; } /// <summary>
        /// 消費延時消息 /// 進行二次檢查覈對 /// </summary>
        /// <returns></returns>
        public ActionResult<bool> ConsumerDelayOrder() { consumerMessageService.ConsumerDelayMessage(); return true; } }
View Code

  HangfireExtensions

  Hangfire定時框架,採用Mysql做爲持久層的存儲,寫的也比較清晰,後期就是針對這些進行擴展,實如今界面就能添加定時任務;

4、結束

  生產端和消費端這段代碼寫的凌亂,但願你們不要介意這一點,是有緣由的,這裏我就不說了。但願你們看到閃光點,不要在一點上糾結;下次會加入Elasticsearch和監控部分的時候我會把這塊代碼改掉,還你們一片整潔的世界;

  Github地址:https://github.com/wangtongzhou520/rabbitmq.git  有什麼問題你們能夠問我;

  歡迎你們加羣438836709!歡迎你們關注我!

  

原文出處:https://www.cnblogs.com/wtzbk/p/10914337.html

相關文章
相關標籤/搜索