RabbitMQ基礎教程之使用進階篇

RabbitMQ基礎教程之使用進階篇

相關博文,推薦查看:html

  1. RabbitMq基礎教程之安裝與測試
  2. RabbitMq基礎教程之基本概念
  3. RabbitMQ基礎教程之基本使用篇

I. 背景

前一篇基本使用篇的博文中,介紹了rabbitmq的三種使用姿式,能夠知道如何向RabbitMQ發送消息以及如何消費,但遺留下幾個疑問,本篇則主要但願弄清楚這幾點java

  • Exchange聲明的問題(是否必須聲明,若是不聲明會怎樣)
  • Exchange聲明的幾個參數(durable, autoDelete)有啥區別
  • 當沒有隊列和Exchange綁定時,直接往隊列中塞數據,好像不會有數據增長(即先塞數據,而後建立queue,創建綁定,從控制檯上看這個queue裏面也不會有數據)
  • 消息消費的兩種姿式(一個主動去拿數據,一個是rabbit推數據)對比

<!--more-->git

II. 基本進階篇

1. Exchange默認場景

將前面的消息發送代碼撈出來,幹掉Exchange的聲明,以下github

public class DefaultProducer {
    public static void publishMsg(String queue, String message) throws IOException, TimeoutException {
        ConnectionFactory factory = RabbitUtil.getConnectionFactory();

        //建立鏈接
        Connection connection = factory.newConnection();

        //建立消息通道
        Channel channel = connection.createChannel();
        channel.queueDeclare(queue, true, false, true, null);

        // 發佈消息
        channel.basicPublish("", queue, null, message.getBytes());

        channel.close();
        connection.close();
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        for (int i = 0; i < 20; i++) {
            publishMsg("hello", "msg" + i);
        }
    }
}

在發佈消息時,傳入的Exchange名爲「」,再到控制檯查看,發現數據被投遞到了(AMQP default)這個交換器,對應的截圖以下api

image

看一下上面的綁定描述內容,重點以下ide

  • 默認交換器選擇Direct策略
  • 將rountingKey綁定到同名的queue上
  • 不支持顯示的綁定和解綁

上面的代碼爲了演示數據的流向,在發佈消息的同時也定義了一個同名的Queue,所以能夠在控制檯上看到同名的 "hello" queue,且內部有20條數據學習

當咱們去掉queue的聲明時,會發現另外一個問題,投入的數據好像並無存下來(由於沒有queue來接收這些數據,而以後再聲明queue時,以前的數據也不會分配過來)測試

2. 綁定以後纔有數據

首先是將控制檯中的hello這個queue刪掉,而後再次執行下面的代碼(相對於前面的就是註釋了queue的聲明)ui

public class DefaultProducer {
    public static void publishMsg(String queue, String message) throws IOException, TimeoutException {
        ConnectionFactory factory = RabbitUtil.getConnectionFactory();

        //建立鏈接
        Connection connection = factory.newConnection();

        //建立消息通道
        Channel channel = connection.createChannel();
        //        channel.queueDeclare(queue, true, false, true, null);

        // 發佈消息
        channel.basicPublish("", queue, null, message.getBytes());

        channel.close();
        connection.close();
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        for (int i = 0; i < 20; i++) {
            publishMsg("hello", "msg" + i);
        }
    }
}

而後從控制檯上看,能夠看到有數據寫入Exchange,可是沒有queue來接收這些數據code

IMAGE

而後開啓消費進程,而後再次執行上面的塞入數據,新後面從新塞入的數據能夠被消費;可是以前塞入的數據則沒有,消費消息的代碼以下:

public class MyDefaultConsumer {
    public void consumerMsg(String queue) throws IOException, TimeoutException {
        ConnectionFactory factory = RabbitUtil.getConnectionFactory();

        //建立鏈接
        Connection connection = factory.newConnection();

        //建立消息通道
        Channel channel = connection.createChannel();
        channel.queueDeclare(queue, true, false, true, null);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                try {
                    System.out.println(" [ " + queue + " ] Received '" + message);
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        // 取消自動ack
        channel.basicConsume(queue, false, consumer);
    }

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        MyDefaultConsumer consumer = new MyDefaultConsumer();
        consumer.consumerMsg("hello");

        Thread.sleep(1000 * 60 * 10);
    }
}

