RabbitMQ使用詳解

RabbitMQ是基於AMQP的一款消息管理系統。AMQP(Advanced Message Queuing Protocol),是一個提供消息服務的應用層標準高級消息隊列協議,其中RabbitMQ就是基於這種協議的一種實現。算法

常見mq:spring

  • ActiveMQ:基於JMS
  • RabbitMQ:基於AMQP協議,erlang語言開發,穩定性好
  • RocketMQ:基於JMS,阿里巴巴產品,目前交由Apache基金會
  • Kafka:分佈式消息系統,高吞吐量

1 消息模型

RabbitMq有5種經常使用的消息模型網絡

1.1 基本消息模型

這是最簡單的消息模型,以下圖:
app

生產者將消息發送到隊列,消費者從隊列中獲取消息,隊列是存儲消息的緩衝區。負載均衡

再演示代碼以前,咱們先建立一個工程rabbitmq-demo,並編寫一個工具類,用於提供與mq服務建立鏈接異步

public class ConnectionUtil {
    /**
     * 創建與RabbitMQ的鏈接
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {
        //定義鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置服務地址
        factory.setHost("192.168.18.130");
        //端口
        factory.setPort(5672);
        //設置帳號信息,用戶名、密碼、vhost
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 經過工程獲取鏈接
        Connection connection = factory.newConnection();
        return connection;
    }
}

生產者發送消息

接下來是生產者發送消息,其過程包括:1.與mq服務創建鏈接,2.創建通道,3.聲明隊列(有相同隊列則不建立,沒有則建立),4.發送消息,代碼以下:分佈式

public class Send {
    private static final String QUEUE_NAME = "basic_queue";
    public static void main(String[] args) throws Exception {
        //消息發送端與mq服務建立鏈接
        Connection connection = ConnectionUtil.getConnection();
        //創建通道
        Channel channel = connection.createChannel();
        //聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "hello world";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("生產者已發送:" + message);
        channel.close();
        connection.close();
    }
}

消費者接受消息

消費者在接收消息的過程須要經歷以下幾個步驟: 1.與mqfuwu創建鏈接,2.創建通道,3.聲明隊列,4,接收消息,代碼以下:ide

public class Consumer1 {
    private static final String QUEUE_NAME = "basic_queue";
    public static void main(String[] args) throws Exception {
        //消息消費者與mq服務創建鏈接
        Connection connection = ConnectionUtil.getConnection();
        //創建通道
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取消息,而且處理,這個方法相似事件監聽,若是有消息的時候,會被自動調用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // body 即消息體
                String msg = new String(body);
                System.out.println("消費者1接收到消息:" + msg);
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

消息的接收與消費使用都須要在一個匿名內部類DefaultConsumer中完成spring-boot

注意:隊列須要提早聲明,若是未聲明就使用隊列,則會報錯。若是不清楚生產者和消費者誰先聲明,爲了保證不報錯,生產者和消費者都聲明隊列,隊列的建立會保證冪等性,也就是說生產者和消費者都聲明同一個隊列,則只會建立一個隊列工具

1.2 Work Queues工做隊列模型

在基本消息模型中,一個生產者對應一個消費者,而實際生產過程當中,每每消息生產會發送不少條消息,若是消費者只有一個的話效率就會很低,所以rabbitmq有另一種消息模型,這種模型下,一個生產發送消息到隊列,容許有多個消費者接收消息,可是一條消息只會被一個消費者獲取。

生產者發送消息

與基本消息模型基本一致,這裏測試循環發佈20條消息:

public class Send {
    private static final String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 循環發佈任務
        for (int i = 1; i <= 20; i++) {
            // 消息內容
            String message = "task .. " + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("生產者發送消息:" + message);
            Thread.sleep(500);
        }
        channel.close();
        connection.close();
    }
}

消費者1

public class Consumer1 {
    private static final String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println("消費者1接收到消息:" + msg);
                try {
                    Thread.sleep(50);//模擬消費耗時
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

消費者2

public class Consumer2 {
    private static final String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println("消費者2接收到消息:" + msg);
                try {
                    Thread.sleep(50);//模擬消費耗時
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

此時有兩個消費者監聽同一個隊列,當兩個消費者都工做時,生成者發送消息,就會按照負載均衡算法分配給不一樣消費者,以下圖:

1.3 訂閱模型

在以前的模型中,一條消息只能被一個消費者獲取,而在訂閱模式中,能夠實現一條消息被多個消費者獲取。在這種模型下,消息傳遞過程當中比以前多了一個exchange交換機,生產者不是直接發送消息到隊列,而是先發送給交換機,經由交換機分配到不一樣的隊列,而每一個消費者都有本身的隊列:

解讀:

一、1個生產者,多個消費者

二、每個消費者都有本身的一個隊列

三、生產者沒有將消息直接發送到隊列,而是發送到了交換機

四、每一個隊列都要綁定到交換機

五、生產者發送的消息,通過交換機到達隊列,實現一個消息被多個消費者獲取的目的

X(exchange)交換機的類型有如下幾種:

Fanout:廣播,交換機將消息發送到全部與之綁定的隊列中去

Direct:定向,交換機按照指定的Routing Key發送到匹配的隊列中去

Topics:通配符,與Direct大體相同,不一樣在於Routing Key能夠根據通配符進行匹配

注意:在發佈訂閱模型中,生產者只負責發消息到交換機,至於消息該怎麼發,以及發送到哪一個隊列,生產者都不負責。通常由消費者建立隊列,而且綁定到交換機

訂閱模型之Fanout

在廣播模式下,消息發送的流程以下:

  1. 能夠有多個消費者,每一個消費者都有本身的隊列
  2. 每一個隊列都要與exchange綁定
  3. 生產者發送消息到exchange
  4. exchange將消息把消息發送到全部綁定的隊列中去
  5. 消費者從各自的隊列中獲取消息
生產者發送消息
public class Send {
    private static final String EXCHANGE_NAME = "fanout_exchange";
    
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 聲明exchange,指定類型爲fanout
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        String message = "hello world";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println("生產者發送消息:" + message);
        channel.close();
        connection.close();
    }
}
消費者
public class Consumer1 {
    private static final String QUEUE_NAME = "fanout_queue_1";
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //消費者聲明本身的隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 聲明exchange,指定類型爲direct
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        //消費者將隊列與交換機進行綁定
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException
            {
                String msg = new String(body);
                System.out.println("消費者1獲取到消息:" + msg);
            }
        });
    }
}

其餘消費者只需修改QUEUE_NAME便可

注意:exchange與隊列同樣都須要提早聲明,若是未聲明就使用交換機,則會報錯。若是不清楚生產者和消費者誰先聲明,爲了保證不報錯,生產者和消費者都聲明交換機,一樣的,交換機的建立也會保證冪等性。

訂閱模型之Direct

在fanout模型中,生產者發佈消息,全部消費者均可以獲取全部消息。在路由模式(Direct)中,能夠實現不一樣的消息被不一樣的隊列消費,在Direct模式下,交換機再也不將消息發送給全部綁定的隊列,而是根據Routing Key將消息發送到指定的隊列,隊列在與交換機綁定時會設定一個Routing Key,而生產者發送的消息時也須要攜帶一個Routing Key。

如圖所示,消費者C1的隊列與交換機綁定時設置的Routing Key是「error」, 而C2的隊列與交換機綁定時設置的Routing Key包括三個:「info」,「error」,「warning」,假如生產者發送一條消息到交換機,並設置消息的Routing Key爲「info」,那麼交換機只會將消息發送給C2的隊列。

生產者發送消息
public class Send {
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 聲明exchange,指定類型爲direct
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String message = "新增一個訂單";
        //生產者發送消息時,設置消息的Routing Key:"insert"
        channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());
        System.out.println("生產者發送消息:" + message);
        channel.close();
        connection.close();
    }
}
消費者1
public class Consumer1 {
    private static final String QUEUE_NAME = "direct_queue_1";
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //消費者聲明本身的隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //聲明交換機
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //消費者將隊列與交換機進行綁定,而且設置Routing Key:"insert"
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException
            {
                String msg = new String(body);
                System.out.println("消費者1獲取到消息:" + msg);
            }
        });
    }
}

其餘消費者須要修改隊列名QUEUE_NAME和Routing Key,上述生成者發送的消息,消費者1是能夠獲取到的

發佈訂閱之Topics

Topic類型的Exchange與Direct相比,都是能夠根據RoutingKey把消息路由到不一樣的隊列。只不過Topic類型Exchange可讓隊列在綁定Routing key 的時候使用通配符

Routingkey 通常都是有一個或多個單詞組成,多個單詞之間以」.」分割,例如: item.insert

通配符規則:

#:匹配一個或多個詞

     *:匹配很少很多剛好1個詞

舉例:

audit.#:可以匹配audit.irs.corporate 或者 audit.irs

     audit.*:只能匹配audit.irs

Topics生產者代碼與Direct大體相同,只不過子聲明交換機時,將類型設爲BuiltinExchangeType.TOPIC(topic),

消費者代碼也與Direct大體相同,也是在聲明交換機時設置類型爲topic,代碼再也不演示

Spring AMQP

Spring AMQP是對AMQP的一種封裝,目的是可以讓咱們更簡便的使用消息隊列,下面介紹一下Spring AMQP在Spring boot中的使用方法

依賴和配置

添加AMQP的啓動器:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在application.yml中添加RabbitMQ的地址:

spring:
  rabbitmq:
    host: 192.168.18.130
    username: admin
    password: admin

消費者

消費者須要定義一個類,類中定義監聽隊列的方法

@Component
public class Listener {

    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "spring.test.queue", durable = "false"),
                    exchange = @Exchange(value = "spring.test.exchange", type = ExchangeTypes.DIRECT),
                    key = "insert"
            )
    )
    public void listen(String msg){
        System.out.println("消費者接受到消息:" + msg);
    }
}

註解:

@Component:保證監聽類被spring掃描到

@RabbitListener:

@RabbitListener包含不少內容,在發佈訂閱模式中,咱們可使用其中的「QueueBinding[] bindings」,其中QueueBinding底層以下:

其中Queue表示隊列,Exchange表示交換機,key表示Routing Key

@RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "spring.test.queue", durable = "false"),
                    exchange = @Exchange(value = "spring.test.exchange", type = ExchangeTypes.DIRECT),
                    key = "insert"
            )
    )

@Queue會建立隊列

@Exchange會建立交換機

@QueueBinding會綁定隊列和交換機

生產者發送消息

能夠經過註解引入AmqpTemplate:

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {
    @Resource
    private AmqpTemplate template;

    @Test
    public void testSendMsg() throws InterruptedException {
        String message = "hello spring";
        template.convertAndSend("spring.test.exchange", "insert", message);
        System.out.println("生產者發送消息:" + message);
        Thread.sleep(10000);//等待10s,讓測試方法延遲結束,防止消費者將來得及獲取消息
    }
}

RabbitMQ如何防止消息丟失

1. 消息確認機制(ACK)

RabbitMQ有一個ACK機制,消費者在接收到消息後會向mq服務發送回執ACK,告知消息已被接收。這種ACK分爲兩種狀況:

  • 自動ACK:消息一旦被接收,消費者會自動發送ACK
  • 手動ACK:消息接收後,不會自動發送ACK,而是須要手動發送ACK

若是消費者沒有發送ACK,則消息會一直保留在隊列中,等待下次接收。但這裏存在一個問題,就是一旦消費者發送了ACK,若是消費者後面宕機,則消息會丟失。所以自動ACK不能保證消費者在接收到消息以後可以正常完成業務功能,所以須要在消息被充分利用以後,手動ACK確認

自動ACK,basicConsume方法中將autoAck參數設爲true便可:

手動ack,在匿名內部類中,手動發送ACK:

固然,若是設置了手動ack,但又不手動發送ACK確認,消息會一直停留在隊列中,可能形成消息的重複獲取

2. 持久化

消息確認機制(ACK)可以保證消費者不丟失消息,但假如消費者在獲取消息以前mq服務宕機,則消息也會丟失,所以要保證消息在服務端不丟失,則須要將消息進行持久化。隊列、交換機、消息都要持久化。

隊列持久化

exchange持久化

消息持久化

3. 生產者確認

生成者在發送消息過程當中也可能出現錯誤或者網絡延遲燈故障,致使消息未成功發送到交換機或者隊列,或重複發送消息,爲了解決這個問題,rabbitmq中有多個解決辦法:

事務:

用事務將消息發送代碼包圍起來:

Confirm模式:

以下所示,在發送代碼前執行channel.confirmSelect(),若是消息未正常發送,就會進入if代碼塊,能夠進行重發也能夠對失敗消息進行記錄

異步confirm方法:

顧名思義,就是生產者發送消息後不用等待服務端回饋發送狀態,能夠繼續執行後面的代碼,對於失敗消息重發進行異步處理:

Spring AMQP中添加配置:

生產者確認機制,確保消息正確發送,若是發送失敗會有錯誤回執,從而觸發重試

spring:
  rabbitmq:
    publisher-confirms: true
相關文章
相關標籤/搜索