c#經過Redis實現輕量級消息組件

最近在開發一個輕量級ASP.NET MVC開發框架,須要加入日誌記錄,郵件發送,短信發送等功能,爲了保持模塊的獨立性,因此須要經過消息通訊的方式進行處理,爲了保持框架在部署,使用,二次開發過程當中的簡易便捷性,因此沒有選擇傳統的MQ,而是基於Redis的訂閱發佈實現一個系統內部消息組件,話很少說,上碼!html

數據結構定義

消息實體包含幾個部分,訂閱通道名稱,信息頭,信息體,信息差別化額外信息字典,信息頭主要包含消息標識,消息日期,信息體包含信息內容,信息實體類型等git

public class Message
    {
        public string MessageChannel { set; get; }
        public MessageHead @MessageHead { set; get; }
        public MessageBody @MessageBody { set; get; }

        [JsonExtensionData]
        public Dictionary<string,Object> @MessageExtra { set; get; }

        public Message()
        {

        }

        public void AddExtra(string Name, string Value)
        {
            if (@MessageExtra == null)
            {
                @MessageExtra = new Dictionary<string, object>();
            }
            @MessageExtra.Add(Name, Value);
        }

        public Object GetExtra(string Name)
        {
            return @MessageExtra[Name];
        }
    }

    public class MessageHead
    {
        public string MessageID { set; get; }
        public DateTime MessageDate { set; get; }

        public MessageHead()
        {
            MessageID = CommonUtil.CreateCommonGuid();
            MessageDate = DateTime.Now;
        }
    }

    public class MessageBody
    {
        public string MessageJsonContent { set; get; }
        public Type MessageMapperType { set; get; }
    }

注:由於消息訂閱發佈傳遞過程當中,我是經過Json序列化傳輸的,使用過程當中可能須要一些額外的鍵值對信息,這裏在對象中定義的是Dictinary對象,可是Dictinary自己是不支持序列化的,因此須要加上註解JsonExtensionData數據庫

訂閱通道聲明

咱們須要達到的效果是,在系統啓動時,全部消息通道能夠根據系統中的應用自動訂閱,這裏就須要一個註解來標識咱們的訂閱通道接收消息的實現類數據結構

[AttributeUsage(AttributeTargets.Class)]
    public class MessageChanelAttribute : Attribute
    {
        private string _ChannleName;
        public string ChannelName
        {
            get
            {
                return this._ChannleName;
            }
            set
            {
                this._ChannleName = value;
            }

        }
    }

消息的個性化策略處理

Redis的三方庫我這裏使用的是StackExchange.Redis.dll,在消息訂閱時,須要爲Channel指定接收到消息時的處理委託,咱們在自動訂閱的過程當中確定也要收集好各種消息處理類並與Channel一一對應,這時候咱們就須要一個基類FastDefaultMessageHandler,咱們的具體的消息處理類繼承自FastDefaultMessageHandler,重寫處理方法便可app

[Component]
    [MessageChanelAttribute(ChannelName = "DefaultMessage")]
    public class FastDefaultMessageHandler : IFastMessageHandle
    {
        [AutoWired]
        public DBUtil @DBUtil;

        public void HandleMessage(RedisChannel ChannelName, RedisValue Message)
        {
            FastExecutor.Message.Design.Message Entity = JsonConvert.DeserializeObject<FastExecutor.Message.Design.Message>(Message);
            try
            {
                if (!CheckMessageIsConsume(Entity))
                {
                    this.CustomHandle(Entity);
                }
            }
            catch (Exception e)
            {
                StringBuilder ExceptionLog = new StringBuilder();
                ExceptionLog.AppendFormat("異常Message所屬Channel:{0}", Entity.MessageChannel + Environment.NewLine);
                ExceptionLog.AppendFormat("異常Message插入時間:{0}", Entity.MessageHead.MessageDate.ToString() + Environment.NewLine);
                ExceptionLog.AppendFormat("異常Message內容:{0}", Message + Environment.NewLine);
                ExceptionLog.AppendFormat("異常信息:{0}", e.Message + Environment.NewLine);
                LogUtil.WriteLog("Logs/MessageErrorLog", "log_", ExceptionLog.ToString() + Environment.NewLine);
                ExceptionLog.AppendFormat("========================================================================================================================================================================" + Environment.NewLine);
                MessageACK.MoveMessageToExceptionChannel(Entity.MessageChannel, Entity);
            }
            finally
            {
                MessageACK.ConfirmMessageFinish(Entity.MessageChannel, Entity.MessageHead.MessageID);
            }

        }

        public virtual void CustomHandle(FastExecutor.Message.Design.Message @Message)
        {

        }

        public virtual bool CheckMessageIsConsume(FastExecutor.Message.Design.Message @Message)
        {
            return false;
        }
    }

