IBM.WMQ訂閱主題,連續獲取消息解決辦法

去隊列裏面一直獲取消息,一開始想到了兩種解決方案:this

第一:訂閱一次獲取一次消息,正常的話每次都能獲取到,可是要及時去清理訂閱而且時間粒度很差控制編碼

第二:訂閱一次,再獲取消息這裏加死循環,超時MQ已經作了,因此能夠不用控制線程等待,獲取到消息了之後,直接經過自定義事件的機制去及時處理消息spa

從最終實驗結果來看,第二種是最優的作法,能夠作到隨時獲取到消息,又不佔用資源。接下來我把最終的實現代碼分享處理,但願對你們有所幫助,有不對的地方,請及時聯繫我。線程

訂閱主題:code

private IIBMWMQMsgHandler _msgHandler; public IBMWMQHelper() { _msgHandler = new DefaultIBMWMQMsgHandler(); } public IBMWMQHelper(IIBMWMQMsgHandler msgHandler) { _msgHandler = msgHandler; }

 

/// <summary>
        /// 訂閱主題。訂閱一次並嘗試一直獲取消息 /// </summary>
        public void SubTopic1(string business, bool isGetMsg = true) { IBMWMQEventSource eventSource = new IBMWMQEventSource(); IBMWMQMsgEventListener msgEventListener = new IBMWMQMsgEventListener(_msgHandler); MQSubscription subs = null; try { //訂閱事件
 msgEventListener.Subscribe(eventSource); //MQEnvironment.CCSID = 1381;

                using (var mqmgr = MQQueueManager.Connect(IBMWMQConfig.QUEUE_MGR_NAME, MQC.MQCO_NONE, IBMWMQConfig.CHANNEL, IBMWMQConfig.CONNECTION_INFO)) { subs = new MQSubscription(mqmgr); if (mqmgr.IsConnected) { this.TryAdd(business, subs); int option = MQC.MQSO_CREATE | MQC.MQSO_FAIL_IF_QUIESCING | MQC.MQSO_DURABLE | MQC.MQSO_RESUME; string subName = string.Format(IBMWMQConfig.SUBSCRIPTION_TEMPLATE, business); string topicName = string.Format(IBMWMQConfig.TOPIC_TEMPLATE, business); try { subs.Subscribe(subName, option, topicName); } catch (MQException ex) { string code = ex.Reason.ToString(); //引起事件
 eventSource.RaiseErroeMsgEvent(business, code); } while (isGetMsg) { eventSource.RaiseErroeMsgEvent(business, string.Format("開始嘗試獲取 {0} 消息...", business)); try { MQMessage incoming = new MQMessage() { CharacterSet = MQC.CODESET_UTF, Encoding = MQC.MQENC_NORMAL }; MQGetMessageOptions gmo = new MQGetMessageOptions(); gmo.WaitInterval = 10 * 1000; //MQC.MQWI_UNLIMITED;
                                gmo.Options |= MQC.MQGMO_WAIT; gmo.Options |= MQC.MQGMO_SYNCPOINT; subs.Get(incoming, gmo); string message = incoming.ReadAll(); if (!string.IsNullOrEmpty(message)) { //引起事件
 eventSource.RaiseNewMsgEvent(business, message); } } catch (MQException ex) { string code = ex.Reason.ToString(); //引起事件
 eventSource.RaiseErroeMsgEvent(business, code); } } } } } catch (MQException e) { string code = e.Reason.ToString(); //引起事件
 eventSource.RaiseErroeMsgEvent(business, code); } finally { //if (subs != null) //{ // subs.Close(MQC.MQCO_REMOVE_SUB, closeSubQueue: true, closeSubQueueOptions: MQC.MQCO_NONE); //}
 } }

或者使用原生方式:orm

