RabbitMQ快速入門

1、前言

RabbitMQ實際上是我最先接觸的一個MQ框架,我記得當時是在大學的時候跑到圖書館一我的去看,因爲RabbitMQ官網的英文還不算太難,所以也是參考官網學習的,一共有6章,當時是用Node來開發的,當時花了一下午看完了,也理解了。而如今回過頭來再看,發現已經忘記了個差很少了,如今再回過頭來繼續看看,然乎記之。以防再忘,讀者看時最好有必定的MQ基礎。框架

2、RabbitMQ

首先咱們須要知道的是RabbitMQ它是基於高級隊列協議(AMQP)的,它是Elang編寫的,下面將圍繞RabbitMQ隊列、交換機、RPC三個重點進行展開。dom

2.一、隊列

存儲消息的地方,多個生產者能夠將消息發送到一個隊列,多個消費者也能夠消費同一個隊列的消息。學習

注意:當多個消費者監聽一個隊列,此時生產者發送消息到隊列只有一個消費者被消費,而且消費端的消費方式是按照消費端在內部啓動的順序輪詢(round-robin)。

2.二、消費者

消費消息的一方fetch

public class Send {

    private final static String QUEUE_NAME = "hello";
    private final static String IP = "172.16.12.162";
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP);
        factory.setUsername("admin");
        factory.setPassword("admin");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
public class Recv {

    private final static String QUEUE_NAME = "hello";
    private final static String IP = "172.16.12.162";

    public static void main(String[] args) {
        try {

            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(IP);
            factory.setUsername("admin");
            factory.setPassword("admin");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

2.三、小結

一、Rabbit是如何保證消息被消費的?
答:經過ack機制。每當一個消息被消費端消費的時候,消費端能夠發送一個ack給RabbitMQ,這樣RabbitMQ就知道了該條消息已經被完整消費而且能夠被delete了。;若是一條消息被消費可是沒有發送ack,那麼此時RabbitMQ將會認爲須要從新消費該消息,若是此時還有其它的消費者,那麼此時RabbitMQ將會把這條消息交給它處理。ui

注意:開啓ack機制的是autoAck= false;

二、消息如何進行持久化?spa

  • 將queue持久化,即設置 channel.queueDeclare(QUEUE_NAME, true, false, false, null);第二個參數durable爲true
  • 設置消息持久化,即設置MessageProperties.PERSISTENT_TEXT_PLAIN
注意:消息持久化並不必定保證消息不會被丟失

三、RabbitMQ如何避免兩個消費者一個很是忙一個很是閒的狀況?
經過以下設置,保證一個消費者一次只能消費一個消息,只有當它消費完成而且返回ack給RabbitMQ以後纔給它派發新的消息。3d

int prefetchCount = 1 ;
channel.basicQos(prefetchCount)

四、RabbitMQ異常狀況下如何保證消息不會被重複消費?
須要業務自身實現密等性,RabbitMQ沒有提供比較好的方式去保證。code

2.二、交換機

在RabbitMQ中,生產者其實歷來不會發送消息到隊列,甚至,它不知道消息被髮送到了哪一個隊列。那它被髮送到了哪裏呢?就是本節的重點:交換機,下面就是它在RabbitMQ中的介紹圖。(X就是交換機)生產者發送消息給交換機,而後由交換機將消息轉發給隊列。server

從上圖就產生一個問題:X怎麼將消息發給queue呢?它是把消息發給全部queue仍是發給一個指定的queue或者丟棄消息呢?這就是看交換機的類型了。下面一塊兒談談這幾種類型blog

2.2.一、fanout

fanout:廣播模式,這個比較好理解,就是全部的隊列都能收到交換機的消息。
clipboard.png
如上面,兩個隊列都能收到交換機的消息。

2.2.二、direct

這個模式至關於發佈/訂閱模式的一種,當交換機類型爲direct的時候,此時咱們須要設置兩個參數:

  1. channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));第二個參數,咱們能夠把它稱呼爲routeKey
  2. channel.queueBind(queueName, EXCHANGE_NAME, "");第三個參數,咱們把它稱呼爲bindKey

有了這兩個參數,咱們就能夠指定咱們訂閱哪些消息了。

clipboard.png
如圖,Q1訂閱了orange的消息,Q2訂閱了black、green的消息。

2.2.三、topic

其實topic和direct有一點相似,它至關於對direct做了加強。在direct中,咱們上面所說的bind routeKey爲black、green的它是有限制的,它只能絕對的等於routeKey,可是有時候咱們的需求不是這樣,咱們可能想要的是正則匹配便可,那麼Topic就派上用場了。

clipboard.png

當類型爲topic時,它的bindKey對應字符串須要是以「.」分割,同時RabbitMQ還提供了兩個符號:

