rabbitMq 教程

https://github.com/401Studio/WeekLearn/issues/2java

 

目錄

  • RabbitMQ 概念
  • exchange交換機機制
    • 什麼是交換機
    • binding?
    • Direct Exchange交換機
    • Topic Exchange交換機
    • Fanout Exchange交換機
    • Header Exchange交換機
  • RabbitMQ 的 Hello - Demo(springboot實現)
  • RabbitMQ 的 Hello Demo(spring xml實現)
  • RabbitMQ 在生產環境下運用和出現的問題
    • Spring RabbitMQ 註解
    • 消息的 JSON 傳輸
    • 消息持久化,斷線重連,ACK。

RabbitMQ 概念

RabbitMQ 即一個消息隊列,_主要是用來實現應用程序的異步和解耦,同時也能起到消息緩衝,消息分發的做用。_RabbitMQ使用的是AMQP協議,它是一種二進制協議。默認啓動端口 5672。git

在 RabbitMQ 中,以下圖結構:github

rabbitmq

  • 左側 P 表明 生產者,也就是往 RabbitMQ 發消息的程序。
  • 中間便是 RabbitMQ,_其中包括了 交換機 和 隊列。_
  • 右側 C 表明 消費者,也就是往 RabbitMQ 拿消息的程序。

那麼,_其中比較重要的概念有 4 個,分別爲:虛擬主機,交換機,隊列,和綁定。_spring

  • 虛擬主機:一個虛擬主機持有一組交換機、隊列和綁定。爲何須要多個虛擬主機呢?很簡單,RabbitMQ當中,_用戶只能在虛擬主機的粒度進行權限控制。_ 所以,若是須要禁止A組訪問B組的交換機/隊列/綁定,必須爲A和B分別建立一個虛擬主機。每個RabbitMQ服務器都有一個默認的虛擬主機「/」。
  • 交換機:_Exchange 用於轉發消息,可是它不會作存儲_ ,若是沒有 Queue bind 到 Exchange 的話,它會直接丟棄掉 Producer 發送過來的消息。
    • 這裏有一個比較重要的概念:**路由鍵 ** 。消息到交換機的時候,交互機會轉發到對應的隊列中,那麼究竟轉發到哪一個隊列,就要根據該路由鍵。
  • 綁定:也就是交換機須要和隊列相綁定,這其中如上圖所示,是多對多的關係。

exchange交換機機制

什麼是交換機

rabbitmq的message model實際上消息不直接發送到queue中,中間有一個exchange是作消息分發,producer甚至不知道消息發送到那個隊列中去。所以,當exchange收到message時,必須準確知道該如何分發。是append到必定規則的queue,仍是append到多個queue中,仍是被丟棄?這些規則都是經過exchagne的4種type去定義的。安全

The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.springboot

Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.服務器

exchange是一個消息的agent,每個虛擬的host中都有定義。它的職責是把message路由到不一樣的queue中。app

binding?

exchange和queue經過routing-key關聯,這二者之間的關係是就是binding。以下圖所示,X表示交換機,紅色表示隊列,交換機經過一個routing-key去binding一個queue,routing-key有什麼做用呢?看Direct exchange類型交換機。less

Directed Exchange

路由鍵exchange,該交換機收到消息後會把消息發送到指定routing-key的queue中。那消息交換機是怎麼知道的呢?其實,producer deliver消息的時候會把routing-key add到 message header中。routing-key只是一個messgae的attribute。異步

A direct exchange delivers messages to queues based on a message routing key. The routing key is a message attribute added into the message header by the producer. The routing key can be seen as an "address" that the exchange use to decide how to route the message. A message goes to the queue(s) whose binding key exactly matches the routing key of the message.

Default Exchange
這種是特殊的Direct Exchange,是rabbitmq內部默認的一個交換機。該交換機的name是空字符串,全部queue都默認binding 到該交換機上。全部binding到該交換機上的queue,routing-key都和queue的name同樣。

Topic Exchange

通配符交換機,exchange會把消息發送到一個或者多個知足通配符規則的routing-key的queue。其中_表號匹配一個word,#匹配多個word和路徑,路徑之間經過.隔開。如知足a._.c的routing-key有a.hello.c;知足#.hello的routing-key有a.b.c.helo。