其中的HandleMessage方法就是咱們在訂閱Channel時對應的委託,會調用類中的CustomHandle的虛方法,子類繼承重寫該方法就會基於多態進行策略調用,CheckMessageIsConsume方法是用於確認消息是否重複消費的,也能夠被重寫,下面看一個訪問日誌類的實例,使用MessageChanelAttribute標註聲明該實現類須要訂閱發佈的Channel名稱爲Visit,CustomHandle方法中實現了插入數據庫操做,CheckMessageIsConsume方法判斷該條日誌數據是否已消費(已經存在於數據庫)框架

[MessageChanelAttribute(ChannelName = "Visit")]
    public class VisitLog : FastDefaultMessageHandler
    {
        public override void CustomHandle(Message.Design.Message Message)
        {
            Frame_VisitLog LogEntity = JsonConvert.DeserializeObject<Frame_VisitLog>(Message.MessageBody.MessageJsonContent);
            @DBUtil.Insert(LogEntity);
            base.CustomHandle(Message);
        }

        public override bool CheckMessageIsConsume(Message.Design.Message Message)
        {
            Frame_VisitLog LogEntity = JsonConvert.DeserializeObject<Frame_VisitLog>(Message.MessageBody.MessageJsonContent);
            DBRow Row = new DBRow("Frame_VisitLog", "RowGuid", LogEntity.RowGuid);
            if (Row.IsExist())
            {
                return true;
            }
            else
            {
                return false;
            }
        }
    }

消息自動訂閱

咱們但願系統在啓動時就尋找出定義好Channel和實現類,自動實現訂閱,這裏就須要用到IOC容器,啓動系統時將全部的消息處理類放入容器中,在自動訂閱時所有取出來,根據消息處理類中聲明的Channel名稱進行自動訂閱ide

public void Init()
        {
            List<Type> HandlerTypeList = InjectUtil.Container.GetRegistType(typeof(IFastMessageHandle));
            foreach (Type HandlerType in HandlerTypeList)
            {
                MessageChanelAttribute Channel = Attribute.GetCustomAttribute(HandlerType, typeof(MessageChanelAttribute)) as MessageChanelAttribute;
                RedisUtil.Subscribe(Channel.ChannelName, ((FastDefaultMessageHandler)InjectUtil.Container.Resolve(HandlerType)).HandleMessage);
            }
        }

注:學習

1.這裏的IOC容器是我本身實現的,地址:https://gitee.com/grassprogramming/FastIOC,你們能夠用AutoFac代替ui

2.RedisUtil是對StackExchange.Redis.dll封裝的處理類,地址:https://gitee.com/grassprogramming/FastUtilthis

消息發送

消息只須要調用Redis的發佈方法便可,將Channel名稱與定義好的數據實體類傳入,序列化爲Json

public void SendMessage<T>(string ChannleName, T CustomMessageEntity, Dictionary<string, string> ExtraData = null)
        {
            FastExecutor.Message.Design.Message MessageEntity = new Design.Message();
            MessageEntity.MessageChannel = ChannleName;
            MessageHead Head = new MessageHead();
            MessageBody Body = new MessageBody();
            Body.MessageMapperType = typeof(T);
            Body.MessageJsonContent = JsonConvert.SerializeObject(CustomMessageEntity);
            MessageEntity.MessageHead = Head;
            MessageEntity.MessageBody = Body;
            if (ExtraData != null)
            {
                foreach (var item in ExtraData)
                {
                    MessageEntity.AddExtra(item.Key, item.Value);
                }
            }
            RedisUtil.Publish(ChannleName, MessageEntity);
            MessageACK.CopyMessageToACKList(ChannleName, MessageEntity);
        }

消息確認與存儲

Redis做訂閱發佈模式做爲消息組件的問題有兩方面

問題:消息消費完沒有確認機制

解決方案

基於Redis的Hash存儲方式創建一個消息存儲字段,在發送消息時拷貝到消息Hash字典中,消費完畢後再刪除,對應SendMessage中的MessageACK.CopyMessageToACKList方法和FastDefaultMessageHandler中的MessageACK.ConfirmMessageFinish方法,本質就是對Hash字典的增長與刪除功能

問題:消息處理端掛了再次重啓消息會丟失

解決方案

確認機制已經保證了消息即便沒有被消費完可是處理端宕機消息也不會丟失,須要注意的是,消息沒有丟失僅僅是Hash字典中有存儲,可是消息通道中不存在了,因此咱們在系統每次啓動時掃描這個Hash字典,從新發布消息到Channel,這樣可能致使重複消費,因此須要靠FastDefaultMessageHandler中的CheckMessageIsConsume方法判斷,同時消息處理者自己處理異常咱們也須要記錄下來,好比發短信供應商接口有問題,消息處理異常會進入Redis的ChannelException通道,咱們能夠根據需求實現一個可視化界面決定是否經過手動恢復

最後

Message組件相關代碼地址:https://gitee.com/grassprogramming/FastExecutor/tree/master/code/FastExecutor/FastExecutor.Message

存在不足問題:若是消息是單純記錄日誌問題,沒辦法確認消息是否消費了

若是你們有什麼好的建議,可留言一塊兒交流學習,共同進步

原文出處:https://www.cnblogs.com/yanpeng19940119/p/11603865.html

相關文章
相關標籤/搜索