【Spring Boot】20.RabbitMQ高級

簡介

前面咱們已經學習瞭如何在RabbitMQ的安裝及簡單使用以及在SpringBoot中集成RabbitMQ組件,接下來咱們來學習RabbitMQ的一些高級特性。java

RabbitMQ監聽器

一、添加Bookweb

爲了測試監聽器的使用場景,咱們先構建一個bean。spring

bean/Book.class
package com.zhaoyi.bweb.bean;

public class Book {
    private String author;
    private String bookName;
    private String introduce;

    public Book() {
    }

    public static Book defaultBook(){
        return new Book("紅樓夢","曹雪芹", "四大名著之一...");
    }

    public Book(String bookName, String author, String introduce) {
        this.author = author;
        this.bookName = bookName;
        this.introduce = introduce;
    }

    public String getIntroduce() {
        return introduce;
    }

    public void setIntroduce(String introduce) {
        this.introduce = introduce;
    }

    public String getAuthor() {
        return author;
    }

    public void setAuthor(String author) {
        this.author = author;
    }

    public String getBookName() {
        return bookName;
    }

    public void setBookName(String bookName) {
        this.bookName = bookName;
    }

    @Override
    public String toString() {
        return "Book{" +
                "author='" + author + '\'' +
                ", bookName='" + bookName + '\'' +
                ", introduce='" + introduce + '\'' +
                '}';
    }
}

使用咱們上一節中學習使用的項目進行接下來的操做。json

在應用中使用RabbitMQ的監聽器。springboot

二、在主程序處開啓基於註解的RabbitMQ模式app

BwebApplication.class
package com.zhaoyi.bweb;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@EnableRabbit
@SpringBootApplication
public class BwebApplication {
    public static void main(String[] args) {
        SpringApplication.run(BwebApplication.class, args);
    }
}

三、編寫一個Service,監聽消息隊列ide

service/BookService.class
package com.zhaoyi.bweb.service;

import com.zhaoyi.bweb.bean.Book;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class BookService {

    @RabbitListener(queues = {"joyblack.news"})
    public void listernerBook(Book book){
        System.out.println("receive a message:");
        System.out.println(book);
    }
}

該監聽方法會將消息體數據映射到Book對象,若是數據類型不一致會出現應用程序異常。學習

該service中的listernerBook監聽了咱們定製的MQ服務的joyblack.news的隊列信息,當joyblack.news接受到信息的時候,會調用該方法。測試

四、 在測試用例中添加一個測試用例,用於向joyblack.news中發送包含Book數據的消息。網站

test/BwebApplicationTests
@Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void send(){
        rabbitTemplate.convertAndSend("exchange.direct", "joyblack.news", Book.defaultBook());
    }

運行主程序。

而後運行咱們的測試運行,能夠看到,每當咱們運行一次測試用例,就會觸發service的listernerBook的一次調用,並打印結果:

...
receive a message:
Book{author='曹雪芹', bookName='紅樓夢', introduce='四大名著之一...'}
receive a message:
Book{author='曹雪芹', bookName='紅樓夢', introduce='四大名著之一...'}
....

也就是說,經過@EnableRabbit以及@RabbitListener兩個註解,咱們就能夠在springboot實現簡單的消息監聽了。

固然,咱們也能夠有其餘的接收消息的模式,好比獲取消息所有內容:

service/BookService.class
package com.zhaoyi.bweb.service;

import com.zhaoyi.bweb.bean.Book;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class BookService {

//    @RabbitListener(queues = {"joyblack.news"})
//    public void listernerBook(Book book){
//        System.out.println("receive a message:");
//        System.out.println(book);
//    }

    @RabbitListener(queues = {"joyblack.news"})
    public void listernerBook(Message message){
        System.out.println("receive a message:");
        System.out.println(message);
    }
}

發送消息後打印的結果爲:

receive a message:
(Body:'{"author":"曹雪芹","bookName":"紅樓夢","introduce":"四大名著之一..."}' MessageProperties [headers={__TypeId__=com.zhaoyi.bweb.bean.Book}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=exchange.direct, receivedRoutingKey=joyblack.news, deliveryTag=1, consumerTag=amq.ctag-a8Co2RP7og21nB5A6R5QbQ, consumerQueue=joyblack.news])

