假設一個場景,首先,咱們 Rabbitmq 服務器積壓了有上萬條未處理的消息,咱們隨便打開一個消費者客戶端,會出現這樣狀況: 巨量的消息瞬間所有推送過來,可是咱們單個客戶端沒法同時處理這麼多數據!api
當數據量特別大的時候,咱們對生產端限流確定是不科學的,由於有時候併發量就是特別大,有時候併發量又特別少,咱們沒法約束生產端,這是用戶的行爲。因此咱們應該對消費端限流,用於保持消費端的穩定,當消息數量激增的時候頗有可能形成資源耗盡,以及影響服務的性能,致使系統的卡頓甚至直接崩潰。服務器
RabbitMQ 提供了一種 qos (服務質量保證)功能,即在非自動確認消息的前提下,若是必定數目的消息(經過基於 consume 或者 channel 設置 Qos 的值)未被確認前,不進行消費新的消息。併發
/** * Request specific "quality of service" settings. * These settings impose limits on the amount of data the server * will deliver to consumers before requiring acknowledgements. * Thus they provide a means of consumer-initiated flow control. * @param prefetchSize maximum amount of content (measured in * octets) that the server will deliver, 0 if unlimited * @param prefetchCount maximum number of messages that the server * will deliver, 0 if unlimited * @param global true if the settings should be applied to the * entire channel rather than each consumer * @throws java.io.IOException if an error is encountered */ void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
prefetchSize:0,單條消息大小限制,0表明不限制app
prefetchCount:一次性消費的消息數量。會告訴 RabbitMQ 不要同時給一個消費者推送多於 N 個消息,即一旦有 N 個消息尚未 ack,則該 consumer 將 block 掉,直到有消息 ack。ide
注意:prefetchSize 和 global 這兩項,rabbitmq 沒有實現,暫且不研究。特別注意一點,prefetchCount 在 no_ask=false 的狀況下才生效,即在自動應答的狀況下這兩個值是不生效的。工具
首先第一步,咱們既然要使用消費端限流,咱們須要關閉自動 ack,將 autoAck 設置爲 falsechannel.basicConsume(queueName, false, consumer);
性能
第二步咱們來設置具體的限流大小以及數量。channel.basicQos(0, 15, false);
fetch
第三步在消費者的 handleDelivery 消費方法中手動 ack,而且設置批量處理 ack 迴應爲 truechannel.basicAck(envelope.getDeliveryTag(), true);
ui
這是生產端代碼,與前幾章的生產端代碼沒有作任何改變,主要的操做集中在消費端。
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class QosProducer { public static void main(String[] args) throws Exception { //1. 建立一個 ConnectionFactory 並進行設置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); //2. 經過鏈接工廠來建立鏈接 Connection connection = factory.newConnection(); //3. 經過 Connection 來建立 Channel Channel channel = connection.createChannel(); //4. 聲明 String exchangeName = "test_qos_exchange"; String routingKey = "item.add"; //5. 發送 String msg = "this is qos msg"; for (int i = 0; i < 10; i++) { String tem = msg + " : " + i; channel.basicPublish(exchangeName, routingKey, null, tem.getBytes()); System.out.println("Send message : " + tem); } //6. 關閉鏈接 channel.close(); connection.close(); } }
這裏咱們建立一個消費者,經過如下代碼來驗證限流效果以及 global
參數設置爲 true
時不起做用.。咱們經過Thread.sleep(5000);
來讓 ack 即處理消息的過程慢一些,這樣咱們就能夠從後臺管理工具中清晰觀察到限流狀況。
import com.rabbitmq.client.*; import java.io.IOException; public class QosConsumer { public static void main(String[] args) throws Exception { //1. 建立一個 ConnectionFactory 並進行設置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(3000); //2. 經過鏈接工廠來建立鏈接 Connection connection = factory.newConnection(); //3. 經過 Connection 來建立 Channel final Channel channel = connection.createChannel(); //4. 聲明 String exchangeName = "test_qos_exchange"; String queueName = "test_qos_queue"; String routingKey = "item.#"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.basicQos(0, 3, false); //通常不用代碼綁定,在管理界面手動綁定 channel.queueBind(queueName, exchangeName, routingKey); //5. 建立消費者並接收消息 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } String message = new String(body, "UTF-8"); System.out.println("[x] Received '" + message + "'"); channel.basicAck(envelope.getDeliveryTag(), true); } }; //6. 設置 Channel 消費者綁定隊列 channel.basicConsume(queueName, false, consumer); channel.basicConsume(queueName, false, consumer1); } }
咱們從下圖中發現 Unacked
值一直都是 3 ,每過 5 秒 消費一條消息即 Ready 和 Total 都減小 3,而 Unacked
的值在這裏表明消費者正在處理的消息,經過咱們的實驗發現了消費者一次性最多處理 3 條消息,達到了消費者限流的預期功能。
當咱們將void basicQos(int prefetchSize, int prefetchCount, boolean global)
中的 global 設置爲 true
的時候咱們發現並無了限流的做用。
TTL是Time To Live的縮寫,也就是生存時間。RabbitMQ支持消息的過時時間,在消息發送時能夠進行指定。
RabbitMQ支持隊列的過時時間,從消息入隊列開始計算,只要超過了隊列的超時時間配置,那麼消息會自動的清除。
這與 Redis 中的過時時間概念相似。咱們應該合理使用 TTL 技術,能夠有效的處理過時垃圾消息,從而下降服務器的負載,最大化的發揮服務器的性能。
RabbitMQ allows you to set TTL (time to live) for both messages and queues. This can be done using optional queue arguments or policies (the latter option is recommended). Message TTL can be enforced for a single queue, a group of queues or applied for individual messages.
RabbitMQ容許您爲消息和隊列設置TTL(生存時間)。 這可使用可選的隊列參數或策略來完成(建議使用後一個選項)。 能夠對單個隊列,一組隊列強制執行消息TTL,也能夠爲單個消息應用消息TTL。
——摘自 RabbitMQ 官方文檔
咱們在生產端發送消息的時候能夠在 properties 中指定 expiration
屬性來對消息過時時間進行設置,單位爲毫秒(ms)。
/** * deliverMode 設置爲 2 的時候表明持久化消息 * expiration 意思是設置消息的有效期,超過10秒沒有被消費者接收後會被自動刪除 * headers 自定義的一些屬性 * */ //5. 發送 Map<String, Object> headers = new HashMap<String, Object>(); headers.put("myhead1", "111"); headers.put("myhead2", "222"); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .deliveryMode(2) .contentEncoding("UTF-8") .expiration("100000") .headers(headers) .build(); String msg = "test message"; channel.basicPublish("", queueName, properties, msg.getBytes());
咱們也能夠後臺管理頁面中進入 Exchange 發送消息指定expiration
咱們也能夠在後臺管理界面中新增一個 queue,建立時能夠設置 ttl,對於隊列中超過該時間的消息將會被移除。
死信隊列:沒有被及時消費的消息存放的隊列
消息沒有被及時消費的緣由:
a.消息被拒絕(basic.reject/ basic.nack)而且再也不從新投遞 requeue=false
b.TTL(time-to-live) 消息超時未消費
c.達到最大隊列長度
首先須要設置死信隊列的 exchange 和 queue,而後進行綁定:
Exchange: dlx.exchange Queue: dlx.queue RoutingKey: # 表明接收全部路由 key
arguments.put("x-dead-letter-exchange",' dlx.exchange' )
這樣消息在過時、requeue失敗、 隊列在達到最大長度時,消息就能夠直接路由到死信隊列!
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class DlxProducer { public static void main(String[] args) throws Exception { //設置鏈接以及建立 channel 湖綠 String exchangeName = "test_dlx_exchange"; String routingKey = "item.update"; String msg = "this is dlx msg"; //咱們設置消息過時時間,10秒後再消費 讓消息進入死信隊列 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .deliveryMode(2) .expiration("10000") .build(); channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes()); System.out.println("Send message : " + msg); channel.close(); connection.close(); } }
import com.rabbitmq.client.*; import java.io.IOException; import java.util.HashMap; import java.util.Map; public class DlxConsumer { public static void main(String[] args) throws Exception { //建立鏈接、建立channel忽略 內容能夠在上面代碼中獲取 String exchangeName = "test_dlx_exchange"; String queueName = "test_dlx_queue"; String routingKey = "item.#"; //必須設置參數到 arguments 中 Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-dead-letter-exchange", "dlx.exchange"); channel.exchangeDeclare(exchangeName, "topic", true, false, null); //將 arguments 放入隊列的聲明中 channel.queueDeclare(queueName, true, false, false, arguments); //通常不用代碼綁定,在管理界面手動綁定 channel.queueBind(queueName, exchangeName, routingKey); //聲明死信隊列 channel.exchangeDeclare("dlx.exchange", "topic", true, false, null); channel.queueDeclare("dlx.queue", true, false, false, null); //路由鍵爲 # 表明能夠路由到全部消息 channel.queueBind("dlx.queue", "dlx.exchange", "#"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; //6. 設置 Channel 消費者綁定隊列 channel.basicConsume(queueName, true, consumer); } }
DLX也是一個正常的 Exchange,和通常的 Exchange 沒有區別,它能在任何的隊列上被指定,實際上就是設置某個隊列的屬性。當這個隊列中有死信時,RabbitMQ 就會自動的將這個消息從新發布到設置的 Exchange 上去,進而被路由到另外一個隊列。能夠監聽這個隊列中消息作相應的處理。