這一篇咱們來把消息中間件整合到springboot中web
=====================================================================spring
首先在服務器上安裝rabbitmq的服務,用docker拉取便可,再也不詳細描述。docker
直接來擼代碼springboot
首先咱們先添加rabbitmq的依賴服務器
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
在配置文件中添加必要的配置信息併發
spring.rabbitmq.host=192.168.0.86
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
好了,基本的配置就已經配置完畢了tcp
rabbitmq有六種模式spring-boot
咱們逐個來看springboot是怎麼實現的呢高併發
P表明生產者,C表明消費者,紅色代碼消息隊列。P將消息發送到消息隊列,C對消息進行處理。測試
咱們先建立一個隊列
@Bean public Queue Queue() { return new Queue("hello"); }
而後我再建立一個生產者
@Controller public class HelloSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hello " + new Date(); System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("hello", context); } }
再建立一個消費者
@Component @RabbitListener(queues = "hello") public class HelloReceiver { @RabbitHandler public void process(String hello) { System.out.println("Receiver : " + hello); } }
再寫一個測試用例看看
@RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqApplicationTests { @Autowired private HelloSender helloSender; @Test public void hello() throws Exception { helloSender.send(); } }
成功!
一個消息產生者,多個消息的消費者。競爭搶消息
咱們先建立一個隊列
@Bean public Queue Queue2() { return new Queue("neo"); }
再建立一個消息生產者
@Controller public class NeoSender { @Autowired private AmqpTemplate rabbitTemplate; public void send(int i) { String context = "spirng boot neo queue"+" ****** "+i; System.out.println("Sender1 : " + context); this.rabbitTemplate.convertAndSend("neo", context); } }
再建立兩個消息的消費者
1 @Component 2 @RabbitListener(queues = "neo") 3 public class NeoReceiver1 { 4 @RabbitHandler 5 public void process(String neo) { 6 System.out.println("Receiver 1: " + neo); 7 } 8 } 9 10 11 12 @Component 13 @RabbitListener(queues = "neo") 14 public class NeoReceiver2 { 15 @RabbitHandler 16 public void process(String neo) { 17 System.out.println("Receiver 2: " + neo); 18 } 19 20 }
咱們寫一個測試用例
@Test public void oneToMany() throws Exception { for (int i=0;i<100;i++){ // Thread.sleep(10); neoSender.send(i); } }
運行
能夠看到消息均勻的被兩個消費者消費了。
經過這個例子咱們能夠看作高併發狀況下的消息產生和消費,這會產生一個消息丟失的問題。萬一客戶端在處理消息的時候掛了,那這條消息就至關於被浪費了,針對這種狀況,rabbitmq推出了消息ack機制,熟悉tcp三次握手的必定不會陌生。
咱們看看springboot是實現ack的
很簡單,在咱們的配置類中,配置一個新的消費者,將原先的消費者先都去掉:
@Bean public SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(Queue()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//消息確認後才能刪除 container.setPrefetchCount(5);//每次處理5條消息 container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("消費端接收到消息 : " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }); return container; }
但這裏會有個問題,test模式下消息發送完畢系統就會直接shutdown,因此只能消費部分消息,不過等真正啓動項目,這個問題就不存在了。
生產者將消息不是直接發送到隊列,而是發送到X交換機,而後由交換機發送給兩個隊列,兩個消費者各自監聽一個隊列,來消費消息。
這種方式實現同一個消息被多個消費者消費。工做模式是同一個消息只能有一個消費者。
咱們新建三個隊列
@Bean public Queue AMessage() { return new Queue("fanout.A"); } @Bean public Queue BMessage() { return new Queue("fanout.B"); } @Bean public Queue CMessage() { return new Queue("fanout.C"); }
再新建一個交換機
@Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); }
再把這些隊列綁定到交換機上去
@Bean Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); }
基本的配置完成後,再新建一個消息生產者
@Component public class FanoutSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hi, fanout msg "; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("fanoutExchange","", context); } }
一樣的,咱們再新建三個消息消費者
1 @Component 2 @RabbitListener(queues = "fanout.A") 3 public class FanoutReceiveA { 4 5 @RabbitHandler 6 public void process(String message) { 7 System.out.println("fanout Receiver A : " + message); 8 } 9 } 10 11 @Component 12 @RabbitListener(queues = "fanout.B") 13 public class FanoutReceiverB { 14 @RabbitHandler 15 public void process(String message) { 16 System.out.println("fanout Receiver B: " + message); 17 } 18 } 19 20 @Component 21 @RabbitListener(queues = "fanout.C") 22 public class FanoutReceiverC { 23 @RabbitHandler 24 public void process(String message) { 25 System.out.println("fanout Receiver C: " + message); 26 } 27 }
三個消費者分別監聽3個隊列的內容
新建一個測試用例:
@RunWith(SpringRunner.class) @SpringBootTest public class FanoutTest { @Autowired private FanoutSender fanoutSender; @Test public void setFanoutSender(){ fanoutSender.send(); } }
三個隊列都接受到了消息
須要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵徹底匹配,這是一個完整的匹配。
發送端不僅按固定的routing key發送消息,而是按字符串匹配發送,接收端一樣如此
符號#匹配一個或多個詞,符號*匹配很少很多一個詞。
4/5二者模式很類似,咱們放在一塊兒演示
新建兩個隊列
final static String message = "topic.A"; final static String messages = "topic.B"; @Bean public Queue queueMessage() { return new Queue(TopicRabbitConfig.message); } @Bean public Queue queueMessages() { return new Queue(TopicRabbitConfig.messages); }
新建一個交換機
@Bean TopicExchange exchange() { return new TopicExchange("topicExchange"); }
綁定隊列到交換機上,路由模式,須要完整匹配topic.message,才能接受
@Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); }
topic模式,前綴匹配到topic.便可接受
@Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); }
咱們新建三個消息生產者
@Component public class TopicSend { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hi, i am message all"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("topicExchange", "topic.1", context); } public void send1() { String context = "hi, i am message 1"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("topicExchange", "topic.message", context); } public void send2() { String context = "hi, i am messages 2"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("topicExchange", "topic.messages", context); } }
send的key是topic.1 send1的key是topic.message,send2的key是topic.messages
因此理論上send會被兩個隊列消費,1.2都應該只有一個隊列消費
咱們再新建兩個消費者
@Component @RabbitListener(queues = "topic.A") public class TopicReceiver { @RabbitHandler public void process(String message) { System.out.println("Topic Receiver1 : " + message); } } @Component @RabbitListener(queues = "topic.B") public class TopicReceiver2 { @RabbitHandler public void process(String message) { System.out.println("Topic Receiver2 : " + message); } }
寫三個測試用例
@RunWith(SpringRunner.class) @SpringBootTest public class TopicTest { @Autowired private TopicSend sender; @Test public void topic() throws Exception { sender.send(); } @Test public void topic1() throws Exception { sender.send1(); } @Test public void topic2() throws Exception { sender.send2(); } }
send的運行結果
send1的運行結果
send2的運行結果
結果符合預期。