目錄java
consumer.nextDelivery
方法進行獲取下一條消息,而後進行消費處理!DefaultConsumer
類,重寫 handleDelivery
方法便可
public class Producer { public static void main(String[] args) throws Exception { //1 建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2 獲取Connection Connection connection = connectionFactory.newConnection(); //3 經過Connection建立一個新的Channel Channel channel = connection.createChannel(); String exchange = "test_consumer_exchange"; String routingKey = "consumer.save"; String msg = "Hello RabbitMQ Consumer Message"; //4 發送消息 for(int i =0; i<5; i ++){ channel.basicPublish(exchange, routingKey, true, null, msg.getBytes()); } } }
public class MyConsumer extends DefaultConsumer { public MyConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //consumerTag: 內部生成的消費標籤 properties: 消息屬性 body: 消息內容 System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); //envelope包含屬性:deliveryTag(標籤), redeliver, exchange, routingKey //redeliver是一個標記,若是設爲true,表示消息以前可能已經投遞過了,如今是從新投遞消息到監聽隊列的消費者 System.err.println("envelope: " + envelope); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }
public class Consumer { public static void main(String[] args) throws Exception { //1 建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2 獲取Connection Connection connection = connectionFactory.newConnection(); //3 經過Connection建立一個新的Channel Channel channel = connection.createChannel(); String exchangeName = "test_consumer_exchange"; String routingKey = "consumer.#"; String queueName = "test_consumer_queue"; //4 聲明交換機和隊列,而後進行綁定設置路由Key channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //5 設置channel,使用自定義消費者 channel.basicConsume(queueName, true, new MyConsumer(channel)); } }
運行說明api
先啓動消費端,訪問管控臺:http://ip:15672,檢查Exchange和Queue是否設置OK,而後啓動生產端。消費端打印內容以下服務器
消費端限流機制ide
RabbitMQ提供了一種qos
(服務質量保證)功能,即在非自動確認消息的前提下,若是必定數目的消息 (經過基於consume或者channel設置Qos的值) 未被確認前,不進行消費新的消息。性能
須要注意:測試
1.不能設置自動簽收功能(autoAck = false)fetch
2.若是消息沒被確認,就不會到達消費端,目的就是給消費端減壓ui
限流設置 - BasicQos()this
void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
prefetchSize:
單條消息的大小限制,消費端一般設置爲0,表示不作限制
prefetchCount:
一次最多能處理多少條消息,一般設置爲1
global:
是否將上面設置應用於channel,false表明consumer級別日誌
注意事項
prefetchSize
和global
這兩項,rabbitmq沒有實現,暫且不研究
prefetchCount
在 autoAck=false
的狀況下生效,即在自動應答的狀況下這個值是不生效的
手工ACK - basicAck()
void basicAck(Integer deliveryTag,boolean multiple)
手工ACK,調用這個方法就會主動回送給Broker一個應答,表示這條消息我處理完了,你能夠給我下一條了。參數multiple
表示是否批量簽收,因爲咱們是一次處理一條消息,因此設置爲false
生產端
生產端就是正常的邏輯
public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "test_qos_exchange"; String routingKey = "qos.save"; String msg = "Hello RabbitMQ QOS Message"; // 發送消息 for (int i = 0; i < 5; i++) { channel.basicPublish(exchange, routingKey, true, null, msg.getBytes()); } } }
自定義消費者
爲了看到限流效果,這裏不進行ACK
public class MyConsumer extends DefaultConsumer { //接收channel private Channel channel ; public MyConsumer(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); System.err.println("envelope: " + envelope); //System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); //手工ACK,參數multiple表示不批量簽收 //channel.basicAck(envelope.getDeliveryTag(), false); } }
消費端
關閉autoACK,進行限流設置
public class Consumer { public static void main(String[] args) throws Exception { //1 建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2 獲取Connection Connection connection = connectionFactory.newConnection(); //3 經過Connection建立一個新的Channel Channel channel = connection.createChannel(); String exchangeName = "test_qos_exchange"; String queueName = "test_qos_queue"; String routingKey = "qos.#"; //4 聲明交換機和隊列,而後進行綁定設置路由Key channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //進行參數設置:單條消息的大小限制,一次最多能處理多少條消息,是否將上面設置應用於channel channel.basicQos(0, 1, false); //限流: autoAck設置爲 false channel.basicConsume(queueName, false, new MyConsumer(channel)); } }
運行說明
咱們先註釋掉手工ACK方法,而後啓動消費端和生產端,此時消費端只打印了一條消息
這是由於咱們設置了手工簽收,而且設置了一次只處理一條消息,當咱們沒有回送ack應答時,Broker端就認爲消費端尚未處理完這條消息,基於這種限流機制就不會給消費端發送新的消息了,因此消費端只打印了一條消息。
經過管控臺也能夠看到隊列總共收到了5條消息,有一條消息沒有ack。
將手工簽收代碼取消註釋,再次運行消費端,此時就會打印5條消息的內容。
當咱們設置 autoACK=false
時,就可使用手工ACK方式了,那麼其實手工方式包括了手工ACK與NACK。
當咱們手工 ACK
時,會發送給Broker一個應答,表明消息成功處理了,Broker就能夠回送響應給生產端了。NACK
則表示消息處理失敗了,若是設置重回隊列,Broker端就會將沒有成功處理的消息從新發送。
使用方式
NACK
並進行日誌的記錄,而後進行補償!void basicNack(long deliveryTag, boolean multiple, boolean requeue)
ACK
保障消費端消費成功!void basicAck(long deliveryTag, boolean multiple)
生產端
對消息設置自定義屬性以便進行區分
public class Producer { public static void main(String[] args) throws Exception { //1 建立ConnectionFactorys ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2 獲取Connection Connection connection = connectionFactory.newConnection(); //3 經過Connection建立一個新的Channel Channel channel = connection.createChannel(); String exchange = "test_ack_exchange"; String routingKey = "ack.save"; for(int i =0; i<5; i ++){ //設置消息屬性 Map<String, Object> headers = new HashMap<String, Object>(); headers.put("num", i); AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .headers(headers) .build(); //發送消息 String msg = "Hello RabbitMQ ACK Message " + i; channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes()); } } }
自定義消費
對第一條消息進行NACK,並設置重回隊列
public class MyConsumer extends DefaultConsumer { private Channel channel ; public MyConsumer(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("body: " + new String(body)); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } if((Integer)properties.getHeaders().get("num") == 0) { //NACK,參數三requeue:是否重回隊列 channel.basicNack(envelope.getDeliveryTag(), false, true); } else { channel.basicAck(envelope.getDeliveryTag(), false); } } }
消費端
關閉自動簽收功能
public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_ack_exchange"; String queueName = "test_ack_queue"; String routingKey = "ack.#"; //聲明交換機和隊列,而後進行綁定設置路由Key channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //手工簽收 必需要設置 autoAck = false channel.basicConsume(queueName, false, new MyConsumer(channel)); } }
運行說明
先啓動消費端,而後啓動生產端,消費端打印以下,顯然第一條消息因爲咱們調用了NACK,而且設置了重回隊列,因此會致使該條消息一直重複發送,消費端就會一直循環消費。
通常工做中不會設置重回隊列這個屬性,都是本身去作補償或者投遞到延遲隊列裏的,而後指定時間去處理便可。
TTL說明
Time To Live
的縮寫,也就是生存時間
此次演示咱們不寫代碼,只經過管控臺進行操做,實際測試也會更爲方便一些。
選擇Exchange菜單,找到下面的Add a new exchange
選擇Queue菜單,找到下面的Add a new queue
點擊Exchange表格中的test002_exchange
,在下面添加綁定規則
點擊Exchange表格中的test002_exchange
,在下面找到Publish message
,設置消息進行發送
點擊Queue菜單,查看錶格中test002已經有了一條消息,10秒後表格顯示0條,說明過時時間到了消息被自動清除了。
點擊Exchange表格中的test002_exchange
,在下面找到Publish message
,設置消息的過時時間並進行發送,此時觀察test002隊列,發現消息5s後就過時被清除了,即便隊列設置的過時時間是10s。
TTL代碼設置過時時間
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .expiration("10000") //10s過時 .build(); //發送消息 channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
隊列過時時間設置
//設置隊列的過時時間10s Map<String,Object> param = new HashMap<>(); param.put("x-message-ttl", 10000); //聲明隊列 channel.queueDeclare(queueName, true, false, false, null);
注意事項
死信隊列介紹
dead-letter-exchange
(dead message)
以後,它能被從新publish到另外一個Exchange,這個Exchange就是DLX
消息變成死信有如下幾種狀況
死信處理過程
死信隊列設置
arguments.put(" x-dead-letter-exchange","dlx.exchange");
,這樣消息在過時、requeue、 隊列在達到最大長度時,消息就能夠直接路由到死信隊列!
生產端
public class Producer { public static void main(String[] args) throws Exception { //1 建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2 獲取Connection Connection connection = connectionFactory.newConnection(); //3 經過Connection建立一個新的Channel Channel channel = connection.createChannel(); String exchange = "test_dlx_exchange"; String routingKey = "dlx.save"; String msg = "Hello RabbitMQ DLX Message"; AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .expiration("10000") .build(); //發送消息 channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes()); } }
自定義消費者
public class MyConsumer extends DefaultConsumer { public MyConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); System.err.println("envelope: " + envelope); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }
消費端
public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 聲明一個普通的交換機 和 隊列 以及路由 String exchangeName = "test_dlx_exchange"; String routingKey = "dlx.#"; String queueName = "test_dlx_queue"; String deadQueueName = "dlx.queue"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); // 指定死信發送的Exchange Map<String, Object> agruments = new HashMap<String, Object>(); agruments.put("x-dead-letter-exchange", "dlx.exchange"); // 這個agruments屬性,要設置到聲明隊列上 channel.queueDeclare(queueName, true, false, false, agruments); channel.queueBind(queueName, exchangeName, routingKey); // 要進行死信隊列的聲明 channel.exchangeDeclare("dlx.exchange", "topic", true, false, null); channel.queueDeclare(deadQueueName, true, false, false, null); channel.queueBind(deadQueueName, "dlx.exchange", "#"); channel.basicConsume(queueName, true, new MyConsumer(channel)); //channel.basicConsume(deadQueueName, true, new MyConsumer(channel)); } }
運行說明
啓動消費端,此時查看管控臺,新增了兩個Exchange,兩個Queue。在test_dlx_queue
上咱們設置了DLX,也就表明死信消息會發送到指定的Exchange上,最終其實會路由到dlx.queue
上。
此時關閉消費端,而後啓動生產端,查看管控臺隊列的消息狀況,test_dlx_queue
的值爲1,而dlx_queue
的值爲0。
10s後的隊列結果如圖,因爲生產端發送消息時指定了消息的過時時間爲10s,而此時沒有消費端進行消費,消息便被路由到死信隊列中。
實際環境咱們還須要對死信隊列進行一個監聽和處理,固然具體的處理邏輯和業務相關,這裏只是簡單演示死信隊列是否生效。