【轉】RabbitMQ基礎——和——持久化機制

這裏原來有一句話,觸犯啦天條,被閹割!!!!html

首先不去討論個人日誌組件怎麼樣。由於有些日誌須要走網絡,有的又不須要走網路,也是有性能與業務場景的多般變化在其中,就把他拋開,咱們只談消息RabbitMQ。java

那麼什麼是RabbitMQ,它是用來解決什麼問題的,性能如何,又怎麼用?我會在下面一一闡述,若有錯誤,不到之處,還望你們不吝賜教。算法

RabbitMQ簡介

必須一提的是rabbitmq是由LShift提供的一個消息隊列協議(AMQP)的開源實現,由以高性能、健壯以及可伸縮性出名的Erlang寫成(所以也是繼承了這些優勢)。安全

百度百科對RabbitMQ闡述也很是明確,建議去看下,還有amqp協議。網絡

RabbitMQ官網:http://www.rabbitmq.com/ 若是你要下載安裝,那麼必須先把Erlang語言裝上。負載均衡

RabbitMQ的.net客戶端,能夠在nuget中輸入rabbitmq輕鬆得到。dom

RabbitMQ與其餘消息隊列的對比,早有仙人給寫出來。 Message Queue Shootoutpost

這篇文章中的測試案例爲:1百萬條1k的消息,每秒種的收發狀況以下圖。性能

 

若是你安裝好啦,rabbitmq,他會提供一個操做監控頁面,頁面以下,他幾乎提供啦,對rabbitmq的全部操做,與監控,因此,你裝上後,本身多看看,多操做下。測試

 

RabbitMQ中的一些名詞闡述與消息從投遞到消費的整個過程

從上圖的標題中能夠看到一些陌生的英文單詞,讓咱們感受一無所知,更無從操做,那麼我給你們弄啦一個圖片你們能夠看下,或許對您理解這些新鮮的單詞有所幫助。

 

看過這些名詞,以後,或許你還毫無頭緒,那麼我把消息從生產到消費的整個流程給你們說一下,或許會更深刻一點,其中Exchange,與Queue都是能夠設置相關屬性,隊列的持久化,交換器類型制定。

 

Note:首先這個過程走分三個部分,一、客戶端(生產消息隊列),二、RabbitMQ服務端(負責路由規則的綁定與消息的分發),三、客戶端(消費消息隊列中的消息)

 

 

Note:由圖能夠看出,一個消息能夠走一次網絡卻被分發到不一樣的消息隊列中,而後被多個的客戶端消費,那麼這個過程就是RabbitMQ的核心機制,RabbitMQ的路由類型與消費模式。

RabbitMQ中Exchange的類型

類型有4種,direct,fanout,topic,headers。其中headers不經常使用,本篇不作介紹,其餘三種類型,會作詳細介紹。

那麼這些類型是什麼意思呢?就是Exchange與隊列進行綁定後,消息根據exchang的類型,按照不一樣的綁定規則分發消息到消息隊列中,能夠是一個消息被分發給多個消息隊列,也能夠是一個消息分發到一個消息隊列。具體請看下文。

介紹之初還要說下RoutingKey,這是個什麼玩意呢?他是exchange與消息隊列綁定中的一個標識。有些路由類型會按照標識對應消息隊列,有些路由類型忽略routingkey。具體看下文。

一、Exchange類型direct

他是根據交換器名稱與routingkey來找隊列的。

 

Note:消息從client發出,傳送給交換器ChangeA,RoutingKey爲routingkey.ZLH,那麼無論你發送給Queue1,仍是Queue2一個消息都會保存在Queue1,Queue2,Queue3,三個隊列中。這就是交換器的direct類型的路由規則。只要找到路由器與routingkey綁定的隊列,那麼他有多少隊列,他就分發給多少隊列。

二、Exchange類型fanout

這個類型忽略Routingkey,他爲廣播模式。

 

Note:消息從客戶端發出,只要queue與exchange有綁定,那麼他無論你的Routingkey是什麼他都會將消息分發給全部與該exchang綁定的隊列中。

三、Exchange類型topic

這個類型的路由規則若是你掌握啦,那是至關的好用,與靈活。他是根據RoutingKey的設置,來作匹配的,其中這裏還有兩個通配符爲:

*,表明任意的一個詞。例如topic.zlh.*,他可以匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....

#,表明任意多個詞。例如topic.#,他可以匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....

 

Note:這個圖看上去很亂,可是他是根據匹配符作匹配的,這裏我建議你本身作下消息隊列的具體操做。

具體操做以下