/// <summary>
        /// 訂閱主題。訂閱一次並嘗試一直獲取消息(原生) /// </summary>
        /// <param name="business"></param         /// <param name="transportMode"></param>
        /// <param name="durability"></param>
        public void SubTopic2(string business, string transportMode = "managed", string durability = "durable") { Hashtable properties = new Hashtable(); MQTopic topic = null; IBMWMQEventSource eventSource = new IBMWMQEventSource(); IBMWMQMsgEventListener msgEventListener = new IBMWMQMsgEventListener(_msgHandler); try { //訂閱事件
 msgEventListener.Subscribe(eventSource); //MQEnvironment.CCSID = 1381; //自動重連
 properties.Add(MQC.CONNECT_OPTIONS_PROPERTY, MQC.MQCNO_RECONNECT_Q_MGR); properties.Add(MQC.HOST_NAME_PROPERTY, IBMWMQConfig.CONNECTION_HOST); properties.Add(MQC.PORT_PROPERTY, IBMWMQConfig.CONNECTION_PORT); properties.Add(MQC.CHANNEL_PROPERTY, IBMWMQConfig.CHANNEL); if (transportMode == "managed") { properties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_MANAGED); } else if (transportMode == "unmanaged") { properties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT); } using (var queueManager = new MQQueueManager(IBMWMQConfig.QUEUE_MGR_NAME, properties)) { if (queueManager.IsConnected) { int option4Durable = MQC.MQSO_CREATE | MQC.MQSO_FAIL_IF_QUIESCING | MQC.MQSO_DURABLE | MQC.MQSO_RESUME;//持久化訂閱
                        int option4NotDurable = MQC.MQSO_CREATE | MQC.MQSO_FAIL_IF_QUIESCING;//非持久化訂閱

                        string subName = string.Format(IBMWMQConfig.SUBSCRIPTION_TEMPLATE, business); string topicName = string.Format(IBMWMQConfig.TOPIC_TEMPLATE, business); try { if (durability == "durable") { topic = queueManager.AccessTopic(topicName, null, option4Durable, null, subName); } else { topic = queueManager.AccessTopic(topicName, null, MQC.MQTOPIC_OPEN_AS_SUBSCRIPTION, option4NotDurable); } while (true) { eventSource.RaiseErroeMsgEvent(business, string.Format("開始嘗試獲取 {0} 消息...", business)); try { MQMessage incoming = new MQMessage() { CharacterSet = MQC.CODESET_UTF, Encoding = MQC.MQENC_NORMAL }; MQGetMessageOptions gmo = new MQGetMessageOptions(); gmo.WaitInterval = 10 * 1000; //MQC.MQWI_UNLIMITED;
                                    gmo.Options |= MQC.MQGMO_WAIT; gmo.Options |= MQC.MQGMO_SYNCPOINT; topic.Get(incoming, gmo); string message = incoming.ReadString(incoming.MessageLength); if (!string.IsNullOrEmpty(message)) { //引起事件
 eventSource.RaiseNewMsgEvent(business, message); } } catch (MQException ex) { string code = ex.Reason.ToString(); //引起事件
 eventSource.RaiseErroeMsgEvent(business, code); } } } catch (MQException ex) { string code = ex.Reason.ToString(); //引起事件
 eventSource.RaiseErroeMsgEvent(business, code); } } else { eventSource.RaiseErroeMsgEvent(business, "鏈接隊列管理器失敗!"); } } } catch (MQException e) { string code = e.Reason.ToString(); //引起事件
 eventSource.RaiseErroeMsgEvent(business, code); } }

接下來開始自定義事件:blog

定義事件中心:接口

public class IBMWMQEventSource { /// <summary>
        /// 新消息處理委託 /// </summary>
        /// <param name="business"></param>
        /// <param name="msg"></param>
        public delegate void NewMsgHandler(string business, string msg); /// <summary>
        /// 新消息處理事件 /// </summary>
        public event NewMsgHandler NewMsgEventHandler; /// <summary>
        /// 錯誤消息處理委託 /// </summary>
        /// <param name="errorCode"></param>
        public delegate void ErrorMsgHandler(string business, string errorCode); /// <summary>
        /// 錯誤消息處理事件 /// </summary>
        public event ErrorMsgHandler ErrorMsgEventHandler; /// <summary>
        /// 引起新消息處理事件的方法 /// </summary>
        /// <param name="business"></param>
        /// <param name="msg"></param>
        public void RaiseNewMsgEvent(string business, string msg) { if (NewMsgEventHandler != null) { NewMsgEventHandler.Invoke(business, msg); } } /// <summary>
        /// 引起錯誤消息處理事件的方法 /// </summary>
        /// <param name="business"></param>
        /// <param name="errorCode"></param>
        public void RaiseErroeMsgEvent(string business, string errorCode) { if (ErrorMsgEventHandler != null) { ErrorMsgEventHandler.Invoke(business, errorCode); } } }

