RabbitMQ 的自定義消費者使用

以前的文章裏面,我都是在消費端的代碼裏面編寫 while 循環,進行 consumer.nextDelivery 方法進行獲取下一條消息,而後進行消費處理,這種方式太 low 了,耦合性過高,因此要使用自定義的 consumer 來解耦,這種方式更方便一些,也是在實際工做中最經常使用的使用方式git

下面來看看具體的代碼實現, 代碼地址:github

https://github.com/hmilyos/rabbitmqdemo.git  rabbitmq-api 項目下
複製代碼

如圖所示,先來實現咱們的自定義消費者api

public class MyConsumer extends DefaultConsumer {

	private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
	
	public MyConsumer(Channel channel) {
		super(channel);
	}

	@Override
    public void handleDelivery(String consumerTag,  //消費者標籤
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        log.info("------MyConsumer-----consume message----------");
        log.info("consumerTag: " + consumerTag);
        log.info("envelope: " + envelope);
        log.info("properties: " + properties);
        log.info("body: " + new String(body));
    }
}
複製代碼

接着,重點來了,在聲明消費者的代碼裏面使用剛纔的自定義消費者bash

/**
 * 使用自定義消費者
 */
public class Consumer {

	private static final Logger log = LoggerFactory.getLogger(Consumer.class);
	
	public static final String EXCHANGE_NAME = "test_consumer_exchange";
	public static final String EXCHANGE_TYPE = "topic";
	public static final String ROUTING_KEY_TYPE = "consumer.#";
	public static final String ROUTING_KEY = "consumer.save";
	public static final String QUEUE_NAME = "test_consumer_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException {
		//1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
        connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
        connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
        //2 獲取C	onnection
        Connection connection = connectionFactory.newConnection();
        //3 經過Connection建立一個新的Channel
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, null);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_TYPE);
        
      //使用自定義消費者
        channel.basicConsume(QUEUE_NAME, true, new MyConsumer(channel));
        log.info("消費端啓動成功");
	}
}

複製代碼

生產端代碼基本不須要修改ide

public class Procuder {

	private static final Logger log = LoggerFactory.getLogger(Procuder.class);

	public static void main(String[] args) throws IOException, TimeoutException {
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
		connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
		connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);

		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();

		String msg = "Hello RabbitMQ Consumer Message";
        for(int i = 0; i < 5; i ++){
            log.info("生產端發送:{}", msg + i);
            channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTING_KEY, true, null, (msg + i).getBytes());
        }
	}
}
複製代碼

先啓動消費端,再啓動生產端,查看運行結果:注意看消費端的日誌,打印出了咱們自定義消費者裏面的東西了。ui

至此,簡單的使用自定義消費者demo就完成了。
相關文章
相關標籤/搜索