SpringBoot實戰電商項目mall(35k+star)地址: https://github.com/macrozheng/mall
之前看過的關於RabbitMQ核心消息模式的文章都是基於JavaAPI的,最近看了下官方文檔,發現這些核心消息模式均可以經過Spring AMQP來實現。因而總結了下RabbitMQ的實用技巧,包括RabbitMQ在Windows和Linux下的安裝、5種核心消息模式的Spring AMQP實現,相信對於想要學習和回顧RabbitMQ的朋友都會有所幫助。html
RabbitMQ是最受歡迎的開源消息中間件之一,在全球範圍內被普遍應用。RabbitMQ是輕量級且易於部署的,能支持多種消息協議。RabbitMQ能夠部署在分佈式系統中,以知足大規模、高可用的要求。java
咱們先來了解下RabbitMQ中的相關概念,這裏以5種消息模式中的路由模式
爲例。git
標誌 | 中文名 | 英文名 | 描述 |
---|---|---|---|
P | 生產者 | Producer | 消息的發送者,能夠將消息發送到交換機 |
C | 消費者 | Consumer | 消息的接收者,從隊列中獲取消息並進行消費 |
X | 交換機 | Exchange | 接收生產者發送的消息,並根據路由鍵發送給指定隊列 |
Q | 隊列 | Queue | 存儲從交換機發來的消息 |
type | 交換機類型 | type | 不一樣類型的交換機轉發消息方式不一樣 |
fanout | 發佈/訂閱模式 | fanout | 廣播消息給全部綁定交換機的隊列 |
direct | 路由模式 | direct | 根據路由鍵發送消息 |
topic | 通配符模式 | topic | 根據路由鍵的匹配規則發送消息 |
接下來咱們介紹下RabbitMQ的安裝和配置,提供Windows和Linux兩種安裝方式。
rabbitmq-plugins enable rabbitmq_management
rabbitmq 3.7.15
的Docker鏡像;docker pull rabbitmq:3.7.15
docker run -p 5672:5672 -p 15672:15672 --name rabbitmq \ -d rabbitmq:3.7.15
docker exec -it rabbitmq /bin/bash rabbitmq-plugins enable rabbitmq_management
firewall-cmd --zone=public --add-port=15672/tcp --permanent firewall-cmd --zone=public --add-port=5672/tcp --permanent firewall-cmd --reload
這5種消息模式是構建基於RabbitMQ的消息應用的基礎,必定要緊緊掌握它們。學過RabbitMQ的朋友應該瞭解過這些消息模式的Java實現,這裏咱們使用Spring AMQP的形式來實現它們。
簡單模式是最簡單的消息模式,它包含一個生產者、一個消費者和一個隊列。生產者向隊列裏發送消息,消費者從隊列中獲取消息並消費。
pom.xml
中添加Spring AMQP的相關依賴;<!--Spring AMQP依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
application.yml
,添加RabbitMQ的相關配置;spring: rabbitmq: host: localhost port: 5672 virtual-host: /mall username: mall password: mall publisher-confirms: true #消息發送到交換器確認 publisher-returns: true #消息發送到隊列確認
簡單模式
相關Java配置,建立一個名爲simple.hello
的隊列、一個生產者和一個消費者;/** * Created by macro on 2020/5/19. */ @Configuration public class SimpleRabbitConfig { @Bean public Queue hello() { return new Queue("simple.hello"); } @Bean public SimpleSender simpleSender(){ return new SimpleSender(); } @Bean public SimpleReceiver simpleReceiver(){ return new SimpleReceiver(); } }
send方法
向隊列simple.hello
中發送消息;/** * Created by macro on 2020/5/19. */ public class SimpleSender { private static final Logger LOGGER = LoggerFactory.getLogger(SimpleSender.class); @Autowired private RabbitTemplate template; private static final String queueName="simple.hello"; public void send() { String message = "Hello World!"; this.template.convertAndSend(queueName, message); LOGGER.info(" [x] Sent '{}'", message); } }
simple.hello
中獲取消息;/** * Created by macro on 2020/5/19. */ @RabbitListener(queues = "simple.hello") public class SimpleReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(SimpleReceiver.class); @RabbitHandler public void receive(String in) { LOGGER.info(" [x] Received '{}'", in); } }
/** * Created by macro on 2020/5/19. */ @Api(tags = "RabbitController", description = "RabbitMQ功能測試") @Controller @RequestMapping("/rabbit") public class RabbitController { @Autowired private SimpleSender simpleSender; @ApiOperation("簡單模式") @RequestMapping(value = "/simple", method = RequestMethod.GET) @ResponseBody public CommonResult simpleTest() { for(int i=0;i<10;i++){ simpleSender.send(); ThreadUtil.sleep(1000); } return CommonResult.success(null); } }
工做模式是指向多個互相競爭的消費者發送消息的模式,它包含一個生產者、兩個消費者和一個隊列。兩個消費者同時綁定到一個隊列上去,當消費者獲取消息處理耗時任務時,空閒的消費者從隊列中獲取並消費消息。
工做模式
相關Java配置,建立一個名爲work.hello
的隊列、一個生產者和兩個消費者;/** * Created by macro on 2020/5/19. */ @Configuration public class WorkRabbitConfig { @Bean public Queue workQueue() { return new Queue("work.hello"); } @Bean public WorkReceiver workReceiver1() { return new WorkReceiver(1); } @Bean public WorkReceiver workReceiver2() { return new WorkReceiver(2); } @Bean public WorkSender workSender() { return new WorkSender(); } }
send方法
向隊列work.hello
中發送消息,消息中包含必定數量的.
號;/** * Created by macro on 2020/5/19. */ public class WorkSender { private static final Logger LOGGER = LoggerFactory.getLogger(WorkSender.class); @Autowired private RabbitTemplate template; private static final String queueName = "work.hello"; public void send(int index) { StringBuilder builder = new StringBuilder("Hello"); int limitIndex = index % 3+1; for (int i = 0; i < limitIndex; i++) { builder.append('.'); } builder.append(index+1); String message = builder.toString(); template.convertAndSend(queueName, message); LOGGER.info(" [x] Sent '{}'", message); } }
work.hello
中獲取消息,名稱分別爲instance 1
和instance 2
,消息中包含.
號越多,耗時越長;/** * Created by macro on 2020/5/19. */ @RabbitListener(queues = "work.hello") public class WorkReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(WorkReceiver.class); private final int instance; public WorkReceiver(int i) { this.instance = i; } @RabbitHandler public void receive(String in) { StopWatch watch = new StopWatch(); watch.start(); LOGGER.info("instance {} [x] Received '{}'", this.instance, in); doWork(in); watch.stop(); LOGGER.info("instance {} [x] Done in {}s", this.instance, watch.getTotalTimeSeconds()); } private void doWork(String in) { for (char ch : in.toCharArray()) { if (ch == '.') { ThreadUtil.sleep(1000); } } } }
/** * Created by macro on 2020/5/19. */ @Api(tags = "RabbitController", description = "RabbitMQ功能測試") @Controller @RequestMapping("/rabbit") public class RabbitController { @Autowired private WorkSender workSender; @ApiOperation("工做模式") @RequestMapping(value = "/work", method = RequestMethod.GET) @ResponseBody public CommonResult workTest() { for(int i=0;i<10;i++){ workSender.send(i); ThreadUtil.sleep(1000); } return CommonResult.success(null); } }
.
號的消息,instance 1
和instance 2
消費者互相競爭,分別消費了一部分消息。
發佈/訂閱模式是指同時向多個消費者發送消息的模式(相似廣播的形式),它包含一個生產者、兩個消費者、兩個隊列和一個交換機。兩個消費者同時綁定到不一樣的隊列上去,兩個隊列綁定到交換機上去,生產者經過發送消息到交換機,全部消費者接收並消費消息。
發佈/訂閱模式
相關Java配置,建立一個名爲exchange.fanout
的交換機、一個生產者、兩個消費者和兩個匿名隊列,將兩個匿名隊列都綁定到交換機;/** * Created by macro on 2020/5/19. */ @Configuration public class FanoutRabbitConfig { @Bean public FanoutExchange fanout() { return new FanoutExchange("exchange.fanout"); } @Bean public Queue fanoutQueue1() { return new AnonymousQueue(); } @Bean public Queue fanoutQueue2() { return new AnonymousQueue(); } @Bean public Binding fanoutBinding1(FanoutExchange fanout, Queue fanoutQueue1) { return BindingBuilder.bind(fanoutQueue1).to(fanout); } @Bean public Binding fanoutBinding2(FanoutExchange fanout, Queue fanoutQueue2) { return BindingBuilder.bind(fanoutQueue2).to(fanout); } @Bean public FanoutReceiver fanoutReceiver() { return new FanoutReceiver(); } @Bean public FanoutSender fanoutSender() { return new FanoutSender(); } }
send方法
向交換機exchange.fanout
中發送消息,消息中包含必定數量的.
號;/** * Created by macro on 2020/5/19. */ public class FanoutSender { private static final Logger LOGGER = LoggerFactory.getLogger(FanoutSender.class); @Autowired private RabbitTemplate template; private static final String exchangeName = "exchange.fanout"; public void send(int index) { StringBuilder builder = new StringBuilder("Hello"); int limitIndex = index % 3 + 1; for (int i = 0; i < limitIndex; i++) { builder.append('.'); } builder.append(index + 1); String message = builder.toString(); template.convertAndSend(exchangeName, "", message); LOGGER.info(" [x] Sent '{}'", message); } }
.
號越多,耗時越長,因爲該消費者能夠從兩個隊列中獲取並消費消息,能夠看作兩個消費者,名稱分別爲instance 1
和instance 2
;/** * Created by macro on 2020/5/19. */ public class FanoutReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(FanoutReceiver.class); @RabbitListener(queues = "#{fanoutQueue1.name}") public void receive1(String in) { receive(in, 1); } @RabbitListener(queues = "#{fanoutQueue2.name}") public void receive2(String in) { receive(in, 2); } private void receive(String in, int receiver) { StopWatch watch = new StopWatch(); watch.start(); LOGGER.info("instance {} [x] Received '{}'", receiver, in); doWork(in); watch.stop(); LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds()); } private void doWork(String in) { for (char ch : in.toCharArray()) { if (ch == '.') { ThreadUtil.sleep(1000); } } } }
/** * Created by macro on 2020/5/19. */ @Api(tags = "RabbitController", description = "RabbitMQ功能測試") @Controller @RequestMapping("/rabbit") public class RabbitController { @Autowired private FanoutSender fanoutSender; @ApiOperation("發佈/訂閱模式") @RequestMapping(value = "/fanout", method = RequestMethod.GET) @ResponseBody public CommonResult fanoutTest() { for(int i=0;i<10;i++){ fanoutSender.send(i); ThreadUtil.sleep(1000); } return CommonResult.success(null); } }
.
號的消息,instance 1
和instance 2
同時獲取並消費了消息。
路由模式是能夠根據路由鍵
選擇性給多個消費者發送消息的模式,它包含一個生產者、兩個消費者、兩個隊列和一個交換機。兩個消費者同時綁定到不一樣的隊列上去,兩個隊列經過路由鍵
綁定到交換機上去,生產者發送消息到交換機,交換機經過路由鍵
轉發到不一樣隊列,隊列綁定的消費者接收並消費消息。
路由模式
相關Java配置,建立一個名爲exchange.direct
的交換機、一個生產者、兩個消費者和兩個匿名隊列,隊列經過路由鍵
都綁定到交換機,隊列1
的路由鍵爲orange
和black
,隊列2
的路由鍵爲green
和black
;/** * Created by macro on 2020/5/19. */ @Configuration public class DirectRabbitConfig { @Bean public DirectExchange direct() { return new DirectExchange("exchange.direct"); } @Bean public Queue directQueue1() { return new AnonymousQueue(); } @Bean public Queue directQueue2() { return new AnonymousQueue(); } @Bean public Binding directBinding1a(DirectExchange direct, Queue directQueue1) { return BindingBuilder.bind(directQueue1).to(direct).with("orange"); } @Bean public Binding directBinding1b(DirectExchange direct, Queue directQueue1) { return BindingBuilder.bind(directQueue1).to(direct).with("black"); } @Bean public Binding directBinding2a(DirectExchange direct, Queue directQueue2) { return BindingBuilder.bind(directQueue2).to(direct).with("green"); } @Bean public Binding directBinding2b(DirectExchange direct, Queue directQueue2) { return BindingBuilder.bind(directQueue2).to(direct).with("black"); } @Bean public DirectReceiver receiver() { return new DirectReceiver(); } @Bean public DirectSender directSender() { return new DirectSender(); } }
send方法
向交換機exchange.direct
中發送消息,發送時使用不一樣的路由鍵
,根據路由鍵
會被轉發到不一樣的隊列;/** * Created by macro on 2020/5/19. */ public class DirectSender { @Autowired private RabbitTemplate template; private static final String exchangeName = "exchange.direct"; private final String[] keys = {"orange", "black", "green"}; private static final Logger LOGGER = LoggerFactory.getLogger(DirectSender.class); public void send(int index) { StringBuilder builder = new StringBuilder("Hello to "); int limitIndex = index % 3; String key = keys[limitIndex]; builder.append(key).append(' '); builder.append(index+1); String message = builder.toString(); template.convertAndSend(exchangeName, key, message); LOGGER.info(" [x] Sent '{}'", message); } }
instance 1
和instance 2
;/** * Created by macro on 2020/5/19. */ public class DirectReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(DirectReceiver.class); @RabbitListener(queues = "#{directQueue1.name}") public void receive1(String in){ receive(in, 1); } @RabbitListener(queues = "#{directQueue2.name}") public void receive2(String in){ receive(in, 2); } private void receive(String in, int receiver){ StopWatch watch = new StopWatch(); watch.start(); LOGGER.info("instance {} [x] Received '{}'", receiver, in); doWork(in); watch.stop(); LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds()); } private void doWork(String in){ for (char ch : in.toCharArray()) { if (ch == '.') { ThreadUtil.sleep(1000); } } } }
/** * Created by macro on 2020/5/19. */ @Api(tags = "RabbitController", description = "RabbitMQ功能測試") @Controller @RequestMapping("/rabbit") public class RabbitController { @Autowired private DirectSender directSender; @ApiOperation("路由模式") @RequestMapping(value = "/direct", method = RequestMethod.GET) @ResponseBody public CommonResult directTest() { for(int i=0;i<10;i++){ directSender.send(i); ThreadUtil.sleep(1000); } return CommonResult.success(null); } }
路由鍵
的消息,instance 1
獲取到了orange
和black
消息,instance 2
獲取到了green
和black
消息。
通配符模式是能夠根據路由鍵匹配規則
選擇性給多個消費者發送消息的模式,它包含一個生產者、兩個消費者、兩個隊列和一個交換機。兩個消費者同時綁定到不一樣的隊列上去,兩個隊列經過路由鍵匹配規則
綁定到交換機上去,生產者發送消息到交換機,交換機經過路由鍵匹配規則
轉發到不一樣隊列,隊列綁定的消費者接收並消費消息。
*
:只能匹配一個單詞;#
:能夠匹配零個或多個單詞。
通配符模式
相關Java配置,建立一個名爲exchange.topic
的交換機、一個生產者、兩個消費者和兩個匿名隊列,匹配*.orange.*
和*.*.rabbit
發送到隊列1
,匹配lazy.#
發送到隊列2
;/** * Created by macro on 2020/5/19. */ @Configuration public class TopicRabbitConfig { @Bean public TopicExchange topic() { return new TopicExchange("exchange.topic"); } @Bean public Queue topicQueue1() { return new AnonymousQueue(); } @Bean public Queue topicQueue2() { return new AnonymousQueue(); } @Bean public Binding topicBinding1a(TopicExchange topic, Queue topicQueue1) { return BindingBuilder.bind(topicQueue1).to(topic).with("*.orange.*"); } @Bean public Binding topicBinding1b(TopicExchange topic, Queue topicQueue1) { return BindingBuilder.bind(topicQueue1).to(topic).with("*.*.rabbit"); } @Bean public Binding topicBinding2a(TopicExchange topic, Queue topicQueue2) { return BindingBuilder.bind(topicQueue2).to(topic).with("lazy.#"); } @Bean public TopicReceiver topicReceiver() { return new TopicReceiver(); } @Bean public TopicSender topicSender() { return new TopicSender(); } }
send方法
向交換機exchange.topic
中發送消息,消息中包含不一樣的路由鍵
;/** * Created by macro on 2020/5/19. */ public class TopicSender { @Autowired private RabbitTemplate template; private static final String exchangeName = "exchange.topic"; private static final Logger LOGGER = LoggerFactory.getLogger(TopicSender.class); private final String[] keys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox", "lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"}; public void send(int index) { StringBuilder builder = new StringBuilder("Hello to "); int limitIndex = index%keys.length; String key = keys[limitIndex]; builder.append(key).append(' '); builder.append(index+1); String message = builder.toString(); template.convertAndSend(exchangeName, key, message); LOGGER.info(" [x] Sent '{}'",message); System.out.println(" [x] Sent '" + message + "'"); } }
instance 1
和instance 2
;/** * Created by macro on 2020/5/19. */ public class TopicReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(TopicReceiver.class); @RabbitListener(queues = "#{topicQueue1.name}") public void receive1(String in){ receive(in, 1); } @RabbitListener(queues = "#{topicQueue2.name}") public void receive2(String in){ receive(in, 2); } public void receive(String in, int receiver){ StopWatch watch = new StopWatch(); watch.start(); LOGGER.info("instance {} [x] Received '{}'", receiver, in); doWork(in); watch.stop(); LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds()); } private void doWork(String in){ for (char ch : in.toCharArray()) { if (ch == '.') { ThreadUtil.sleep(1000); } } } }
/** * Created by macro on 2020/5/19. */ @Api(tags = "RabbitController", description = "RabbitMQ功能測試") @Controller @RequestMapping("/rabbit") public class RabbitController { @Autowired private TopicSender topicSender; @ApiOperation("通配符模式") @RequestMapping(value = "/topic", method = RequestMethod.GET) @ResponseBody public CommonResult topicTest() { for(int i=0;i<10;i++){ topicSender.send(i); ThreadUtil.sleep(1000); } return CommonResult.success(null); } }
路由鍵
的消息,instance 1
和instance 2
分別獲取到了匹配的消息。
RabbitMQ Tutorials:https://www.rabbitmq.com/gets...github
https://github.com/macrozheng...spring
mall項目全套學習教程連載中,關注公衆號第一時間獲取。docker