RabbitMQ消息隊列幫助類

調用json

          //消息隊列發消息
            MqConfigInfo config = new MqConfigInfo();
            config.MQExChange = "DrawingOutput";
            config.MQQueueName = "DrawingOutput";
            config.MQRoutingKey = "DrawingOutput";
            MqHelper heper = new MqHelper(config);
            byte[] body = Encoding.UTF8.GetBytes("98K");//發送的內容
            heper.SendMsg(body);

消息隊列幫助類MqHelperui

using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Content;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace LogTest
{
    public class MqHelper : IDisposable
    {
        #region 消息隊列的配置信息

        public IConnection MQConnection { get; set; }

        public IModel MQModel { get; set; }

        public MqConfigInfo MqConfigInfo { get; set; }

        #endregion
        public MqHelper(MqConfigInfo configInfo)
        {
            MqConfigInfo = configInfo;
            var username = "guest";//用戶名
            //if (string.IsNullOrEmpty(username))
            //{
            //    throw new ConfigurationErrorsException("MQHelper配置節MQUserName錯誤");
            //}
            var password = "guest";//密碼
            //if (string.IsNullOrEmpty(password))
            //{
            //    throw new ConfigurationErrorsException("MQHelper配置節MQPassWord錯誤");
            //}
            var virtualhost = "mq_test";//虛擬主機名
            //if (string.IsNullOrEmpty(virtualhost))
            //{
            //    throw new ConfigurationErrorsException("MQHelper配置節MQVirtualHost錯誤");
            //}

            var connectionFactory = new ConnectionFactory
            {
                UserName = username,
                Password = password,
                VirtualHost = virtualhost,
                RequestedHeartbeat = 0,
                HostName = "192.168.1.49",//消息隊列的ip
                Port = 5672
            };

            try
            {
                MQConnection = connectionFactory.CreateConnection();
                MQModel = MQConnection.CreateModel();
                if (MqConfigInfo.MQExChangeType != null)
                {
                    MQModel.ExchangeDeclare(MqConfigInfo.MQExChange, MqConfigInfo.MQExChangeType);
                    QueueDeclareOk ok = MQModel.QueueDeclare(MqConfigInfo.MQQueueName, true, false, false, null);

                    MQModel.QueueBind(MqConfigInfo.MQQueueName, MqConfigInfo.MQExChange, MqConfigInfo.MQRoutingKey);
                }
            }
            catch (Exception ex)
            {
                throw new Exception("MQHelper建立鏈接失敗", ex);
            }
        }

        /// <summary>
        /// 發送消息
        /// </summary>
        /// <typeparam name="T">消息類型</typeparam>
        /// <param name="message">消息主體</param>
        /// <returns></returns>
        public bool SendMsg(object message)
        {
            try
            {
                IMapMessageBuilder mmb = new MapMessageBuilder(MQModel);
                System.Collections.Generic.IDictionary<string, object> header = mmb.Headers;
                //header["Header"] =MqConfigInfo.MQHeader;

                string json = JsonConvert.SerializeObject(message);

                byte[] body = Encoding.UTF8.GetBytes(json);
                if (MqConfigInfo.MQPersistModel)
                {
                    ((IBasicProperties)mmb.GetContentHeader()).DeliveryMode = 2;
                }
                MQModel.BasicPublish(MqConfigInfo.MQExChange, MqConfigInfo.MQRoutingKey, (IBasicProperties)mmb.GetContentHeader(), body);
            }
            catch (Exception ex)
            {
                throw ex;
            }
            return true;
        }

        /// <summary>
        /// 發送消息
        /// </summary>
        /// <param name="message">消息主體</param>
        /// <returns></returns>
        public bool SendMsg(byte[] message)
        {
            try
            {
                IMapMessageBuilder mmb = new MapMessageBuilder(MQModel);
                System.Collections.Generic.IDictionary<string, object> header = mmb.Headers;
                //header["Header"] =MqConfigInfo.MQHeader;
                if (MqConfigInfo.MQPersistModel)
                {
                    ((IBasicProperties)mmb.GetContentHeader()).DeliveryMode = 2;
                }
                MQModel.BasicPublish(MqConfigInfo.MQExChange, MqConfigInfo.MQRoutingKey, (IBasicProperties)mmb.GetContentHeader(), message);
            }
            catch (Exception ex)
            {
                throw ex;
            }
            return true;
        }

        /// <summary>
        /// 發送消息
        /// </summary>
        /// <param name="message">消息主體</param>
        /// <returns></returns>
        public bool SendMsg(string message)
        {
            try
            {
                IMapMessageBuilder mmb = new MapMessageBuilder(MQModel);
                System.Collections.Generic.IDictionary<string, object> header = mmb.Headers;
                //header["Header"] =MqConfigInfo.MQHeader;
                byte[] body = Encoding.UTF8.GetBytes(message);
                if (MqConfigInfo.MQPersistModel)
                {
                    ((IBasicProperties)mmb.GetContentHeader()).DeliveryMode = 2;
                }
                MQModel.BasicPublish(MqConfigInfo.MQExChange, MqConfigInfo.MQRoutingKey, (IBasicProperties)mmb.GetContentHeader(), body);
            }
            catch (Exception ex)
            {
                throw ex;
            }
            return true;
        }

        public void Dispose()
        {
            if (MQModel != null)
            {
                MQModel.Dispose();
            }

            if (MQConnection != null)
            {
                MQConnection.Dispose();
            }
        }
    }

    /// <summary>
    /// 消息隊列配置信息
    /// </summary>
    public class MqConfigInfo
    {
        public MqConfigInfo()
        {
            MQExChangeType = "direct";
            MQPersistModel = true;
        }

        /// <summary>
        /// 交換機
        /// </summary>
        public string MQExChange { get; set; }

        /// <summary>
        /// 交換機類型(fanout,direct,topic, headers)默認direct
        /// </summary>
        public string MQExChangeType { get; set; }

        /// <summary>
        /// 路由Key
        /// </summary>
        public string MQRoutingKey { get; set; }

        /// <summary>
        /// 消息頭
        /// </summary>
        public string MQHeader { get; set; }

        /// <summary>
        /// 消息的持久化
        /// </summary>
        public bool MQPersistModel { get; set; }

        /// <summary>
        /// 隊列名稱
        /// </summary>
        public string MQQueueName { get; set; }
    }
}
相關文章
相關標籤/搜索