RabbitMQ從入門到精通(三)

1. 自定義消費者使用

  • 咱們以前呢都是在代碼中編寫while循環,進行 consumer.nextDelivery 方法進行獲取下一條消息,而後進行消費處理!
  • 其實咱們還可使用自定義的Consumer,它更加的方便,解耦性更加的強,也是在實際工做中最經常使用的使用方式!
  • 自定義消費端實現只須要繼承 DefaultConsumer 類,重寫 handleDelivery 方法便可

 

自定義消費端演示

public class Producer {
     public static void main(String[] args) throws Exception {
            //1 建立ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.244.11");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setHandshakeTimeout(20000);
            //2 獲取Connection
            Connection connection = connectionFactory.newConnection();
            //3 經過Connection建立一個新的Channel
            Channel channel = connection.createChannel();
            
            String exchange = "test_consumer_exchange";
            String routingKey = "consumer.save";
            
            String msg = "Hello RabbitMQ Consumer Message";
            //4 發送消息
            for(int i =0; i<5; i ++){
                channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
            }
        }
}

 

public class MyConsumer extends DefaultConsumer {

    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        //consumerTag: 內部生成的消費標籤  properties: 消息屬性  body: 消息內容  
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        //envelope包含屬性:deliveryTag(標籤), redeliver, exchange, routingKey
        //redeliver是一個標記,若是設爲true,表示消息以前可能已經投遞過了,如今是從新投遞消息到監聽隊列的消費者
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
    }
}

 

public class Consumer {
    public static void main(String[] args) throws Exception {
        //1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);
        //2 獲取Connection
        Connection connection = connectionFactory.newConnection();
        //3 經過Connection建立一個新的Channel
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_consumer_exchange";
        String routingKey = "consumer.#";
        String queueName = "test_consumer_queue";
        //4 聲明交換機和隊列,而後進行綁定設置路由Key
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //5 設置channel,使用自定義消費者
        channel.basicConsume(queueName, true, new MyConsumer(channel));
    }
}

 

運行說明api

先啓動消費端,訪問管控臺:http://ip:15672,檢查Exchange和Queue是否設置OK,而後啓動生產端。消費端打印內容以下服務器

 

2.消費端的限流策略

2.1 限流的場景與機制

  • 假設一個場景,咱們Rabbitmq服務器有上萬條未處理的消息,咱們隨便打開一個消費者客戶端,會出現這種狀況:巨量的消息瞬間所有推送過來,可是咱們單個客戶端沒法同時處理這麼多數據!此時頗有可能致使服務器崩潰,嚴重的可能致使線上的故障。
  • 除了這種場景,還有一些其餘的場景,好比說單個生產者一分鐘生產出了幾百條數據,可是單個消費者一分鐘可能只能處理60條數據,這個時候生產端和消費端確定是不平衡的。一般生產端是沒辦法作限制的。因此消費端確定須要作一些限流措施,不然若是超出最大負載,可能致使消費端性能降低,服務器卡頓甚至崩潰等一系列嚴重後果。

 

消費端限流機制ide

RabbitMQ提供了一種qos (服務質量保證)功能,即在非自動確認消息的前提下,若是必定數目的消息 (經過基於consume或者channel設置Qos的值) 未被確認前,不進行消費新的消息。性能

須要注意:測試

1.不能設置自動簽收功能(autoAck = false)fetch

2.若是消息沒被確認,就不會到達消費端,目的就是給消費端減壓ui

 

2.2 限流相關API

限流設置 - BasicQos()this

void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
prefetchSize: 單條消息的大小限制,消費端一般設置爲0,表示不作限制
prefetchCount: 一次最多能處理多少條消息,一般設置爲1
global: 是否將上面設置應用於channel,false表明consumer級別日誌

注意事項

prefetchSizeglobal這兩項,rabbitmq沒有實現,暫且不研究
prefetchCountautoAck=false 的狀況下生效,即在自動應答的狀況下這個值是不生效的
 
