RabbitMQ能夠設置隊列的優先級,在隊列中的高優先級消息會被優先消費。在設置優先級時,首先須要設置隊列的最高優先級,而後在生產者發送消息時設置該條消息的優先級,最後在隊列中的高優先級的消息會被先發送給消費者消費java
設置隊列的最高優先級在聲明隊列時進行設置,代碼以下:設計模式
Map<String, Object> queueArgs = new HashMap<>(1); queueArgs.put("x-max-priority", 10); channel.queueDeclare(QUEUE_NAME, false, false, false, queueArgs);
設置消息的優先級在生產者生成消息時進行設置,代碼以下:服務器
BasicProperties properties = new BasicProperties.Builder() .priority(i) .build(); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message);
注意:當消費者消費速度大於生產端,且Broker中沒有消息堆積的話,也就是說當生產者生產一條消息就被消費者消費,消息隊列中沒有消息堆積的話,設置消息優先級是沒有意義的ide
生產者ui
Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); for (int i = 0; i < 10; i++) { BasicProperties properties = new BasicProperties.Builder() .priority(i) .build(); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, String.valueOf(i).getBytes(StandardCharsets.UTF_8)); }
消費者.net
Channel channel = connection.createChannel(); channel.exchangeDeclare(PriorityProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT); Map<String, Object> queueArgs = new HashMap<>(1); queueArgs.put("x-max-priority", 10); channel.queueDeclare(QUEUE_NAME, false, false, false, queueArgs); channel.queueBind(QUEUE_NAME, PriorityProducer.EXCHANGE_NAME, PriorityProducer.ROUTING_KEY); channel.basicQos(1); channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, StandardCharsets.UTF_8); System.out.print(message + " "); channel.basicAck(envelope.getDeliveryTag(), false); } });
輸出以下: 0 9 8 7 6 5 4 3 2 1設計
因爲消費者設置了消息預取數量爲1,因此會先取0消費,而後形成消息在消息隊列中的積壓,後面取的話就會先取優先級高的消息code
RabbitMQ使用AMQP協議,在AMQP協議中沒有直接實現延遲消息,因此咱們使用死信隊列(DLX)和消息存活時間(TTL)模擬出延遲隊列blog
當消息在隊列中變爲死信消息(Dead Message)後,該消息會被Publish到該隊列的DLX(Dead-Letter-Exchange)中。DLX就是一個Exchange,當消息被髮送到DLX後能夠路由到隊列中進行從新消費接口
消息在消息隊列中變爲死信消息的幾種狀況:
在聲明隊列時設置該隊列的死信隊列以及發送消息到死信隊列的Routing Key,代碼以下:
Map<String, Object> queueArgs = new HashMap<>(2); // 設置死信隊列 queueArgs.put("x-dead-letter-exchange", DelayProducer.DLX_EXCHANGE_NAME); // 設置死信Roting Key,不設置默認使用該Queue的Routing Key queueArgs.put("x-dead-letter-routing-key", DelayProducer.DLX_ROUTING_KEY); channel.queueDeclare(PLAIN_QUEUE_NAME, false, false, false, queueArgs);
能夠經過DDL和DLX實現延遲隊列,具體實現邏輯以下:
把消息發送到普通的隊列中(該隊列設置死信隊列),當消息DDL到期後會發送到死信隊列中,而後經過消費死信隊列中的消息實現延遲隊列,示例代碼以下:
生產者
Channel channel = connection.createChannel(); channel.exchangeDeclare(PLAIN_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); BasicProperties properties = new BasicProperties.Builder() // 設置消息的TTL .expiration("60000") .build(); channel.basicPublish(PLAIN_EXCHANGE_NAME, PLAIN_ROUTING_KEY, properties, "Hello".getBytes(StandardCharsets.UTF_8));
消費者
Channel channel = connection.createChannel(); channel.exchangeDeclare(DelayProducer.PLAIN_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DelayProducer.DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); Map<String, Object> queueArgs = new HashMap<>(2); // 設置死信隊列 queueArgs.put("x-dead-letter-exchange", DelayProducer.DLX_EXCHANGE_NAME); // 設置死信Roting Key,不設置默認使用該Queue的Routing Key queueArgs.put("x-dead-letter-routing-key", DelayProducer.DLX_ROUTING_KEY); channel.queueDeclare(PLAIN_QUEUE_NAME, false, false, false, queueArgs); channel.queueBind(PLAIN_QUEUE_NAME, DelayProducer.PLAIN_EXCHANGE_NAME, DelayProducer.PLAIN_ROUTING_KEY); channel.queueDeclare(DLX_QUEUE_NAME, false, false, false, null); channel.queueBind(DLX_QUEUE_NAME, DelayProducer.DLX_EXCHANGE_NAME, DelayProducer.DLX_ROUTING_KEY); // 因爲消息到普通隊列中TTL時間內沒有消費,因此該消息會被髮送到死信隊列中,因此咱們經過消費死信隊列來實現延遲消息 channel.basicConsume(DLX_QUEUE_NAME, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, StandardCharsets.UTF_8); System.out.println(message); channel.basicAck(envelope.getDeliveryTag(), false); } });
由前面的消息隊列系列文章能夠看出來,消費者能夠獲取消息有Pull、Push模型。RabbitMQ兩種模型都支持,可是其對Pull模型支持不太好,須要本身實現輪詢查詢是否有消息。下面是兩種模型的簡單使用
Push模型是RabbitMQ服務器主動推送消息給Consumer。這種模型有點像設計模式中的時間驅動模式,須要Consumer註冊回調接口到RabbitMQ服務器中,當RabbitMQ服務器有消息時會主動回調接口發送消息。Push模型有慢消費的缺點,RabbitMQ經過設置消費者預取消息數量來控制服務器發送消息的速度。咱們常常用到就是這種模式,Consumer示例代碼以下:
Channel channel = connection.createChannel(); channel.exchangeDeclare(PullProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare(QUEUE_NAME, false,false, false, null); channel.queueBind(QUEUE_NAME, PullProducer.EXCHANGE_NAME, PullProducer.ROUTING_KEY); channel.basicConsumer(QUEUE_NAME, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // 處理消息邏輯 channel.basicAck(envelope.getDeliveryTag(), false); } });
Pull模型Consumer主動去RabbitMQ服務器拉消息。這種模式的缺點是消息延遲和忙等,須要本身設計輪詢方案。Consumer示例代碼以下,沒有實現輪詢方案:
Connection connection = Basic.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(PullProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare(QUEUE_NAME, false,false, false, null); channel.queueBind(QUEUE_NAME, PullProducer.EXCHANGE_NAME, PullProducer.ROUTING_KEY); while (true) { GetResponse response = channel.basicGet(QUEUE_NAME, false); if (response == null) { continue; } String message = new String(response.getBody(), StandardCharsets.UTF_8); // 處理消息邏輯 channel.basicAck(response.getEnvelope().getDeliveryTag(), false); }
http://blog.csdn.net/u013256816/article/details/55105495 http://blog.csdn.net/u013256816/article/details/62890189