RabbitMQ介紹

(一)RabbitMQ基本概念

  RabbitMQ是流行的開源消息隊列系統,用erlang語言開發。我曾經對這門語言挺有興趣,學過一段時間,後來沒堅持。RabbitMQ是 AMQP(高級消息隊列協議)的標準實現。若是不熟悉AMQP,直接看RabbitMQ的文檔會比較困難。不過它也只有幾個關鍵概念,這裏簡單介紹。html

RabbitMQ的結構圖以下:java

一、幾個概念說明:安全

Broker:簡單來講就是消息隊列服務器實體。
Exchange:消息交換機,它指定消息按什麼規則,路由到哪一個隊列。
Queue:消息隊列載體,每一個消息都會被投入到一個或多個隊列。
Binding:綁定,它的做用就是把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker裏能夠開設多個vhost,用做不一樣用戶的權限分離。
producer:消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每一個鏈接裏,可創建多個channel,每一個channel表明一個會話任務。服務器

二、消息隊列的使用過程大概以下:架構

(1)客戶端鏈接到消息隊列服務器,打開一個channel。
(2)客戶端聲明一個exchange,並設置相關屬性。
(3)客戶端聲明一個queue,並設置相關屬性。
(4)客戶端使用routing key,在exchange和queue之間創建好綁定關係。
(5)客戶端投遞消息到exchange。dom

exchange接收到消息後,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列裏。異步

exchange也有幾個類型,徹底根據key進行投遞的叫作Direct交換機,例如,綁定時設置了routing key爲」abc」,那麼客戶端提交的消息,只有設置了key爲」abc」的纔會投遞到隊列。對key進行模式匹配後進行投遞的叫作Topic交換機,符 號」#」匹配一個或多個詞,符號」*」匹配正好一個詞。例如」abc.#」匹配」abc.def.ghi」,」abc.*」只匹配」abc.def」。還 有一種不須要key的,叫作Fanout交換機,它採起廣播模式,一個消息進來時,投遞到與該交換機綁定的全部隊列。ide

三、關聯關係工具

從示意圖能夠看出消息生產者並無直接將消息發送給消息隊列,而是經過創建與Exchange的Channel,將消息發送給Exchange,Exchange根據規則,將消息轉發給指定的消息隊列。消費者經過創建與消息隊列相連的Channel,從消息隊列中獲取消息。post

這裏談到的Channel能夠理解爲創建在生產者/消費者和RabbitMQ服務器之間的TCP鏈接上的虛擬鏈接,一個TCP鏈接上能夠創建多個Channel。 RabbitMQ服務器的Exchange對象能夠理解爲生產者發送消息的郵局,消息隊列能夠理解爲消費者的郵箱。Exchange對象根據它定義的規則和消息包含的routing key以及header信息將消息轉發到消息隊列。channel下圖中淺紅色框起來的兩塊所示:

根據轉發消息的規則不一樣,RabbitMQ服務器中使用的Exchange對象有四種,Direct Exchange, Fanout Exchange, Topic Exchange, Header Exchange,若是定義Exchange時沒有指定類型和名稱, RabbitMQ將會爲每一個消息隊列設定一個Default Exchange,它的Routing Key是消息隊列名稱。

RabbitMQ Java Client的官網示例有6個,本篇只使用三個例程,分別是使用默認Default Exchange的消息生產/消費,使用Direct Exchange的消息生產/消費,以及RPC方式的消息生產/消費。

爲了測試方便,咱們新定義了一個virutal host,名字是test_vhosts,定義了兩個用戶rabbitmq_producer和rabbitmq_consumer, 設置其user_tag爲administrator(能夠進行遠程鏈接), 爲它們設置了訪問test_vhosts下全部資源的權限。

建立virutal host,在Admin-->Virtual Hosts(右側的導航欄上)打開:

建立用戶:

爲用戶設置權限:(在用戶列表上點擊某個用戶進入設置頁面)

 

 

使用默認Default Exchange的消息生產/消費

咱們定義一個生產者程序,一個消費者程序。

生產者程序代碼以下:

package com.gl365.payment.util.rabbitmq.demo1;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import io.netty.handler.timeout.TimeoutException;