Fanout Exchange

扇形交換機,該交換機會把消息發送到全部binding到該交換機上的queue。這種是publisher/subcribe模式。用來作廣播最好。
全部該exchagne上指定的routing-key都會被ignore掉。

The fanout copies and routes a received message to all queues that are bound to it regardless of routing keys or pattern matching as with direct and topic exchanges. Keys provided will simply be ignored.

Header Exchange

設置header attribute參數類型的交換機。

RabbitMQ 的 Hello Demo

安裝就不說了,建議按照官方文檔上作。先貼代碼,稍後解釋,代碼以下:

配置 交換機,隊列,交換機與隊列的綁定,消息監視容器:

@Configuration
@Data
public class RabbitMQConfig {

    final static String queueName = "spring-boot";

    @Bean
    Queue queue() {
        return new Queue(queueName, false);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("spring-boot-exchange");
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(queueName);
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    Receiver receiver() {
        return new Receiver();
    }
    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
}

配置接收信息者(即消費者):

public class Receiver {

    private CountDownLatch latch = new CountDownLatch(1);

    public void receiveMessage(String message) {
        System.out.println("Received <" + message + ">");
        latch.countDown();
    }

    public CountDownLatch getLatch() {
        return latch;
    }
}

配置發送信息者(即生產者):

@RestController
public class Test {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping(value = "/test/{abc}",method = RequestMethod.GET)
    public String test(@PathVariable(value = "abc") String abc){
        rabbitTemplate.convertAndSend("spring-boot", abc + " from RabbitMQ!");
        return  "abc";
    }
}

以上即可實現一個簡單的 RabbitMQ Demo,具體代碼在:點這裏

那麼,這裏,分爲三個部分分析:發消息,交換機隊列,收消息。

  • 對於發送消息:咱們通常可使用 RabbitTemplate,這個是 Spring 封裝給了咱們,便於咱們發送信息,咱們調用 rabbitTemplate.convertAndSend("spring-boot", xxx); 便可發送信息。
  • 對於交換機隊列:如上代碼,咱們須要配置交換機 TopicExchange,配置隊列 Queue,而且配置他們之間的綁定 Binding
  • 對於接受消息:首先須要建立一個消息監聽容器,而後把咱們的接受者註冊到該容器中,這樣,隊列中有信息,那麼就會調用接收者的對應的方法。如上代碼 container.setMessageListener(listenerAdapter); 其中,MessageListenerAdapter 能夠看作是 咱們接收者的一個包裝類,new MessageListenerAdapter(receiver, "receiveMessage"); 指明瞭若是有消息來,那麼調用接收者哪一個方法進行處理。

RabbitMQ 的 Hello Demo(spring xml實現)

spring xml方式實現RabbitMQ簡單,可讀性較好,配置簡單,配置和實現以下所示。

上文已經講述了rabbitmq的配置,xml方式經過properites文件存放用戶配置信息:

mq.host=127.0.0.1
mq.username=guest
mq.password=guest
mq.port=5672

配置application-mq.xml配置文件,聲明鏈接、交換機、queue以及consumer監聽。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" >
    <description>rabbitmq 鏈接服務配置</description>

    <!-- 鏈接配置 -->
    <context:property-placeholder location="classpath:mq.properties" />
    <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}"/>
    <rabbit:admin connection-factory="connectionFactory"/>
    <!-- spring template聲明-->
    <rabbit:template exchange="amqpExchange" id="amqpTemplate"  connection-factory="connectionFactory" />

    <!--申明queue-->
    <rabbit:queue id="test_queue_key" name="test_queue_key" durable="true" auto-delete="false" exclusive="false" />
    <!--申明exchange交換機並綁定queue-->
    <rabbit:direct-exchange name="amqpExchange" durable="true" auto-delete="false" id="amqpExchange">
        <rabbit:bindings>
            <rabbit:binding queue="test_queue_key" key="test_queue_key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>


    <!--consumer配置監聽-->
    <bean id="reveiver" class="com.demo.mq.receive.Reveiver" />
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
        <rabbit:listener queues="test_queue_key" ref="reveiver" method="receiveMessage"/>
    </rabbit:listener-container>
