1.RabbitMQ介紹java
RabbitMQ是實現AMQP(高級消息隊列協議)的消息中間件的一種,最初起源於金融系統,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。RabbitMQ主要是爲了實現系統之間的雙向解耦而實現的。當生產者大量產生數據時,消費者沒法快速消費,那麼須要一箇中間層。保存這個數據。web
AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。spring
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。apache
2.AmqpTemplate,RabbitTemplateapi
Spring AMQP提供了一個發送和接收消息的操做模板類AmqpTemplate。 AmqpTemplate它定義包含了發送和接收消息等的一些基本的操做功能。RabbitTemplate是AmqpTemplate的一個實現。安全
RabbitTemplate支持消息的確認與返回,爲了返回消息,RabbitTemplate 須要設置mandatory 屬性爲true,而且CachingConnectionFactory 的publisherReturns屬性也須要設置爲true。返回的消息會根據它註冊的RabbitTemplate.ReturnCallback setReturnCallback 回調發送到給客戶端,服務器
一個RabbitTemplate僅能支持一個ReturnCallback 。app
爲了確認Confirms消息, CachingConnectionFactory 的publisherConfirms 屬性也須要設置爲true,確認的消息會根據它註冊的RabbitTemplate.ConfirmCallback setConfirmCallback回調發送到給客戶端。一個RabbitTemplate也僅能支持一個ConfirmCallback.maven
3.SpringBoot集成RabbitMQ分佈式
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>rabbitMQ</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
自動配置信息 這裏我開啓ACK消息確認
server.port=8083
#服務器配置
spring.application.name=rabbitmq-hello-sending
#rabbitmq鏈接參數
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=linpeng
spring.rabbitmq.password=123456
# 開啓發送確認
spring.rabbitmq.publisher-confirms=true
# 開啓發送失敗退回
spring.rabbitmq.publisher-returns=true
# 開啓ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
建立消息隊列 隊列名:hello 和 helloObj
package com.example.demo; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public Queue QueueA() { return new Queue("hello"); } @Bean public Queue QueueB() { return new Queue("helloObj"); } /** * Fanout 就是咱們熟悉的廣播模式或者訂閱模式,給Fanout交換機發送消息,綁定了這個交換機的全部隊列都收到這個消息。 * @return */ @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("ABExchange"); } @Bean Binding bindingExchangeA(Queue QueueA, FanoutExchange fanoutExchange) { return BindingBuilder.bind(QueueA).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue QueueB, FanoutExchange fanoutExchange) { return BindingBuilder.bind(QueueB).to(fanoutExchange); } }
消息發送者 Sender 使用 RabbitTemplate 不採用 AmqpTemplate
package com.example.demo; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import java.util.Date; //RabbitTemplate.ConfirmCallback @Service public class HelloSender implements RabbitTemplate.ReturnCallback { @Autowired // private AmqpTemplate rabbitTemplate; private RabbitTemplate rabbitTemplate; public void send() { String context = "你好如今是 " + new Date() +""; System.out.println("HelloSender發送內容 : " + context); // this.rabbitTemplate.setConfirmCallback(this); this.rabbitTemplate.setReturnCallback(this); this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (!ack) { System.out.println("HelloSender消息發送失敗" + cause + correlationData.toString()); } else { System.out.println("HelloSender 消息發送成功 "); } }); this.rabbitTemplate.convertAndSend("hello", context); } public void sendObj() { MessageObj obj = new MessageObj(); obj.setACK(false); obj.setId(123); obj.setName("zhangsan"); obj.setValue("data"); System.out.println("發送 : " + obj); this.rabbitTemplate.convertAndSend("helloObj", obj); } @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println("sender return success" + message.toString()+"==="+i+"==="+s1+"==="+s2); } }
消息接受者 Receiver 註解方式接受消息
package com.example.demo; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Configurable; import org.springframework.context.annotation.Bean; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Date; import java.util.Map; @Component @RabbitListener(queues = "hello") public class HelloReceiver { @RabbitHandler public void process(String hello,Channel channel, Message message) throws IOException { System.out.println("HelloReceiver收到 : " + hello +"收到時間"+new Date()); try { //告訴服務器收到這條消息 已經被我消費了 能夠在隊列刪掉 這樣之後就不會再發了 不然消息服務器覺得這條消息沒處理掉 後續還會在發 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println("receiver success"); } catch (IOException e) { e.printStackTrace(); //丟棄這條消息 //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); System.out.println("receiver fail"); } } }
備註:咱們用註解的方式來接受消息 就不要用 本身建立對象實現ChannelAwareMessageListener的方式來接受消息 這種方式還要去全局裏面配置 麻煩,直接用@RabbitListener(queues = "hello")最簡單
消息確認 由於我在屬性配置文件裏面開啓了ACK確認 因此若是代碼沒有執行ACK確認 你在RabbitMQ的後臺會看到消息會一直留在隊列裏面未消費掉 只要程序一啓動開始接受該隊列消息的時候 又會收到
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
//消息的標識,false只確認當前一個消息收到,true確認全部consumer得到的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//ack返回false,並從新回到隊列,api裏面解釋得很清楚
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//拒絕消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
TestController測試
@Autowired private HelloSender helloSender; /** * 單生產者-單個消費者 */ @RequestMapping("/test") public void hello() throws Exception { helloSender.send(); }