RabbitMQ(二):Java 操做隊列

1. 簡單模式

模型:java

  • P:消息的生產者
  • 隊列:rabbitmq
  • C:消息的消費者

獲取 MQ 鏈接ide

public static Connection getConnection() throws IOException, TimeoutException {
        // 定義一個鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 設置服務地址
        factory.setHost("127.0.0.1");
        // AMQP 5672
        factory.setPort(5672);
        // vhost
        factory.setVirtualHost("/vhost_ljf");
        // 用戶名
        factory.setUsername("ljf");
        // 密碼
        factory.setPassword("123456");
        return factory.newConnection();
    }

生產者生產消息fetch

public class Send {
    private static final String QUEUE_NAME = "test_simple_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 獲取一個鏈接
        Connection connection = ConnectionUtils.getConnection();
        // 從鏈接中獲取一個通道
        Channel channel = connection.createChannel();
        // 建立隊列聲明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        String msg = "hello simple!";
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        System.out.println("--send msg: " + msg);
        channel.close();
        connection.close();
    }
}

消費者接收消息spa

public class Recv {
    private static final String QUEUE_NAME = "test_simple_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 獲取鏈接
        Connection connection = ConnectionUtils.getConnection();
        // 建立通道
        Channel channel = connection.createChannel();
        // 隊列聲明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定義消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取到達的消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("recv: " + msg);
            }
        };
        // 監聽隊列
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

簡單隊列的不足code

耦合性高,生產者一一對應消費者(若是我想有多個消費者消費隊列中的消息,這時候就不行了);blog

隊列名變動,這時候得同時變動。rabbitmq

2. 工做隊列模式(Work Queue)

模型隊列

爲何會出現工做隊列?ip

simple 隊列是一一對應的,並且咱們實際開發,生產者發送消息是絕不費力的,而消費者通常是要跟業務相結合的,消費者接收到消息以後就須要處理,可能須要花費時間,這時候隊列就會積壓了不少消息。內存

生產者

/**
 *                 |----C1
 *   P----Queue----|
 *                 |----C2
 */
public class Send {
    private static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException ,InterruptedException{
        // 獲取鏈接
        Connection connection = ConnectionUtils.getConnection();

        // 獲取 channel
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < 50; i++) {
            String msg = "hello" + i;
            System.out.println("[WQ] send: " + msg);
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            Thread.sleep(i*20);
        }
        channel.close();
        connection.close();
    }
}

消費者

  • 消費者1
public class Recv1 {
    private static final String QUEUE_NAME = "test_work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 獲取鏈接
        Connection connection = ConnectionUtils.getConnection();
        // 獲取 channel
        Channel channel = connection.createChannel();
        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定義一個消費者
        Consumer consumer = new DefaultConsumer(channel) {
            // 消息到達 觸發方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[1] Recv msg: " + msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[1] done.");
                }
            }
        };
        boolean autoAck = true;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}
  • 消費者2
public class Recv2 {
    private static final String QUEUE_NAME = "test_work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 獲取鏈接
        Connection connection = ConnectionUtils.getConnection();
        // 獲取 channel
        Channel channel = connection.createChannel();
        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定義一個消費者
        Consumer consumer = new DefaultConsumer(channel) {
            // 消息到達 觸發方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[2] Recv msg: " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[2] done.");
                }
            }
        };
        boolean autoAck = true;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}

現象

先運行消費者1和消費者2,再運行生產者

消費者1 和 消費者2 處理的消息數量是同樣多的。

消費者1:偶數

消費者2:奇數

這種方式叫作輪詢分發(round-robin),結果就是無論誰忙誰悠閒,都不會多給一個消息。

3. 公平分發(fair dipatch)

生產者

public class Send {
    private static final String QUEUE_NAME = "test_work_queue";
    public static void main(String[] args) throws IOException, TimeoutException ,InterruptedException{
        // 獲取鏈接
        Connection connection = ConnectionUtils.getConnection();
        // 獲取 channel
        Channel channel = connection.createChannel();
        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /**
         * 每一個消費者:發送確認消息以前,消息隊列不發送下一個消息到消費者,一次只處理一個消息
         */
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);
        for (int i = 0; i < 50; i++) {
            String msg = "hello" + i;
            System.out.println("[WQ] send: " + msg);
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            Thread.sleep(i*5);
        }
        channel.close();
        connection.close();
    }
}

消費者

  • 消費者1
public class Recv1 {
    private static final String QUEUE_NAME = "test_work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 獲取鏈接
        Connection connection = ConnectionUtils.getConnection();
        // 獲取 channel
        final Channel channel = connection.createChannel();
        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);  // 保證一次只發送一個
        // 定義一個消費者
        Consumer consumer = new DefaultConsumer(channel) {
            // 消息到達 觸發方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[1] Recv msg: " + msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[1] done.");
                    // 手動回執
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        boolean autoAck = false; // 自動應答 false
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}
  • 消費者2
public class Recv2 {
    private static final String QUEUE_NAME = "test_work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 獲取鏈接
        Connection connection = ConnectionUtils.getConnection();
        // 獲取 channel
        final Channel channel = connection.createChannel();
        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);  // 保證一次只發送一個
        // 定義一個消費者
        Consumer consumer = new DefaultConsumer(channel) {
            // 消息到達 觸發方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[2] Recv msg: " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[2] done.");
                    // 手動回執
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        boolean autoAck = false; // 自動應答 false
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}

現象

消費者2 處理的消息比 消費者1 多,能者多勞。

4. 消息應答與消息持久化

消息應答

boolean autoAck = false; // 自動應答 false
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
  • boolean autoAck = true;(自動確認模式)

一旦 rabbitmq 將消息分發給消費者,就會從內存中刪除;

這種狀況下,若是殺死正在執行的消費者,就會丟失正在處理的消息。

  • boolean autoAck = false;(手動模式)

若是一個消費者掛掉,就會交付給其餘消費者;

rabbitmq 支持消息應答,消費者發送一個消息應答,告訴 rabbitmq 這個消息我已經處理完成,能夠刪掉,而後 rabbitmq 就刪除內存中的消息。

消息應答默認是打開的,即爲 false;

若是 rabbitmq 掛了,消息任然會丟失。

消息持久化

// 聲明隊列
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

注意:rabbitmq 不容許從新定義(不一樣參數)一個已存在的隊列

相關文章
相關標籤/搜索