以前的文章裏面,我都是在消費端的代碼裏面編寫 while 循環,進行 consumer.nextDelivery 方法進行獲取下一條消息,而後進行消費處理,這種方式太 low 了,耦合性過高,因此要使用自定義的 consumer 來解耦,這種方式更方便一些,也是在實際工做中最經常使用的使用方式git
下面來看看具體的代碼實現, 代碼地址:github
https://github.com/hmilyos/rabbitmqdemo.git rabbitmq-api 項目下
複製代碼
如圖所示,先來實現咱們的自定義消費者api
public class MyConsumer extends DefaultConsumer {
private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, //消費者標籤
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("------MyConsumer-----consume message----------");
log.info("consumerTag: " + consumerTag);
log.info("envelope: " + envelope);
log.info("properties: " + properties);
log.info("body: " + new String(body));
}
}
複製代碼
接着,重點來了,在聲明消費者的代碼裏面使用剛纔的自定義消費者bash
/**
* 使用自定義消費者
*/
public class Consumer {
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
public static final String EXCHANGE_NAME = "test_consumer_exchange";
public static final String EXCHANGE_TYPE = "topic";
public static final String ROUTING_KEY_TYPE = "consumer.#";
public static final String ROUTING_KEY = "consumer.save";
public static final String QUEUE_NAME = "test_consumer_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
//2 獲取C onnection
Connection connection = connectionFactory.newConnection();
//3 經過Connection建立一個新的Channel
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, null);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_TYPE);
//使用自定義消費者
channel.basicConsume(QUEUE_NAME, true, new MyConsumer(channel));
log.info("消費端啓動成功");
}
}
複製代碼
生產端代碼基本不須要修改ide
public class Procuder {
private static final Logger log = LoggerFactory.getLogger(Procuder.class);
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String msg = "Hello RabbitMQ Consumer Message";
for(int i = 0; i < 5; i ++){
log.info("生產端發送:{}", msg + i);
channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTING_KEY, true, null, (msg + i).getBytes());
}
}
}
複製代碼
先啓動消費端,再啓動生產端,查看運行結果:注意看消費端的日誌,打印出了咱們自定義消費者裏面的東西了。ui