public class ProducerApp {
    public static void main(String[] args) throws IOException, TimeoutException {  
        Connection connection = null;  
        Channel channel = null;  
        try  
        {  
            ConnectionFactory factory = new ConnectionFactory();  
            factory.setHost("localhost");  
            factory.setPort(5672);  
            factory.setUsername("rabbitmq_producer");  
            factory.setPassword("rabbitmq_producer");  
            factory.setVirtualHost("test_vhosts");  
   
            //建立與RabbitMQ服務器的TCP鏈接  
            connection  = factory.newConnection();  
            channel = connection.createChannel();  
            channel.queueDeclare("firstQueue", true, false, false, null);  
            String message = "First Message";             
            channel.basicPublish("", "firstQueue", null, message.getBytes());  
            System.out.println("Send Message is:'" + message + "'");              
        }  
        catch(Exception ex)  
        {  
            ex.printStackTrace();  
        }  
        finally  
        {  
            if(channel != null)  
            {  
                try {
                    channel.close();
                } catch (java.util.concurrent.TimeoutException e) {
                    e.printStackTrace();
                }  
            }  
            if(connection != null)  
            {  
                connection.close();  
            }  
        }  
    }
}

關於生產者的代碼有幾點說明:

1) RabbitMQ Java Client示例提供的ConnectionFactory屬性設置的代碼只有一句:

factory.setHost("localhost");  

這句代碼表示使用rabbitmq服務器默認的virutal host(「/」),默認的用戶guest/guest進行鏈接,可是若是這段代碼運行在遠程機器上時, 將由於guest用戶不能用於遠程鏈接RabbitMQ服務器而運行失敗,上面提供的代碼是能夠進行創建遠程鏈接的代碼。

2)Channel創建後,調用Channel.queueDeclare方法建立消息隊列firstQueue。

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,  
                 Map<String, Object> arguments) throws IOException; 

這個方法的第二個參數durable表示創建的消息隊列是不是持久化(RabbitMQ重啓後仍然存在,並非指消息的持久化),第三個參數exclusive 表示創建的消息隊列是否只適用於當前TCP鏈接,第四個參數autoDelete表示當隊列再也不被使用時,RabbitMQ是否能夠自動刪除這個隊列。 第五個參數arguments定義了隊列的一些參數信息,主要用於Headers Exchange進行消息匹配時。

 

3)生產者發送消息使用Channel.basicPublish方法。

void basicPublish(String exchange, String routingKey,   
 BasicProperties props, byte[] body) throws IOException; 

第一個參數exchange是消息發送的Exchange名稱,若是沒有指定,則使用Default Exchange。 第二個參數routingKey是消息的路由Key,是用於Exchange將消息路由到指定的消息隊列時使用(若是Exchange是Fanout Exchange,這個參數會被忽略), 第三個參數props是消息包含的屬性信息。RabbitMQ的消息屬性和消息體是分開的,不像JMS消息那樣同時包含在javax.jms.Message對象中,這一點須要特別注意。 第四個參數body是RabbitMQ消息體。 咱們這裏調用basicPublish方法發送消息時,props參數爲null,於是咱們發送的消息是非持久化消息,若是要發送持久化消息,咱們須要進行以下設置:

AMQP.BasicProperties props =  
                    new AMQP.BasicProperties("text/plain",  
                            "UTF-8",  
                            null,  
                            2,  
                            0, null, null, null,  
                            null, null, null, null,  
                            null, null);  
 channel.basicPublish("", "firstQueue", props, message.getBytes());  

定義props時的參數2表示消息的類型爲持久化消息。 運行生產者程序後,咱們能夠執行rabbitmqctl命令查看隊列消息,咱們看到firstQueue隊列有一條消息。

消費者代碼以下:

package com.gl365.payment.util.rabbitmq.demo1;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class ConsumerApp {
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("rabbitmq_consumer");
            factory.setPassword("rabbitmq_consumer");
            factory.setVirtualHost("test_vhosts");
            connection = factory.newConnection();
            channel = connection.createChannel();

            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" Consumer have received '" + message + "'");
                }
            };
            channel.basicConsume("firstQueue", true, consumer);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

消費者代碼中,創建Connection,Channel的代碼和生產者程序相似。它主要定義了一個Consumer對象,這個對象重載了DefaultCustomer類 的handleDelivery方法:

void handleDelivery(String consumerTag,  
                        Envelope envelope,  
                        AMQP.BasicProperties properties,  
                        byte[] body) 
 

handleDelivery方法的第一個參數consumerTag是接收到消息時的消費者Tag,若是咱們沒有在basicConsume方法中指定Consumer Tag,RabbitMQ將使用隨機生成的Consumer Tag(以下圖所示)

 

第二個參數envelope是消息的打包信息,包含了四個屬性:

1._deliveryTag,消息發送的編號,表示這條消息是RabbitMQ發送的第幾條消息,咱們能夠看到這條消息是發送的 第一條消息。

