4.rabbitmq--路由模式

rabbitmq--路由模式

想象以前的訂閱發佈模式: 一個生產者,多個消費者,每個消費者都有本身的一個隊列,生產者沒有將消息直接發送到隊列,而是發送到了交換機,每一個隊列綁定交換機,生產者發送的消息通過交換機,到達隊列,實現一個消息被多個消費者獲取的目的。java

須要注意的是,若是將消息發送到一個沒有隊列綁定的exchange上面,那麼該消息將會丟失,這是由於在rabbitMQ中exchange不具有存儲消息的能力,只有隊列具有存儲消息的能力。web

可是若是想象下這樣的一個場景:生產者有新增商品,修改商品,刪除商品的消息,消費者包含前臺系統和搜索系統,要求前臺系統接收修改和刪除商品的消息,搜索系統接收新增商品、修改商品和刪除商品的消息。因此使用這種訂閱模式實現商品數據的同步並不合理。所以咱們介紹下一種模式:路由模式。spring

這種模式添加了一個路由鍵,生產者發佈消息的時候添加路由鍵,消費者綁定隊列到交換機時添加鍵值,這樣就能夠接收到須要接收的消息。apache

 生產者:app

 1 package com.maozw.mq.routing;
 2 
 3 import com.maozw.mq.config.RabbitConfig;
 4 import com.rabbitmq.client.Channel;
 5 import org.slf4j.Logger;
 6 import org.slf4j.LoggerFactory;
 7 import org.springframework.amqp.rabbit.connection.Connection;
 8 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 9 import org.springframework.beans.factory.annotation.Autowired;
10 import org.springframework.web.bind.annotation.PathVariable;
11 import org.springframework.web.bind.annotation.RequestMapping;
12 import org.springframework.web.bind.annotation.RestController;
13 
14 import java.io.IOException;
15 import java.util.concurrent.TimeoutException;
16 
17 import static org.apache.log4j.varia.ExternallyRolledFileAppender.OK;
18 
19 /**
20  * work 模式
21  * 兩種分發: 輪詢分發 + 公平分發
22  * 輪詢分發:消費端:自動確認消息;boolean autoAck = true;
23  * 公平分發: 消費端:手動確認消息 boolean autoAck = false; channel.basicAck(envelope.getDeliveryTag(),false);
24  *
25  * @author MAOZW
26  * @Description: ${todo}
27  * @date 2018/11/26 15:06
28  */
29 @RestController
30 @RequestMapping("/routing")
31 public class RoutingProducer {
32     private static final Logger LOGGER = LoggerFactory.getLogger(RoutingProducer.class);
33     @Autowired
34     RabbitConfig rabbitConfig;
35 
36 
37     @RequestMapping("/send")
38     public String send() throws IOException, TimeoutException {
39         Connection connection = null;
40         Channel channel= null;
41         try {
42             ConnectionFactory connectionFactory = rabbitConfig.connectionFactory();
43             connection = connectionFactory.createConnection();
44             channel = connection.createChannel(false);
45 
46             /**
47              * 申明交換機 以及type
48              */
49             channel.exchangeDeclare(RabbitConfig.EXCHANGE_ROUTE,"direct");
50 
51             /**
52              * 發送消息
53              * 每一個消費者 發送確認消息以前,消息隊列不會發送下一個消息給消費者,一次只處理一個消息
54              * 自動模式無需設置下面設置
55              */
56             int prefetchCount = 1;
57             channel.basicQos(prefetchCount);
58 
59             String Hello = ">>>> Hello EXCHANGE_ROUTE <<<<";
60             for (int i = 0; i < 5; i++) {
61                 String message = Hello + i;
62                 if (i == 3){
63                     channel.basicPublish(RabbitConfig.EXCHANGE_ROUTE, RabbitConfig.ROUTINGKEY_A, null, message.getBytes());
64                 }else{
65                     channel.basicPublish(RabbitConfig.EXCHANGE_ROUTE, RabbitConfig.ROUTINGKEY_B, null, message.getBytes());
66                 }
67                 LOGGER.info("生產消息: " + message);
68             }
69             return "OK";
70         }catch (Exception e) {
71 
72         } finally {
73             connection.close();
74             channel.close();
75             return OK;
76         }
77     }
78 }

消費者1 ide

 1 package com.maozw.mq.routing;
 2 
 3 import com.maozw.mq.config.RabbitConfig;
 4 import com.rabbitmq.client.AMQP;
 5 import com.rabbitmq.client.Channel;
 6 import com.rabbitmq.client.DefaultConsumer;
 7 import com.rabbitmq.client.Envelope;
 8 import org.slf4j.Logger;
 9 import org.slf4j.LoggerFactory;
