topic模式也稱爲主題模式,其實他相對於routing模式最大的好處就是他多了一種匹配模式的路由,怎麼理解匹配呢,其實就至關於咱們以前正則的.*這種,不過他的匹配機制可能不是這種(其實除了匹配規則外,他的做用就和routing模式同樣 ),而他的工做流程圖以下:java
OK! 先說一下他的匹配規則:web
綁定鍵binding key也必須是這種形式。以特定路由鍵發送的消息將會發送到全部綁定鍵與之匹配的隊列中。但綁定鍵有兩種特殊的狀況:
①*(星號)僅表明一個單詞
②#(井號)表明任意個單詞spring
示例:apache
以上圖爲例:app
*.orange.* : 匹配以 任意一個單詞字符開頭中間包含 .orange. 以任意一個單詞字符結尾 的字符串。好比 a.orange.b, sdfsd.orange.fdsfsdf 等(注意是一個單詞)。ide
lay.# :只要一lay.開頭的都匹配,他能夠匹配lay.a, lay.a.b, lay.b.c等。fetch
這樣是否是很方便,好比咱們想將log的發給q1隊列,其餘的發給q2,那麼咱們只須要定義log.#、或者log.*,那麼你發送給q1隊列的數據就是log日誌的消息。 spa
1 package com.maozw.mq.topic; 2 3 import com.maozw.mq.config.RabbitConfig; 4 import com.rabbitmq.client.Channel; 5 import org.slf4j.Logger; 6 import org.slf4j.LoggerFactory; 7 import org.springframework.amqp.rabbit.connection.Connection; 8 import org.springframework.amqp.rabbit.connection.ConnectionFactory; 9 import org.springframework.beans.factory.annotation.Autowired; 10 import org.springframework.web.bind.annotation.RequestMapping; 11 import org.springframework.web.bind.annotation.RestController; 12 13 import java.io.IOException; 14 import java.util.concurrent.TimeoutException; 15 16 import static org.apache.log4j.varia.ExternallyRolledFileAppender.OK; 17 18 /** 19 * work 模式 20 * 兩種分發: 輪詢分發 + 公平分發 21 * 輪詢分發:消費端:自動確認消息;boolean autoAck = true; 22 * 公平分發: 消費端:手動確認消息 boolean autoAck = false; channel.basicAck(envelope.getDeliveryTag(),false); 23 * 24 * @author MAOZW 25 * @Description: 商品 26 * @date 2018/11/26 15:06 27 */ 28 @RestController 29 @RequestMapping("/topic") 30 public class TopicProducer { 31 private static final Logger LOGGER = LoggerFactory.getLogger(TopicProducer.class); 32 @Autowired 33 RabbitConfig rabbitConfig; 34 35 public static final String EXCHANGE_TOPIC = "EXCHANGE_TOPIC"; 36 private static final String[] routingKeys = {"commodity.add", "commodity.update", "commodity.select", "commodity.delete"}; 37 38 @RequestMapping("/send") 39 public String send() throws IOException, TimeoutException { 40 Connection connection = null; 41 Channel channel = null; 42 try { 43 ConnectionFactory connectionFactory = rabbitConfig.connectionFactory(); 44 connection = connectionFactory.createConnection(); 45 channel = connection.createChannel(false); 46 47 /** 48 * 申明交換機 以及type 49 */ 50 channel.exchangeDeclare(EXCHANGE_TOPIC, "topic"); 51 /** 52 * 發送消息 53 * 每一個消費者 發送確認消息以前,消息隊列不會發送下一個消息給消費者,一次只處理一個消息 54 * 自動模式無需設置下面設置 55 */ 56 int prefetchCount = 1; 57 channel.basicQos(prefetchCount); 58 59 String Hello = ">>>> Hello EXCHANGE_TOPIC <<<<"; 60 String message = Hello; 61 for (String routingKey : routingKeys) { 62 channel.basicPublish(EXCHANGE_TOPIC, routingKey, null, message.getBytes()); 63 } 64 LOGGER.info("生產消息: " + message); 65 return "OK"; 66 } catch (Exception e) { 67 68 } finally { 69 connection.close(); 70 channel.close(); 71 return OK; 72 } 73 } 74 }
1 package com.maozw.mq.topic; 2 3 import com.maozw.mq.config.RabbitConfig; 4 import com.rabbitmq.client.AMQP; 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.DefaultConsumer; 7 import com.rabbitmq.client.Envelope; 8 import org.slf4j.Logger; 9 import org.slf4j.LoggerFactory; 10 import org.springframework.amqp.rabbit.connection.Connection; 11 import org.springframework.amqp.rabbit.connection.ConnectionFactory; 12 13 import java.io.IOException; 14 15 /** 16 * @author MAOZW 17 * @Description: ${todo} 18 * @date 2018/11/26 15:06 19 */ 20 21 public class TopicConsumer { 22 private static final Logger LOGGER = LoggerFactory.getLogger(TopicConsumer.class); 23 public static final String EXCHANGE_TOPIC = "EXCHANGE_TOPIC"; 24 private static final String[] routingKeys = {"commodity.add", "commodity.update", "commodity.delete"}; 25 26 public static void main(String[] args) throws IOException { 27 ConnectionFactory connectionFactory = RabbitConfig.getConnectionFactory(); 28 Connection connection = connectionFactory.createConnection(); 29 Channel channel = connection.createChannel(false); 30 /** 31 * 建立隊列申明 32 */ 33 boolean durable = true; 34 channel.queueDeclare(RabbitConfig.QUEUE_TOPIC, durable, false, false, null); 35 /** 36 * 綁定隊列到交換機 37 */ 38 channel.queueBind(RabbitConfig.QUEUE_TOPIC, EXCHANGE_TOPIC, "commodity.add"); 39 40 41 /** 42 * 改變分發規則 43 */ 44 channel.basicQos(1); 45 DefaultConsumer consumer = new DefaultConsumer(channel) { 46 @Override 47 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 48 super.handleDelivery(consumerTag, envelope, properties, body); 49 System.out.println("[1] 接口數據 : " + new String(body, "utf-8")); 50 try { 51 Thread.sleep(300); 52 } catch (InterruptedException e) { 53 e.printStackTrace(); 54 } finally { 55 System.out.println("[1] done!"); 56 //消息應答:手動回執,手動確認消息 57 channel.basicAck(envelope.getDeliveryTag(), false); 58 } 59 } 60 }; 61 //監聽隊列 62 /** 63 * autoAck 消息應答 64 * 默認輪詢分發打開:true :這種模式一旦rabbitmq將消息發送給消費者,就會從內存中刪除該消息,不關心客戶端是否消費正常。 65 * 使用公平分發須要關閉autoAck:false 須要手動發送回執 66 */ 67 boolean autoAck = false; 68 channel.basicConsume(RabbitConfig.QUEUE_TOPIC, autoAck, consumer); 69 } 70 71 }
1 package com.maozw.mq.topic; 2 3 import com.maozw.mq.config.RabbitConfig; 4 import com.rabbitmq.client.AMQP; 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.DefaultConsumer; 7 import com.rabbitmq.client.Envelope; 8 import org.slf4j.Logger; 9 import org.slf4j.LoggerFactory; 10 import org.springframework.amqp.rabbit.connection.Connection; 11 import org.springframework.amqp.rabbit.connection.ConnectionFactory; 12 13 import java.io.IOException; 14 15 /** 16 * @author MAOZW 17 * @Description: ${todo} 18 * @date 2018/11/26 15:06 19 */ 20 21 public class TopicConsumer2 { 22 private static final Logger LOGGER = LoggerFactory.getLogger(TopicConsumer2.class); 23 public static final String EXCHANGE_TOPIC = "EXCHANGE_TOPIC"; 24 private static final String[] routingKeys = {"commodity.add", "commodity.update", "commodity.delete", "commodity.select"}; 25 26 public static void main(String[] args) throws IOException { 27 ConnectionFactory connectionFactory = RabbitConfig.getConnectionFactory(); 28 Connection connection = connectionFactory.createConnection(); 29 Channel channel = connection.createChannel(false); 30 /** 31 * 建立隊列申明 32 */ 33 boolean durable = true; 34 channel.queueDeclare(RabbitConfig.QUEUE_TOPIC2, durable, false, false, null); 35 36 /** 37 * 綁定隊列到交換機 38 */ 39 channel.queueBind(RabbitConfig.QUEUE_TOPIC2, EXCHANGE_TOPIC, "commodity.*"); 40 41 42 /** 43 * 改變分發規則 44 */ 45 channel.basicQos(1); 46 DefaultConsumer consumer = new DefaultConsumer(channel) { 47 @Override 48 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 49 super.handleDelivery(consumerTag, envelope, properties, body); 50 System.out.println("[2] 接口數據 : " + new String(body, "utf-8")); 51 try { 52 Thread.sleep(200); 53 } catch (InterruptedException e) { 54 e.printStackTrace(); 55 } finally { 56 System.out.println("[2] done! "); 57 //消息應答:手動回執,手動確認消息 58 channel.basicAck(envelope.getDeliveryTag(), false); 59 } 60 } 61 }; 62 //監聽隊列 63 /** 64 * autoAck 消息應答 65 * 默認輪詢分發打開:true :這種模式一旦rabbitmq將消息發送給消費者,就會從內存中刪除該消息,不關心客戶端是否消費正常。 66 * 使用公平分發須要關閉autoAck:false 須要手動發送回執 67 */ 68 boolean autoAck = false; 69 channel.basicConsume(RabbitConfig.QUEUE_TOPIC2, autoAck, consumer); 70 } 71 }