2._redeliver,重傳標誌,確認在收到對消息的失敗確認後,是否須要重發這條消息,咱們這裏的值是false,不須要重發。

3._exchange,消息發送到的Exchange名稱,正如咱們上面發送消息時同樣,exchange名稱爲空,使用的是Default Exchange。

4._routingKey,消息發送的路由Key,咱們這裏是發送消息時設置的「firstQueue」。

第三個參數properties就是上面使用basicPublish方法發送消息時的props參數,因爲咱們上面設置它爲null,這裏接收到的properties 是默認的Properties,只有bodySize,其餘全是null。

第四個參數body是消息體.

咱們這裏重載的handleDelivery方法僅僅打印出了生產者發送的消息內容,實際使用時能夠轉發給後臺程序進行處理。

在Consumer對象定義後,咱們調用了Channel.basicConsume方法將Consumer與消息隊列綁定,不然Consumer沒法從消息隊列獲取消息。

String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException  

basicConsume方法的第一個參數是Consumer綁定的隊列名,第二個參數是自動確認標誌,若是爲true,表示Consumer接受到消息後,會自動發確認消息(Ack消息)給消息隊列,消息隊列會將這條消息從消息隊列裏刪除,第三個參數就是Consumer對象,用於處理接收到的消息。

若是咱們想讓消費者接收到消息後對消息進行手動確認(Manual Ack),咱們須要對代碼進行兩處改動:

1)在調用basicConsume方法時,將autoAck屬性設置爲false。

channel.basicConsume("firstQueue", false, consumer);  

2)在handleDelivery方法中調用Channel.basicAck方法,發送手動確認消息給消息隊列。

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)  
                        throws IOException  
{  
      this.getChannel().basicAck(envelope.getDeliveryTag(), false);  
}  

basicAck方法有兩個參數,第一個參數deliverTag是消息的發送編號,第二個參數multiple是消息確認方式,若是值爲true,表示對消息隊列裏全部編號小於或等於當前消息編號的未確認消息進行手動確認,若是爲false,表示僅確認當前消息。

消費者代碼執行後,咱們能夠看到消費者程序的控制檯輸出了這條消息的內容,並且使用rabbitmqctl命令查看隊列消息時,隊列裏的消息數爲0。

使用Direct Exchange的消息生產/消費

使用Direct Exchange的生產者/消費者代碼與Default Exchange比較相似,不過生產者程序的代碼須要添加建立Direct Exchange和 將Exchange和消息隊列綁定的代碼,具體添加和修改的代碼以下:

package com.gl365.payment.util.rabbitmq.demo2;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import io.netty.handler.timeout.TimeoutException;

public class ProducerApp {
    public static void main(String[] args) throws IOException, TimeoutException {  
        Connection connection = null;  
        Channel channel = null;  
        try  
        {  
            ConnectionFactory factory = new ConnectionFactory();  
            factory.setHost("localhost");  
            factory.setPort(5672);  
            factory.setUsername("rabbitmq_producer");  
            factory.setPassword("rabbitmq_producer");  
            factory.setVirtualHost("test_vhosts");  
   
            //建立與RabbitMQ服務器的TCP鏈接  
            connection  = factory.newConnection();  
            channel = connection.createChannel(); 
            channel.exchangeDeclare("directExchange", "direct"); channel.queueDeclare("directQueue", true, false, false, null); channel.queueBind("directQueue", "directExchange", "directMessage");  
            String message = "First Direct Message";  
               
            channel.basicPublish("directExchange", "directMessage", null, message.getBytes());  
            System.out.println("Send Direct Message is:'" + message + "'");
        }  
        catch(Exception ex)  
        {  
            ex.printStackTrace();  
        }  
        finally  
        {  
            if(channel != null)  
            {  
                try {
                    channel.close();
                } catch (java.util.concurrent.TimeoutException e) {
                    e.printStackTrace();
                }  
            }  
            if(connection != null)  
            {  
                connection.close();  
            }  
        }  
    }
}

首先咱們調用Channel.exchangeDeclare方法建立名爲「directExchange」的Direct Exchange。

Exchange.DeclareOk exchangeDeclare(String exchange, String type,boolean durable) throws IOException 

exchangeDeclare方法的第一個參數exchange是exchange名稱,第二個參數type是Exchange類型,有「direct」,「fanout」,「topic」,「headers」四種,分別對應RabbitMQ的四種Exchange。第三個參數durable是設置Exchange是否持久化( 即在RabbitMQ服務器重啓後Exchange是否仍存在,若是沒有設置,默認是非持久化的)