即包含了整個消息內容。

管理組件AmqpAdmin

前面咱們已經經過MQ的組件測試了不少有意思的功能,可是別忘了,咱們不少組件,好比交換器、消息隊列等,都是咱們經過RabbitMQ的管理網站事先建立的。那麼,咱們會有這樣的疑問,可不能夠經過RabbitMQ提供的什麼組件幫咱們在應用程序中完成這樣的操做呢?答案是能!

這個組件就是咱們這一章節將要講到的AmqpAdmin。經過AmqpAdmin咱們能夠建立交換器、消息隊列以及綁定規則等。

要使用AmqpAdmin很簡單,還記得咱們以前講過的自動配置類嗎,他提供的兩個重要組件之一就是AmqpAdmin。咱們直接在應用程序中注入該組件就可使用了。

建立交換器

Exchange.inteface是MQ組件中定義的一個接口

org.springframework.amqp.core.Exchange
package org.springframework.amqp.core;

import java.util.Map;

public interface Exchange extends Declarable {
    String getName();

    String getType();

    boolean isDurable();

    boolean isAutoDelete();

    Map<String, Object> getArguments();

    boolean isDelayed();

    boolean isInternal();
}

咱們查看該接口的實現類有5個(其實有一個抽象類做爲中間件),他們分別是DirectExchange、FanoutExchange、CustomExchange、TopicExchange以及HeadersExchange。其中有2個咱們不用在乎,其餘三個恰好對應咱們以前所講到的3種交換器類型,所以,要建立交換器,直接建立對應類型的交換器便可,例如,咱們建立一個direct類型的交換器,並命名爲exchange.myDirect.

Test.class
package com.zhaoyi.bweb;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class BwebApplicationTests {

    @Autowired
    AmqpAdmin amqpAdmin;

    @Test
    public void createExchange(){
        amqpAdmin.declareExchange(new DirectExchange("exchange.myDirect"));
        System.out.println("create exchange success.");
    }
}

運行該測試用例,咱們就能夠在管理網站處的Exchanges選項卡查看到新建立的direct類型的exchange了。

amqpAdmin.declarexxxx能夠建立xxx類型的RabbitMQ組件。

同理,咱們能夠經過amqpAdmin.declareQueue建立其餘的組件,提供的參數一一對應於咱們配置對應組件時指定的那些配置選項。

建立隊列

Test.class
@Test
    public void createQueue(){
        amqpAdmin.declareQueue(new Queue("queue.myQueue", true));
        System.out.println("create Queue success.");
    }

該測試用例建立了一個name=queue.myQueue,以及durable爲true(便可持久化)的隊列。

建立綁定規則

建立綁定規則時咱們須要查看一下Binding類對應的方法參數:

org.springframework.amqp.core.Binding
public Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments) {
		// 目的地
        this.destination = destination;
		// 目的的類型
        this.destinationType = destinationType;
		// 交換器
        this.exchange = exchange;
		// 路由鍵
        this.routingKey = routingKey;
		// 額外參數
        this.arguments = arguments;
    }

所以,咱們對應這些參數進行配置就能夠了,你也能夠感受獲得,這些參數都是和咱們的管理網站的可視化配置一一對應起來的:

Test.class
@Test
    public void createBinding(){
        amqpAdmin.declareBinding(new Binding("queue.myQueue",
                Binding.DestinationType.QUEUE,
                "exchange.myDirect",
                "queue.myQueue",
                null
                ));
        System.out.println("create Binding success.");
    }

能夠看到,咱們定義了一個綁定規則,他是綁定在交換器exchange.myDirect上,路由鍵爲queue.myQueue,並指向目的地爲queue.myQueue的隊列。

如今,查看管理網站,能夠清晰的看到咱們此次建立的三個組件,以及他們之間的綁定關係。

注意路由鍵這個屬性,一般狀況下,咱們會將其命名爲和目的地隊列同樣的名稱,但請不要混淆兩者。若是你的應用足夠複雜,顯然是不少綁定規則,而且路由鍵是多種多樣的。

amqpAdmin.deleteXxx 能夠幫助咱們刪除指定名稱的交換器和隊列,你們能夠本身嘗試使用。

相關文章
相關標籤/搜索