手工ACK - basicAck()

void basicAck(Integer deliveryTag,boolean multiple)
手工ACK,調用這個方法就會主動回送給Broker一個應答,表示這條消息我處理完了,你能夠給我下一條了。參數multiple表示是否批量簽收,因爲咱們是一次處理一條消息,因此設置爲false

 

2.3 限流演示

生產端

生產端就是正常的邏輯

public class Producer {
    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchange = "test_qos_exchange";
        String routingKey = "qos.save";

        String msg = "Hello RabbitMQ QOS Message";
        // 發送消息
        for (int i = 0; i < 5; i++) {
            channel.basicPublish(exchange, routingKey, true, null,
                    msg.getBytes());
        }
    }
}

 

自定義消費者

爲了看到限流效果,這裏不進行ACK

public class MyConsumer extends DefaultConsumer {

    //接收channel
    private Channel channel ;
    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        //System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
        //手工ACK,參數multiple表示不批量簽收
        //channel.basicAck(envelope.getDeliveryTag(), false);   
    }
}

 

消費端

關閉autoACK,進行限流設置

public class Consumer {

    public static void main(String[] args) throws Exception {
        //1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);
        //2 獲取Connection
        Connection connection = connectionFactory.newConnection();
        //3 經過Connection建立一個新的Channel
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_qos_exchange";
        String queueName = "test_qos_queue";
        String routingKey = "qos.#";
        //4 聲明交換機和隊列,而後進行綁定設置路由Key
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //進行參數設置:單條消息的大小限制,一次最多能處理多少條消息,是否將上面設置應用於channel
        channel.basicQos(0, 1, false);
        
        //限流: autoAck設置爲 false
        channel.basicConsume(queueName, false, new MyConsumer(channel));
    }
}

 

運行說明

咱們先註釋掉手工ACK方法,而後啓動消費端和生產端,此時消費端只打印了一條消息2019-06-09_163747

這是由於咱們設置了手工簽收,而且設置了一次只處理一條消息,當咱們沒有回送ack應答時,Broker端就認爲消費端尚未處理完這條消息,基於這種限流機制就不會給消費端發送新的消息了,因此消費端只打印了一條消息。

經過管控臺也能夠看到隊列總共收到了5條消息,有一條消息沒有ack。

將手工簽收代碼取消註釋,再次運行消費端,此時就會打印5條消息的內容。

 

3. 消費端ACK與重回隊列機制

3.1 ACK與NACK

當咱們設置 autoACK=false 時,就可使用手工ACK方式了,那麼其實手工方式包括了手工ACK與NACK。

當咱們手工 ACK 時,會發送給Broker一個應答,表明消息成功處理了,Broker就能夠回送響應給生產端了。NACK 則表示消息處理失敗了,若是設置重回隊列,Broker端就會將沒有成功處理的消息從新發送。

 

使用方式

  1. 消費端進行消費的時候,若是因爲業務異常咱們能夠手工 NACK 並進行日誌的記錄,而後進行補償!
    方法:void basicNack(long deliveryTag, boolean multiple, boolean requeue)
  2. 若是因爲服務器宕機等嚴重問題,那咱們就須要手工進行 ACK 保障消費端消費成功!
    方法:void basicAck(long deliveryTag, boolean multiple)

 

3.2 重回隊列演示

  • 消費端重回隊列是爲了對沒有處理成功的消息,把消息從新會遞給Broker!
  • 重回隊列,會把消費失敗的消息從新添加到隊列的尾端,供消費者繼續消費。
  • 通常咱們在實際應用中,都會關閉重回隊列,也就是設置爲false

 

生產端

對消息設置自定義屬性以便進行區分

public class Producer {

    public static void main(String[] args) throws Exception {
        //1 建立ConnectionFactorys
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);
        //2 獲取Connection
        Connection connection = connectionFactory.newConnection();
        //3 經過Connection建立一個新的Channel
        Channel channel = connection.createChannel();
        