建立「directQueue」消息隊列後,咱們再調用Channel.queueBind方法,將咱們建立的Direct Exchange和消息隊列綁定。

Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;  

queueBind方法第一個參數queue是消息隊列的名稱,第二個參數exchange是Exchange的名稱,第三個參數routingKey是消息隊列和Exchange之間綁定的路由key,咱們這裏綁定的路由key是「directMessage」。從Exchange過來的消息,只有routing key爲「directMessage」的消息會被轉到消息隊列「directQueue」,其餘消息將不會被轉發,下面將證明這一點。

運行ProducerApp程序,使用rabbitmq_producer用戶登陸管理頁面,咱們能夠看到名爲「directExchange」的Direct Exchange被建立出來。

消息隊列directQueue與它綁定,routing key爲directMessage。

消息隊列directQueue裏有一條消息

咱們修改ProducerApp的程序,將消息的routing key改成「indirectMessage」

package com.gl365.payment.util.rabbitmq.demo2;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import io.netty.handler.timeout.TimeoutException;

public class ProducerApp {
    public static void main(String[] args) throws IOException, TimeoutException {  
        Connection connection = null;  
        Channel channel = null;  
        try  
        {  
            ConnectionFactory factory = new ConnectionFactory();  
            factory.setHost("localhost");  
            factory.setPort(5672);  
            factory.setUsername("rabbitmq_producer");  
            factory.setPassword("rabbitmq_producer");  
            factory.setVirtualHost("test_vhosts");  
   
            //建立與RabbitMQ服務器的TCP鏈接  
            connection  = factory.newConnection();  
            channel = connection.createChannel(); 
            channel.exchangeDeclare("directExchange", "direct");  
            channel.queueDeclare("directQueue", true, false, false, null);  
            channel.queueBind("directQueue", "directExchange", "directMessage");  
            //String message = "First Direct Message";  
            String message = "First Indirect Message";  
            channel.basicPublish("directExchange", "indirectMessage", null, message.getBytes());  
            System.out.println("Send Indirect Message is:'" + message + "'"); 
            
            //channel.basicPublish("directExchange", "indirectQueue", null, message.getBytes());  
            //System.out.println("Send Direct Message is:'" + message + "'");
        }  
        catch(Exception ex)  
        {  
            ex.printStackTrace();  
        }  
        finally  
        {  
            if(channel != null)  
            {  
                try {
                    channel.close();
                } catch (java.util.concurrent.TimeoutException e) {
                    e.printStackTrace();
                }  
            }  
            if(connection != null)  
            {  
                connection.close();  
            }  
        }  
    }
}

再次運行程序後,打開管理頁面,咱們看到「directQueue」隊列裏仍然只有一條消息。

 

咱們向Exchange發送的第二條消息因爲和綁定的routing key不一致,沒有被轉發到「directQueue」消息隊列,說明被RabbitMQ丟棄了

 

咱們經過管理界面再建立一個消息隊列「indirectQueue」,在它和「directExchange」之間創建bind關係,routingkey爲「indirectMessage」 。

再綁定一個

 

再次運行ProducerApp程序,咱們能夠看到「directQueue」消息隊列消息數還是1,但「indirectQueue」消息隊列接收到了從Exchange轉發來的消息。

 

使用RPC方式的消息生產/消費

RPC方式的消息生產和消費示意圖以下:

 

 

在這種方式下,生產者和消費者之間的消息發送/接收流程以下:

1)生產者在發送消息的同時,將返回消息的消息隊列名(replyTo中指定)以及消息關聯Id(correlationId)附帶在消息Properties中發送給消費者。

2)消費者在接收到消息,處理完成後,將結果做爲返回消息發送到replyTo指定的返回消息隊列中,同時附帶接收消息中的corrleationId, 以便讓生產者接收到到返回消息後,根據corrleationId確認是針對1)中發送消息的返回消息,若是correlationId確認一致,則將返回消息 取出,進行後續處理。

示意圖中的生產者和消費者在發送消息時使用的都是Default Exchange,咱們接下來的程序作一點改動,使用Direct Exchange。

在咱們的程序中,生產者發送一個數字給消費者,消費者接收到消息後,計算這個數字的階乘結果,返回給生產者。 生產者程序的主要代碼以下:

