SpringBoot消息篇Ⅲ --- 整合RabbitMQ

知識儲備: html

關於消息隊列的基本概念我已經在上一篇文章介紹過了(傳送門),本篇文章主要講述的是SpringBoot與RabbitMQ的整合以及簡單的使用。java

一.安裝RabbitMQpython

1.在linux上使用docker下載RabbitMQlinux

docker pull registry.docker-cn.com/library/rabbitmq:3-management

2.使用docker啓動RabbitMQweb

docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq d69a5113ceae 

5672端口:客戶端與MQ的通訊端口spring

15672端口:管理界面訪問web頁面的端口docker

3.訪問管理界面瀏覽器

瀏覽器訪問:http://172.16.**.**:15672,默認的管理界面帳號密碼均爲:guestspring-boot

 

測試RabbitMQ測試

1). 登陸RabbitMQ管理界面,建立交換器(Exchanges)

2). 建立Queues

3). 分別給交換器綁定queues

 

4).在direct交換器中給路由器發送消息

 

5). 隊列中接收到的消息

二.環境搭建

1.引入spring-boot-starter-amqp

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.環境配置

#配置主機地址,默認localhost
spring.rabbitmq.host=172.16.80.34
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#默認5672
spring.rabbitmq.port=5672
#默認/
spring.rabbitmq.virtual-host=/

三.RabbitMQ自動配置原理

1.RabbitAutoConfiguration

2.自動配置了鏈接工廠ConnectionFactory

3.RabbitProperties封裝了RabbitMQ的配置

4.RabbitTemplate:給RabbitMQ發送和接收消息的模板

5.AmqpAdmin系統管理組件:建立交換器等

四.RabbitTemplate的簡單使用

發送消息:

@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void contextLoads() {

//Message須要本身構造一個,定義消息體內容和消息頭
// rabbitTemplate.send(exchange,routingKey,message);
//object默認當成消息體,只須要傳入要發送的對象,自動序列化給mq
Map<String,Object> map = new HashMap<>();
map.put("msg","第一次發送消息");
map.put("data",Arrays.asList("<","0.0",">"));
//對象被默認序列化之後發送出去
rabbitTemplate.convertAndSend("exchange.direct","wang.news",map); //使用點對點方式傳播
}

此時查看RabbitMQ管理頁面的wang.news隊列,已經有消息插入進去了,因爲RabbitMQ傳遞的是序列化的對象,因此接收到的值也是序列化事後的值。

接收消息:

 @Test
    public void receive(){
        Object receive = rabbitTemplate.receiveAndConvert("wang.news"); //接收消息。
        System.out.println(receive.getClass());
        System.out.println(receive);
    }

使用該方法獲取到消息後隊列裏的消息就會自動清除。

因爲序列化的對象保存起來很不直觀,那麼該如何解決這個問題呢?

因爲RabbitTemplate默認採用的是JDK的MessageConvert,使用默認的JDK序列化規則,因此須要更改MessageConvert,更改成JSON的序列化規則

 

import org.springframework.amqp.support.converter.MessageConverter;//這裏要導入amqp包下的MessageConverter
@Configuration
public class MyAMQPConfig {
@Bean
public MessageConverter messageConverter(){ //自動配置裏,配置RabbitTemplate的時候會判斷是否有自定義的MessageConvert,若是有則採用自定義的
return new Jackson2JsonMessageConverter();
}

}

 

上面演示的是點對點(direct)的交換器(Exchanges),那麼廣播模式(fanout)的交換器要如何使用的呢?

 @Test
    public void sendMsgs(){
        rabbitTemplate.convertAndSend("exchange.fanout","",new Book("java",2)); //廣播模式只須要指定交換器的模式,自動會向該交換器綁定的全部隊列發送消息。
    }

發佈/訂閱(模糊匹配模式)也是同樣的,只須要指定交換器,修改對應的routingKey就好了

 /**
     * 發佈/訂閱(模糊匹配)方式
     */
    @Test
    public void topicSendMsgs(){
        rabbitTemplate.convertAndSend("exchange.topic","*.news",new Book("python",3));
    }

五.監聽消息

上面簡單演示了使用rabbitTemplate發送和接收消息,實際開發中須要一些監聽場景。例如訂單系統和庫存系統的解耦中,兩個系統之間都是經過消息隊列來通訊的,當某一我的下單以後,將訂單信息存放在消息隊列中,庫存系統要實時的監聽消息裏面的內容一旦有新的訂單進來,庫存系統就須要有相關的操做。那麼該如何實現監聽呢,Spring爲了簡化開發,引入了一些註解來實現消息隊列的監聽。

1.在SpringBoot主啓動類上加上註解@EnableRabbit,開啓RabbitMQ的註解模式

@EnableRabbit//開啓基於註解的RabbitMQ模式
@SpringBootApplication
public class Springboot02AmqpApplication {

    public static void main(String[] args) {
        SpringApplication.run(Springboot02AmqpApplication.class, args);
    }

}

2.使用@RabbitListener監聽某個隊列

@Service
public class BookService {
    @RabbitListener(queues = {"wang.news"}) //監聽隊列wang.news,只要wang.news收到消息,馬上執行該方法,並清空隊列
    public void receive(Book book){
        System.out.println("收到消息"+book);
    }
    @RabbitListener(queues = {"wang"})
    public void receive02(Message message){  //org.springframework.amqp.core.Message;
        System.out.println("消息內容"+message.getBody());
        System.out.println("消息頭"+message.getMessageProperties());
    }
}

五.AMQPAdmin的使用

上面代碼用到的交換器以及隊列都是咱們手動在RabbitMQ管理界面添加的,使用AMQPAdmin可讓咱們用編碼的方式建立這些組件。

1.建立交換器(Exchange)

  @Test
    public void createExchange() {
        DirectExchange directExchange = new DirectExchange("amqpadmin.exchange");
        amqpAdmin.declareExchange(directExchange); //建立一個DirectExchange
        System.out.println("建立完成");
    }

2.建立隊列(Queue)

 @Test
    public void createQueue(){
        amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));
        System.out.println("隊列建立完成");
    }

3.建立綁定規則(banding)

 @Test
    public void createBanding(){
        amqpAdmin.declareBinding(new Binding("amqpadmin.queue",Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqpadmin.queue",null));
        System.out.println("綁定完成");
    }
相關文章
相關標籤/搜索