        String exchange = "test_ack_exchange";
        String routingKey = "ack.save";
        
        for(int i =0; i<5; i ++){
            //設置消息屬性
            Map<String, Object> headers = new HashMap<String, Object>();
            headers.put("num", i);
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .headers(headers)
                    .build();
            //發送消息
            String msg = "Hello RabbitMQ ACK Message " + i;
            channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
        }   
    }
}

 

自定義消費

對第一條消息進行NACK,並設置重回隊列

public class MyConsumer extends DefaultConsumer {

    private Channel channel ;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("body: " + new String(body));
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if((Integer)properties.getHeaders().get("num") == 0) {
            //NACK,參數三requeue:是否重回隊列
            channel.basicNack(envelope.getDeliveryTag(), false, true);
        } else {
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
}

 

消費端

關閉自動簽收功能

public class Consumer {
    
    public static void main(String[] args) throws Exception {
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_ack_exchange";
        String queueName = "test_ack_queue";
        String routingKey = "ack.#";
        //聲明交換機和隊列,而後進行綁定設置路由Key
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //手工簽收 必需要設置 autoAck = false
        channel.basicConsume(queueName, false, new MyConsumer(channel));
    }
}

 

運行說明

先啓動消費端,而後啓動生產端,消費端打印以下,顯然第一條消息因爲咱們調用了NACK,而且設置了重回隊列,因此會致使該條消息一直重複發送,消費端就會一直循環消費。

 

通常工做中不會設置重回隊列這個屬性,都是本身去作補償或者投遞到延遲隊列裏的,而後指定時間去處理便可。

 

4. TTL

TTL說明

  • TTL是Time To Live的縮寫,也就是生存時間
  • RabbitMQ支持消息的過時時間,在消息發送時能夠進行指定
  • RabbitMQ支持爲每一個隊列設置消息的超時時間,從消息入隊列開始計算,只要超過了隊列的超時時間配置,那麼消息會自動的清除

 

TTL演示

此次演示咱們不寫代碼,只經過管控臺進行操做,實際測試也會更爲方便一些。
 

1. 建立Exchange

選擇Exchange菜單,找到下面的Add a new exchange

 

2.建立Queue

選擇Queue菜單,找到下面的Add a new queue

 

3.創建隊列和交換機的綁定關係

點擊Exchange表格中的test002_exchange,在下面添加綁定規則

 

4.發送消息

點擊Exchange表格中的test002_exchange,在下面找到Publish message,設置消息進行發送

 

5.驗證

點擊Queue菜單,查看錶格中test002已經有了一條消息,10秒後表格顯示0條,說明過時時間到了消息被自動清除了。
14795543-2f85c29d91d33480

 

6.設置單條消息過時時間

點擊Exchange表格中的test002_exchange,在下面找到Publish message,設置消息的過時時間並進行發送,此時觀察test002隊列,發現消息5s後就過時被清除了,即便隊列設置的過時時間是10s。

 
TTL代碼設置過時時間

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .expiration("10000") //10s過時
                .build();
        //發送消息
        channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());

 

隊列過時時間設置

//設置隊列的過時時間10s
        Map<String,Object> param = new HashMap<>();
        param.put("x-message-ttl", 10000);
        //聲明隊列
        channel.queueDeclare(queueName, true, false, false, null);

 
注意事項

  1. 二者的區別是設置隊列的過時時間是對該隊列的全部消息生效的。
  2. 爲消息設置TTL有一個問題:RabbitMQ只對處於隊頭的消息判斷是否過時(即不會掃描隊列),因此,極可能隊列中已存在死消息,可是隊列並不知情。這會影響隊列統計數據的正確性,妨礙隊列及時釋放資源。

 

5.死信隊列