[java]  view plain  copy
 
  1.   //建立RPC發送消息的Direct Exchange,消息隊列和綁定關係。  
  2.   channel.exchangeDeclare("rpcSendExchange", "direct",true);  
  3.   channel.queueDeclare("rpcSendQueue", true, false, false, null);  
  4.   channel.queueBind("rpcSendQueue", "rpcSendExchange", "rpcSendMessage");  
  5.   
  6.   //創建RPC返回消息的Direct Exchange, 消息隊列和綁定關係           
  7.   channel.exchangeDeclare("rpcReplyExchange", "direct",true);  
  8.   channel.queueDeclare("rpcReplyQueue", true, false, false, null);  
  9.   channel.queueBind("rpcReplyQueue", "rpcReplyExchange", "rpcReplyMessage");  
  10.   
  11.   //建立接收RPC返回消息的消費者,並將它與RPC返回消息隊列相關聯。  
  12.   QueueingConsumer replyCustomer = new QueueingConsumer(channel);  
  13.   channel.basicConsume("rpcReplyQueue", true,replyCustomer);  
  14.   
  15.   String number = "10";  
  16.   
  17.   //生成RPC請求消息的CorrelationId  
  18.   String correlationId = UUID.randomUUID().toString();  
  19.   //在RabbitMQ消息的Properties中設置RPC請求消息的CorrelationId以及  
  20.   //ReplyTo名稱(咱們這裏使用的是Exchange名稱,  
  21.   //而不是消息隊列名稱)  
  22.   BasicProperties props = new BasicProperties  
  23.                       .Builder()  
  24.                       .correlationId(correlationId)  
  25.                       .replyTo("rpcReplyExchange")  
  26.                       .build();  
  27.   
  28.   System.out.println("The send message's correlation id is:" + correlationId);              
  29.   channel.basicPublish("rpcSendExchange", "rpcSendMessage", props, number.getBytes());  
  30.   
  31.   String response = null;  
  32.   
  33.   while(true)  
  34.   {  
  35.           //從返回消息中取一條消息  
  36.    Delivery delivery = replyCustomer.nextDelivery();  
  37.    //若是消息的CorrelationId與發送消息的CorrleationId一致,表示這條消息是  
  38.           //發送消息對應的返回消息,是階乘運算的計算結果。  
  39.           System.out.println("The received reply message's correlation id is:" + messageCorrelationId);  
  40.           String messageCorrelationId = delivery.getProperties().getCorrelationId();  
  41.    if (!Strings.isNullOrEmpty(messageCorrelationId) && messageCorrelationId.equals(correlationId))   
  42.           {  
  43.     response = new String(delivery.getBody());  
  44.     break;  
  45.    }  
  46.   }  
  47.   
  48.   //輸出階乘運算結果  
  49.   if(!Strings.isNullOrEmpty(response))  
  50.   {  
  51. System.out.println("Factorial(" + number + ") = " + response);  
  52.   }  
消費者程序的主要代碼以下:
[java] view plain copy
 
  1.  Consumer consumer = new DefaultConsumer(channel)  
  2.  {  
  3.     @Override  
  4.     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException  
  5.     {  
  6.        //獲取返回消息發送到的Exchange名稱  
  7.        String replyExchange = properties.getReplyTo();  
  8.    
  9.        //設置返回消息的Properties,附帶發送消息的CorrelationId.  
  10.        AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()  
  11.                             .correlationId(properties.getCorrelationId())  
  12.                             .build();  
  13.    
  14.        String message = new String(body,"UTF-8");  
  15.        System.out.println("The received message is:" + message);  
  16.        System.out.println("The received message's correlation id is:" + properties.getCorrelationId());  
  17.    
  18.        //計算階乘,factorial方法是計算階乘的方法。  
  19.        int number = Integer.parseInt(message);  
  20.        String response = factorial(number);  
  21.    
  22.        //將階乘消息發送到Reply Exchange  
  23.        this.getChannel().basicPublish(replyExchange, "rpcReplyMessage",replyProps, response.getBytes());  
  24.    }  
  25. };  
  26.    
  27. channel.basicConsume("rpcSendQueue", true, consumer);  

先運行生產者程序,發送請求消息到Send Exchange,而後等待消費者發送的返回消息。 
再啓動消費者程序,計算階乘並返回結果給Reply Exchange。 兩個程序的控制檯信息以下圖所示
生產者程序控制臺

消費者程序控制臺

從控制檯信息能夠看出生產者端根據返回消息中包含的Correlation Id判斷出這是發送消息對應的返回消息,獲取了階乘的計算結果。

這個例子只是簡單的生產者和消費者之間的方法調用,實際使用時,咱們能夠基於這個實例,實現更爲複雜的操做。

 

