RabbitMQ簡介以及應用

RabbitMQ簡介以及應用

1、簡要介紹

  • 開源AMQP實現,Erlang語言編寫,支持多種客戶端
  • 分佈式、高可用、持久化、可靠、安全
  • 支持多種協議:AMQP、STOMP、MQTT、HTTP
  • 適用於多系統之間的業務解耦的消息中間件

2、基本概念

一、exchange:交換器,負責接收消息,轉發消息至綁定的隊列,有四種類型:

  • direct:徹底匹配的路由
  • topic:模式匹配的路由
  • fanout:廣播模式
  • headers:鍵值對匹配路由

Exchange屬性:java

  • 持久化:若是啓用,那麼rabbit服務重啓以後仍然存在
  • 自動刪除:若是啓用,那麼交換器將會在其綁定的隊列都被刪除掉以後自動刪除掉自身

二、Queue:隊列,rabbitmq的內部對象,用於存儲消息,其屬性相似於Exchange,一樣能夠設置是否持久化、自動刪除等。

消費者從Queue中獲取消息並消費。多個消費者能夠訂閱同一個Queue,這時Queue中的消息會被平均分攤給多個消費者進行處理,而不是每一個消費者都收到全部的消息並處理。安全

三、Binding:綁定,根據路由規則綁定交換器與隊列

四、Routing:路由鍵,路由的關鍵字

3、消息的可靠性

  • Message acknowledgment:消息確認,在消息確認機制下,收到回執纔會刪除消息,未收到回執而斷開了鏈接,消息會轉發給其餘消費者,若是忘記回執,會致使消息堆積,消費者重啓後會重複消費這些消息並重復執行業務邏輯。服務器

  • Message durability:消息持久化,設置消息持久化能夠避免絕大部分消息丟失,好比rabbitmq服務重啓,可是採用非持久化能夠提高隊列的處理效率。若是要確保消息的持久化,那麼消息對應的Exchange和Queue一樣要設置爲持久化。架構

  • Prefetch count,每次發送給消費者消息的數量,默認爲1

另外,若是須要可靠性業務,須要設置持久化和ack機制,若是系統高吞吐,能夠設置爲非持久化、noack、自動刪除機制。併發

4、簡單應用

模擬這樣一個業務場景,用戶下單成功後,須要給用戶增長積分,同時還須要給用戶發送下單成功的消息,這是在電商業務中很常見的一個業務場景。dom

若是系統是微服務架構,可能用戶下單功能在訂單服務,給用戶增長積分的功能在積分服務,給用戶發送通知消息的功能在通知服務,各個服務之間解耦,互不影響。那麼要實現上述的業務場景,消息中間件rabbitmq是一個很好的選擇。分佈式

緣由以下:ide

  • 高性能,它的實現語言是天生具有高併發高可用的erlang 語言
  • 支持消息的持久化,即便服務器掛了,也不會丟失消息
  • 消息應答(ack)機制,消費者消費完消息後發送一個消息應答,rabbitmq纔會刪除消息,確保消息的可靠性
  • 支持高可用集羣
  • 靈活的路由

實現思路:微服務

用戶下單成功後,rabbitmq發送一條消息至EXCHANGE.ORDER_CREATE交換器,該交換器綁定了兩個隊列,QUEUE.ORDER_INCREASESCOREQUEUE.ORDER_NOTIFY,消費者訂閱這兩個隊列分別用來處理增長積分、發送用戶通知。若是後續日誌系統還須要記錄下單的相關日誌,那麼咱們只須要再定義一個隊列並將其綁定到EXCHANGE.ORDER_CREATE便可。高併發

下單發rabbitmq消息

package com.robot.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

/**
 * @author: 會跳舞的機器人
 * @date: 2017/10/13 10:46
 * @description: 模擬用戶下單以後發送rabbitmq消息
 */
public class OrderCreator {
    // 交換器名稱
    private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE";
    // 消息內容
    private static String msg = "create order success";

    /**
     * 模擬建立訂單後發送mq消息
     */
    public void createOrder() {
        System.out.println("下單成功,開始發送rabbitmq消息");

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.12.44");
        connectionFactory.setPort(56720);
        connectionFactory.setUsername("baibei");
        connectionFactory.setPassword("baibei");

        Connection connection;
        Channel channel;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            // 持久化
            boolean durable = true;
            // topic類型
            String type = "topic";
            // 聲明交換器,若是交換器不存在則建立之
            channel.exchangeDeclare(EXCHANGE, type, durable);

            String messgeId = UUID.randomUUID().toString();
            // deliveryMode>=2表示設置消息持久化
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2).messageId(messgeId).build();
            // 發佈消息
            String routingKey = "order_create";
            channel.basicPublish(EXCHANGE, routingKey, props, msg.getBytes("utf-8"));
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

積分系統訂閱消息

package com.robot.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author: 會跳舞的機器人
 * @date: 2017/10/13 16:02
 * @description: rabbitmq消費者,模擬下單成功後給用戶增長積分
 */
public class IncreaseScoreConsumer implements Consumer {
    private Connection connection;
    private Channel channel;
    // 交換器名稱
    private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE";
    // 增長積分隊列名稱
    private static final String QUEUENAME = "QUEUE.ORDER_INCREASESCORE";

