RabbitMq - Work 模式java
1、什麼是Work模式web
若是有幾個消息都須要處理,且每一個消息的處理時間很長,僅有一個消費者,那麼當它在處理一個消息的時候,其餘消息就只有等待。spring
等待有時候是好的,但在程序中並不那麼好,當隊列中有多個消息待處理,將其分發給多個消費者,當一個消費者在處理的時候,有其餘消費者繼續消費隊列中的消息,便緩解了等待的尷尬。app
那麼這篇文章將實現一個生產者,多個消費者的模式,實現任務分發:work模式,如圖所示。
ide
處理一個消息可能會花必定的時間,萬一還沒處理完消費者就gg了…生產者一發送消息,便會將其標記爲已刪除,故最終的結果是:這條消息沒有獲得正確的處理。並且,指派給該消費者且還沒有處理的全部消息都會gg。spring-boot
解決策略:取消自動回覆機制
爲了解決消息的丟失問題,RabbitMQ提供了消息確認機制:message acknowledgments,一個消費者處理完成後,將會回傳一個ack給生產者,以表示處理成功,這樣生產者才能夠將消息刪除。fetch
這樣即便一個消費者gg了,沒有回傳ack,那麼發送者便會重發消息到隊列,若是這時候有其餘的消費者服務該隊列,那麼便會從隊列中取出消息並處理。這就保證了消息的不丟失。spa
自動回覆機制:無論是否處理成功,仍是失敗,都會回覆ack。code
1 channel.basicConsume(QUEUE_NAME, true, consumer);
自動恢復機制默認是打開的,在接收端的代碼最後:第二個參數爲true,表示會自動回覆,只要生產發送消息,就會標記刪除。因此咱們須要將自動回覆設置爲false。blog
1 boolean autoAck = false; 2 channel.basicConsume(QUEUE_NAME, autoAck, consumer);
這樣來保證消息不會由於消費者的gg而丟失了。
那麼取消自動回覆之後,咱們須要手動回覆一次:
1 channel.basicAck(envelope.getDeliveryTag(), false);
注意當前的消息確認機制只適用於同一個channel。
---------------------
application.yml
############################################################# ############## rabbitmq config ############################## ############################################################# spring.rabbitmq.host: 127.0.0.1 spring.rabbitmq.port: 5672 spring.rabbitmq.username: admin spring.rabbitmq.password: admin
1 package com.maozw.mq.config; 2 3 import org.slf4j.Logger; 4 import org.slf4j.LoggerFactory; 5 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; 6 import org.springframework.amqp.rabbit.connection.ConnectionFactory; 7 import org.springframework.amqp.rabbit.core.RabbitTemplate; 8 import org.springframework.beans.factory.annotation.Value; 9 import org.springframework.beans.factory.config.ConfigurableBeanFactory; 10 import org.springframework.context.annotation.Bean; 11 import org.springframework.context.annotation.Configuration; 12 import org.springframework.context.annotation.Scope; 13 14 /** 15 * @author MAOZW 16 * @Description: Broker:它提供一種傳輸服務,它的角色就是維護一條從生產者到消費者的路線,保證數據能按照指定的方式進行傳輸, 17 * Exchange:消息交換機,它指定消息按什麼規則,路由到哪一個隊列。 18 * Queue:消息的載體,每一個消息都會被投到一個或多個隊列。 19 * Binding:綁定,它的做用就是把exchange和queue按照路由規則綁定起來. 20 * Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。 21 * vhost:虛擬主機,一個broker裏能夠有多個vhost,用做不一樣用戶的權限分離。 22 * Producer:消息生產者,就是投遞消息的程序. 23 * Consumer:消息消費者,就是接受消息的程序. 24 * Channel:消息通道,在客戶端的每一個鏈接裏,可創建多個channel. 25 * @date 2018/11/26 14:33 26 */ 27 @Configuration 28 public class RabbitConfig { 29 30 private static final Logger LOGGER = LoggerFactory.getLogger(RabbitConfig.class); 31 32 @Value("${spring.rabbitmq.host}") 33 private String rabbitmqHost; 34 35 @Value("${spring.rabbitmq.port}") 36 private int rabbitmqPort; 37 38 @Value("${spring.rabbitmq.username}") 39 private String userName; 40 41 @Value("${spring.rabbitmq.password}") 42 private String password; 43 44 45 public static final String EXCHANGE_A = "my-mq-exchange_A"; 46 public static final String EXCHANGE_B = "my-mq-exchange_B"; 47 public static final String EXCHANGE_C = "my-mq-exchange_C"; 48 49 50 public static final String QUEUE_A = "QUEUE_A"; 51 public static final String QUEUE_WORK = "QUEUE_WORK"; 52 public static final String QUEUE_C = "QUEUE_C"; 53 54 public static final String ROUTINGKEY_A = "spring-boot-routingKey_A"; 55 public static final String ROUTINGKEY_B = "spring-boot-routingKey_B"; 56 public static final String ROUTINGKEY_C = "spring-boot-routingKey_C"; 57 58 59 @Bean 60 public ConnectionFactory connectionFactory() { 61 CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); 62 connectionFactory.setUsername(userName); 63 connectionFactory.setPassword(password); 64 connectionFactory.setVirtualHost("/vir_simple"); 65 connectionFactory.setPublisherConfirms(false); 66 return connectionFactory; 67 } 68 69 public static ConnectionFactory getConnectionFactory() { 70 CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); 71 connectionFactory.setUsername("admin"); 72 connectionFactory.setPassword("admin"); 73 connectionFactory.setVirtualHost("/vir_simple"); 74 connectionFactory.setPublisherConfirms(false); 75 return connectionFactory; 76 } 77 }
生產者
1 package com.maozw.mq.work; 2 3 import com.maozw.mq.config.RabbitConfig; 4 import com.rabbitmq.client.Channel; 5 import org.slf4j.Logger; 6 import org.slf4j.LoggerFactory; 7 import org.springframework.amqp.rabbit.connection.Connection; 8 import org.springframework.amqp.rabbit.connection.ConnectionFactory; 9 import org.springframework.beans.factory.annotation.Autowired; 10 import org.springframework.web.bind.annotation.RequestMapping; 11 import org.springframework.web.bind.annotation.RestController; 12 13 import java.io.IOException; 14 15 /** 16 * work 模式 17 * 兩種分發: 輪詢分發 + 公平分發 18 * 輪詢分發:消費端:自動確認消息;boolean autoAck = true; 19 * 公平分發: 消費端:手動確認消息 boolean autoAck = false; channel.basicAck(envelope.getDeliveryTag(),false); 20 * @author MAOZW 21 * @Description: ${todo} 22 * @date 2018/11/26 15:06 23 */ 24 @RestController 25 @RequestMapping("/work") 26 public class WorkProducer { 27 private static final Logger LOGGER = LoggerFactory.getLogger(WorkProducer.class); 28 @Autowired 29 RabbitConfig rabbitConfig; 30 31 @RequestMapping("/send") 32 public String send() throws IOException { 33 ConnectionFactory connectionFactory = rabbitConfig.connectionFactory(); 34 Connection connection = connectionFactory.createConnection(); 35 Channel channel = connection.createChannel(false); 36 //建立隊列申明 37 channel.queueDeclare(RabbitConfig.QUEUE_WORK, false, false, false, null); 38 39 /** 40 * 每一個消費者 發送確認消息以前,消息隊列不會發送下一個消息給消費者,一次只處理一個消息 41 * 自動模式無需設置下面設置 42 */ 43 int prefetchCount = 1; 44 channel.basicQos(prefetchCount); 45 46 String Hello = ">>>> Hello Simple <<<<"; 47 for (int i = 0; i < 50; i++) { 48 String message = Hello + i; 49 channel.basicPublish("", RabbitConfig.QUEUE_WORK, null, message.getBytes()); 50 LOGGER.info("生產消息: " + message); 51 } 52 return "OK"; 53 } 54 }
消費者
1 package com.maozw.mq.work; 2 3 import com.maozw.mq.config.RabbitConfig; 4 import com.rabbitmq.client.AMQP; 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.DefaultConsumer; 7 import com.rabbitmq.client.Envelope; 8 import org.slf4j.Logger; 9 import org.slf4j.LoggerFactory; 10 import org.springframework.amqp.rabbit.connection.Connection; 11 import org.springframework.amqp.rabbit.connection.ConnectionFactory; 12 13 import java.io.IOException; 14 15 /** 16 * @author MAOZW 17 * @Description: ${todo} 18 * @date 2018/11/26 15:06 19 */ 20 21 public class WorkConsumer2 { 22 private static final Logger LOGGER = LoggerFactory.getLogger(WorkConsumer2.class); 23 24 public static void main(String[] args) throws IOException { 25 ConnectionFactory connectionFactory = RabbitConfig.getConnectionFactory(); 26 Connection connection = connectionFactory.createConnection(); 27 Channel channel = connection.createChannel(false); 28 //建立隊列申明 29 channel.queueDeclare(RabbitConfig.QUEUE_WORK, false, false, false, null); 30 31 /** 32 * 改變分發規則 33 */ 34 channel.basicQos(1); 35 36 DefaultConsumer consumer = new DefaultConsumer(channel) { 37 @Override 38 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 39 super.handleDelivery(consumerTag, envelope, properties, body); 40 System.out.println("[2] 接口數據 : " + new String(body, "utf-8")); 41 42 try { 43 Thread.sleep(200); 44 } catch (InterruptedException e) { 45 e.printStackTrace(); 46 } finally { 47 System.out.println("[2] done!"); 48 //消息應答:手動回執,手動確認消息 49 channel.basicAck(envelope.getDeliveryTag(),false); 50 } 51 } 52 }; 53 //監聽隊列 54 /** 55 * autoAck 消息應答 56 * 默認輪詢分發打開:true :這種模式一旦rabbitmq將消息發送給消費者,就會從內存中刪除該消息,不關心客戶端是否消費正常。 57 * 使用公平分發須要關閉autoAck:false 須要手動發送回執 58 */ 59 boolean autoAck = false; 60 channel.basicConsume(RabbitConfig.QUEUE_WORK,autoAck, consumer); 61 } 62 63 }