RabbitMQ 消息隊列

一:簡介html

  RabbitMQ是一個在AMQP協議標準基礎上完整的,可服用的企業消息系統。他遵循Mozilla Public License開源協議。採用 Erlang 實現的工業級的消息隊列(MQ)服務器json

RabbitMQ的官方站:http://www.rabbitmq.com/ 
     AMQP(高級消息隊列協議) 是一個異步消息傳遞所使用的應用層協議規範,做爲線路層協議,而不是API(例如JMS),AMQP 客戶端可以無視消息的來源任意發送和接受信息。AMQP的原始用途只是爲金融界提供一個能夠彼此協做的消息協議,而如今的目標則是爲通用消息隊列架構提供通用構建工具。所以,面向消息的中間件 (MOM)系統,例如發佈/訂閱隊列,沒有做爲基本元素實現。反而經過發送簡化的AMQ實體,用戶被賦予了構建例如這些實體的能力。這些實體也是規範的一 部分,造成了在線路層協議頂端的一個層級:AMQP模型。這個模型統一了消息模式,諸如以前提到的發佈/訂閱,隊列,事務以及流數據,而且添加了額外的特性,例如更易於擴展,基於內容的路由。 windows

 

1.下載Erlang服務器

http://www.erlang.org/download.html架構

有32位與64位版本可供選擇,根據我的須要下載相應版本,下載好後先安裝Erlang異步

2.下載RabbitMQ服務器安裝文件函數

 http://www.rabbitmq.com/install-windows.html工具

安裝好RabbitMQ服務器端環境後,RabbitMQ將會以服務的形式駐留在服務器上,默認是開啓狀態spa

.在 cmd 中指向 sbin 目錄,並輸入如下命令操作系統

rabbitmq-plugins enable rabbitmq_management

http://localhost:15672,在登錄界面輸入用戶名:guest,密碼:guest,便可進入管理界面查看各類信息 

二:RabbitMQ 示例

發送消息端

  class Program
    {
        static void Main(string[] args)
        {
            UserHelp mes =new UserHelp();
            while (true)
            {
                var result = mes.GetUserMessage();
                Console.WriteLine( result);
            }

        }
    }

 

 public string GetUserMessage()
        {
            try
            {
                var conFactory = new ConnectionFactory();
                conFactory.HostName = "localhost";
                conFactory.UserName = "wangdongsheng";
                conFactory.Password = "123456";
                conFactory.VirtualHost = "/";

                using (var connect = conFactory.CreateConnection())
                {
                    //rabbitmq服務端
                    using (var channel = connect.CreateModel())
                    {
                        //在MQ上定義一個持久化隊列,若是名稱相同不會重複建立
                        channel.QueueDeclare("TestQueue", true, false, false, null);
                        // //輸入1,那若是接收一個消息,可是沒有應答,則客戶端不會收到下一個消息
                        channel.BasicQos(0, 1, false);
                        //在隊列上定義一個消費者
                        var consumer = new QueueingBasicConsumer(channel);
                        //消費隊列,並設置應答模式爲程序主動應答
                        channel.BasicConsume("TestQueue", false, consumer);
                        while (true)
                        {
                            //阻塞函數,獲取隊列中的消息
                            var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                            byte[] bytes = ea.Body;
                            string str = Encoding.UTF8.GetString(bytes);
                            var msg = JsonConvert.DeserializeObject<RequestMsg>(str);
                            //恢復確認
                            channel.BasicAck(ea.DeliveryTag, false);
                            return "接收的數據:"+"發送人-"+msg.Name+";發送內容-"+msg.Code;
                        }

                    }
                }
            }
            catch (Exception ex)
            {
                return ex.Message;
            }
        }
   public class RequestMsg
    {
        public string Name { get; set; }
        public string Code { get; set; }
    }