  • 星號(*):表示1個單詞
  • 井號(#):表示0、多個單詞

上圖的意思是:全部第二個單詞爲orange的消息發送個Q1,全部最後一個單詞爲rabbit或者第一個單詞爲lazy的消息發送給Q2。

2.2.四、header

這一種類型官方demo沒有過多解釋,這裏也不研究了。

2.三、RPC

RabbitMQ 還能夠實現RPC(遠程過程調用)。什麼是RPC,簡單來講就是local調用remote方法。對應於RabbitMQ中則是Client發送一個request message,Server處理完成以後將其返回給Client。這裏就有了一個疑問?Server是如何將response返回給Client的,這裏RabbitMQ定義了一個概念:Callback Queue。
Callback Queue
注意這個隊列是獨一無二的String replyQueueName = channel.queueDeclare().getQueue();
首先咱們須要明白一點的是爲何須要這個queue?咱們知道在RabbitMQ做消息隊列的時候,Client只須要將消息投放到queue中,而後Server從queue去取就能夠了。可是在RabbitMQ做爲RPC的時候多了一點就是,Client還須要返回結果,這時Server端怎麼知道把消息發送給Client,這就是Callback Queue的用處了。
Correlation Id
在上面咱們知道Server返回數據給Client是經過Callback Queue的,那麼是爲每個request都建立一個queue嗎?這未免太過浪費資源,RabbitMQ有更好的方案。在咱們發送request,綁定一個惟一ID(correlationId),而後在消息被處理返回的時候取出這個ID和發出去的ID進行匹配。這樣來講一個Callback Queue是Client級別而不是request級別的了。

實現
上面介紹了RabbitMQ實現RPC最重要的兩個概念,具體代碼比較簡單仍是貼下把。
client 端

public class RPCClient {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";

    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        connection = factory.newConnection();
        channel = connection.createChannel();
    }

    public static void main(String[] argv) throws Exception{
        RPCClient fibonacciRpc = new RPCClient();
        try {
            for (int i = 0; i < 32; i++) {
                String i_str = Integer.toString(i);
                System.out.println(" [x] Requesting fib(" + i_str + ")");
                String response = fibonacciRpc.call(i_str);
                System.out.println(" [.] Got '" + response + "'");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public String call(String message) throws IOException, InterruptedException {
        final String corrId = UUID.randomUUID().toString();

        String replyQueueName = channel.queueDeclare().getQueue();
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

        String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response.offer(new String(delivery.getBody(), "UTF-8"));
            }
        }, consumerTag -> {
        });

        String result = response.take();
        channel.basicCancel(ctag);
        return result;
    }

    public void close() throws IOException {
        connection.close();
    }
}

服務端

public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n - 1) + fib(n - 2);
    }

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.queuePurge(RPC_QUEUE_NAME);

            channel.basicQos(1);

            System.out.println(" [x] Awaiting RPC requests");

            Object monitor = new Object();
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                        .Builder()
                        .correlationId(delivery.getProperties().getCorrelationId())
                        .build();

                String response = "";

                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    int n = Integer.parseInt(message);

                    System.out.println(" [.] fib(" + message + ")");
                    response += fib(n);
                } catch (RuntimeException e) {
                    System.out.println(" [.] " + e.toString());
                } finally {
                    channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    // RabbitMq consumer worker thread notifies the RPC server owner thread
                    synchronized (monitor) {
                        monitor.notify();
                    }
                }
            };

            channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
            // Wait and be prepared to consume the message from RPC client.
            while (true) {
                synchronized (monitor) {
                    try {
                        monitor.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

3、總結

此次回頭再看RabbitMQ,再次從新理解了如下RabbitMQ,有些東西仍是要慢慢嚼的。固然這些也都是官網的入門例子,後續有機會的話再深刻研究。

相關文章
相關標籤/搜索