定義事件監聽器:隊列

public class IBMWMQMsgEventListener { private readonly IIBMWMQMsgHandler _msgHandler; public IBMWMQMsgEventListener(IIBMWMQMsgHandler msgHandler) { _msgHandler = msgHandler; } /// <summary>
        /// 訂閱事件 /// </summary>
        public void Subscribe(IBMWMQEventSource eventSource) { eventSource.NewMsgEventHandler += _msgHandler.OnNewMsgHandler; eventSource.ErrorMsgEventHandler += _msgHandler.OnErrorMsgHandler; } /// <summary>
        /// 取消訂閱事件 /// </summary>
        public void UnSubscribe(IBMWMQEventSource eventSource) { eventSource.NewMsgEventHandler -= _msgHandler.OnNewMsgHandler; eventSource.ErrorMsgEventHandler -= _msgHandler.OnErrorMsgHandler; } }

定義消息處理接口:事件

public interface IIBMWMQMsgHandler { /// <summary>
        /// 處理新消息 /// </summary>
        /// <param name="business">業務代碼</param>
        /// <param name="msg">消息包</param>
        void OnNewMsgHandler(string business, string msg); /// <summary>
        /// 處理錯誤消息 /// </summary>
        /// <param name="business">業務代碼</param>
        /// <param name="errorCode">錯誤碼</param>
        void OnErrorMsgHandler(string business, string errorCode); }

默認消息處理機制:

public class DefaultIBMWMQMsgHandler : IIBMWMQMsgHandler { /// <summary>
        /// 處理新消息 /// </summary>
        /// <param name="business">業務代碼</param>
        /// <param name="msg">消息包</param>
        public void OnNewMsgHandler(string business, string msg) { Trace.WriteLine(string.Format("新消息到達,數據包:{0}", msg)); } /// <summary>
        /// 處理錯誤消息 /// </summary>
        /// <param name="business">業務代碼</param>
        /// <param name="errorCode">錯誤碼</param>
        public void OnErrorMsgHandler(string business, string errorCode) { Trace.WriteLine(string.Format("處理錯誤消息,錯誤碼:{0}", errorCode)); } }

定義消息處理方法:

