1.RabbitMQ介紹
RabbitMQ是實現AMQP(高級消息隊列協議)的消息中間件的一種,最初起源於金融系統,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。RabbitMQ主要是爲了實現系統之間的雙向解耦而實現的。當生產者大量產生數據時,消費者沒法快速消費,那麼須要一箇中間層。保存這個數據。java
AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。web
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。spring
2.AmqpTemplate,RabbitTemplateapache
Spring AMQP提供了一個發送和接收消息的操做模板類AmqpTemplate。 AmqpTemplate它定義包含了發送和接收消息等的一些基本的操做功能。RabbitTemplate是AmqpTemplate的一個實現。api
RabbitTemplate支持消息的確認與返回,爲了返回消息,RabbitTemplate 須要設置mandatory 屬性爲true,而且CachingConnectionFactory 的publisherReturns屬性也須要設置爲true。返回的消息會根據它註冊的RabbitTemplate.ReturnCallback setReturnCallback 回調發送到給客戶端,安全
一個RabbitTemplate僅能支持一個ReturnCallback 。服務器
爲了確認Confirms消息, CachingConnectionFactory 的publisherConfirms 屬性也須要設置爲true,確認的消息會根據它註冊的RabbitTemplate.ConfirmCallback setConfirmCallback回調發送到給客戶端。一個RabbitTemplate也僅能支持一個ConfirmCallback.app
3.SpringBoot集成RabbitMQmaven
pom分佈式
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>rabbitMQ</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
自動配置信息 這裏我開啓ACK消息確認
server.port=8083
#服務器配置
spring.application.name=rabbitmq-hello-sending
#rabbitmq鏈接參數
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=linpeng
spring.rabbitmq.password=123456
# 開啓發送確認
spring.rabbitmq.publisher-confirms=true
# 開啓發送失敗退回
spring.rabbitmq.publisher-returns=true
# 開啓ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
建立消息隊列 隊列名:hello 和 helloObj
package com.example.demo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public Queue QueueA() {
return new Queue("hello");
}
@Bean
public Queue QueueB() {
return new Queue("helloObj");
}
/**
* Fanout 就是咱們熟悉的廣播模式或者訂閱模式,給Fanout交換機發送消息,綁定了這個交換機的全部隊列都收到這個消息。
* @return
*/
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("ABExchange");
}
@Bean
Binding bindingExchangeA(Queue QueueA, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(QueueA).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue QueueB, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(QueueB).to(fanoutExchange);
}
}
消息發送者 Sender 使用 RabbitTemplate 不採用 AmqpTemplate
package com.example.demo;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import java.util.Date;
//RabbitTemplate.ConfirmCallback
@Service
public class HelloSender implements RabbitTemplate.ReturnCallback {
@Autowired
// private AmqpTemplate rabbitTemplate;
private RabbitTemplate rabbitTemplate;
public void send() {
String context = "你好如今是 " + new Date() +"";
System.out.println("HelloSender發送內容 : " + context);
// this.rabbitTemplate.setConfirmCallback(this);
this.rabbitTemplate.setReturnCallback(this);
this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
System.out.println("HelloSender消息發送失敗" + cause + correlationData.toString());
} else {
System.out.println("HelloSender 消息發送成功 ");
}
});
this.rabbitTemplate.convertAndSend("hello", context);
}
public void sendObj() {
MessageObj obj = new MessageObj();
obj.setACK(false);
obj.setId(123);
obj.setName("zhangsan");
obj.setValue("data");
System.out.println("發送 : " + obj);
this.rabbitTemplate.convertAndSend("helloObj", obj);
}
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("sender return success" + message.toString()+"==="+i+"==="+s1+"==="+s2);
}
// @Override
// public void confirm(CorrelationData correlationData, boolean b, String s) {
// System.out.println("sender success");
// }
}
消息接受者 Receiver 註解方式接受消息
package com.example.demo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
import java.util.Map;
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
@RabbitHandler
public void process(String hello,Channel channel, Message message) throws IOException {
System.out.println("HelloReceiver收到 : " + hello +"收到時間"+new Date());
try {
//告訴服務器收到這條消息 已經被我消費了 能夠在隊列刪掉 這樣之後就不會再發了 不然消息服務器覺得這條消息沒處理掉 後續還會在發
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("receiver success");
} catch (IOException e) {
e.printStackTrace();
//丟棄這條消息
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
System.out.println("receiver fail");
}
}
}
備註:咱們用註解的方式來接受消息 就不要用 本身建立對象實現ChannelAwareMessageListener的方式來接受消息 這種方式還要去全局裏面配置 麻煩,直接用@RabbitListener(queues = "hello")最簡單
消息確認 由於我在屬性配置文件裏面開啓了ACK確認 因此若是代碼沒有執行ACK確認 你在RabbitMQ的後臺會看到消息會一直留在隊列裏面未消費掉 只要程序一啓動開始接受該隊列消息的時候 又會收到
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
//消息的標識,false只確認當前一個消息收到,true確認全部consumer得到的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//ack返回false,並從新回到隊列,api裏面解釋得很清楚
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//拒絕消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
TestController測試
@Autowired
private HelloSender helloSender;
/**
* 單生產者-單個消費者
*/
@RequestMapping("/test")
public void hello() throws Exception {
helloSender.send();
}
RabbitMQ後臺 兩個隊列
發送消息
ACK場景測試
咱們把HelloReceiver的ACK確認代碼註釋掉 那消息就算程序收到了 可是未確認ACK致使消息服務器覺得他是未成功消費的 後續還會再發
重啓程序