1.RabbitMq - Work 模式

RabbitMq - Work 模式java

1、什麼是Work模式web

  若是有幾個消息都須要處理,且每一個消息的處理時間很長,僅有一個消費者,那麼當它在處理一個消息的時候,其餘消息就只有等待。spring

等待有時候是好的,但在程序中並不那麼好,當隊列中有多個消息待處理,將其分發給多個消費者,當一個消費者在處理的時候,有其餘消費者繼續消費隊列中的消息,便緩解了等待的尷尬。app

那麼這篇文章將實現一個生產者,多個消費者的模式,實現任務分發:work模式,如圖所示。

ide

 

2、消息確認機制

問題:怎樣保證消息不因消費者gg而丟失

處理一個消息可能會花必定的時間,萬一還沒處理完消費者就gg了…生產者一發送消息,便會將其標記爲已刪除,故最終的結果是:這條消息沒有獲得正確的處理。並且,指派給該消費者且還沒有處理的全部消息都會gg。spring-boot

解決策略:取消自動回覆機制
爲了解決消息的丟失問題,RabbitMQ提供了消息確認機制:message acknowledgments,一個消費者處理完成後,將會回傳一個ack給生產者,以表示處理成功,這樣生產者才能夠將消息刪除。fetch

這樣即便一個消費者gg了,沒有回傳ack,那麼發送者便會重發消息到隊列,若是這時候有其餘的消費者服務該隊列,那麼便會從隊列中取出消息並處理。這就保證了消息的不丟失。spa

自動回覆機制:無論是否處理成功,仍是失敗,都會回覆ack。code

1 channel.basicConsume(QUEUE_NAME, true, consumer);

自動恢復機制默認是打開的,在接收端的代碼最後:第二個參數爲true,表示會自動回覆,只要生產發送消息,就會標記刪除。因此咱們須要將自動回覆設置爲false。blog

1 boolean autoAck = false;
2 channel.basicConsume(QUEUE_NAME, autoAck, consumer);

這樣來保證消息不會由於消費者的gg而丟失了。

那麼取消自動回覆之後,咱們須要手動回覆一次:

1 channel.basicAck(envelope.getDeliveryTag(), false);

注意當前的消息確認機制只適用於同一個channel。
---------------------

application.yml

#############################################################
############## rabbitmq config ##############################
#############################################################
spring.rabbitmq.host: 127.0.0.1
spring.rabbitmq.port: 5672
spring.rabbitmq.username: admin
spring.rabbitmq.password: admin

 

 1 package com.maozw.mq.config;
 2 
 3 import org.slf4j.Logger;
 4 import org.slf4j.LoggerFactory;
 5 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
 6 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 7 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 8 import org.springframework.beans.factory.annotation.Value;
 9 import org.springframework.beans.factory.config.ConfigurableBeanFactory;
10 import org.springframework.context.annotation.Bean;
11 import org.springframework.context.annotation.Configuration;
12 import org.springframework.context.annotation.Scope;
13 
14 /**
15  * @author MAOZW
16  * @Description: Broker:它提供一種傳輸服務,它的角色就是維護一條從生產者到消費者的路線,保證數據能按照指定的方式進行傳輸,
17  * Exchange:消息交換機,它指定消息按什麼規則,路由到哪一個隊列。
18  * Queue:消息的載體,每一個消息都會被投到一個或多個隊列。
19  * Binding:綁定,它的做用就是把exchange和queue按照路由規則綁定起來.
20  * Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
21  * vhost:虛擬主機,一個broker裏能夠有多個vhost,用做不一樣用戶的權限分離。
22  * Producer:消息生產者,就是投遞消息的程序.
23  * Consumer:消息消費者,就是接受消息的程序.
24  * Channel:消息通道,在客戶端的每一個鏈接裏,可創建多個channel.
25  * @date 2018/11/26 14:33
26  */
27 @Configuration
28 public class RabbitConfig {
29 
30     private static final Logger LOGGER = LoggerFactory.getLogger(RabbitConfig.class);
31 
32     @Value("${spring.rabbitmq.host}")
33     private String rabbitmqHost;
34 
35     @Value("${spring.rabbitmq.port}")
36     private int rabbitmqPort;
37 
38     @Value("${spring.rabbitmq.username}")
39     private String userName;
40 
41     @Value("${spring.rabbitmq.password}")
42     private String password;
43 
44 
45     public static final String EXCHANGE_A = "my-mq-exchange_A";
46     public static final String EXCHANGE_B = "my-mq-exchange_B";
47     public static final String EXCHANGE_C = "my-mq-exchange_C";
48 
49 
50     public static final String QUEUE_A = "QUEUE_A";
51     public static final String QUEUE_WORK = "QUEUE_WORK";
52     public static final String QUEUE_C = "QUEUE_C";
53 
54     public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";
55     public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";
56     public static final String ROUTINGKEY_C = "spring-boot-routingKey_C";
57 
58 
59     @Bean
60     public ConnectionFactory connectionFactory() {
61         CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
62         connectionFactory.setUsername(userName);
63         connectionFactory.setPassword(password);
64         connectionFactory.setVirtualHost("/vir_simple");
65         connectionFactory.setPublisherConfirms(false);
66         return connectionFactory;
67     }
68 
69     public static ConnectionFactory getConnectionFactory() {
70         CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
71         connectionFactory.setUsername("admin");
72         connectionFactory.setPassword("admin");
73         connectionFactory.setVirtualHost("/vir_simple");
74         connectionFactory.setPublisherConfirms(false);
75         return connectionFactory;
76     }
77 }