10 import org.springframework.amqp.rabbit.connection.Connection;
11 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
12 
13 import java.io.IOException;
14 
15 /**
16  * @author MAOZW
17  * @Description: ${todo}
18  * @date 2018/11/26 15:06
19  */
20 
21 public class RoutingConsumer {
22     private static final Logger LOGGER = LoggerFactory.getLogger(RoutingConsumer.class);
23 
24     public static void main(String[] args) throws IOException {
25         ConnectionFactory connectionFactory = RabbitConfig.getConnectionFactory();
26         Connection connection = connectionFactory.createConnection();
27         Channel channel = connection.createChannel(false);
28         /**
29          * 建立隊列申明
30          */
31         boolean durable = true;
32         channel.queueDeclare(RabbitConfig.QUEUE_ROUTE, durable, false, false, null);
33         /**
34          * 綁定隊列到交換機
35          */
36         channel.queueBind(RabbitConfig.QUEUE_ROUTE, RabbitConfig.EXCHANGE_ROUTE,RabbitConfig.ROUTINGKEY_A);
37 
38         /**
39          * 改變分發規則
40          */
41         channel.basicQos(1);
42         DefaultConsumer consumer = new DefaultConsumer(channel) {
43             @Override
44             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
45                 super.handleDelivery(consumerTag, envelope, properties, body);
46                 System.out.println("[1] 接口數據 : " + new String(body, "utf-8"));
47                 try {
48                     Thread.sleep(100);
49                 } catch (InterruptedException e) {
50                     e.printStackTrace();
51                 } finally {
52                     System.out.println("[1] done!");
53                     //消息應答:手動回執,手動確認消息
54                     channel.basicAck(envelope.getDeliveryTag(),false);
55                 }
56             }
57         };
58         //監聽隊列
59         /**
60          * autoAck 消息應答
61          *  默認輪詢分發打開:true :這種模式一旦rabbitmq將消息發送給消費者,就會從內存中刪除該消息,不關心客戶端是否消費正常。
62          *  使用公平分發須要關閉autoAck:false  須要手動發送回執
63          */
64         boolean autoAck = false;
65         channel.basicConsume(RabbitConfig.QUEUE_ROUTE,autoAck, consumer);
66     }
67     
68 }
 1 package com.maozw.mq.routing;
 2 
 3 import com.maozw.mq.config.RabbitConfig;
 4 import com.rabbitmq.client.AMQP;
 5 import com.rabbitmq.client.Channel;
 6 import com.rabbitmq.client.DefaultConsumer;
 7 import com.rabbitmq.client.Envelope;
 8 import org.slf4j.Logger;
 9 import org.slf4j.LoggerFactory;
10 import org.springframework.amqp.rabbit.connection.Connection;
11 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
12 
13 import java.io.IOException;
14 
15 /**
16  * @author MAOZW
17  * @Description: ${todo}
18  * @date 2018/11/26 15:06
19  */
20 
21 public class RoutingConsumer2 {
22     private static final Logger LOGGER = LoggerFactory.getLogger(RoutingConsumer2.class);
23 
24     public static void main(String[] args) throws IOException {
25         ConnectionFactory connectionFactory = RabbitConfig.getConnectionFactory();
26         Connection connection = connectionFactory.createConnection();
27         Channel channel = connection.createChannel(false);
28         /**
29          * 建立隊列申明
30          */
31         boolean durable = true;
32         channel.queueDeclare(RabbitConfig.QUEUE_ROUTE2, durable, false, false, null);
33         /**
34          * 綁定隊列到交換機
35          */
36         channel.queueBind(RabbitConfig.QUEUE_ROUTE2, RabbitConfig.EXCHANGE_ROUTE,RabbitConfig.ROUTINGKEY_B);
37 
38         /**
39          * 改變分發規則
40          */
41         channel.basicQos(1);
42         DefaultConsumer consumer = new DefaultConsumer(channel) {
43             @Override
44             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
45                 super.handleDelivery(consumerTag, envelope, properties, body);
46                 System.out.println("[2] 接口數據 : " + new String(body, "utf-8"));
47                 try {
48                     Thread.sleep(100);
49                 } catch (InterruptedException e) {
50                     e.printStackTrace();
51                 } finally {
52                     System.out.println("[2] done!");
53                     //消息應答:手動回執,手動確認消息
54                     channel.basicAck(envelope.getDeliveryTag(),false);
55                 }
56             }
57         };
58         //監聽隊列
59         /**
60          * autoAck 消息應答
61          *  默認輪詢分發打開:true :這種模式一旦rabbitmq將消息發送給消費者,就會從內存中刪除該消息,不關心客戶端是否消費正常。
62          *  使用公平分發須要關閉autoAck:false  須要手動發送回執
63          */
64         boolean autoAck = false;
65         channel.basicConsume(RabbitConfig.QUEUE_ROUTE2,autoAck, consumer);
66     }
67 }
相關文章
相關標籤/搜索