消息中間件之RabbitMQ初識

做者:threedayman 恆生LIGHT雲社區html

RabbitMQ是什麼

RabbitMQ是部署最普遍的開源消息代理。RabbitMQ有輕量級且易部署的特色。支持多種消息協議。java

爲何使用RabbitMQ

常見的使用場景有解耦、異步、削峯填谷。下面咱們經過例子來感覺下各自場景下使用MQ帶來的效益。mysql

解耦git

假設有系統A,依賴系統B、系統C、系統D,依賴關係在代碼中已經寫死,結構以下圖。程序員

1621939617(1).png

假設此時又來了一個新需求,系統A須要調用系統E進行一些新的業務操做,那麼系統A的程序員又免不了一頓操做,處理接入系統E的需求。同理若是要去掉某個系統的依賴好比系統C,也須要系統A負責的開發進行處理。github

那麼此時咱們若是引入了MQ來看看會帶來什麼樣的變化。sql

image-20210525185231787.png

系統A發送消息到MQ,系統B、C、D訂閱對應的消息進行業務處理。那麼咱們再來看看以前的場景,假設須要增長一個依賴系統E,只須要系統E的開發人員進行對應的訂閱消費便可,同理若是要取消系統C的依賴,只須要系統C取消訂閱對應的消息。數據庫

異步app

假設系統A操做耗時30ms,系統A還將同步調用系統B(300ms)、系統C(600ms)、系統D(200ms)那麼這個請求的響應時間將會達到1130ms。過長的響應時間會給客戶帶來很差的用戶體驗。異步

1621940629(1).png

引入MQ以後咱們看看會發生什麼變化

image-20210525190839346.png

系統A將消息發送給MQ(7ms)以後就返回,系統B、C、D分別監聽MQ進行業務處理。那麼咱們看到針對剛纔長耗時的同步依賴,引入MQ進行異步處理後,整體的響應時間從1130ms降到了37ms。

削峯填谷

假設咱們有個業務高峯期的請求量可以到達7000 /s而業務低谷流量只有100/s,可是咱們的mysql數據庫只能承受2000/s的請求。

1621941575(1).png

在這種狀況下會致使在高峯期超過了mqsql最高的負載能力而直接打掛,而低峯期沒有將mqsql的資源合理利用起來。

引入MQ以後咱們看看會發生什麼變化

image-20210525192439287.png

此時系統能夠按照本身最大的消費能力2000/s去拉取消息,能夠平穩度過業務高峯期,同時將一部分消息延遲到業務低谷時期進行處理。不至於出現因爲高流量致使數據庫被打掛,出現總體服務不可用的現象。

怎樣使用RabbitMQ

本小節主要針對RabbitMQ的java客戶端編寫的幾個經常使用的例子,若是您對使用RabbitMQ已熟練掌握,可跳過本小節。查看完整的RabbitMQ使用說明,請訪問官方文檔

Hello world

咱們經過一個Hello world 的例子來感覺下RabbitMQ。首先介紹下本例中使用到的術語

  • Producer:生產者,用來發送消息。
  • Queue:消息隊列,用於存儲消息,消息經由生產者投遞到消息隊列,最終被投遞到消費者進行消費,消息隊列收到機器內存和硬盤資源的限制。
  • Consumer:消費者,用於接收並處理消息。

本例中咱們咱們將生產Hello World的消息,經過消費者接受並打印出消息。

1621942986(1).png

生產者Send 關鍵步驟見註釋說明

public class Send {
	//隊列名稱
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        //建立和server之間的鏈接 connection、channel
        ConnectionFactory factory = new ConnectionFactory();
        //請設置實際部署節點ip
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //聲明一個queue去發送消息
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            //發佈消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

完整Send代碼查閱

消費者Recv 關鍵步驟見註釋說明

public class Recv {
	//隊列名稱
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        //建立和server之間的鏈接 connection、channel
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
		//聲明要去消費的隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		//經過該類來處理消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

完整Recv代碼查閱

Work Queues

在本例中咱們將介紹經過RabbitMQ分發耗時任務給多個工做者。RabbitMQ會經過輪詢(round-robin)的方式將消息投遞給消費者,這是的咱們可以很容易的擴展消費能力。

1621944984(1).png

生產者NewTask 關鍵步驟見註釋說明

public class NewTask {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //將隊列設置成持久化
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

            String message = String.join(" ", argv);
			//將消息設置成持久化
            channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }

}

完整NewTask代碼查閱

消費者Woker 關鍵步驟見註釋說明

public class Worker {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
		//將隊列設置成持久化
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		//一個消費者最多同時處理一個未確認的消息
        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
    }
	//模擬耗時任務,一個.表明耗時1S
    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

完整Worker代碼查閱

Publish/Subscribe

上面咱們已經介紹過RabbitMQ的核心消息模型,生產者、消費者、隊列,在本小節咱們將接觸到另外一個消息模型exchange** ,它負責從生產者中接收消息,並把消息投遞到隊列中。exchage主要有如下幾種類型**

