RabbitMQ核心概念

AMQP的四個主要概念

一、虛擬主機(virtual host)或(vhosthtml

二、交換機(exchange算法

三、隊列(queue數據庫

四、綁定器(bind編程

什麼是虛擬主機?安全

  一組交換機、隊列和綁定器 被稱爲 虛擬主機(vhost)。服務器

爲何要用虛擬主機?多線程

  RabbitMQ server 能夠說就是一個消息隊列服務器實體(Broker),Broker當中能夠有多個用戶,而用戶只能在虛擬主機的粒度進行權限控制,因此RabbitMQ中須要多個虛擬主機。每個RabbitMQ服務器都有一個默認的虛擬主機「/」負載均衡

 隊列(queue)ide

  隊列是消息載體,每一個消息都會被投入到一個或多個隊列。試圖建立一個已經存在的隊列,RabbitMQ會直接忽略這個請求。(接收消息的實體)spa

把消息放進隊列前,咱們還須要使用另外一個東西:交換機。

交換機(exchange)

  它指定消息按什麼規則,路由到哪一個隊列。它能夠被理解成具備路由表的路由程序。(發送消息的實體)

交換機能夠存在多個,每一個交換機在本身獨立的進程當中執行,所以增長多個交換機就是增長多個進程,能夠充分利用服務器上的CPU核以便達到更高的效率。

交換機如何判斷要把消息送到哪一個隊列?這是咱們須要路由規則,也就須要綁定器了。

綁定器(bind)

  它的做用就是把exchange和queue按照路由規則綁定起來。(將交換器和隊列鏈接起來,而且封裝消息的路由信息)

channel.BasicPublish(exchange: "", routingKey: "writeLog", basicProperties: null, body: body);

每一個消息都有一個稱爲路由關鍵字(routingKey)的屬性,exchange根據這個關鍵字進行消息投遞,其實就是一個簡單的字符串

(綁定操做就能夠理解成:exchange將具備路由關鍵字 「X」 的消息投遞到到名爲「business」的隊列當中去。) 具體實踐請看下文。

從而一個綁定就能夠歸納爲:一個基於路由鍵交換機隊列鏈接起來的路由規則。

須要注意:由Exchange,Queue,RoutingKey三個,才能決定一個從Exchange到Queue的惟一的線路。

更多參考:http://www.ltens.com/article-6.html

 

程序中鏈接與消息使用的兩個關鍵概念

鏈接(Connection):

  與RabbitMQ Server創建的一個鏈接,由ConnectionFactory建立,每一個connection只與一個物理的Server進行鏈接,此鏈接是基於Socket進行鏈接的。AMQP通常使用TCP

通道 (Channel):

  消息通道(主要進行相關定義,發送消息,獲取消息,事務處理等),在客戶端的每一個鏈接裏,可創建多個channel,每一個channel表明一個會話任務

Channel在.net的客戶端程序裏應該是叫「Model」,採用IModel CreateModel()建立的,可是其餘語言的客戶端都叫Channel。須要注意:一個Connection能夠有多個Channel。

爲何設計中引入Channel概念?

  一個比較廣泛的需求:客戶端程序有時候會是一個多線程程序,每個線程都想要和RabbitMQ進行鏈接,可是又不想共享一個鏈接

由於一個Connection就是一個TCP連接,RabbitMQ在設計的時候不但願與每個客戶端保持多個TCP鏈接,但這確實是有些客戶端的需求,每個Channel之間沒有任何聯繫,是徹底分離的。

創建在Connection基礎上的一個Channel,相對於Connection來講,它是輕量級的。Channel能夠在多線程中使用,可是在必須保證任什麼時候候只有一個線程執行命令。

交換機類型

  有4種:direct【默認的類型】,fanout,topic,headers。其中headers不經常使用,本篇不作介紹,其餘三種類型,會作詳細介紹。

Exchange與隊列進行綁定後,消息根據exchang的類型,按照不一樣的綁定規則分發消息到消息隊列中,能夠是一個消息被分發給多個消息隊列,也能夠是一個消息分發到一個消息隊列。具體請看下文。

介紹之初還要說下RoutingKey,這是個什麼玩意呢?他是exchange與消息隊列綁定中的一個標識。有些路由類型會按照標識對應消息隊列,有些路由類型忽略routingkey。

一、Fanout: 廣播模式,會忽略路由鍵Routingkey,將消息廣播給綁定到該交換機的全部隊列。 不論消息的路由關鍵字是什麼,這條消息都會被路由到全部與該交換器綁定的隊列中。

廣播式交換器類型的工做方式以下:

不使用任何參數將消息隊列與交換器綁定在一塊兒。

發佈者(直接式交換器類型描述中的producer變成了publisher,已經隱含了二種交換器類型的區別)向交換器發送一條消息。 消息被無條件的傳遞到全部和這個交換器綁定的消息隊列中。

二、Direct: 根據路由鍵和交換器來找隊列的,對消息路徑進行全文匹配。消息路由鍵 "sunshine" 只能匹配 "sunshine" 綁定,不匹配 "sunshine.warm" 這類綁定。 

經過精確匹配消息的路由關鍵字,將消息路由到零個或者多個隊列中,綁定關鍵字用來將隊列和交換器綁定到一塊兒。這讓咱們能夠構建經典的點對點隊列消息傳輸模型,不過和任何已定義的交換器類型同樣,當消息的路由關鍵字與多個綁定關鍵字匹配時,消息可能會被髮送到多個隊列中。

direct模式下還能夠實現多路綁定,即一個exchange和多個queue綁定時,具備一樣的bindkey,以下圖:

 

三、Topic: 主題模式,處理路由鍵,按模式匹配路由鍵。

模式符號:

"#" 表示一個或多個單詞,"*" 僅匹配一個單詞。

如 "wood.#" 可匹配 "wood.palm.redwood",但 "wood.*" 只匹配 "wood.deadwood"。 

主題式交換器類型提供了這樣的路由機制:經過消息的路由關鍵字和綁定關鍵字的模式匹配,將消息路由到被綁定的隊列中。這種路由器類型能夠被用來支持經典的發佈/訂閱消息傳輸模型——使用主題名字空間做爲消息尋址模式,將消息傳遞給那些部分或者所有匹配主題模式的多個消費者。

主題交換器類型的工做方式以下:

綁定關鍵字用零個或多個標記構成,每個標記之間用.」字符分隔。綁定關鍵字必須用這種形式明確說明,並支持通配符:*」匹配一個詞組#零個或多個詞組
所以綁定關鍵字「*.dask.#」匹配路由關鍵字「class.dask」和「eur.dask.tab」,可是不匹配「dask.rho」。

更多參考:RabbitMQ交換器Exchange介紹與實踐

消息持久化

問題及方案描述

1.當有多個消費者同時收取消息,且每一個消費者在接收消息的同時,還要處理其它的事情,且會消耗很長的時間。在此過程當中可能會出現一些意外,好比消息接收到一半的時候,一個消費者死掉了。

這種狀況要使用消息接收確認機制,能夠執行上次宕機的消費者沒有完成的事情。

2.在默認狀況下,咱們程序建立的消息隊列以及存放在隊列裏面的消息,都是非持久化的。當RabbitMQ死掉了或者重啓了,上次建立的隊列、消息都不會保存。

這種狀況可使用RabbitMQ提供的消息隊列的持久化機制。

相關理論描述

  RabbitMQ支持消息的持久化,也就是數據寫在磁盤上,爲了數據安全考慮,我我的以爲大多數開發人員都會選擇持久化。

隊列和交換機有一個建立時候指定的標誌durabledurable的惟一含義就是具備這個標誌的隊列和交換機會在重啓以後從新創建,它不表示說在隊列當中的消息會在重啓後恢復。

消息隊列持久化包括3個部分:

一、exchange持久化,在聲明時指定durable => true
二、queue持久化,在聲明時指定durable => true
三、消息持久化,在投遞時指定delivery_mode=> 2(1是非持久化)

若是exchange和queue都是持久化的,那麼它們之間的binding也是持久化的。若是exchange和queue二者之間有一個持久化,一個非持久化,就不容許創建綁定。

注意:一旦建立了隊列和交換機,就不能修改其標誌了。例如,若是建立了一個non-durable的隊列,而後想把它改變成durable的,惟一的辦法就是刪除這個隊列而後重現建立

生產者

class Producter
    {
        const string ExchangeName = "eric.exchange";
        const string QueueName = "eric.queue";
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//聲明消息隊列,且爲可持久化的
                channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//聲明消息隊列,且爲可持久化的
                channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);

                string message = "Eric is very handsome";
                var body = Encoding.UTF8.GetBytes(message);

                //將隊列設置爲持久化以後,還須要將消息也設爲可持久化的
                var props = channel.CreateBasicProperties();
                props.SetPersistent(true);

                channel.BasicPublish(ExchangeName, routingKey: QueueName, basicProperties: props, body: body);

                Console.WriteLine("Producter Sent: {0}", message);
                Console.ReadKey();
            }
        }
    }
View Code

注:ack是 acknowledgments 的縮寫,noAck 是("no manual acks")

程序運行結果:

 

消費者

class Recevice
    {
        const string ExchangeName = "eric.exchange";
        const string QueueName = "eric.queue";
        public static void Main()
        {
            var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", VirtualHost = "/" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//聲明消息隊列,且爲可持久化的
                channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//聲明消息隊列,且爲可持久化的
                channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);

                BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck: true);
                //NoAck:true 告訴RabbitMQ當即從隊列中刪除消息,另外一個很是受歡迎的方式是從隊列中刪除已經確認接收的消息,能夠經過單獨調用BasicAck 進行確認:
                //BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck:false);
                var msgContent = Encoding.UTF8.GetString(msgResponse.Body);

                Console.WriteLine("The received content:"+msgContent);

                channel.BasicAck(msgResponse.DeliveryTag, multiple: false);
                //使用BasicAck方式來告之是否從隊列中移除該條消息
                //須要額外注意,好比從隊列中獲取消息並用它來操做數據庫或日誌文件時,若是出現操做失敗時,則該條消息應該保留在隊列中,只到操做成功時才從隊列中移除。
                Console.ReadKey();
            }
        }
    }