小結:

  • 經過上面的演示得知一點
  • 當沒有Queue綁定到Exchange時,往Exchange中寫入的消息也不會從新分發到以後綁定的queue上

3. Durable, autoDeleted參數

在定義Queue時,能夠指定這兩個參數,這兩個參數的區別是什麼呢?

a. durable

持久化,保證RabbitMQ在退出或者crash等異常狀況下數據沒有丟失,須要將queue,exchange和Message都持久化。

如果將queue的持久化標識durable設置爲true,則表明是一個持久的隊列,那麼在服務重啓以後,也會存在,由於服務會把持久化的queue存放在硬盤上,當服務重啓的時候,會從新什麼以前被持久化的queue。隊列是能夠被持久化,可是裏面的消息是否爲持久化那還要看消息的持久化設置。也就是說,重啓以前那個queue裏面尚未發出去的消息的話,重啓以後那隊列裏面是否是還存在原來的消息,這個就要取決於發生着在發送消息時對消息的設置

b. autoDeleted

自動刪除,若是該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列

這個比較容易演示了,當一個Queue被設置爲自動刪除時,當消費者斷掉以後,queue會被刪除,這個主要針對的是一些不是特別重要的數據,不但願出現消息積累的狀況

// 倒數第二個參數,true表示開啓自動刪除
// 正數第二個參數,true表示持久化
channel.queueDeclare(queue, true, false, true, null);

c. 小結

  • 當一個Queue已經聲明好了以後,不能更新durable或者autoDelted值;當須要修改時,須要先刪除再從新聲明
  • 消費的Queue聲明應該和投遞的Queue聲明的 durable,autoDelted屬性一致,不然會報錯
  • 對於重要的數據,通常設置 durable=true, autoDeleted=false
  • 對於設置 autoDeleted=true 的隊列,當沒有消費者以後,隊列會自動被刪除

4. ACK

執行一個任務可能須要花費幾秒鐘,你可能會擔憂若是一個消費者在執行任務過程當中掛掉了。一旦RabbitMQ將消息分發給了消費者,就會從內存中刪除。在這種狀況下,若是正在執行任務的消費者宕機,會丟失正在處理的消息和分發給這個消費者但還沒有處理的消息。 可是,咱們不想丟失任何任務,若是有一個消費者掛掉了,那麼咱們應該將分發給它的任務交付給另外一個消費者去處理。

爲了確保消息不會丟失,RabbitMQ支持消息應答。消費者發送一個消息應答,告訴RabbitMQ這個消息已經接收而且處理完畢了。RabbitMQ就能夠刪除它了。

所以手動ACK的常見手段

// 接收消息以後,主動ack/nak
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
            byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        try {
            System.out.println(" [ " + queue + " ] Received '" + message);
            channel.basicAck(envelope.getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicNack(envelope.getDeliveryTag(), false, true);
        }
    }
};

// 取消自動ack
channel.basicConsume(queue, false, consumer);

手動ack時,有個multiple,其含義表示:

能夠理解爲每一個Channel維護一個unconfirm的消息序號集合,每publish一條數據,集合中元素加1,每回調一次handleAck方法,unconfirm集合刪掉相應的一條(multiple=false)或多條(multiple=true)記錄

III. 其餘

1. 參考

Java Client API Guide

2. 一灰灰Bloghttps://liuyueyi.github.io/hexblog

一灰灰的我的博客,記錄全部學習和工做中的博文,歡迎你們前去逛逛

3. 聲明

盡信書則不如,已上內容,純屬一家之言,因我的能力有限,不免有疏漏和錯誤之處,如發現bug或者有更好的建議,歡迎批評指正,不吝感激

4. 掃描關注

QrCode

相關文章
相關標籤/搜索