RabbitMQ Client的重連機制

 

RabbitMQ Java Client提供了重連機制,不過在RabbitMQ Java Client 4.0版本以前,自動重連默認是關閉的。從Rabbit Client 4.0版本開始,自動重連默認是打開的。控制自動重連的屬性是com.rabbitmq.client.ConnectionFactory類的automaticRecovery和topologyRecovery屬性。

設置automaticRecovery屬性爲true時,會執行如下recovery:

1)Connection的重連。

2)偵聽Connection的Listener的恢復。

3)從新創建在Connection基礎上的Channel。

4)偵聽Channel的Listener的恢復。

5)Channel上的設置,如basicQos,publisher confirm以及事務屬性等的恢復。

當設置topologyRecovery屬性爲true時,會執行如下recovery:

1)exchange的從新定義(不包含預約義的exchange)

2)queue的從新定義(不包含預約義的queue)

3)binding的從新定義(不包含預約義的binding)

4)全部Consumer的恢復

咱們定義一個帶auto recovery的消費者程序,咱們使用RabbitMQ Java Client 4.0.0版本,這個版本引入了AutorecoveringConnection和

AutorecoveringChannel類,能夠添加RecoveryListener對Recovery過程進行監控。

 

[java] view plain copy
 
  1. public class RecoveryConsumerApp  
  2. {  
  3.     public static void main( String[] args ) throws IOException, TimeoutException {  
  4.             ConnectionFactory connectionFactory = new ConnectionFactory();  
  5.             ...................  
  6.    
  7.             AutorecoveringConnection connection = (AutorecoveringConnection)connectionFactory.newConnection();  
  8.             String originalLocalAddress =  
  9.                     connection.getLocalAddress() + ":" + connection.getLocalPort();  
  10.             System.out.println("The origin connection's local address is:" + originalLocalAddress);  
  11.    
  12.             AutorecoveringChannel  channel = (AutorecoveringChannel)connection.createChannel();  
  13.             System.out.println("The origin channel's channel number is:" + channel.getChannelNumber());  
  14.    
  15.             channel.exchangeDeclare("recoveryExchange", BuiltinExchangeType.DIRECT, false, true ,null);  
  16.             channel.queueDeclare("recoveryQueue", false, false, true,null);  
  17.             channel.queueBind("recoveryQueue", "recoveryExchange", "recoveryMessage");  
  18.    
  19.             connection.addRecoveryListener(new RecoveryListener() {  
  20.                 public void handleRecovery(Recoverable recoverable) {  
  21.                     System.out.println("Connection handleRecovery method is called");  
  22.                     AutorecoveringConnection recoveredConnection =  
  23.                             (AutorecoveringConnection)recoverable;  
  24.                     String recoveredLocalAddress =  
  25.                             recoveredConnection.getLocalAddress() + ":" + recoveredConnection.getLocalPort();  
  26.                     System.out.println("The recovered connection's local address is:" + recoveredLocalAddress);  
  27.                 }  
  28.    
  29.                 public void handleRecoveryStarted(Recoverable recoverable) {  
  30.                     System.out.println("Connection handleRecoveryStarted method is called");  
  31.                 }  
  32.             });  
  33.    
  34.             channel.addRecoveryListener(new RecoveryListener() {  
  35.                     public void handleRecovery(Recoverable recoverable) {  
  36.                         System.out.println("Channel handleRecovery method is called");  
  37.                         AutorecoveringChannel recoveryChannel =  
  38.                                 (AutorecoveringChannel)recoverable;  
  39.                         System.out.println("The recovered Channel's number is:" + recoveryChannel.getChannelNumber());  
  40.                     }  
  41.    
  42.                     public void handleRecoveryStarted(Recoverable recoverable) {  
  43.                         System.out.println("Channel handleRecoveryStarted method is called");  
  44.                     }  
  45.             });  
  46.    
  47.     }  
  48. }  

 

這個程序中Exchange, Queue都是非持久化而且自動刪除的。 咱們爲Connection和Channel分別添加了Recovery Listener匿名對象,

便於確認他們確實進行了Recovery操做。

啓動程序後,咱們能夠看到recoveryExchange和recoveryQueue都被建立出來,且Binding關係創建了。

 

鏈接的本地地址是0.0.0.0:8109,Channel編號是1

此時咱們關閉RabbitMQ服務器,再重啓RabbitMQ服務器,咱們能夠從控制檯界面看到有鏈接超時的警告信

息以及重連信息。

從重連日誌信息中咱們能夠看出Channel的編號仍是1,可是Connection的本地地址已經變成了0.0.0.0:8470,證實進行了重連。

