高併發場景下生產者產生大量的消息,對於巨量的消息瞬間推送消費端沒法處理java
rabbitmq提供了服務質量保障功能,即在非自動確認消息的前提下,若是必定數目的消息未被確認,不進行消費新的消息。api
使用 basicqos方法。在消費端進行使用。 0 1 false
prefetSize:0
prefetCount:這個值通常在設置爲非自動ack的狀況下生效,通常大小爲1
global: true是channel級別, false是消費者級別
注意:咱們要使用非自動ack
併發
消費者:ide
package com.flying.rabbitmq.api.limit; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_qos_exchange"; String queueName = "test_qos_queue"; String routingKey = "qos.#"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //1 限流方式 第一件事就是 autoAck設置爲 false channel.basicQos(0, 1, false); channel.basicConsume(queueName, false, new MyConsumer(channel)); } }
自定義消費監聽高併發
package com.flying.rabbitmq.api.limit; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import java.io.IOException; public class MyConsumer extends DefaultConsumer { private Channel channel ; public MyConsumer(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); System.err.println("envelope: " + envelope); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); // TODO 第二個參數設置爲false,由於消費者設置的爲prefetCount1 //2手工簽收 channel.basicAck(envelope.getDeliveryTag(), false); } }
生產者:this
package com.flying.rabbitmq.api.limit; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "test_qos_exchange"; String routingKey = "qos.save"; String msg = "Hello RabbitMQ QOS Message"; for(int i =0; i<5; i ++){ channel.basicPublish(exchange, routingKey, true, null, msg.getBytes()); } } }