死信隊列介紹

  • 死信隊列:DLX,dead-letter-exchange
  • 利用DLX,當消息在一個隊列中變成死信 (dead message) 以後,它能被從新publish到另外一個Exchange,這個Exchange就是DLX

 

消息變成死信有如下幾種狀況

  • 消息被拒絕(basic.reject / basic.nack),而且requeue = false
  • 消息TTL過時
  • 隊列達到最大長度

 

死信處理過程

  • DLX也是一個正常的Exchange,和通常的Exchange沒有區別,它能在任何的隊列上被指定,實際上就是設置某個隊列的屬性。
  • 當這個隊列中有死信時,RabbitMQ就會自動的將這個消息從新發布到設置的Exchange上去,進而被路由到另外一個隊列。
  • 能夠監聽這個隊列中的消息作相應的處理。

 

死信隊列設置

  1. 首先須要設置死信隊列的exchange和queue,而後進行綁定:

  1. 而後須要有一個監聽,去監聽這個隊列進行處理
  2. 而後咱們進行正常聲明交換機、隊列、綁定,只不過咱們須要在隊列加上一個參數便可:arguments.put(" x-dead-letter-exchange","dlx.exchange");,這樣消息在過時、requeue、 隊列在達到最大長度時,消息就能夠直接路由到死信隊列!

 

死信隊列演示

生產端

public class Producer {
    public static void main(String[] args) throws Exception {
        //1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);
        //2 獲取Connection
        Connection connection = connectionFactory.newConnection();
        //3 經過Connection建立一個新的Channel
        Channel channel = connection.createChannel();
        
        String exchange = "test_dlx_exchange";
        String routingKey = "dlx.save";
        
        String msg = "Hello RabbitMQ DLX Message";
        
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .deliveryMode(2)
                .contentEncoding("UTF-8")
                .expiration("10000")
                .build();
        //發送消息
        channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
    }
}

 

自定義消費者

public class MyConsumer extends DefaultConsumer {

    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
    }
}

 

消費端

  • 聲明正常處理消息的交換機、隊列及綁定規則
  • 在正常交換機上指定死信發送的Exchange
  • 聲明死信交換機、隊列及綁定規則
  • 監聽死信隊列,進行後續處理,這裏省略
public class Consumer {
    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        // 聲明一個普通的交換機 和 隊列 以及路由
        String exchangeName = "test_dlx_exchange";
        String routingKey = "dlx.#";
        String queueName = "test_dlx_queue";
        String deadQueueName = "dlx.queue";
        
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        // 指定死信發送的Exchange
        Map<String, Object> agruments = new HashMap<String, Object>();
        agruments.put("x-dead-letter-exchange", "dlx.exchange");
        // 這個agruments屬性,要設置到聲明隊列上
        channel.queueDeclare(queueName, true, false, false, agruments);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 要進行死信隊列的聲明
        channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
        channel.queueDeclare(deadQueueName, true, false, false, null);
        channel.queueBind(deadQueueName, "dlx.exchange", "#");

        channel.basicConsume(queueName, true, new MyConsumer(channel));
        //channel.basicConsume(deadQueueName, true, new MyConsumer(channel));

    }
}

 

運行說明

啓動消費端,此時查看管控臺,新增了兩個Exchange,兩個Queue。在test_dlx_queue上咱們設置了DLX,也就表明死信消息會發送到指定的Exchange上,最終其實會路由到dlx.queue上。

此時關閉消費端,而後啓動生產端,查看管控臺隊列的消息狀況,test_dlx_queue的值爲1,而dlx_queue的值爲0。
10s後的隊列結果如圖,因爲生產端發送消息時指定了消息的過時時間爲10s,而此時沒有消費端進行消費,消息便被路由到死信隊列中。

實際環境咱們還須要對死信隊列進行一個監聽和處理,固然具體的處理邏輯和業務相關,這裏只是簡單演示死信隊列是否生效。

相關文章
相關標籤/搜索