想象以前的訂閱發佈模式: 一個生產者,多個消費者,每個消費者都有本身的一個隊列,生產者沒有將消息直接發送到隊列,而是發送到了交換機,每一個隊列綁定交換機,生產者發送的消息通過交換機,到達隊列,實現一個消息被多個消費者獲取的目的。java
須要注意的是,若是將消息發送到一個沒有隊列綁定的exchange上面,那麼該消息將會丟失,這是由於在rabbitMQ中exchange不具有存儲消息的能力,只有隊列具有存儲消息的能力。web
可是若是想象下這樣的一個場景:生產者有新增商品,修改商品,刪除商品的消息,消費者包含前臺系統和搜索系統,要求前臺系統接收修改和刪除商品的消息,搜索系統接收新增商品、修改商品和刪除商品的消息。因此使用這種訂閱模式實現商品數據的同步並不合理。所以咱們介紹下一種模式:路由模式。spring
這種模式添加了一個路由鍵,生產者發佈消息的時候添加路由鍵,消費者綁定隊列到交換機時添加鍵值,這樣就能夠接收到須要接收的消息。apache
生產者:app
1 package com.maozw.mq.routing; 2 3 import com.maozw.mq.config.RabbitConfig; 4 import com.rabbitmq.client.Channel; 5 import org.slf4j.Logger; 6 import org.slf4j.LoggerFactory; 7 import org.springframework.amqp.rabbit.connection.Connection; 8 import org.springframework.amqp.rabbit.connection.ConnectionFactory; 9 import org.springframework.beans.factory.annotation.Autowired; 10 import org.springframework.web.bind.annotation.PathVariable; 11 import org.springframework.web.bind.annotation.RequestMapping; 12 import org.springframework.web.bind.annotation.RestController; 13 14 import java.io.IOException; 15 import java.util.concurrent.TimeoutException; 16 17 import static org.apache.log4j.varia.ExternallyRolledFileAppender.OK; 18 19 /** 20 * work 模式 21 * 兩種分發: 輪詢分發 + 公平分發 22 * 輪詢分發:消費端:自動確認消息;boolean autoAck = true; 23 * 公平分發: 消費端:手動確認消息 boolean autoAck = false; channel.basicAck(envelope.getDeliveryTag(),false); 24 * 25 * @author MAOZW 26 * @Description: ${todo} 27 * @date 2018/11/26 15:06 28 */ 29 @RestController 30 @RequestMapping("/routing") 31 public class RoutingProducer { 32 private static final Logger LOGGER = LoggerFactory.getLogger(RoutingProducer.class); 33 @Autowired 34 RabbitConfig rabbitConfig; 35 36 37 @RequestMapping("/send") 38 public String send() throws IOException, TimeoutException { 39 Connection connection = null; 40 Channel channel= null; 41 try { 42 ConnectionFactory connectionFactory = rabbitConfig.connectionFactory(); 43 connection = connectionFactory.createConnection(); 44 channel = connection.createChannel(false); 45 46 /** 47 * 申明交換機 以及type 48 */ 49 channel.exchangeDeclare(RabbitConfig.EXCHANGE_ROUTE,"direct"); 50 51 /** 52 * 發送消息 53 * 每一個消費者 發送確認消息以前,消息隊列不會發送下一個消息給消費者,一次只處理一個消息 54 * 自動模式無需設置下面設置 55 */ 56 int prefetchCount = 1; 57 channel.basicQos(prefetchCount); 58 59 String Hello = ">>>> Hello EXCHANGE_ROUTE <<<<"; 60 for (int i = 0; i < 5; i++) { 61 String message = Hello + i; 62 if (i == 3){ 63 channel.basicPublish(RabbitConfig.EXCHANGE_ROUTE, RabbitConfig.ROUTINGKEY_A, null, message.getBytes()); 64 }else{ 65 channel.basicPublish(RabbitConfig.EXCHANGE_ROUTE, RabbitConfig.ROUTINGKEY_B, null, message.getBytes()); 66 } 67 LOGGER.info("生產消息: " + message); 68 } 69 return "OK"; 70 }catch (Exception e) { 71 72 } finally { 73 connection.close(); 74 channel.close(); 75 return OK; 76 } 77 } 78 }
消費者1 ide
1 package com.maozw.mq.routing; 2 3 import com.maozw.mq.config.RabbitConfig; 4 import com.rabbitmq.client.AMQP; 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.DefaultConsumer; 7 import com.rabbitmq.client.Envelope; 8 import org.slf4j.Logger; 9 import org.slf4j.LoggerFactory; 10 import org.springframework.amqp.rabbit.connection.Connection; 11 import org.springframework.amqp.rabbit.connection.ConnectionFactory; 12 13 import java.io.IOException; 14 15 /** 16 * @author MAOZW 17 * @Description: ${todo} 18 * @date 2018/11/26 15:06 19 */ 20 21 public class RoutingConsumer { 22 private static final Logger LOGGER = LoggerFactory.getLogger(RoutingConsumer.class); 23 24 public static void main(String[] args) throws IOException { 25 ConnectionFactory connectionFactory = RabbitConfig.getConnectionFactory(); 26 Connection connection = connectionFactory.createConnection(); 27 Channel channel = connection.createChannel(false); 28 /** 29 * 建立隊列申明 30 */ 31 boolean durable = true; 32 channel.queueDeclare(RabbitConfig.QUEUE_ROUTE, durable, false, false, null); 33 /** 34 * 綁定隊列到交換機 35 */ 36 channel.queueBind(RabbitConfig.QUEUE_ROUTE, RabbitConfig.EXCHANGE_ROUTE,RabbitConfig.ROUTINGKEY_A); 37 38 /** 39 * 改變分發規則 40 */ 41 channel.basicQos(1); 42 DefaultConsumer consumer = new DefaultConsumer(channel) { 43 @Override 44 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 45 super.handleDelivery(consumerTag, envelope, properties, body); 46 System.out.println("[1] 接口數據 : " + new String(body, "utf-8")); 47 try { 48 Thread.sleep(100); 49 } catch (InterruptedException e) { 50 e.printStackTrace(); 51 } finally { 52 System.out.println("[1] done!"); 53 //消息應答:手動回執,手動確認消息 54 channel.basicAck(envelope.getDeliveryTag(),false); 55 } 56 } 57 }; 58 //監聽隊列 59 /** 60 * autoAck 消息應答 61 * 默認輪詢分發打開:true :這種模式一旦rabbitmq將消息發送給消費者,就會從內存中刪除該消息,不關心客戶端是否消費正常。 62 * 使用公平分發須要關閉autoAck:false 須要手動發送回執 63 */ 64 boolean autoAck = false; 65 channel.basicConsume(RabbitConfig.QUEUE_ROUTE,autoAck, consumer); 66 } 67 68 }
1 package com.maozw.mq.routing; 2 3 import com.maozw.mq.config.RabbitConfig; 4 import com.rabbitmq.client.AMQP; 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.DefaultConsumer; 7 import com.rabbitmq.client.Envelope; 8 import org.slf4j.Logger; 9 import org.slf4j.LoggerFactory; 10 import org.springframework.amqp.rabbit.connection.Connection; 11 import org.springframework.amqp.rabbit.connection.ConnectionFactory; 12 13 import java.io.IOException; 14 15 /** 16 * @author MAOZW 17 * @Description: ${todo} 18 * @date 2018/11/26 15:06 19 */ 20 21 public class RoutingConsumer2 { 22 private static final Logger LOGGER = LoggerFactory.getLogger(RoutingConsumer2.class); 23 24 public static void main(String[] args) throws IOException { 25 ConnectionFactory connectionFactory = RabbitConfig.getConnectionFactory(); 26 Connection connection = connectionFactory.createConnection(); 27 Channel channel = connection.createChannel(false); 28 /** 29 * 建立隊列申明 30 */ 31 boolean durable = true; 32 channel.queueDeclare(RabbitConfig.QUEUE_ROUTE2, durable, false, false, null); 33 /** 34 * 綁定隊列到交換機 35 */ 36 channel.queueBind(RabbitConfig.QUEUE_ROUTE2, RabbitConfig.EXCHANGE_ROUTE,RabbitConfig.ROUTINGKEY_B); 37 38 /** 39 * 改變分發規則 40 */ 41 channel.basicQos(1); 42 DefaultConsumer consumer = new DefaultConsumer(channel) { 43 @Override 44 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 45 super.handleDelivery(consumerTag, envelope, properties, body); 46 System.out.println("[2] 接口數據 : " + new String(body, "utf-8")); 47 try { 48 Thread.sleep(100); 49 } catch (InterruptedException e) { 50 e.printStackTrace(); 51 } finally { 52 System.out.println("[2] done!"); 53 //消息應答:手動回執,手動確認消息 54 channel.basicAck(envelope.getDeliveryTag(),false); 55 } 56 } 57 }; 58 //監聽隊列 59 /** 60 * autoAck 消息應答 61 * 默認輪詢分發打開:true :這種模式一旦rabbitmq將消息發送給消費者,就會從內存中刪除該消息,不關心客戶端是否消費正常。 62 * 使用公平分發須要關閉autoAck:false 須要手動發送回執 63 */ 64 boolean autoAck = false; 65 channel.basicConsume(RabbitConfig.QUEUE_ROUTE2,autoAck, consumer); 66 } 67 }