public class CustomIBMWMQMsgHandler : BaseJobManager, IIBMWMQMsgHandler { /// <summary>
        /// 消息自定義業務處理 /// </summary>
        private static CustomBusinessHandler customBusinessHandler = new CustomBusinessHandler(); /// <summary>
        /// 處理新消息 /// </summary>
        /// <param name="business"></param>
        /// <param name="msg"></param>
        public void OnNewMsgHandler(string business, string msg) { //獲取配置文件
            List<JobConfigEntity> configs = InitJobConfig(); string businessName = configs.First(c => c.JobName.Replace("_", "").Contains(business)).Name; switch (business) { case IBMWMQConfig.BUSINESS_NAME_ZDFZ09: msg = customBusinessHandler.RemoveMsgHeader(msg, IBMWMQConfig.BUSINESS_NAME_ZDF_Z09); customBusinessHandler.DO_ZDF_Z09(business, msg); break; case IBMWMQConfig.BUSINESS_NAME_ZAPZ10: msg = customBusinessHandler.RemoveMsgHeader(msg, IBMWMQConfig.BUSINESS_NAME_ZAP_Z10); customBusinessHandler.DO_ZAP_Z10(business, msg); break; case IBMWMQConfig.BUSINESS_NAME_OULR24: msg = customBusinessHandler.RemoveMsgHeader(msg, IBMWMQConfig.BUSINESS_NAME_OUL_R24); customBusinessHandler.DO_OUL_R24(business, msg); break; } this.WriteInfo(this.GetType(), string.Format("收到來自{0}的消息,數據包:{1}", businessName, msg)); } /// <summary>
        /// 處理錯誤消息 /// </summary>
        /// <param name="business"></param>
        /// <param name="errorCode"></param>
        public void OnErrorMsgHandler(string business, string errorCode) { if (!string.IsNullOrEmpty(errorCode)) { //獲取配置文件
                List<JobConfigEntity> configs = InitJobConfig(); string businessName = configs.First(c => c.JobName.Replace("_", "").Contains(business)).Name; //TODO 其餘消息內容校驗
                if (errorCode.Equals("2033")) { this.WriteInfo(this.GetType(), string.Format("MQRC_NO_MSG_AVAILABLE.{0} 無消息({1})。", businessName, errorCode)); } else if (errorCode.Equals("2085")) { this.WriteInfo(this.GetType(), string.Format("MQRC_UNKNOWN_OBJECT_NAME.{0} 主題不存在({1})。", businessName, errorCode)); } else if (errorCode.Equals("2429")) { this.WriteInfo(this.GetType(), string.Format("MQRC_SUBSCRIPTION_IN_USE.{0} 主題已被訂閱({1})。", businessName, errorCode)); } else if (errorCode.Equals("2537")) { this.WriteInfo(this.GetType(), string.Format("MQRC_CHANNEL_NOT_AVAILABLE.頻道不可用({0})。", errorCode)); } else if (errorCode.Equals("2538")) { this.WriteInfo(this.GetType(), string.Format("MQRC_HOST_NOT_AVAILABLE.沒法鏈接消息隊列主機({0})。", errorCode)); } else if (errorCode.Equals("2539")) { this.WriteInfo(this.GetType(), string.Format("MQRC_CHANNEL_CONFIG_ERROR.頻道配置錯誤({0})。", errorCode)); } else if (errorCode.Equals("2540")) { this.WriteInfo(this.GetType(), string.Format("MQRC_UNKNOWN_CHANNEL_NAME.未知頻道稱({0})。", errorCode)); } else { if (errorCode.Length == 4) { this.WriteInfo(this.GetType(), string.Format("未知錯誤消息,錯誤緣由編碼:{0}。", errorCode)); } else { this.WriteInfo(this.GetType(), string.Format("{0} {1}", businessName, errorCode)); } } } } }

去掉消息頭:

/// <summary>
        /// 去掉消息頭 /// </summary>
        /// <param name="msg"></param>
        /// <param name="name"></param>
        /// <returns></returns>
        public string RemoveMsgHeader(string msg, string name) { msg = msg.Trim(); //去掉消息頭
            int index = msg.IndexOf("<" + name, StringComparison.Ordinal); if (index > 0) { string temp = msg.Substring(0, index); msg = msg.Substring(index, msg.Length - temp.Length); msg = msg.Trim(); } return msg; }

配置信息:

public class IBMWMQConfig { /// <summary>
        /// MQ主機地址 /// </summary>
        private static readonly string CONNECTION_HOST = ConfigHelper.GetValue("IBM_WMQ_CONNECTION_HOST"); /// <summary>
        /// 通信端口 /// </summary>
        private const int CONNECTION_PORT = 4421; /// <summary>
        /// CLIENT_ID /// </summary>
        private const string CLIENT_ID = ""; /// <summary>
        /// 通道名稱 /// </summary>
        public const string CHANNEL = ""; /// <summary>
        /// 隊列管理器名稱 /// </summary>
        public const string QUEUE_MGR_NAME = ""; /// <summary>
        /// 訂閱主題持久化標識,{0}標識具體業務 /// </summary>
        public static readonly string SUBSCRIPTION_TEMPLATE = "JMS:" + QUEUE_MGR_NAME + ":" + CLIENT_ID + "_{0}.REQ:{0}.REQ"; /// <summary>
        /// 主題名稱模板,{0}標識具體業務 /// </summary>
        public static readonly string TOPIC_TEMPLATE = "{0}.REQ"; /// <summary>
        /// IBM.WMQ鏈接字符串 /// </summary>
        public static readonly string CONNECTION_INFO = string.Format("{0}({1})", CONNECTION_HOST, CONNECTION_PORT); }

調用:

IBMWMQHelper helper = new IBMWMQHelper(new CustomIBMWMQMsgHandler()); helper.SubTopic1(IBMWMQConfig.BUSINESS_NAME_ZAPZ10);
相關文章
相關標籤/搜索