</beans>

上述代碼中,引入properties文件就很少說了。

<rabbit:connection-factory>標籤聲明建立connection的factory工廠。

<rabbit-template>聲明spring template,和上文spring中使用template同樣。template可聲明exchange。

<rabbit:queue>聲明一個queue並設置queue的配置項,直接看標籤屬性就能夠明白queue的配置項。

<rabbit:direct-exchange>聲明交換機並綁定queue。

<rabbit:listener-container>申明監聽container並配置consumer和監聽routing-key。

剩下就簡單了,application-context.xml中把rabbitmq配置import進去。

<?xml version="1.0" encoding="UTF-8"?>
<beans
        xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:task="http://www.springframework.org/schema/task"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:aop="http://www.springframework.org/schema/aop"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
       http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd
       http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd">

    <context:component-scan base-package="com.demo.**" />
    <import resource="application-mq.xml" />
</beans>

Producer實現,發送消息仍是使用template的convertAndSend() deliver消息。

@Service
public class Producer {

    @Autowired
    private AmqpTemplate amqpTemplate;

    private final static Logger logger = LoggerFactory.getLogger(Producer.class);

    public void sendDataToQueue(String queueKey, Object object) {
        try {
            amqpTemplate.convertAndSend(queueKey, object);
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("exeception={}",e);
        }

    }
}

配置consumer

package com.demo.mq.receive;

import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;

@Service
public class Reveiver {
    private CountDownLatch latch = new CountDownLatch(1);

    public void receiveMessage(String message) {
        System.out.println("reveice msg=" + message.toString());
        latch.countDown();
    }
}

測試deliver消息

Controller
@RequestMapping("/demo/")
public class TestController {
    private final static Logger logger = LoggerFactory.getLogger(TestController.class);
    @Resource
    private Producer producer;


    @RequestMapping("/test/{msg}")
    public String send(@PathVariable("msg") String msg){
        logger.info("#TestController.send#abc={msg}", msg);
        System.out.println("msg="+msg);
        producer.sendDataToQueue("test_queue_key",msg);
        return "index";
    }
}

RabbitMQ 在生產環境下運用和出現的問題

在生產環境中,因爲 Spring 對 RabbitMQ 提供了一些方便的註解,因此首先可使用這些註解。例如:

  • @enablerabbit:@enablerabbit 和 @configuration 註解在一個類中結合使用,若是該類可以返回一個 RabbitListenerContainerFactory 類型的 bean,那麼就至關於可以把該終端(消費端)和 RabbitMQ 進行鏈接。Ps:(生成端不是經過 RabbitListenerContainerFactory 來和 RabbitMQ 鏈接,而是經過 RabbitTemplate )
  • @rabbitlistener:當對應的隊列中有消息的時候,該註解修飾下的方法會被執行。
  • @rabbithandler:接收者能夠監聽多個隊列,不一樣的隊列消息的類型可能不一樣,該註解可使得不一樣的消息讓不一樣方法來響應。

具體這些註解的使用,能夠參考這裏的代碼:點這裏

首先,生產環境下的 RabbitMQ 可能不會在生產者或者消費者本機上,因此須要從新定義 ConnectionFactory,即:

@Bean
ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
    connectionFactory.setUsername(userName);
    connectionFactory.setPassword(password);
    connectionFactory.setVirtualHost(vhost);
    return connectionFactory;
}

這裏,能夠從新設置須要鏈接的 RabbitMQ 的 ip,端口,虛擬主機,用戶名,密碼。

而後,能夠先從生產端考慮,生產端須要鏈接 RabbitMQ,那麼能夠經過 RabbitTemplate 進行鏈接。 Ps:(RabbitTemplate 用於生產端發送消息到交換機中),以下代碼:

@Bean(name="myTemplate")
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setMessageConverter(integrationEventMessageConverter());
    template.setExchange(exchangeName);
    return template;
}

在該代碼中,new RabbitTemplate(connectionFactory); 設置了生產端鏈接到RabbitMQ,template.setMessageConverter(integrationEventMessageConverter()); 設置了 生產端發送給交換機的消息是以什麼格式的,在 integrationEventMessageConverter() 代碼中:

