知識儲備: html
關於消息隊列的基本概念我已經在上一篇文章介紹過了(傳送門),本篇文章主要講述的是SpringBoot與RabbitMQ的整合以及簡單的使用。java
一.安裝RabbitMQpython
1.在linux上使用docker下載RabbitMQlinux
docker pull registry.docker-cn.com/library/rabbitmq:3-management
2.使用docker啓動RabbitMQweb
docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq d69a5113ceae
5672端口:客戶端與MQ的通訊端口spring
15672端口:管理界面訪問web頁面的端口docker
3.訪問管理界面瀏覽器
瀏覽器訪問:http://172.16.**.**:15672,默認的管理界面帳號密碼均爲:guestspring-boot
測試RabbitMQ測試
1). 登陸RabbitMQ管理界面,建立交換器(Exchanges)
2). 建立Queues
3). 分別給交換器綁定queues
4).在direct交換器中給路由器發送消息
5). 隊列中接收到的消息
二.環境搭建
1.引入spring-boot-starter-amqp
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.環境配置
#配置主機地址,默認localhost spring.rabbitmq.host=172.16.80.34 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #默認5672 spring.rabbitmq.port=5672 #默認/ spring.rabbitmq.virtual-host=/
三.RabbitMQ自動配置原理
1.RabbitAutoConfiguration
2.自動配置了鏈接工廠ConnectionFactory
3.RabbitProperties封裝了RabbitMQ的配置
4.RabbitTemplate:給RabbitMQ發送和接收消息的模板
5.AmqpAdmin系統管理組件:建立交換器等
四.RabbitTemplate的簡單使用
發送消息:
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void contextLoads() {
//Message須要本身構造一個,定義消息體內容和消息頭
// rabbitTemplate.send(exchange,routingKey,message);
//object默認當成消息體,只須要傳入要發送的對象,自動序列化給mq
Map<String,Object> map = new HashMap<>();
map.put("msg","第一次發送消息");
map.put("data",Arrays.asList("<","0.0",">"));
//對象被默認序列化之後發送出去
rabbitTemplate.convertAndSend("exchange.direct","wang.news",map); //使用點對點方式傳播
}
此時查看RabbitMQ管理頁面的wang.news隊列,已經有消息插入進去了,因爲RabbitMQ傳遞的是序列化的對象,因此接收到的值也是序列化事後的值。
接收消息:
@Test public void receive(){ Object receive = rabbitTemplate.receiveAndConvert("wang.news"); //接收消息。 System.out.println(receive.getClass()); System.out.println(receive); }
使用該方法獲取到消息後隊列裏的消息就會自動清除。
因爲序列化的對象保存起來很不直觀,那麼該如何解決這個問題呢?
因爲RabbitTemplate默認採用的是JDK的MessageConvert,使用默認的JDK序列化規則,因此須要更改MessageConvert,更改成JSON的序列化規則
import org.springframework.amqp.support.converter.MessageConverter;//這裏要導入amqp包下的MessageConverter
@Configuration
public class MyAMQPConfig {
@Bean
public MessageConverter messageConverter(){ //自動配置裏,配置RabbitTemplate的時候會判斷是否有自定義的MessageConvert,若是有則採用自定義的
return new Jackson2JsonMessageConverter();
}
}
上面演示的是點對點(direct)的交換器(Exchanges),那麼廣播模式(fanout)的交換器要如何使用的呢?
@Test public void sendMsgs(){ rabbitTemplate.convertAndSend("exchange.fanout","",new Book("java",2)); //廣播模式只須要指定交換器的模式,自動會向該交換器綁定的全部隊列發送消息。 }
發佈/訂閱(模糊匹配模式)也是同樣的,只須要指定交換器,修改對應的routingKey就好了
/** * 發佈/訂閱(模糊匹配)方式 */ @Test public void topicSendMsgs(){ rabbitTemplate.convertAndSend("exchange.topic","*.news",new Book("python",3)); }
五.監聽消息
上面簡單演示了使用rabbitTemplate發送和接收消息,實際開發中須要一些監聽場景。例如訂單系統和庫存系統的解耦中,兩個系統之間都是經過消息隊列來通訊的,當某一我的下單以後,將訂單信息存放在消息隊列中,庫存系統要實時的監聽消息裏面的內容一旦有新的訂單進來,庫存系統就須要有相關的操做。那麼該如何實現監聽呢,Spring爲了簡化開發,引入了一些註解來實現消息隊列的監聽。
1.在SpringBoot主啓動類上加上註解@EnableRabbit,開啓RabbitMQ的註解模式
@EnableRabbit//開啓基於註解的RabbitMQ模式 @SpringBootApplication public class Springboot02AmqpApplication { public static void main(String[] args) { SpringApplication.run(Springboot02AmqpApplication.class, args); } }
2.使用@RabbitListener監聽某個隊列
@Service public class BookService { @RabbitListener(queues = {"wang.news"}) //監聽隊列wang.news,只要wang.news收到消息,馬上執行該方法,並清空隊列 public void receive(Book book){ System.out.println("收到消息"+book); } @RabbitListener(queues = {"wang"}) public void receive02(Message message){ //org.springframework.amqp.core.Message; System.out.println("消息內容"+message.getBody()); System.out.println("消息頭"+message.getMessageProperties()); } }
五.AMQPAdmin的使用
上面代碼用到的交換器以及隊列都是咱們手動在RabbitMQ管理界面添加的,使用AMQPAdmin可讓咱們用編碼的方式建立這些組件。
1.建立交換器(Exchange)
@Test public void createExchange() { DirectExchange directExchange = new DirectExchange("amqpadmin.exchange"); amqpAdmin.declareExchange(directExchange); //建立一個DirectExchange System.out.println("建立完成"); }
2.建立隊列(Queue)
@Test public void createQueue(){ amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true)); System.out.println("隊列建立完成"); }
3.建立綁定規則(banding)
@Test public void createBanding(){ amqpAdmin.declareBinding(new Binding("amqpadmin.queue",Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqpadmin.queue",null)); System.out.println("綁定完成"); }