接收消息端

 class Program
    {
        private static void Main(string[] args)
        {

            SendHelp help=new SendHelp();
            while (true)
            {
                Console.WriteLine("請輸入內容:");
                var write = Console.ReadLine();
                if (!string.IsNullOrWhiteSpace(write))
                {
                    var result = help.SendMessage(write);
                    Console.ForegroundColor = ConsoleColor.DarkYellow;
                    Console.WriteLine(result);
                    Console.ForegroundColor = ConsoleColor.White;
                }
            }
            Console.ReadKey();
        }
    }
        public string SendMessage(string msg)
        {
            try
            {
                var conFactory = new ConnectionFactory();
                conFactory.HostName = "localhost";
                conFactory.UserName = "wangdongsheng";
                conFactory.Password = "123456";
                conFactory.VirtualHost = "/";

                using (IConnection connect = conFactory.CreateConnection())
                {
                    //rabbitmq服務端
                    using (var channel = connect.CreateModel())
                    {
                        //在MQ上定義一個持久化隊列,若是名稱相同不會重複建立
                        channel.QueueDeclare("TestQueue", true, false, false, null);
                        while (true)
                        {
                            var requetMsg = new RequestMsg();
                            requetMsg.Name = "東昇";
                            requetMsg.Code = msg;
                            string jsonStr = JsonConvert.SerializeObject(requetMsg);
                            byte[] bytes = Encoding.UTF8.GetBytes(jsonStr);
                            IBasicProperties pro = channel.CreateBasicProperties();
                            pro.DeliveryMode = 2;
                            channel.BasicPublish("", "TestQueue", pro, bytes);
                            return "發送消息成功:發送人-"+requetMsg.Name+";消息內容-" + requetMsg.Code;
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                return "發送的消息爲:" + ex.Message;
            }
        }

 

1.Server(broker): 接受客戶端鏈接,實現AMQP消息隊列和路由功能的進程。

2.Virtual Host:實際上是一個虛擬概念,相似於權限控制組,一個Virtual Host裏面能夠有若干個Exchange和Queue,可是權限控制的最小粒度是Virtual Host

3.Exchange:接受生產者發送的消息,並根據Binding規則將消息路由給服務器中的隊列。ExchangeType決定了Exchange路由消息的行爲,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三種,不一樣類型的Exchange路由的行爲是不同的。

4.Message Queue:消息隊列,用於存儲還未被消費者消費的消息。

5.Message: 由Header和Body組成,Header是由生產者添加的各類屬性的集合,包括Message是否被持久化、由哪一個Message Queue接受、優先級是多少等。而Body是真正須要傳輸的APP數據。

6.Binding:Binding聯繫了Exchange與Message Queue。Exchange在與多個Message Queue發生Binding後會生成一張路由表,路由表中存儲着Message Queue所需消息的限制條件即Binding Key。當Exchange收到Message時會解析其Header獲得Routing Key,Exchange根據Routing Key與Exchange Type將Message路由到Message Queue。Binding Key由Consumer在Binding Exchange與Message Queue時指定,而Routing Key由Producer發送Message時指定,二者的匹配方式由Exchange Type決定。 

7.Connection:鏈接,對於RabbitMQ而言,其實就是一個位於客戶端和Broker之間的TCP鏈接。

8.Channel:信道,僅僅建立了客戶端到Broker之間的鏈接後,客戶端仍是不能發送消息的。須要爲每個Connection建立Channel,AMQP協議規定只有經過Channel才能執行AMQP的命令。一個Connection能夠包含多個Channel。之因此須要Channel,是由於TCP鏈接的創建和釋放都是十分昂貴的,若是一個客戶端每個線程都須要與Broker交互,若是每個線程都創建一個TCP鏈接,暫且不考慮TCP鏈接是否浪費,就算操做系統也沒法承受每秒創建如此多的TCP鏈接。RabbitMQ建議客戶端線程之間不要共用Channel,至少要保證共用Channel的線程發送消息必須是串行的,可是建議儘可能共用Connection。

9.Command:AMQP的命令,客戶端經過Command完成與AMQP服務器的交互來實現自身的邏輯。例如在RabbitMQ中,客戶端能夠經過publish命令發送消息,txSelect開啓一個事務,txCommit提交一個事務。
相關文章
相關標籤/搜索