RabbitMQ 經常使用模式快速上手

注意配置

鏈接工廠基本配置

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
/**
 * @author yezixun
 * @date 2019/11/7 14:35
 */
public class ConnectionUtil {
    public static Connection getConnection() throws Exception {
        //定義鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置服務地址
        factory.setHost("localhost");
        //端口
        factory.setPort(5672);
        //設置帳號信息,用戶名、密碼、vhost 
        //默認帳號爲
        factory.setVirtualHost("testhost"); 
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 經過工廠獲取鏈接
        return factory.newConnection();
    }
}

Queue

發送

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * @author yezixun
 * @date 2019/11/7 14:38
 */
public class Send {
    //隊列名稱
    private final static String QUEUE_NAME = "q_test_01";

    public static void main(String[] argv) throws Exception {
        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 從鏈接中建立通道
        Channel channel = connection.createChannel();

        // 聲明(建立)隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 消息內容
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        //關閉通道和鏈接
        channel.close();
        connection.close();
    }
}

接收

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * @author yezixun
 * @date 2019/11/7 14:48
 */
public class Recv {
    //隊列名稱
    private final static String QUEUE_NAME = "q_test_01"; 

    public static void main(String[] argv) throws Exception {

        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 從鏈接中建立通道
        Channel channel = connection.createChannel();
        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);

        // 監聽隊列
        channel.basicConsume(QUEUE_NAME, true, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
        }
    }
}

work

發送

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * @author yezixun
 * @date 2019/11/7 15:26
 */
public class Send {
    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {
        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //發送0-99的數字
        for (int i = 0; i < 100; i++) {
            // 消息內容
            String message = "" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            //線程沉睡 數字越大睡得越久
            Thread.sleep(i * 10);
        }

        channel.close();
        connection.close();
    }
}

接收

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * @author yezixun
 * @date 2019/11/7 15:25
 */
public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(1);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);

        // 監聽隊列
        // true 自動確認:不管是否成功 都認爲成功消費
        // false 手動確認:消費者成功消費後反饋,不反饋消息將一直處於不可用狀態
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            // 休眠1秒  模擬數據量 接收間隔比Recv1長 模擬性能(壓力)弱的服務器
            Thread.sleep(1000);

            //下面這行註釋掉表示使用自動確認模式 開啓表示手動
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * @author yezixun
 * @date 2019/11/7 15:23
 */
public class Recv {
    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(1);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);

        // 監聽隊列
        // true 自動確認:不管是否成功 都認爲成功消費
        // false 手動確認:消費者成功消費後反饋,不反饋消息將一直處於不可用狀態
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [y] Received '" + message + "'");
            //休眠 模擬數據量  接收間隔比Recv2短 模擬性能(壓力)強的服務器
            Thread.sleep(10);

            // 返回確認狀態,註釋掉表示使用自動確認模式 開啓表示手動
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

消息的確認模式

消費者從隊列中獲取消息,服務端如何知道消息已經被消費呢?java

模式1:自動確認 只要消息從隊列中獲取,不管消費者獲取到消息後是否成功消息,都認爲是消息已經成功消費。服務器

模式2:手動確認 消費者從隊列中獲取消息後,服務器會將該消息標記爲不可用狀態,等待消費者的反饋,若是消費者一直沒有反饋,那麼該消息將一直處於不可用狀態。性能

發佈訂閱模式

發送

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * 發佈訂閱模式
 * @author yezixun
 * @date 2019/11/7 15:54
 */
public class Send {
    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {
        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 消息內容
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

接收

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * @author yezixun
 * @date 2019/11/7 15:55
 */
public class Recv {
    private final static String QUEUE_NAME = "test_queue_work1";

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {

        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(1);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽隊列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * @author yezixun
 * @date 2019/11/7 15:55
 */
public class Recv2 {
    private final static String QUEUE_NAME = "test_queue_work2";

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {

        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(1);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽隊列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv2] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

路由模式

發送

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * 路由模式
 * @author yezixun
 * @date 2019/11/7 16:26
 */
public class Send {
    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {
        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.exchangeDeclare(EXCHANGE_NAME,  "direct");


            // 消息內容
            String message = "添加";
            channel.basicPublish(EXCHANGE_NAME, "select",null, message.getBytes());
            //String message = "刪除";
            //channel.basicPublish(EXCHANGE_NAME, "delete",null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            
            
        channel.close();
        connection.close();
    }
}

接收

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * @author yezixun
 * @date 2019/11/7 15:23
 */
public class Recv {
    private final static String QUEUE_NAME = "test_queue_direct_1";

    private final static String EXCHANGE_NAME = "test_exchange_direct";
    public static void main(String[] argv) throws Exception {

        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //綁定隊列到交換機 刪除和修改
      channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update");
      channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"delete");

        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(1);

        //定義隊列消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);

        // 監聽隊列
        // true 自動確認:不管是否成功 都認爲成功消費
        // false 手動確認:消費者成功消費後反饋,不反饋消息將一直處於不可用狀態
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [y] Received '" + message + "'");

            // 返回確認狀態,註釋掉表示使用自動確認模式 開啓表示手動
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * @author yezixun
 * @date 2019/11/7 15:23
 */
public class Recv2 {
    private final static String QUEUE_NAME = "test_queue_direct_2";
    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {

        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //綁定隊列到交換機 查詢和增長
      channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"select");
      channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"insert");

        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(1);

        //定義隊列消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);

        // 監聽隊列
        // true 自動確認:不管是否成功 都認爲成功消費
        // false 手動確認:消費者成功消費後反饋,不反饋消息將一直處於不可用狀態
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [y] Received '" + message + "'");

            // 返回確認狀態,註釋掉表示使用自動確認模式 開啓表示手動
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

主題模式

發送

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * @author yezixun
 * @date 2019/11/7 16:45
 */
public class Send {
    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {
        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        // 消息內容
        String message = "Hello World!!";
        channel.basicPublish(EXCHANGE_NAME, "routekey.", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

接收

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * @author yezixun
 * @date 2019/11/7 16:46
 */
public class Recv {

    private final static String QUEUE_NAME = "test_queue_topic_work_1";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routekey.*");

        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(1);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽隊列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv_x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * @author yezixun
 * @date 2019/11/7 16:46
 */
public class Recv2 {
    private final static String QUEUE_NAME = "test_queue_topic_work_2";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 獲取到鏈接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");

        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(1);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽隊列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv2_x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

————————————————.net

版權聲明:本文中大部分爲CSDN博主「niaobirdfly」的原創 詳細信息參考原做者線程

原文連接:https://blog.csdn.net/hellozpc/article/details/814369803d

相關文章
相關標籤/搜索