View Code

接受消息還有一種方法,就是經過基於推送的事件訂閱。可使用內置的 QueueingBasicConsumer 提供簡化的編程模型,容許在共享隊列上阻塞,直到收到一條消息。

var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(QueueName, noAck: true, consumer: consumer);
var msgResponse = consumer.Queue.Dequeue(); 
var msgContent = Encoding.UTF8.GetString(msgResponse.Body);

程序運行結果:

消費者消息的確認

一、消息隊列的消費

Note:若是一個消息隊列中有大量消息等待操做時,咱們能夠用多個客戶端來處理消息,這裏的分發機制是採用負載均衡算法中的輪詢。第一個消息給A,下一個消息給B,下下一個消息給A,下下下一個消息給B......以此類推。

二、爲保證消息的安全性,保證此消息被正確處理後才能在服務端的消息隊列中刪除。那麼rabbitmq提供了ack應答機制,來實現這一功能

ack應答有兩種方式:一、自動應答,二、手動應答。具體實現以下。

public static void Consumer()
        {
            try
            {
                var qName = "lhtest1";
                var exchangeName = "fanoutchange1";
                var exchangeType = "fanout";//topic、fanout
                var routingKey = "*";
                var uri = new Uri("amqp://xxxx:5672/");
                var factory = new ConnectionFactory
                {
                    UserName = "123",
                    Password = "123",
                    RequestedHeartbeat = 0,
                    Endpoint = new AmqpTcpEndpoint(uri)
                };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchangeName, exchangeType);
                        channel.QueueDeclare(qName, true, false, false, null);
                        channel.QueueBind(qName, exchangeName, routingKey);
                        //定義這個隊列的消費者
                        QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                        //false爲手動應答,true爲自動應答
                        channel.BasicConsume(qName, false, consumer);
                        while (true)
                        {
                            BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();                           
                            byte[] bytes = ea.Body;
                            var messageStr = Encoding.UTF8.GetString(bytes);
                            var message = DoJson.JsonToModel<QMessage>(messageStr);
                            Console.WriteLine("Receive a Message, DateTime:" + message.DateTime.ToString("yyyy-MM-dd HH:mm:ss") + " Title:" + message.Title);
                            //若是是自動應答,下下面這句代碼不用寫啦。
                            if ((Convert.ToInt32(message.Title) % 2) == 1)
                            {
                                channel.BasicAck(ea.DeliveryTag, false);
                            }
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
View Code
相關文章
相關標籤/搜索