複製代碼
 public static void Producer(int value)
        {
            try
            {
                var qName = "lhtest1";
                var exchangeName = "fanoutchange1";
                var exchangeType = "fanout";//topic、fanout
                var routingKey = "*";
                var uri = new Uri("amqp://192.168.10.121:5672/");
                var factory = new ConnectionFactory
                {
                    UserName = "123",
                    Password = "123",
                    RequestedHeartbeat = 0,
                    Endpoint = new AmqpTcpEndpoint(uri)
                };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        //設置交換器的類型
                        channel.ExchangeDeclare(exchangeName, exchangeType);
                        //聲明一個隊列,設置隊列是否持久化,排他性,與自動刪除
                        channel.QueueDeclare(qName, true, false, false, null);
                        //綁定消息隊列,交換器,routingkey
                        channel.QueueBind(qName, exchangeName, routingKey);
                        var properties = channel.CreateBasicProperties();
                        //隊列持久化
                        properties.Persistent = true;
                        var m = new QMessage(DateTime.Now, value+"");
                        var body = Encoding.UTF8.GetBytes(DoJson.ModelToJson<QMessage>(m));
                        //發送信息
                        channel.BasicPublish(exchangeName, routingKey, properties, body);
                    }
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
複製代碼

消息隊列的消費與消息確認Ack

一、消息隊列的消費

Note:若是一個消息隊列中有大量消息等待操做時,咱們能夠用多個客戶端來處理消息,這裏的分發機制是採用負載均衡算法中的輪詢。第一個消息給A,下一個消息給B,下下一個消息給A,下下下一個消息給B......以此類推。

二、爲啦保證消息的安全性,保證此消息被正確處理後才能在服務端的消息隊列中刪除。那麼rabbitmq提供啦ack應答機制,來實現這一功能。

ack應答有兩種方式:一、自動應答,二、手動應答。具體實現以下。

複製代碼
 public static void Consumer()
        {
            try
            {
                var qName = "lhtest1";
                var exchangeName = "fanoutchange1";
                var exchangeType = "fanout";//topic、fanout
                var routingKey = "*";
                var uri = new Uri("amqp://192.168.10.121:5672/");
                var factory = new ConnectionFactory
                {
                    UserName = "123",
                    Password = "123",
                    RequestedHeartbeat = 0,
                    Endpoint = new AmqpTcpEndpoint(uri)
                };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchangeName, exchangeType);
                        channel.QueueDeclare(qName, true, false, false, null);
                        channel.QueueBind(qName, exchangeName, routingKey);
                        //定義這個隊列的消費者
                        QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                        //false爲手動應答,true爲自動應答
                        channel.BasicConsume(qName, false, consumer);
                        while (true)
                        {
                            BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();                           
                            byte[] bytes = ea.Body;
                            var messageStr = Encoding.UTF8.GetString(bytes);
                            var message = DoJson.JsonToModel<QMessage>(messageStr);
                            Console.WriteLine("Receive a Message, DateTime:" + message.DateTime.ToString("yyyy-MM-dd HH:mm:ss") + " Title:" + message.Title);
                            //若是是自動應答,下下面這句代碼不用寫啦。
                            if ((Convert.ToInt32(message.Title) % 2) == 1)
                            {
                                channel.BasicAck(ea.DeliveryTag, false);
                            }
                        }
                    }
                }
            }

 

RabbitMQ持久化機制

核心代碼:

        channel.queueDeclare(queue_name, durable, false, false, null); //聲明消息隊列,且爲可持久化的
         String message="Hello world"+Math.random();
         //將隊列設置爲持久化以後,還須要將消息也設爲可持久化的,MessageProperties.PERSISTENT_TEXT_PLAIN
         channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

消息何時須要持久化?

根據 官方博文 的介紹,RabbitMQ在兩種狀況下會將消息寫入磁盤:

 

  1. 消息自己在publish的時候就要求消息寫入磁盤;
  2. 內存緊張,須要將部份內存中的消息轉移到磁盤;

 

消息何時會刷到磁盤?

 

  1. 寫入文件前會有一個Buffer,大小爲1M(1048576),數據在寫入文件時,首先會寫入到這個Buffer,若是Buffer已滿,則會將Buffer寫入到文件(未必刷到磁盤);
  2. 有個固定的刷盤時間:25ms,也就是無論Buffer滿不滿,每隔25ms,Buffer裏的數據及未刷新到磁盤的文件內容一定會刷到磁盤;
  3. 每次消息寫入後,若是沒有後續寫入請求,則會直接將已寫入的消息刷到磁盤:使用Erlang的receive x after 0來實現,只要進程的信箱裏沒有消息,則產生一個timeout消息,而timeout會觸發刷盤操做。

 

消息在磁盤文件中的格式

消息保存於$MNESIA/msg_store_persistent/x.rdq文件中,其中x爲數字編號,從1開始,每一個文件最大爲16M(16777216),超過這個大小會生成新的文件,文件編號加1。消息以如下格式存在於文件中:

 

<<Size:64, MsgId:16/binary, MsgBody>>

 