public MessageConverter integrationEventMessageConverter() {
    Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
    return messageConverter;
}

如上 Jackson2JsonMessageConverter 指明瞭 JSON。上述代碼的最後 template.setExchange(exchangeName); 指明瞭 要把生產者要把消息發送到哪一個交換機上。

有了上述,那麼,咱們便可使用 rabbitTemplate.convertAndSend("spring-boot", xxx); 發送消息,xxx 表示任意類型,由於上述的設置會幫咱們把這些類型轉化成 JSON 傳輸。

接着,生產端發送咱們說過了,那麼如今能夠看看消費端:

對於消費端,咱們能夠只建立 SimpleRabbitListenerContainerFactory,它可以幫咱們生成 RabbitListenerContainer,而後咱們再使用 @rabbitlistener 指定接收者收到信息時處理的方法。

@Bean(name="myListenContainer")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setMessageConverter(integrationEventMessageConverter());
    factory.setConnectionFactory(connectionFactory());
    return factory;
}

這其中 factory.setMessageConverter(integrationEventMessageConverter()); 指定了咱們接受消息的時候,以 JSON 傳輸的消息能夠轉換成對應的類型傳入到方法中。例如:

@Slf4j
@Component
@RabbitListener(containerFactory = "helloRabbitListenerContainer",queues = "spring-boot")
public class Receiver {
    @RabbitHandler
    public void receiveTeacher(Teacher teacher) {
        log.info("##### = {}",teacher);
    }
}

可能出現的問題:

消息持久化

在生產環境中,咱們須要考慮萬一輩子產者掛了,消費者掛了,或者 rabbitmq 掛了怎麼樣。通常來講,若是生產者掛了或者消費者掛了,實際上是沒有影響,由於消息就在隊列裏面。那麼萬一 rabbitmq 掛了,以前在隊列裏面的消息怎麼辦,其實能夠作消息持久化,RabbitMQ 會把信息保存在磁盤上。

作法是能夠先從 Connection 對象中拿到一個 Channel 信道對象,而後再能夠經過該對象設置 消息持久化。

生產者或者消費者斷線重連

這裏 Spring 有自動重連機制。

ACK 確認機制

每一個Consumer可能須要一段時間才能處理完收到的數據。若是在這個過程當中,Consumer出錯了,異常退出了,而數據尚未處理完成,那麼 很是不幸,這段數據就丟失了。由於咱們採用no-ack的方式進行確認,也就是說,每次Consumer接到數據後,而不論是否處理完 成,RabbitMQ Server會當即把這個Message標記爲完成,而後從queue中刪除了。

若是一個Consumer異常退出了,它處理的數據可以被另外的Consumer處理,這樣數據在這種狀況下就不會丟失了(注意是這種狀況下)。
爲了保證數據不被丟失,RabbitMQ支持消息確認機制,即acknowledgments。爲了保證數據能被正確處理而不只僅是被Consumer收到,那麼咱們不能採用no-ack。而應該是在處理完數據後發送ack。

在處理數據後發送的ack,就是告訴RabbitMQ數據已經被接收,處理完成,RabbitMQ能夠去安全的刪除它了。
若是Consumer退出了可是沒有發送ack,那麼RabbitMQ就會把這個Message發送到下一個Consumer。這樣就保證了在Consumer異常退出的狀況下數據也不會丟失。

我的對 RabbitMQ ACK 的一些疑問,求助:點這裏

總結

  1. RabbitMQ 做用:異步,解耦,緩衝,消息分發。
  2. RabbitMQ 主要分爲3個部分,生產者,交換機和隊列,消費者。
  3. 須要注意消息持久化,目的爲了防止 RabbitMQ 宕機;考慮 ACK 機制,目的爲了若是消費者對消息的處理失敗了,那麼後續要如何處理。

寫在最後

    1. 寫出來,說出來才知道對不對,知道不對才能改正,改正了才能成長。
    2. 在技術方面,但願你們眼裏都容不得沙子。若是有不對的地方或者須要改進的地方但願能夠指出,萬分感謝。
相關文章
相關標籤/搜索