    public void consume() {
        // 初始化rabbitmq鏈接信息
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.12.44");
        connectionFactory.setPort(56720);
        connectionFactory.setUsername("baibei");
        connectionFactory.setPassword("baibei");
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            // 聲明交換器
            channel.exchangeDeclare(EXCHANGE, "topic", true);
            // 聲明隊列
            channel.queueDeclare(QUEUENAME, true, false, false, null);
            // 交換器與隊列綁定並設置routingKey
            channel.queueBind(QUEUENAME, EXCHANGE, "order_create");
            // 消費消息,callback是該類,關閉自動確認消息,在完成業務邏輯後手動確認確認
            channel.basicConsume(QUEUENAME, false, this);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body, "UTF-8");
        System.out.println("《積分系統》收到訂單消息:" + msg + ",給用戶增長積分......");
        // 手動確認消息
        channel.basicAck(envelope.getDeliveryTag(), false);

        /**
         * channel.basicReject(envelope.getDeliveryTag(), false);該方法會丟棄掉隊列中的這條消息
         * channel.basicReject(envelope.getDeliveryTag(), true);該方法會把消息從新放回隊列
         * 通常系統會設定一個重試次數,若是超太重試次數,則會丟棄消息,反之則會把消息再放入隊列
         */
    }

    public void handleConsumeOk(String consumerTag) {

    }

    public void handleCancelOk(String consumerTag) {

    }

    public void handleCancel(String consumerTag) throws IOException {

    }

    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {

    }

    public void handleRecoverOk(String consumerTag) {

    }

}

通知系統訂閱消息

package com.robot.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author: 會跳舞的機器人
 * @date: 2017/10/13 16:20
 * @description: rabbitmq消費者,模擬下單成功後給用戶發送通知
 */
public class NotifyConsumer implements Consumer {
    private Connection connection;
    private Channel channel;

    // 交換器名稱
    private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE";
    // 通知用戶下單成功通知隊列名稱
    private static final String QUEUENAME = "QUEUE.ORDER_NOTIFY";

    public void consume() {
        // 初始化rabbitmq鏈接信息
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.12.44");
        connectionFactory.setPort(56720);
        connectionFactory.setUsername("baibei");
        connectionFactory.setPassword("baibei");
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            // 聲明交換器
            channel.exchangeDeclare(EXCHANGE, "topic", true);
            // 聲明隊列
            channel.queueDeclare(QUEUENAME, true, false, false, null);
            // 交換器與隊列綁定並設置routingKey
            channel.queueBind(QUEUENAME, EXCHANGE, "order_create");
            // 消費消息,callback是該類,關閉自動確認消息,在完成業務邏輯後手動確認確認
            channel.basicConsume(QUEUENAME, false, this);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body, "UTF-8");
        System.out.println("《通知系統》收到訂單消息:" + msg + ",開始給用戶發送通知......");
        // 手動確認消息
        channel.basicAck(envelope.getDeliveryTag(), false);

        /**
         * channel.basicReject(envelope.getDeliveryTag(), false);該方法會丟棄掉隊列中的這條消息
         * channel.basicReject(envelope.getDeliveryTag(), true);該方法會把消息從新放回隊列
         * 通常系統會設定一個重試次數,若是超太重試次數,則會丟棄消息,反之則會把消息再放入隊列
         */
    }

    public void handleConsumeOk(String consumerTag) {

    }

    public void handleCancelOk(String consumerTag) {

    }

    public void handleCancel(String consumerTag) throws IOException {

    }

    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {

    }

    public void handleRecoverOk(String consumerTag) {

    }
}

測試

package com.robot.rabbitmq;

/**
 * @author: 會跳舞的機器人
 * @date: 2017/10/13 16:27
 * @description:
 */
public class Test {
    public static void main(String[] args) {

        IncreaseScoreConsumer increaseScoreConsumer = new IncreaseScoreConsumer();
        increaseScoreConsumer.consume();

        NotifyConsumer notifyConsumer = new NotifyConsumer();
        notifyConsumer.consume();

        OrderCreator orderCreator = new OrderCreator();
        for (int i = 0; i < 3; i++) {
            orderCreator.createOrder();
        }
    }
}

輸出:

下單成功,開始發送rabbitmq消息
《積分系統》收到訂單消息:create order success,給用戶增長積分......
《通知系統》收到訂單消息:create order success,開始給用戶發送通知......
下單成功,開始發送rabbitmq消息
《積分系統》收到訂單消息:create order success,給用戶增長積分......
《通知系統》收到訂單消息:create order success,開始給用戶發送通知......
下單成功,開始發送rabbitmq消息
《積分系統》收到訂單消息:create order success,給用戶增長積分......
《通知系統》收到訂單消息:create order success,開始給用戶發送通知......

原文轉載:https://www.jianshu.com/p/2f55cd7a3e1c做者:會跳舞的機器人

相關文章
相關標籤/搜索