<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
server: port: 8888 # 設置端口號 spring: rabbitmq: host: 127.0.0.1 # 設置RabbitMQ所在主機 port: 5672 # 設置RabbitMQ服務端口 username: guest # 設置RabbitMQ用戶名 password: guest # 設置RabbitMQ密碼
public interface RabbitConstant { /** * 簡單模式 */ String SIMPLE_QUEUE_NAME = "simple_queue"; /** * work 模式 */ String WORK_QUEUE_NAME = "work_queue"; /** * 發佈/訂閱(publish/subscribe)模式 */ String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange"; String PUBLISH_SUBSCRIBE_FIRST_QUEUE_NAME = "publish_subscribe_first_queue"; String PUBLISH_SUBSCRIBE_SECOND_QUEUE_NAME = "publish_subscribe_second_queue"; /** * 路由(routing)模式 */ String ROUTING_EXCHANGE_NAME = "routing_exchange"; String ROUTING_FIRST_QUEUE_NAME = "routing_first_queue"; String ROUTING_SECOND_QUEUE_NAME = "routing_second_queue"; String ROUTING_THIRD_QUEUE_NAME = "routing_third_queue"; String ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME = "routing_first_queue_routing_key"; String ROUTING_SECOND_QUEUE_ROUTING_KEY_NAME = "routing_second_queue_routing_key"; String ROUTING_THIRD_QUEUE_ROUTING_KEY_NAME = "routing_third_queue_routing_key"; /** * 主題(topics)模式 */ String TOPICS_EXCHANGE_NAME = "topics_exchange"; String TOPICS_FIRST_QUEUE_NAME = "topics_first_queue"; String TOPICS_SECOND_QUEUE_NAME = "topics_second_queue"; String TOPICS_THIRD_QUEUE_NAME = "topics_third_queue"; String TOPICS_FIRST_QUEUE_ROUTING_KEY = "topics.first.routing.key"; String TOPICS_SECOND_QUEUE_ROUTING_KEY = "topics.second.routing.key"; String TOPICS_THIRD_QUEUE_ROUTING_KEY = "topics.third.routing.key"; String TOPICS_ROUTING_KEY_FIRST_WILDCARD = "#.first.#"; String TOPICS_ROUTING_KEY_SECOND_WILDCARD = "*.second.#"; String TOPICS_ROUTING_KEY_THRID_WILDCARD = "*.third.*"; /** * header 模式 */ String HEADER_EXCHANGE_NAME = "header_exchange"; String HEADER_FIRST_QUEUE_NAME = "header_first_queue"; String HEADER_SECOND_QUEUE_NAME = "header_second_queue"; /** * rpc 模式 */ String RPC_QUEUE_NAME = "rpc_queue"; }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.nio.charset.StandardCharsets; @RestController public class RabbitController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping(value = "/simple") public void simple() { rabbitTemplate.convertAndSend(RabbitConstant.SIMPLE_QUEUE_NAME, "hello world!"); } @GetMapping(value = "/work") public void work() { rabbitTemplate.convertAndSend(RabbitConstant.WORK_QUEUE_NAME, "work hello!"); } @GetMapping(value = "/pubsub") public void pubsub() { rabbitTemplate.convertAndSend(RabbitConstant.PUBLISH_SUBSCRIBE_EXCHANGE_NAME, null, "publish/subscribe hello"); } @GetMapping(value = "/routing") public void routing() { // 給第一個隊列發送消息 rabbitTemplate.convertAndSend(RabbitConstant.ROUTING_EXCHANGE_NAME, RabbitConstant.ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME, "routing hello"); } @GetMapping(value = "/topics") public void topics() { // 給第一個隊列發送消息,此時隊列能接受到消息,由於隊列通配符爲 #.first.#,而 routing_key 爲 topics.first.routing.key,匹配成功 rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_FIRST_QUEUE_ROUTING_KEY, "topics hello"); // 給第二個隊列發送消息,此時隊列也能接受到消息,由於隊列通配符爲 *.second.#,而 routing_key 爲 topics.second.routing.key,匹配成功 rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_SECOND_QUEUE_ROUTING_KEY, "topics hello"); // 給第三個隊列發送消息,此時隊列沒法接受到消息,由於隊列通配符爲 *.third.*,而 routing_key 爲 topics.third.routing.key,匹配失敗 rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_THIRD_QUEUE_ROUTING_KEY, "topics hello"); } @GetMapping(value = "/header") public void header() { // 這條消息應該能被兩個隊列都接收到,第一個隊列 all 匹配成功,第二個隊列 hello-value any匹配成功 MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("matchAll", "YES"); messageProperties.setHeader("hello", "world"); Message message = new Message("header first hello".getBytes(StandardCharsets.UTF_8), messageProperties); rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, message); // 這條消息應該只被第二個隊列接受,第一個隊列 all 匹配失敗,第二個隊列 matchAll-NO any匹配成功 MessageProperties messagePropertiesSecond = new MessageProperties(); messagePropertiesSecond.setHeader("matchAll", "NO"); Message messageSecond = new Message("header second hello".getBytes(StandardCharsets.UTF_8), messagePropertiesSecond); rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, messageSecond); } @GetMapping(value = "/rpc") public void rpc() { Object responseMsg = rabbitTemplate.convertSendAndReceive(RabbitConstant.RPC_QUEUE_NAME, "rpc hello!"); System.out.println("rabbit rpc response message: " + responseMsg); } }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitSimpleProvider { @Bean public Queue simpleQueue() { return new Queue(RabbitConstant.SIMPLE_QUEUE_NAME); } }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitSimpleConsumer { @RabbitHandler @RabbitListener(queues = RabbitConstant.SIMPLE_QUEUE_NAME) public void simpleListener(String context) { System.out.println("rabbit receiver: " + context); } }
@Test public void simple() { rabbitTemplate.convertAndSend(RabbitConstant.SIMPLE_QUEUE_NAME, "hello world!"); }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitWorkProvider { @Bean public Queue workQueue() { return new Queue(RabbitConstant.WORK_QUEUE_NAME); } }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitWorkConsumer { @RabbitListener(queues = RabbitConstant.WORK_QUEUE_NAME) @RabbitHandler public void workQueueListenerFirst(String context) { System.out.println("rabbit workQueue listener first receiver: " + context); } @RabbitListener(queues = RabbitConstant.WORK_QUEUE_NAME) @RabbitHandler public void workQueueListenerSecond(String context) { System.out.println("rabbit workQueue listener second receiver: " + context); } }
@Test public void work() { rabbitTemplate.convertAndSend(RabbitConstant.WORK_QUEUE_NAME, "work hello!"); }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitPublishSubscribeProvider { @Bean public Queue pubsubQueueFirst() { return new Queue(RabbitConstant.PUBLISH_SUBSCRIBE_FIRST_QUEUE_NAME); } @Bean public Queue pubsubQueueSecond() { return new Queue(RabbitConstant.PUBLISH_SUBSCRIBE_SECOND_QUEUE_NAME); } @Bean public FanoutExchange fanoutExchange() { // 建立fanout類型交換機,表示與此交換機會將消息發送給全部綁定的隊列 return new FanoutExchange(RabbitConstant.PUBLISH_SUBSCRIBE_EXCHANGE_NAME); } @Bean public Binding pubsubQueueFirstBindFanoutExchange() { // 隊列一綁定交換機 return BindingBuilder.bind(pubsubQueueFirst()).to(fanoutExchange()); } @Bean public Binding pubsubQueueSecondBindFanoutExchange() { // 隊列二綁定交換機 return BindingBuilder.bind(pubsubQueueSecond()).to(fanoutExchange()); } }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitPublishSubscribeConsumer { @RabbitListener(queues = RabbitConstant.PUBLISH_SUBSCRIBE_FIRST_QUEUE_NAME) @RabbitHandler public void pubsubQueueFirst(String context) { System.out.println("rabbit pubsub queue first receiver: " + context); } @RabbitListener(queues = RabbitConstant.PUBLISH_SUBSCRIBE_SECOND_QUEUE_NAME) @RabbitHandler public void pubsubQueueSecond(String context) { System.out.println("rabbit pubsub queue second receiver: " + context); } }
@Test public void pubsub() { rabbitTemplate.convertAndSend(RabbitConstant.PUBLISH_SUBSCRIBE_EXCHANGE_NAME, null, "publish/subscribe hello"); }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitRoutingProvider { @Bean public Queue rabbitRoutingFirstQueue() { return new Queue(RabbitConstant.ROUTING_FIRST_QUEUE_NAME); } @Bean public Queue rabbitRoutingSecondQueue() { return new Queue(RabbitConstant.ROUTING_SECOND_QUEUE_NAME); } @Bean public Queue rabbitRoutingThirdQueue() { return new Queue(RabbitConstant.ROUTING_THIRD_QUEUE_NAME); } @Bean public DirectExchange directExchange() { // 建立direct類型交換機,表示與此交換機會將消息發送給 routing_key 徹底相同的隊列 return new DirectExchange(RabbitConstant.ROUTING_EXCHANGE_NAME); } @Bean public Binding routingFirstQueueBindDirectExchange() { // 隊列一綁定direct交換機,並設置 routing_key 爲 routing_first_queue_routing_key return BindingBuilder.bind(rabbitRoutingFirstQueue()).to(directExchange()).with(RabbitConstant.ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME); } @Bean public Binding routingSecondQueueBindDirectExchange() { // 隊列二綁定direct交換機,並設置 routing_key 爲 routing_second_queue_routing_key return BindingBuilder.bind(rabbitRoutingSecondQueue()).to(directExchange()).with(RabbitConstant.ROUTING_SECOND_QUEUE_ROUTING_KEY_NAME); } @Bean public Binding routingThirdQueueBindDirectExchange() { // 隊列三綁定direct交換機,並設置 routing_key 爲 routing_third_queue_routing_key return BindingBuilder.bind(rabbitRoutingThirdQueue()).to(directExchange()).with(RabbitConstant.ROUTING_THIRD_QUEUE_ROUTING_KEY_NAME); } }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitRoutingConsumer { @RabbitListener(queues = RabbitConstant.ROUTING_FIRST_QUEUE_NAME) @RabbitHandler public void routingFirstQueueListener(String context) { System.out.println("rabbit routing queue first receiver: " + context); } @RabbitListener(queues = RabbitConstant.ROUTING_SECOND_QUEUE_NAME) @RabbitHandler public void routingSecondQueueListener(String context) { System.out.println("rabbit pubsub queue second receiver: " + context); } @RabbitListener(queues = RabbitConstant.ROUTING_THIRD_QUEUE_NAME) @RabbitHandler public void routingThirdQueueListener(String context) { System.out.println("rabbit pubsub queue third receiver: " + context); } }
@Test public void routing() { // 給第一個隊列發送消息 rabbitTemplate.convertAndSend(RabbitConstant.ROUTING_EXCHANGE_NAME, RabbitConstant.ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME, "routing hello"); }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitTopicProvider { @Bean public Queue topicFirstQueue() { return new Queue(RabbitConstant.TOPICS_FIRST_QUEUE_NAME); } @Bean public Queue topicSecondQueue() { return new Queue(RabbitConstant.TOPICS_SECOND_QUEUE_NAME); } @Bean public Queue topicThirdQueue() { return new Queue(RabbitConstant.TOPICS_THIRD_QUEUE_NAME); } @Bean public TopicExchange topicExchange() { // 建立topic類型交換機,表示與此交換機會將消息發送給 routing_key 通配符匹配成功的隊列 return new TopicExchange(RabbitConstant.TOPICS_EXCHANGE_NAME); } @Bean public Binding topicFirstQueueBindExchange() { // 隊列一綁定topic類型交換機,並設置 routing_key 通配符爲 #.first.# return BindingBuilder.bind(topicFirstQueue()).to(topicExchange()).with(RabbitConstant.TOPICS_ROUTING_KEY_FIRST_WILDCARD); } @Bean public Binding topicSecondQueueBindExchange() { // 隊列二綁定topic類型交換機,並設置 routing_key 通配符爲 *.second.# return BindingBuilder.bind(topicSecondQueue()).to(topicExchange()).with(RabbitConstant.TOPICS_ROUTING_KEY_SECOND_WILDCARD); } @Bean public Binding topicThirdQueueBindExchange() { // 隊列三綁定topic類型交換機,並設置 routing_key 通配符爲 *.third.* return BindingBuilder.bind(topicThirdQueue()).to(topicExchange()).with(RabbitConstant.TOPICS_ROUTING_KEY_THRID_WILDCARD); } }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitTopicsConsumer { @RabbitListener(queues = RabbitConstant.TOPICS_FIRST_QUEUE_NAME) @RabbitHandler public void topicFirstQueue(String context) { System.out.println("rabbit topics queue first receiver: " + context); } @RabbitListener(queues = RabbitConstant.TOPICS_SECOND_QUEUE_NAME) @RabbitHandler public void topicSecondQueue(String context) { System.out.println("rabbit topics queue second receiver: " + context); } @RabbitListener(queues = RabbitConstant.TOPICS_THIRD_QUEUE_NAME) @RabbitHandler public void topicThirdQueue(String context) { System.out.println("rabbit topics queue third receiver: " + context); } }
@Test public void topics() { // 給第一個隊列發送消息,此時隊列能接受到消息,由於隊列通配符爲 #.first.#,而 routing_key 爲 topics.first.routing.key,匹配成功 rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_FIRST_QUEUE_ROUTING_KEY, "topics hello"); // 給第二個隊列發送消息,此時隊列也能接受到消息,由於隊列通配符爲 *.second.#,而 routing_key 爲 topics.second.routing.key,匹配成功 rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_SECOND_QUEUE_ROUTING_KEY, "topics hello"); // 給第三個隊列發送消息,此時隊列沒法接受到消息,由於隊列通配符爲 *.third.*,而 routing_key 爲 topics.third.routing.key,匹配失敗 rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_THIRD_QUEUE_ROUTING_KEY, "topics hello"); }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.HeadersExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitHeaderProvider { @Bean public Queue headerFirstQueue() { return new Queue(RabbitConstant.HEADER_FIRST_QUEUE_NAME); } @Bean public Queue headerSecondQueue() { return new Queue(RabbitConstant.HEADER_SECOND_QUEUE_NAME); } @Bean public HeadersExchange headersExchange() { return new HeadersExchange(RabbitConstant.HEADER_EXCHANGE_NAME); } @Bean public Binding headerFirstQueueBindExchange() { Map<String, Object> headersMap = new HashMap<>(8); headersMap.put("matchAll", "YES"); headersMap.put("hello", "world"); return BindingBuilder.bind(headerFirstQueue()).to(headersExchange()).whereAll(headersMap).match(); } @Bean public Binding headerSecondQueueBindExchange() { Map<String, Object> headersMap = new HashMap<>(8); headersMap.put("matchAll", "NO"); headersMap.put("hello", "world"); return BindingBuilder.bind(headerSecondQueue()).to(headersExchange()).whereAny(headersMap).match(); } }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitHeaderConsumer { @RabbitListener(queues = RabbitConstant.HEADER_FIRST_QUEUE_NAME) @RabbitHandler public void headerFirstQueue(String context) { System.out.println("rabbit header queue first receiver: " + context); } @RabbitListener(queues = RabbitConstant.HEADER_SECOND_QUEUE_NAME) @RabbitHandler public void headerSecondQueue(String context) { System.out.println("rabbit header queue second receiver: " + context); } }
@Test public void header() { // 這條消息應該能被兩個隊列都接收到,第一個隊列 all 匹配成功,第二個隊列 hello-value any匹配成功 MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("matchAll", "YES"); messageProperties.setHeader("hello", "world"); Message message = new Message("header first hello".getBytes(StandardCharsets.UTF_8), messageProperties); rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, message); // 這條消息應該只被第二個隊列接受,第一個隊列 all 匹配失敗,第二個隊列 matchAll-NO any匹配成功 MessageProperties messagePropertiesSecond = new MessageProperties(); messagePropertiesSecond.setHeader("matchAll", "NO"); Message messageSecond = new Message("header second hello".getBytes(StandardCharsets.UTF_8), messagePropertiesSecond); rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, messageSecond); }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitRpcProvider { @Bean public Queue rpcQueue() { return new Queue(RabbitConstant.RPC_QUEUE_NAME); } }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitRpcConsumer { @RabbitListener(queues = RabbitConstant.RPC_QUEUE_NAME) @RabbitHandler public String rpcQueue(String context) { System.out.println("rabbit rpc queue receiver: " + context); return "copy that!"; } }
@Test public void rpc() { Object responseMsg = rabbitTemplate.convertSendAndReceive(RabbitConstant.RPC_QUEUE_NAME, "rpc hello!"); System.out.println("rabbit rpc response message: " + responseMsg); }
文章還有不少不足之處,歡迎各位兄弟姐妹批評指正,代碼倉庫已存放至gitee [SpringBoot RabbitMQ 工做模式倉庫連接](https://gitee.com/BarryMan/spring-boot-rabbitmq.git)
java