<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>cn.tedu</groupId> <artifactId>rabbitmq-springboot</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rabbitmq-springboot</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
spring: rabbitmq: host: 192.168.64.140 username: admin password: admin
刪除自動建立的主程序java
咱們爲每種模式建立一個包,在每一個包中建立各自的主程序,單獨測試.spring
Spring提供的Queue類,是隊列的封裝對象,它封裝了隊列的參數信息.apache
RabbitMQ的自動配置類,會發現這些Queue實例,並在RabbitMQ服務器中定義這些隊列.api
package cn.tedu.rabbitmqspringboot.m1; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import javax.annotation.PostConstruct; @SpringBootApplication public class Main { @Autowired private Producer producer; public static void main(String[] args) { SpringApplication.run(Main.class, args); } @Bean public Queue helloworldQueue(){ /** * 可用如下形式: * new Queue("helloworld") - 默認屬性:持久(true),非排他(false),非自動刪除(false) * new Queue("helloworld",false,false,false,null) */ return new Queue("helloworld",false);//返回一個非持久隊列 } /** *@PostConstruct 方法會被自動執行,spring掃描建立了全部對象,並完成全部注入操做後會執行 */ @PostConstruct public void test(){ producer.send(); System.out.println("消息已經發送"); } }
AmqpTemplate是rabbitmq客戶端API的一個封裝工具,提供了簡便的方法來執行消息操做.springboot
AmqpTemplate由自動配置類自動建立服務器
package cn.tedu.rabbitmqspringboot.m1; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class Producer { // 在RabbtiAutoConfiguration 自動配置類中建立的工具對象 @Autowired private AmqpTemplate amqpTemplate; public void send(){ amqpTemplate.convertAndSend("helloworld", "Hello world!"); } }
經過@RabbitListener
從指定的隊列接收消息
使用@RebbitHandler
註解的方法來處理消息app
package cn.tedu.rabbitmqspringboot.m1; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component //@RabbitListener(queues = "helloworld")//這樣使用須要用@RabbitHandler配合使用 public class Consumer { //@RabbitHandler//這樣只能接收一個隊列的消息,簡單模式 @RabbitListener(queues = "helloworld")//這種用法能夠寫多個方法接收多個隊列,工廠模式 public void receive(String msg){ System.out.println("收到: " + msg); } }
在主程序中建立名爲task_queue
的持久隊列dom
package cn.tedu.rabbitmqspringboot.m2; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import javax.annotation.PostConstruct; /** * 合理分發 * 1.手動ack - springboot整合後默認就是手動ack模式 * 消費者方法執行成功後,springboot會幫助發送回執 * 2.qos=1 - yml中配置prefetch * 持久化 * 1.隊列持久化 * 2.消息持久化 - 默認是持久消息 */ @SpringBootApplication public class Main { @Autowired private Producer producer; public static void main(String[] args) { SpringApplication.run(Main.class, args); } @Bean public Queue taskQueue(){ /** * 可用如下形式: * new Queue("helloworld") - 默認屬性:持久(true),非排他(false),非自動刪除(false) * new Queue("helloworld",false,false,false,null) */ return new Queue("task_queue",true);//返回一個持久隊列 } /** *@PostConstruct 方法會被自動執行,spring掃描建立了全部對象,並完成全部注入操做後會執行 */ @PostConstruct public void test(){ producer.send(); } }
package cn.tedu.rabbitmqspringboot.m2; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Scanner; @Component public class Producer { // 在RabbtiAutoConfiguration 自動配置類中建立的工具對象 @Autowired private AmqpTemplate amqpTemplate; public void send(){ // new Thread(new Runnable() { // @Override // public void run() { // // } // }).start(); // lambda.匿名內部類的簡寫 new Thread(() ->{ while (true){//用單獨的線程執行,不要影響主線程 System.out.println("輸入消息: "); String msg = new Scanner(System.in).nextLine(); amqpTemplate.convertAndSend("task_queue",msg); } }).start(); } }
spring boot封裝的 rabbitmq api 中, 發送的消息默認是持久化消息.
若是但願發送非持久化消息, 須要在發送消息時作如下設置:異步
//若是須要設置消息爲非持久化,能夠取得消息的屬性對象,修改它的deliveryMode屬性 t.convertAndSend("task_queue", (Object) s, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { MessageProperties props = message.getMessageProperties(); props.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); return message; } });
package cn.tedu.rabbitmqspringboot.m2; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Consumer { @RabbitListener(queues = "task_queue") public void receive1(String msg){ System.out.println("消費者1-收到: " + msg); } @RabbitListener(queues = "task_queue") public void receive2(String msg){ System.out.println("消費者2-收到: " + msg); } }
在 spring boot 中提供了三種確認模式:maven
默認的 AUTO
模式中, 處理消息的方法拋出異常, 則表示消息沒有被正確處理, 該消息會被從新發送.
spring: rabbitmq: listener: simple: # acknowledgeMode: NONE # rabbitmq的自動確認 acknowledgeMode: AUTO # rabbitmq的手動確認, springboot會自動發送確認回執 (默認) # acknowledgeMode: MANUAL # rabbitmq的手動確認, springboot不發送回執, 必須本身編碼
若是設置爲 MANUAL
模式,必須手動執行確認操做
@RabbitListener(queues="task_queue") public void receive1(String s, Channel c, @Header(name=AmqpHeaders.DELIVERY_TAG) long tag) throws Exception { System.out.println("receiver1 - 收到: "+s); // 手動發送確認回執 c.basicAck(tag, false); }
工做模式中, 爲了合理地分發數據, 須要將 qos 設置成 1, 每次只接收一條消息, 處理完成後才接收下一條消息.
spring boot 中是經過 prefetch
屬性進行設置, 改屬性的默認值是 250.
spring: rabbitmq: listener: simple: prefetch: 1 # qos=1, 默認250
建立 FanoutExcnahge
實例, 封裝 fanout
類型交換機定義信息.
spring boot 的自動配置類會自動發現交換機實例, 並在 RabbitMQ 服務器中定義該交換機.
package cn.tedu.rabbitmqspringboot.m3; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import javax.annotation.PostConstruct; @SpringBootApplication public class Main { @Autowired private Producer producer; public static void main(String[] args) { SpringApplication.run(Main.class, args); } @Bean public FanoutExchange logsExchange(){ return new FanoutExchange("logs",false,false);//非持久,不自動刪除 } @PostConstruct public void test(){ producer.send(); } }
生產者向指定的交換機 logs
發送數據.
不須要指定隊列名或路由鍵, 即便指定也無效, 由於 fanout
交換機會向全部綁定的隊列發送數據, 而不是有選擇的發送.
package cn.tedu.rabbitmqspringboot.m3; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Scanner; @Component public class Producer { @Autowired private AmqpTemplate amqpTemplate; public void send(){ new Thread(() ->{ while (true){ System.out.println("輸入消息: "); String msg = new Scanner(System.in).nextLine(); amqpTemplate.convertAndSend("logs","",msg);//向交換機發送消息 } }).start(); } }
消費者須要執行如下操做:
spring boot 經過註解完成以上操做:
@RabbitListener(bindings = @QueueBinding( //這裏進行綁定設置 value = @Queue, //這裏定義隨機隊列,默認屬性: 隨機命名,非持久,排他,自動刪除 exchange = @Exchange(name = "logs", declare = "false") //指定 logs 交換機,由於主程序中已經定義,這裏不進行定義 ))
package cn.tedu.rabbitmqspringboot.m3; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Consumer { //1.建立隨機隊列 2,指定交換機logs 3.綁定 @RabbitListener(bindings = @QueueBinding( value = @Queue(),//隊列,隨即名,非持久,獨佔,自動刪除隊列 exchange = @Exchange(name = "logs",declare = "false")//交換機,declare表示不定義交換機,只是使用 )) public void receive1(String msg){ System.out.println("消費者1-收到: " + msg); } //1.建立隨機隊列 2,指定交換機logs 3.綁定 @RabbitListener(bindings = @QueueBinding( value = @Queue(),//隊列,隨即名,非持久,獨佔,自動刪除隊列 exchange = @Exchange(name = "logs",declare = "false")//交換機,declare(false)表示不定義交換機,只是使用 )) public void receive2(String msg){ System.out.println("消費者2-收到: " + msg); } }
與發佈和訂閱模式代碼相似, 只是作如下三點調整:
direct
交換機主程序中使用 DirectExcnahge
對象封裝交換機信息, spring boot 自動配置類會自動發現這個對象, 並在 RabbitMQ 服務器上定義這個交換機.
package cn.tedu.rabbitmqspringboot.m4; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.FanoutExchange; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import javax.annotation.PostConstruct; @SpringBootApplication public class Main { @Autowired private Producer producer; public static void main(String[] args) { SpringApplication.run(Main.class, args); } @Bean public DirectExchange directExchange(){ return new DirectExchange("direct_logs",false,false);//非持久,不自動刪除 } @PostConstruct public void test(){ producer.send(); } }
生產者向指定的交換機發送消息, 並指定路由鍵.
package cn.tedu.rabbitmqspringboot.m4; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Scanner; @Component public class Producer { @Autowired private AmqpTemplate amqpTemplate; public void send(){ new Thread(() ->{ while (true){ System.out.println("輸入消息: "); String msg = new Scanner(System.in).nextLine(); System.out.println("輸入路由鍵: "); String key = new Scanner(System.in).nextLine(); amqpTemplate.convertAndSend("direct_logs",key,msg); } }).start(); } }
消費者經過註解來定義隨機隊列, 綁定到交換機, 並指定綁定鍵:
@RabbitListener(bindings = @QueueBinding( // 這裏作綁定設置 value = @Queue, // 定義隊列, 隨機命名,非持久,排他,自動刪除 exchange = @Exchange(name = "direct_logs", declare = "false"), // 指定綁定的交換機,主程序中已經定義過隊列,這裏不進行定義 key = {"error","info","warning"} // 設置綁定鍵 ))
package cn.tedu.rabbitmqspringboot.m4; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Consumer { //1.建立隨機隊列 2,指定交換機logs 3.綁定 @RabbitListener(bindings = @QueueBinding( value = @Queue(),//隊列,隨即名,非持久,獨佔,自動刪除隊列 exchange = @Exchange(name = "direct_logs",declare = "false"),//交換機,declare表示不定義交換機,只是使用 key = {"error"} //設置綁定鍵 )) public void receive1(String msg){ System.out.println("消費者1-收到: " + msg); } //1.建立隨機隊列 2,指定交換機logs 3.綁定 @RabbitListener(bindings = @QueueBinding( value = @Queue(),//隊列,隨即名,非持久,獨佔,自動刪除隊列 exchange = @Exchange(name = "direct_logs",declare = "false"),//交換機,declare(false)表示不定義交換機,只是使用 key = {"error","info","warning"} //設置綁定鍵 )) public void receive2(String msg){ System.out.println("消費者2-收到: " + msg); } }
主題模式不過是具備特殊規則的路由模式, 代碼與路由模式基本相同, 只作以下調整:
topic
交換機package cn.tedu.rabbitmqspringboot.m5; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.TopicExchange; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import javax.annotation.PostConstruct; @SpringBootApplication public class Main { @Autowired private Producer producer; public static void main(String[] args) { SpringApplication.run(Main.class, args); } @Bean public TopicExchange directExchange(){ return new TopicExchange("topic_logs",false,false);//非持久,不自動刪除 } @PostConstruct public void test(){ producer.send(); } }
package cn.tedu.rabbitmqspringboot.m5; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Scanner; @Component public class Producer { @Autowired private AmqpTemplate amqpTemplate; public void send(){ new Thread(() ->{ while (true){ System.out.println("輸入消息: "); String msg = new Scanner(System.in).nextLine(); System.out.println("輸入路由鍵: "); String key = new Scanner(System.in).nextLine(); amqpTemplate.convertAndSend("topic_logs",key,msg); } }).start(); } }
package cn.tedu.rabbitmqspringboot.m5; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Consumer { //1.建立隨機隊列 2,指定交換機logs 3.綁定 @RabbitListener(bindings = @QueueBinding( value = @Queue(),//隊列,隨即名,非持久,獨佔,自動刪除隊列 exchange = @Exchange(name = "topic_logs",declare = "false"),//交換機,declare表示不定義交換機,只是使用 key = {"*.orange.*"} //設置綁定鍵 )) public void receive1(String msg){ System.out.println("消費者1-收到: " + msg); } //1.建立隨機隊列 2,指定交換機logs 3.綁定 @RabbitListener(bindings = @QueueBinding( value = @Queue(),//隊列,隨即名,非持久,獨佔,自動刪除隊列 exchange = @Exchange(name = "topic_logs",declare = "false"),//交換機,declare(false)表示不定義交換機,只是使用 key = {"*.*.rabbit","lazy.#"} //設置綁定鍵 )) public void receive2(String msg){ System.out.println("消費者2-收到: " + msg); } }
主程序中定義兩個隊列
rpc_queue
package cn.tedu.m6; import java.util.UUID; import org.springframework.amqp.core.Queue; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @SpringBootApplication public class Main { public static void main(String[] args) { SpringApplication.run(Main.class, args); } @Bean public Queue sendQueue() { return new Queue("rpc_queue",false); } @Bean public Queue rndQueue() { return new Queue(UUID.randomUUID().toString(), false); } }
從rpc_queue
接收調用數據, 執行運算求斐波那契數,並返回計算結果.@Rabbitlistener
註解對於具備返回值的方法:
replyTo
屬性correlationId
屬性replyTo
屬性指定的隊列發送計算結果, 並攜帶 correlationId
屬性package cn.tedu.m6; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RpcServer { @RabbitListener(queues = "rpc_queue") public long getFbnq(int n) { return f(n); } private long f(int n) { if (n==1 || n==2) { return 1; } return f(n-1) + f(n-2); } }
使用 SPEL 表達式獲取隨機隊列名: "#{rndQueue.name}"
發送調用數據時, 攜帶隨機隊列名和correlationId
從隨機隊列接收調用結果, 並獲取correlationId
package cn.tedu.m6; import java.util.UUID; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; @Component public class RpcClient { @Autowired AmqpTemplate t; @Value("#{rndQueue.name}") String rndQueue; public void send(int n) { // 發送調用信息時, 經過前置消息處理器, 對消息屬性進行設置, 添加返回隊列名和關聯id t.convertAndSend("rpc_queue", (Object)n, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { MessageProperties p = message.getMessageProperties(); p.setReplyTo(rndQueue); p.setCorrelationId(UUID.randomUUID().toString()); return message; } }); } //從隨機隊列接收計算結果 @RabbitListener(queues = "#{rndQueue.name}") public void receive(long r, @Header(name=AmqpHeaders.CORRELATION_ID) String correlationId) { System.out.println("nn"+correlationId+" - 收到: "+r); } }