rabbitmq的基本使用

如今微服務盛行, 咱們一般會進行解耦, 這時候就須要異步的消息隊列來幫助各個服務之間解耦spring

rabbitmq的基本概念介紹

rabbitmq的基本概念有消息producer(消息生產者)、exchange(交換機)、queue(隊列)、consumer(消費者)、routingKeysegmentfault

clipboard.png
(圖中的P是producer, 即消息生產者, 中簡的Server是Exchange(交換機) 和 Queue(隊列))瀏覽器

  • Queue(隊列)

queue是存放消息的隊列, 實際上就是一個存放消息數據結構爲隊列的一個容器數據結構

  • exchange(交換機)

咱們可能會簡單的覺得發送者會把消息發送到隊列中, 而後消費者對隊列進行監聽。事實上, 消息發送者永遠不會將消息直接發送到隊列中, 而是將消息發送到exhang中, 再由exchange經過必定的路由規則路由到對應的消息隊列中。
交換機有四種類型:異步

Direct
Topic
Headers
Fanout

Direct Exchange:

clipboard.png

  • routingKey

在上面介紹exchange中說到消息經過必定的路由規則路由到對應的隊列中, routingKey就是起着這樣的一個做用,一般咱們發送消息到exchane中的時候會攜帶一個routingKey, 而這個routingKey就是exchange和queue綁定的一個規則, 由此即可以將消息從exchange再發送到對應的queue上spring-boot

參考文章https://segmentfault.com/a/11...微服務

SpringBoot中使用rabbitmq

首先添加如下依賴:spa

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置項以下:3d

spring:
  rabbitmq:
    port: 5672
    password: guest
    username: guest
    host: localhost
    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 1
        max-concurrency: 1
        retry:
          enabled: true

在瀏覽器輸入http://localhost:15672/, 在mq上咱們新建了一個名爲exchange1, routingKey爲exhcange1-queue1的exchange, 而且映射到名爲queue1的隊列, code

圖片描述

發送消息代碼Prodcuer:

public class Sender{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(Object object) {
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("exchange1-queue1-id");
        String message = "hello world";
        rabbitTemplate.convertAndSend("exchange1", "exchange1-queue1", "helloworld", new CorrelationData());
    }
}

在上面的代碼中咱們發送了一個消息到名爲"exchange1", routingKey爲"exchange1-queue1"的消息。咱們啓動rabbitmq。
發送後能夠在mq上看到以下圖已經成功發送了。

圖片描述

接下來貼上接受消息的receiver代碼:

@Component
public class Receiver {

    @RabbitListener(queues = "queue1")
    public void receive(Message message, Channel channel) {
        try {
            System.out.println(message.getBody());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

接受消息後再看mq上以下圖:
圖片描述

能夠看出queue1上Ready一欄是0,可是Unacked一欄和Total一欄依然有消息, 這是由於咱們再配置文件中設置的是手動的ack,這時候代碼中沒有進行ack, mq會認爲消費者沒有成功消費掉這條消息, 這時候就須要在配置文件中設置成自動ack, 或者在代碼中手動進行ack,在消費者後添加以下代碼:

channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
相關文章
相關標籤/搜索