在C#中使用消息隊列RabbitMQ

一、什麼是RabbitMQ。詳見 http://www.rabbitmq.com/html

    做用就是提升系統的併發性,將一些不須要及時響應客戶端且佔用較多資源的操做,放入隊列,再由另一個線程,去異步處理這些隊列,可極大的提升系統的併發能力。web

二、安裝json

    RabbitMQ服務:http://www.rabbitmq.com/download.html
    (安裝完RabbitMQ服務後,會在Windows服務中看到。若是沒有Erlang運行環境,在安裝過程當中會提醒先安裝Erlang環境。http://www.erlang.org/downloads)安全

    .net客戶端類庫:http://www.rabbitmq.com/dotnet.html併發

 

下載Erlang環境並安裝異步

地址:http://www.erlang.org/downloadstcp

而後安裝RabbitMQ,安裝成功後會在服務中看到該服務。函數

 

三、插件工具

     RabbitMQ提供了不少好用的插件,最經常使用的就是web管理工具,啓動此插件。oop

     CMD中運行命令:rabbitmq-plugins enable rabbitmq_management

     注:rabbitmq-plugins 所在路徑爲:D:\Program Files\RabbitMQ Server\rabbitmq_server-3.4.0\sbin

     web管理工具的地址是:http://localhost:15672,初始用戶名:guest 初始密碼:guest

四、配置

    配置文件地址爲:C:\Documents and Settings\Administrator\Application Data\RabbitMQ\rabbitmq.config,默認沒有rabbit.config文件,須要手工新建(默認會有rabbitmq.config.example 做爲參考)。基於安全,作了兩個配置,以下:

    

複製代碼
[
{rabbit,
[
{loopback_users, [<<"guest">>]},
{tcp_listeners, [{"127.0.0.1", 1234},
{"10.121.1.48", 8009}]}

]}
].
複製代碼

loopback_users:設置只能在與RabbitMq服務同一臺機器上訪問服務的用戶。

tcp_listeners:設置RabbitMQ監聽的IP地址與端口。只監聽局域網內網iP、修改默認端口,防止被入侵攻擊。

設置完後,別忘記了如下操做,不然配置不起做用。

  • 中止RabbitMQ服務;
  • 從新安裝服務使配置生效:rabbitmq-service.bat install

        此命令要切換到路徑:D:\Program Files\RabbitMQ Server\rabbitmq_server-3.4.0\sbin

  • 啓動RabbitMQ服務;

五、Demo練習。

複製代碼
消息生產者:

class Program
    {
        static void Main(string[] args)
        {
            try
            {
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = Constants.MqHost;
                factory.Port = Constants.MqPort;
                factory.UserName = Constants.MqUserName;
                factory.Password = Constants.MqPwd;
                using (IConnection conn = factory.CreateConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        //在MQ上定義一個持久化隊列,若是名稱相同不會重複建立
                        channel.QueueDeclare("MyFirstQueue", true, false, false, null);
                        while (true)
                        {
                            string customStr = Console.ReadLine();
                            RequestMsg requestMsg = new RequestMsg();
                            requestMsg.Name = string.Format("Name_{0}", customStr);
                            requestMsg.Code = string.Format("Code_{0}", customStr);
                            string jsonStr = JsonConvert.SerializeObject(requestMsg);
                            byte[] bytes = Encoding.UTF8.GetBytes(jsonStr);
                            
                            //設置消息持久化
                            IBasicProperties properties = channel.CreateBasicProperties();
                            properties.DeliveryMode = 2;
                            channel.BasicPublish("", "MyFirstQueue", properties, bytes);

                            //channel.BasicPublish("", "MyFirstQueue", null, bytes);

                            Console.WriteLine("消息已發送:" + requestMsg.ToString());
                        }
                    }
                }
            }
            catch (Exception e1)
            {
                Console.WriteLine(e1.ToString());
            }
            Console.ReadLine();
        }
    }
複製代碼

 

複製代碼
class Program
    {
        static void Main(string[] args)
        {
            try
            {
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = Constants.MqHost;
                factory.Port = Constants.MqPort;
                factory.UserName = Constants.MqUserName;
                factory.Password = Constants.MqPwd;
                using (IConnection conn = factory.CreateConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        //在MQ上定義一個持久化隊列,若是名稱相同不會重複建立
                        channel.QueueDeclare("MyFirstQueue", true, false, false, null);

                        //輸入1,那若是接收一個消息,可是沒有應答,則客戶端不會收到下一個消息
                        channel.BasicQos(0, 1, false);
                        
                        Console.WriteLine("Listening...");

                        //在隊列上定義一個消費者
                        QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                        //消費隊列,並設置應答模式爲程序主動應答
                        channel.BasicConsume("MyFirstQueue", false, consumer);

                        while (true)
                        {
                            //阻塞函數,獲取隊列中的消息
                            BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                            byte[] bytes = ea.Body;
                            string str = Encoding.UTF8.GetString(bytes);
                            RequestMsg msg = JsonConvert.DeserializeObject<RequestMsg>(str);
                            Console.WriteLine("HandleMsg:" + msg.ToString());
                            //回覆確認
                            channel.BasicAck(ea.DeliveryTag, false);
                        }
                    }
                }
            }
            catch (Exception e1)
            {
                Console.WriteLine(e1.ToString());
            }
            Console.ReadLine();
        }
    }
複製代碼

 

相關文章
相關標籤/搜索