RabbitMQ消費端自定義監聽器DefaultConsumer

生產者ide

package com.example.demo.produce;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class MyProcuder {
    public static void main(String[] args) throws Exception {
        //1 建立一個ConnectionFactory,並進行配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("39.106.128.***");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        //2 經過鏈接工廠建立鏈接
        Connection connection = connectionFactory.newConnection();
        //3 經過connection建立一個channel
        Channel channel = connection.createChannel();
        //4 經過channel發送數據
        String exchange = "exchange001";
        String routingKey = "routingkey001";
        String body = "this is a message";
        channel.basicPublish(exchange, routingKey, null, body.getBytes());
        //5 關閉相關鏈接
        channel.close();
        connection.close();
    }
}

消費者this

package com.example.demo.constumer;

import com.rabbitmq.client.*;

public class Consumer {
    public static void main(String[] args) throws Exception{
        //1 建立一個ConnectionFactory,並進行配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("39.106.128.***");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        //2 經過鏈接工廠建立鏈接
        Connection connection = connectionFactory.newConnection();
        //3 經過connection建立一個channel
        Channel channel = connection.createChannel();
        //4 聲明(建立)一個隊列
        String queue = "queue001";
        String exchangeName = "exchange001";
        String routingKey = "routingkey001";
        //是否持久化
        boolean durable = true;
        boolean exclusive = false;
        boolean autoDelete = false;
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
        channel.queueBind(queue, exchangeName, routingKey);
        //5 建立消費者
        // Step06: let the consumer consume the queue
        channel.basicConsume(queue, true, new MyConsumer(channel));
    }
}

自定義消費者spa

package com.example.demo.constumer;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class MyConsumer extends DefaultConsumer {
    public MyConsumer(Channel channel) {
        super(channel);
    }
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
        String message = null;
        try {
            message = new String(body, "UTF-8");
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("消費者接收到的消息:" + message);
}
}
相關文章
相關標籤/搜索