鏈接到recoveryQueue隊列上的Consumer Tag也進行了恢復,並且Consumer Tag與以前的Consumer Tag一致,這是由於設置了

topologyRecovery屬性爲true。

咱們再在生產者程序中使用重連機制,依然使用Rabbit Java Client 4.0版本 生產者程序的片斷以下:

 

[java] view plain copy
 
  1. <span style="font-size: 17.5px;">  </span>factory.setAutomaticRecoveryEnabled(true);  
  2.    factory.setNetworkRecoveryInterval(60000);  
  3.    factory.setTopologyRecoveryEnabled(true);  
  4.    
  5.    AutorecoveringConnection connection = (AutorecoveringConnection)factory.newConnection();  
  6.    AutorecoveringChannel channel = (AutorecoveringChannel)connection.createChannel();     
  7.    //設置Channel爲Publish Confirm模式  
  8.    channel.confirmSelect();  <span style="font-size: 17.5px;">  </span>  


 

登陸管理界面,咱們能夠看到生產者創建的Channel是Confirm模式(圖中Mode列用C表示)

咱們關掉RabbitMQ服務器,再重啓RabbitMQ服務器,能夠看到生產者Channel被恢復,可是本地端口號已經從13684變成了13874,

說明這是從新建立的Channel,建立的Channel仍然是Confirm模式,和最初的Channel一致。

若是咱們設置Channel爲Transaction模式(調用Channel.txSelect()方法),重連後恢復的Channel的模式也仍然是Transaction模式。

RabbitMQ支持消息的持久化,也就是數據寫在磁盤上,爲了數據安全考慮,我想大多數用戶都會選擇持久化。消息隊列持久化包括3個部分:
(1)exchange持久化,在聲明時指定durable => 1
(2)queue持久化,在聲明時指定durable => 1
(3)消息持久化,在投遞時指定delivery_mode => 2(1是非持久化)

若是exchange和queue都是持久化的,那麼它們之間的binding也是持久化的。若是exchange和queue二者之間有一個持久化,一個非持久化,就不容許創建綁定。

(2、基本概念介紹)

      AMQP(高級消息隊列協議) 是一個異步消息傳遞所使用的應用層協議規範,做爲線路層協議,而不是API(例如JMS),AMQP 客戶端可以無視消息的來源任意發送和接受信息。AMQP的原始用途只是爲金融界提供一個能夠彼此協做的消息協議,而如今的目標則是爲通用消息隊列架構提供通用構建工具。所以,面向消息的中間件 (MOM)系統,例如發佈/訂閱隊列,沒有做爲基本元素實現。反而經過發送簡化的AMQ實體,用戶被賦予了構建例如這些實體的能力。這些實體也是規範的一 部分,造成了在線路層協議頂端的一個層級:AMQP模型。這個模型統一了消息模式,諸如以前提到的發佈/訂閱,隊列,事務以及流數據,而且添加了額外的特性,例如更易於擴展,基於內容的路由。

AMQP當中有四個概念很是重要

  1. virtual host,虛擬主機
  2. exchange,交換機
  3. queue,隊列
  4. binding,綁定

一個虛擬主機持有一組交換機、隊列和綁定。

爲何須要多個虛擬主機呢?由於RabbitMQ當中,用戶只能在虛擬主機的粒度進行權限控制。所以,若是須要禁止A組訪問B組的交換機/隊列/綁定,必須爲A和B分別建立一個虛擬主機。每個RabbitMQ服務器都有一個默認的虛擬主機/

何謂虛擬主機(virtual host),交換機(exchange),隊列(queue)和綁定(binding)

隊列(Queues)是你的消息(messages)的終點,能夠理解成裝消息的容器。消息就一直在裏面,直到有客戶端(也就是消費者,Consumer)鏈接到這個隊列而且將其取走爲止。不過,也能夠將一個隊列配置成這樣的:一旦消息進入這個隊列,此消息就被刪除。

隊列是由消費者(Consumer)經過程序創建的,不是經過配置文件或者命令行工具。這沒什麼問題,若是一個消費者試圖建立一個已經存在的隊列,RabbitMQ會直接忽略這個請求。所以咱們能夠將消息隊列的配置寫在應用程序的代碼裏面。

而要把一個消息放進隊列前,須要有一個交換機(Exchange)。

交換機(Exchange)能夠理解成具備路由表的路由程序。每一個消息都有一個稱爲路由鍵(routing key)的屬性,就是一個簡單的字符串。交換機當中有一系列的綁定(binding),即路由規則(routes)。(例如,指明具備路由鍵 「X」 的消息要到名爲timbuku的隊列當中去。)