生產者

 1 package com.maozw.mq.work;
 2 
 3 import com.maozw.mq.config.RabbitConfig;
 4 import com.rabbitmq.client.Channel;
 5 import org.slf4j.Logger;
 6 import org.slf4j.LoggerFactory;
 7 import org.springframework.amqp.rabbit.connection.Connection;
 8 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 9 import org.springframework.beans.factory.annotation.Autowired;
10 import org.springframework.web.bind.annotation.RequestMapping;
11 import org.springframework.web.bind.annotation.RestController;
12 
13 import java.io.IOException;
14 
15 /**
16  * work 模式
17  * 兩種分發: 輪詢分發 + 公平分發
18  * 輪詢分發:消費端:自動確認消息;boolean autoAck = true;
19  * 公平分發: 消費端:手動確認消息 boolean autoAck = false; channel.basicAck(envelope.getDeliveryTag(),false);
20  * @author MAOZW
21  * @Description: ${todo}
22  * @date 2018/11/26 15:06
23  */
24 @RestController
25 @RequestMapping("/work")
26 public class WorkProducer {
27     private static final Logger LOGGER = LoggerFactory.getLogger(WorkProducer.class);
28     @Autowired
29     RabbitConfig rabbitConfig;
30 
31     @RequestMapping("/send")
32     public String send() throws IOException {
33         ConnectionFactory connectionFactory = rabbitConfig.connectionFactory();
34         Connection connection = connectionFactory.createConnection();
35         Channel channel = connection.createChannel(false);
36         //建立隊列申明
37         channel.queueDeclare(RabbitConfig.QUEUE_WORK, false, false, false, null);
38 
39         /**
40          * 每一個消費者 發送確認消息以前,消息隊列不會發送下一個消息給消費者,一次只處理一個消息
41          * 自動模式無需設置下面設置
42          */
43         int prefetchCount = 1;
44         channel.basicQos(prefetchCount);
45 
46         String Hello = ">>>> Hello Simple <<<<";
47         for (int i = 0; i < 50; i++) {
48             String message = Hello + i;
49             channel.basicPublish("", RabbitConfig.QUEUE_WORK, null, message.getBytes());
50             LOGGER.info("生產消息: " + message);
51         }
52         return "OK";
53     }
54 }

消費者

 1 package com.maozw.mq.work;
 2 
 3 import com.maozw.mq.config.RabbitConfig;
 4 import com.rabbitmq.client.AMQP;
 5 import com.rabbitmq.client.Channel;
 6 import com.rabbitmq.client.DefaultConsumer;
 7 import com.rabbitmq.client.Envelope;
 8 import org.slf4j.Logger;
 9 import org.slf4j.LoggerFactory;
10 import org.springframework.amqp.rabbit.connection.Connection;
11 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
12 
13 import java.io.IOException;
14 
15 /**
16  * @author MAOZW
17  * @Description: ${todo}
18  * @date 2018/11/26 15:06
19  */
20 
21 public class WorkConsumer2 {
22     private static final Logger LOGGER = LoggerFactory.getLogger(WorkConsumer2.class);
23 
24     public static void main(String[] args) throws IOException {
25         ConnectionFactory connectionFactory = RabbitConfig.getConnectionFactory();
26         Connection connection = connectionFactory.createConnection();
27         Channel channel = connection.createChannel(false);
28         //建立隊列申明
29         channel.queueDeclare(RabbitConfig.QUEUE_WORK, false, false, false, null);
30 
31         /**
32          * 改變分發規則
33          */
34         channel.basicQos(1);
35 
36         DefaultConsumer consumer = new DefaultConsumer(channel) {
37             @Override
38             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
39                 super.handleDelivery(consumerTag, envelope, properties, body);
40                 System.out.println("[2] 接口數據 : " + new String(body, "utf-8"));
41 
42                 try {
43                     Thread.sleep(200);
44                 } catch (InterruptedException e) {
45                     e.printStackTrace();
46                 } finally {
47                     System.out.println("[2] done!");
48                     //消息應答:手動回執,手動確認消息
49                     channel.basicAck(envelope.getDeliveryTag(),false);
50                 }
51             }
52         };
53         //監聽隊列
54         /**
55          * autoAck 消息應答
56          *  默認輪詢分發打開:true :這種模式一旦rabbitmq將消息發送給消費者,就會從內存中刪除該消息,不關心客戶端是否消費正常。
57          *  使用公平分發須要關閉autoAck:false  須要手動發送回執
58          */
59         boolean autoAck = false;
60         channel.basicConsume(RabbitConfig.QUEUE_WORK,autoAck, consumer);
61     }
62     
63 }
相關文章
相關標籤/搜索