IBM.WMQ訂閱消息

網上關於IBM這個消息隊列中間件的資料相對比較少,C#相關的資料就更少了,最近由於要對接這個隊列中間件,花了很多功夫去搜索、整理各類資料,遇到不少問題,所以記錄下來。git

一、基於 amqmdnet.dll 進行開發,這個是官方提供的DLL,安裝了IBM WebSphere MQ後在安裝目錄能夠找到(C:\Program Files\IBM\WebSphere MQ\bin)github

二、基於 MmqiNetLite.dll 開發,這是一個開源組件,地址:https://github.com/fglaeser/mmqinet,這個項目代碼有部分未完善,原做者也是好幾年沒更新,可是基礎功能能夠使用,本文代碼主要基於此編寫spa

源碼以下:code

public class IBMWMQConfig
    {
        /// <summary>
        /// MQ主機地址
        /// </summary>
        private const string CONNECTION_HOST = "";
        /// <summary>
        /// 通信端口
        /// </summary>
        private const int CONNECTION_PORT = 4421;
        /// <summary>
        /// CLIENT_ID
        /// </summary>
        private const string CLIENT_ID = "";
        /// <summary>
        /// 通道名稱
        /// </summary>
        public const string CHANNEL = "SYSTEM.ADMIN.SVRCONN";
        /// <summary>
        /// 隊列管理器名稱
        /// </summary>
        public const string QUEUE_MGR_NAME = "PHIBHUBGW1";
        /// <summary>
        /// 訂閱主題持久化標識,{0}標識具體業務
        /// </summary>
        public static readonly string SUBSCRIPTION_TEMPLATE = "JMS:" + QUEUE_MGR_NAME + ":" + CLIENT_ID + "_{0}.REQ:{0}.REQ";
        /// <summary>
        /// 主題名稱模板,{0}標識具體業務
        /// </summary>
        public static readonly string TOPIC_TEMPLATE = "{0}.REQ";
        /// <summary>
        /// IBM.WMQ鏈接字符串
        /// </summary>
        public static readonly string CONNECTION_INFO = string.Format("{0}({1})", CONNECTION_HOST, CONNECTION_PORT);
    }

訂閱消息:orm

/// <summary>
        /// 訂閱主題
        /// </summary>
        /// <param name="business"></param>
        /// <returns></returns>
        private string SubTopic(string business)
        {
            string message = string.Empty;
            try
            {
                using (var mqmgr = MQQueueManager.Connect(IBMWMQConfig.QUEUE_MGR_NAME, MQC.MQCO_NONE, IBMWMQConfig.CHANNEL, IBMWMQConfig.CONNECTION_INFO))
                {
                    MQSubscription subs = new MQSubscription(mqmgr);
                    if (mqmgr.IsConnected)
                    {
                        int option = MQC.MQSO_CREATE + MQC.MQSO_RESUME + MQC.MQSO_NON_DURABLE + MQC.MQSO_MANAGED + MQC.MQSO_FAIL_IF_QUIESCING;
                        string subName = string.Format(IBMWMQConfig.SUBSCRIPTION_TEMPLATE, business);
                        string topicName = string.Format(IBMWMQConfig.TOPIC_TEMPLATE, business);

                        subs.Subscribe(subName, option, topicName);

                        MQMessage incoming = new MQMessage();
                        MQGetMessageOptions gmo = new MQGetMessageOptions();
                        gmo.WaitInterval = 10 * 1000;//MQC.MQWI_UNLIMITED;
                        gmo.Options |= MQC.MQGMO_WAIT;
                        gmo.Options |= MQC.MQGMO_SYNCPOINT;

                        subs.Get(incoming, gmo);
                        message = incoming.ReadAll();

                        //subs.Close(MQC.MQCO_REMOVE_SUB, closeSubQueue: true, closeSubQueueOptions: MQC.MQCO_NONE);
                    }
                }
            }
            catch (MQException e)
            {
                message = e.Reason.ToString();
            }
            return message;
        }

向隊列推送一條消息:中間件

/// <summary>
        /// 向消息隊列推送一條消息
        /// </summary>
        /// <param name="msg">消息內容</param>
        /// <param name="queueName">隊列名稱</param>
        public void PushMsgToQueue(string msg, string queueName)
        {
            using (var mqmgr = MQQueueManager.Connect(IBMWMQConfig.QUEUE_MGR_NAME, MQC.MQCO_NONE, IBMWMQConfig.CHANNEL, IBMWMQConfig.CONNECTION_INFO))
            using (var q = new MQQueue(mqmgr, queueName, MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING))
            {
                var outgoing = new MQMessage()
                {
                    CharacterSet = MQC.CODESET_UTF,
                    Encoding = MQC.MQENC_NORMAL
                };
                outgoing.WriteString(msg);
                q.Put(outgoing, new MQPutMessageOptions());
            }
        }

        /// <summary>
        /// 向消息隊列推送一條消息
        /// </summary>
        /// <param name="msg">消息內容</param>
        /// <param name="queueName">隊列名稱</param>
        public void PushMsgToQueue1(string msg, string queueName)
        {
            using (var mqmgr = MQQueueManager.Connect(IBMWMQConfig.QUEUE_MGR_NAME, MQC.MQCO_NONE, IBMWMQConfig.CHANNEL, IBMWMQConfig.CONNECTION_INFO))
            {
                if (mqmgr.IsConnected)
                {
                    var outgoing = new MQMessage()
                    {
                        CharacterSet = MQC.CODESET_UTF,
                        Encoding = MQC.MQENC_NORMAL
                    };
                    outgoing.WriteString(msg);

                    var od = new MQObjectDescriptor
                    {
                        ObjectType = MQC.MQOT_Q,
                        ObjectName = queueName
                    };
                    mqmgr.Put1(od, outgoing, new MQPutMessageOptions());
                }
            }
        }
相關文章
相關標籤/搜索