消費者程序(Consumer)要負責建立你的交換機。交換機能夠存在多個,每一個交換機在本身獨立的進程當中執行,所以增長多個交換機就是增長多個進程,能夠充分利用服務器上的CPU核以便達到更高的效率。例如,在一個8核的服務器上,能夠建立5個交換機來用5個核,另外3個核留下來作消息處理。相似的,在RabbitMQ的集羣當中,你能夠用相似的思路來擴展交換機一邊獲取更高的吞吐量。

交換機如何判斷要把消息送到哪一個隊列?你須要路由規則,即綁定(binding)。一個綁定就是一個相似這樣的規則:將交換機「desert(沙漠)」當中具備路由鍵「阿里巴巴」的消息送到隊列「hideout(山洞)」裏面去。換句話說,一個綁定就是一個基於路由鍵將交換機和隊列鏈接起來的路由規則。例如,具備路由鍵「audit」的消息須要被送到兩個隊列,「log-forever」和「alert-the-big-dude」。要作到這個,就須要建立兩個綁定,每一個都鏈接一個交換機和一個隊列,二者都是由「audit」路由鍵觸發。在這種狀況下,交換機會複製一份消息而且把它們分別發送到兩個隊列當中。交換機不過就是一個由綁定構成的路由表。

交換機有多種類型。他們都是作路由的,可是它們接受不一樣類型的綁定。爲何不建立一種交換機來處理全部類型的路由規則呢?由於每種規則用來作匹配分子的CPU開銷是不一樣的。例如,一個「topic」類型的交換機試圖將消息的路由鍵與相似「dogs.*」的模式進行匹配。匹配這種末端的通配符比直接將路由鍵與「dogs」比較(「direct」類型的交換機)要消耗更多的CPU。若是你不須要「topic」類型的交換機帶來的靈活性,你能夠經過使用「direct」類型的交換機獲取更高的處理效率。那麼有哪些類型,他們又是怎麼處理的呢?

  Exchange

  1. Exchange Direct

     

    Exchange Fanout

    Exchange Topic

     


持久化

你花了大量的時間來建立隊列、交換機和綁定,而後,服務器程序掛了。你的隊列、交換機和綁定怎麼樣了?還有,放在隊列裏面可是還沒有處理的消息們呢?

若是你是用默認參數構造的這一切的話,那麼,他們都灰飛煙滅了。RabbitMQ重啓以後會乾淨的像個新生兒。你必須重作全部的一切,亡羊補牢,如何避免未來再度發生此類杯具?

隊列和交換機有一個建立時候指定的標誌durable。durable的惟一含義就是具備這個標誌的隊列和交換機會在重啓以後從新創建,它不表示說在隊列當中的消息會在重啓後恢復。那麼如何才能作到不僅是隊列和交換機,還有消息都是持久的呢?

可是首先須要考慮的問題是:是否真的須要消息的持久化?若是須要重啓後消息能夠回覆,那麼它須要被寫入磁盤。但即便是最簡單的磁盤操做也是要消耗時間的。因此須要衡量判斷。

當你將消息發佈到交換機的時候,能夠指定一個標誌「Delivery Mode」(投遞模式)。根據你使用的AMQP的庫不一樣,指定這個標誌的方法可能不太同樣。簡單的說,就是將Delivery Mode設置成2,也就是持久的(persistent)便可。通常的AMQP庫都是將Delivery Mode設置成1,也就是非持久的。因此要持久化消息的步驟以下:

  1. 將交換機設成 durable。
  2. 將隊列設成 durable。
  3. 將消息的 Delivery Mode 設置成2 。

綁定(Bindings)怎麼辦?綁定沒法在建立的時候設置成durable。沒問題,若是你綁定了一個durable的隊列和一個durable的交換機,RabbitMQ會自動保留這個綁定。相似的,若是刪除了某個隊列或交換機(不管是否是durable),依賴它的綁定都會自動刪除。

注意:

  • RabbitMQ 不容許你綁定一個非堅固(non-durable)的交換機和一個durable的隊列。反之亦然。要想成功必須隊列和交換機都是durable的。
  • 一旦建立了隊列和交換機,就不能修改其標誌了。例如,若是建立了一個non-durable的隊列,而後想把它改變成durable的,惟一的辦法就是刪除這個隊列而後重現建立。所以,最好仔細檢查建立的標誌。
相關文章
相關標籤/搜索