  • direct
  • topic
  • headers
  • fanout

本例中咱們將已fanout類型做爲講解,經過名稱咱們大概也能猜到此類型exchange會廣播接收到的消息到其綁定的隊列中。

1622082071(1).jpg

生產者EmitLog 關鍵步驟見註釋說明

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //建立一個exchange 並指定類型
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            String message = argv.length < 1 ? "info: Hello World!" :
                    String.join(" ", argv);
			//此處和以前發消息不同,指定具體的exchange沒有指定具體的queue
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }

}

EmitLog完整代碼查閱

消費者ReceiveLogs 關鍵步驟見註釋說明

public class ReceiveLogs {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
		//建立fanout類型的exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //獲取一個獨有的,非持久化的,自動刪除的隊列
        String queueName = channel.queueDeclare().getQueue();
        //經過綁定方法將exchage和queue之間簡歷關係
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

ReceiveLogs完整代碼查閱

Routing

上一個例子中exchange將接收到的信息廣播給了綁定的隊列中,本例中咱們將增長綁定的一些特定,使exchange有能力經過routingKey(全匹配)來投遞不一樣的消息到不一樣的隊列中。例如平常日誌區分error日誌進單獨的隊列。

image-20210526095222998.png

生產者EmitLogDirect 關鍵步驟見註釋說明

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //聲明一個direct類型的exchange
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

            String severity = getSeverity(argv);
            String message = getMessage(argv);

            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
        }
    }

    private static String getSeverity(String[] strings) {
        if (strings.length < 1)
            return "info";
        return strings[0];
    }

    private static String getMessage(String[] strings) {
        if (strings.length < 2)
            return "Hello World!";
        return joinStrings(strings, " ", 1);
    }

    private static String joinStrings(String[] strings, String delimiter, int startIndex) {
        int length = strings.length;
        if (length == 0) return "";
        if (length <= startIndex) return "";
        StringBuilder words = new StringBuilder(strings[startIndex]);
        for (int i = startIndex + 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}

EmitLogDirect完整代碼查閱

消費者ReceiveLogsDirect 關鍵步驟見註釋說明

public class ReceiveLogsDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1) {
            System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
            System.exit(1);
        }

        for (String severity : argv) {
            //創建exchange和queue之間關係並設置routingKey
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

ReceiveLogsDirect完整代碼查閱

Topics

提供更豐富的exchange到queue之間的路由規則。規則經過.分隔的routingKey,最高限制 255bytes。跟以前的全匹配routingKey不一樣,topic類型的exchange的routingKey主要增長了兩個特性。

  • *表明一個單詞**。**
  • **#** 表明0個或一個單詞。

image-20210526101017541.png

生產者EmitLogTopic 關鍵步驟見註釋說明

public class EmitLogTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            String routingKey = getRouting(argv);
            String message = getMessage(argv);

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
        }
    }

    private static String getRouting(String[] strings) {
        if (strings.length < 1)
            return "anonymous.info";
        return strings[0];
    }

    private static String getMessage(String[] strings) {
        if (strings.length < 2)
            return "Hello World!";
        return joinStrings(strings, " ", 1);
    }

    private static String joinStrings(String[] strings, String delimiter, int startIndex) {
        int length = strings.length;
        if (length == 0) return "";
        if (length < startIndex) return "";
        StringBuilder words = new StringBuilder(strings[startIndex]);
        for (int i = startIndex + 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}

EmitLogTopic完整代碼查閱

消費者ReceiveLogsTopic 關鍵步驟見註釋說明

public class ReceiveLogsTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1) {
            System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
            System.exit(1);
        }

        for (String bindingKey : argv) {
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

ReceiveLogsTopic完整代碼查閱

引入RabbitMQ帶來什麼挑戰

看到這,各位看官是否是越越欲試想在項目中引入RabbitMQ去優化如今的使用場景,那麼是否是咱們部署一個RabbitMQ服務,而後發送消息就高枕無憂了呢?其實在引入一箇中間件時,同時伴隨着一些問題,若是咱們對這些問題了解不夠深刻或者全面,那恭喜你將進入挖坑選手序列。爲了成爲一個靠譜的程序員,咱們要充分了解引入中間件給咱們 項目帶來的挑戰,才能在以後的應用上從容應對。下面列了下消息中間件中常見的幾類問題

  • 消息丟失
  • 消息重複
  • 消息堆積
  • RabbitMQ的可用性保證

以後的文章,咱們將逐個去講解上述問題的解決方案。 下一講:RabbitMQ消息可靠性傳輸

參考文檔

https://www.rabbitmq.com/ RabbitMQ官方文檔

tips:做者我的經驗有限,不足之處煩請指正。

相關文章
相關標籤/搜索