MsgId爲RabbitMQ經過rabbit_guid:gen()每個消息生成的GUID,MsgBody會包含消息對應的exchange,routing_keys,消息的內容,消息對應的協議版本,消息內容格式(二進制仍是其它)等等。

文件什麼時候刪除?

當全部文件中的垃圾消息(已經被刪除的消息)比例大於閾值(GARBAGE_FRACTION = 0.5)時,會觸發文件合併操做(至少有三個文件存在的狀況下),以提升磁盤利用率。

publish消息時寫入內容,ack消息時刪除內容(更新該文件的有用數據大小),當一個文件的有用數據等於0時,刪除該文件。

消息索引何時須要持久化?

索引的持久化與消息的持久化相似,也是在兩種狀況下須要寫入到磁盤中:要麼自己須要持久化,要麼由於內存緊張,須要釋放部份內存。

消息索引何時會刷到磁盤?

 

  1. 有個固定的刷盤時間:25ms,索引文件內容一定會刷到磁盤;
  2. 每次消息(及索引)寫入後,若是沒有後續寫入請求,則會直接將已寫入的索引刷到磁盤,實現上與消息的timeout刷盤一致。

RabbitMQ(二)隊列與消息的持久化

 

當有多個消費者同時收取消息,且每一個消費者在接收消息的同時,還要作其它的事情,且會消耗很長的時間,在此過程當中可能會出現一些意外,好比消息接收到一半的時候,一個消費者宕掉了,這時候就要使用消息接收確認機制,可讓其它的消費者再次執行剛纔宕掉的消費者沒有完成的事情。另外,在默認狀況下,咱們建立的消息隊列以及存放在隊列裏面的消息,都是非持久化的,也就是說當RabbitMQ宕掉了或者是重啓了,建立的消息隊列以及消息都不會保存,爲了解決這種狀況,保證消息傳輸的可靠性,咱們可使用RabbitMQ提供的消息隊列的持久化機制。

 

 

 生產者:

複製代碼
 1 import com.rabbitmq.client.ConnectionFactory;
 2 import com.rabbitmq.client.Connection;
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.MessageProperties;
 5 public class ClientSend1 {
 6     public static final String queue_name="my_queue";
 7     public static final boolean durable=true; //消息隊列持久化
 8     public static void main(String[] args)
 9     throws java.io.IOException{
10         ConnectionFactory factory=new ConnectionFactory(); //建立鏈接工廠
11         factory.setHost("localhost");
12         factory.setVirtualHost("my_mq");
13         factory.setUsername("zhxia");
14         factory.setPassword("123456");
15         Connection connection=factory.newConnection(); //建立鏈接
16         Channel channel=connection.createChannel();//建立信道
17         channel.queueDeclare(queue_name, durable, false, false, null); //聲明消息隊列,且爲可持久化的
18         String message="Hello world"+Math.random();
19         //將隊列設置爲持久化以後,還須要將消息也設爲可持久化的,MessageProperties.PERSISTENT_TEXT_PLAIN
20         channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
21         System.out.println("Send message:"+message);
22         channel.close();
23         connection.close();
24     }
25 
26 }
複製代碼

 

說明:

行17 和行20 須要同時設置,也就是將隊列設置爲持久化以後,還須要將發送的消息也要設置爲持久化才能保證隊列和消息一直存在

 消費者:

複製代碼
 1 import com.rabbitmq.client.ConnectionFactory;
 2 import com.rabbitmq.client.Connection;
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.QueueingConsumer;
 5 public class ClientReceive1 {
 6     public static final String queue_name="my_queue";
 7     public static final boolean autoAck=false;
 8     public static final boolean durable=true;
 9     public static void main(String[] args)
10     throws java.io.IOException,java.lang.InterruptedException{
11         ConnectionFactory factory=new ConnectionFactory();
12         factory.setHost("localhost");
13         factory.setVirtualHost("my_mq");
14         factory.setUsername("zhxia");
15         factory.setPassword("123456");
16         Connection connection=factory.newConnection();
17         Channel channel=connection.createChannel();
18         channel.queueDeclare(queue_name, durable, false, false, null);
19         System.out.println("Wait for message");
20         channel.basicQos(1); //消息分發處理
21         QueueingConsumer consumer=new QueueingConsumer(channel);
22         channel.basicConsume(queue_name, autoAck, consumer);
23         while(true){
24             Thread.sleep(500);
25             QueueingConsumer.Delivery deliver=consumer.nextDelivery();
26             String message=new String(deliver.getBody());
27             System.out.println("Message received:"+message);
28             channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);
29         }
30     }
31 }
複製代碼

 說明:

行22: 設置RabbitMQ調度分發消息的方式,也就是告訴RabbitMQ每次只給消費者處理一條消息,也就是等待消費者處理完而且已經對剛纔處理的消息進行確認以後, 才發送下一條消息,防止消費者太過於忙碌。以下圖所示:

 

相關文章
相關標籤/搜索