RabbitMQ基礎

一、RabbitMQ

  1. 消息隊列解決了什麼問題java

    • 異步處理
    • 應用解耦
    • 流量削鋒
    • 日誌處理

運行rabbitmq鏡像

# docker run --name rabbitmq -tid -p 5672:5672 -p 15672:15672 -p 25672:25672 rabbitmq

修改rabbitmq設置

# docker exec -it 容器號 /bin/bash
新增用戶
# rabbitmqctl add_user [user_name] [pwd]
查看用戶
# rabbitmqctl list_users
Setting permissions for user "[user_name]" in vhost "/" ...
# rabbitmqctl set_permissions -p "/" [user_name] ".*" ".*" ".*"

# rabbitmqctl list_permissions -p /
將[user_name]用戶設置爲administrator角色
# rabbitmqctl set_user_tags asdf administrator
刪除guest用戶
# rabbitmqctl delete_user guest

開啓web界面

# rabbitmq-plugins enable rabbitmq_management

web訪問

# http://IP:15672
  1. Java操做RabbitMQweb

    1. simple 簡單隊列
    2. work queues 工做隊列 公平分發 輪詢分發
    3. publish/subscribe 發佈訂閱
    4. routing 路由選擇 通配符模式
    5. Topics 主題
    6. 手動和自動確認消息
    7. 隊列的持久化和非持久化
    8. rabbitMQ的延遲隊列

    依賴

    <dependencies>
        <!-- 引入隊列依賴 -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>4.0.2</version>
        </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.10</version>
    </dependency>
    
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.5</version>
    </dependency>
    
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
    
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.11</version>
    </dependency>
        </dependencies>

二、簡單隊列

Y9U1G6.png

2.二、定義鏈接MQ的工具

public class connectionUtil {

    public static Connection getConnection() throws IOException, TimeoutException {
        //定義連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置服務地址[運行rabbitMQ的地址]
        factory.setHost("192.168.168.130");
        //AMQ的端口號
        factory.setPort(5672);
        //vHost
        factory.setVirtualHost("lgz");
        factory.setUsername("lgz");
        factory.setPassword("pwd123456");
        return   factory.newConnection();
    }
}

2.三、生產者發送消息

//測試發送信息
public class ProducerSend {
    private static final String QUEUE_NAME="test_simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("cs");
        //獲取連接
        Connection connection = connectionUtil.getConnection();
        //從連接中獲取通道
        Channel channel = connection.createChannel();
        //隊列聲明
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        String msg="hello simple";
         channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        System.out.println("test success");
         channel.close();
         connection.close();
    }

}

2.四、消費者獲取信息

public class comsumerGain {

    private static final String QUE_NAME="test_simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //獲取連接
        Connection connection = connectionUtil.getConnection();
        //建立通道
        Channel channel = connection.createChannel();
        //定義消費者
        DefaultConsumer  consumer = new DefaultConsumer(channel) {
            //獲取到達的消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg = new String(body, "utf-8");
                System.out.println(msg);
            }
        };
        //監聽隊列
        channel.basicConsume(QUE_NAME,true,consumer);

    }
}

2.五、簡單隊列的缺點

  1. 耦合性高:生產者一 一對應消費者

三、工做隊列

3.一、工做隊列模型

Y9U6sg.png

3.二、生產者演示

public class WorkQueue {
    public static  final  String QUEUE_NAME="work_queue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //建立鏈接
        Connection connection = connectionUtil.getConnection();
        //獲取channel連接
        Channel channel = connection.createChannel();
        //
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        for (int i = 0; i <50 ; i++) {
            String msg="No"+i;
            System.out.println(msg);
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            Thread.sleep(i*20);
        }
        channel.close();
        connection.close();


    }
}

3.三、消費者1

public class WorkRecv {
    private final static String QUEUE_NAME="work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //獲取連接
        Connection connection = connectionUtil.getConnection();
        //獲取channel
        Channel channel = connection.createChannel();
        //聲明隊列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //定義一個消費者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            //消息觸發

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg=new String(body,"utf-8");

                System.out.println("receive"+msg);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("結束");
                }
            }
        };

        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);

    }

3.四、消費者2

public class WorkRecv {
    private final static String QUEUE_NAME="work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //獲取連接
        Connection connection = connectionUtil.getConnection();
        //獲取channel
        Channel channel = connection.createChannel();
        //聲明隊列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //定義一個消費者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            //消息觸發

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg=new String(body,"utf-8");

                System.out.println("receive"+msg);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("結束");
                }
            }
        };

        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);

    }
}

3.五、總結

  1. 採用了輪詢算法輪詢機制

四、公平分發

4.一、生產者

public class WorkQueue {
    public static  final  String QUEUE_NAME="work_queue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //建立鏈接
        Connection connection = connectionUtil.getConnection();
        //獲取channel連接
        Channel channel = connection.createChannel();
        //
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        /*
        * 每一個消費者發送確認消息以前,消息隊列不發送下一個消息到消費者。【一次只處理一個消息】
        * */
        int prefetchCount=1;
        channel.basicQos(prefetchCount);


        for (int i = 0; i <50 ; i++) {
            String msg="No"+i;
            System.out.println(msg);
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            Thread.sleep(i*20);
        }
        channel.close();
        connection.close();


    }
}

4.二、消費者1

public class WorkRecv {
    private final static String QUEUE_NAME="work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //獲取連接
        Connection connection = connectionUtil.getConnection();
        //獲取channel
        final Channel channel = connection.createChannel();
        //聲明隊列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.basicQos(1); //一次只分發一個

        //定義一個消費者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            //消息觸發





            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg=new String(body,"utf-8");

                System.out.println("receive"+msg);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("結束");

                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };

        channel.basicConsume(QUEUE_NAME,false,defaultConsumer);

    }
}

4.二、消費者2

public class WorkReceive {
    private final static String QUEUE_NAME="work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //獲取連接
        Connection connection = connectionUtil.getConnection();
        //獲取channel
        final Channel channel = connection.createChannel();
        //聲明隊列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //保證每次只分發一個
        channel.basicQos(1);

        //定義一個消費者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            //消息觸發

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg=new String(body,"utf-8");

                System.out.println("receive"+msg);

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("【start】:");

                    //手動回執一個消息
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };

        channel.basicConsume(QUEUE_NAME,false,defaultConsumer);

    }
}

4.三、疑難問題

  1. basicQos(1);用於限制rabbitMQ一次只分發一個消息
  2. 使用公平分發必須關閉自動應答ack,改爲手動的

五、消息應答與消息持久化

5.一、消息應答

  1. boolean autoack=false (手動確認模式),若是有消費者宕機,則會將信息交付給其餘的消費者。【rabbitmq支持消息應答,消費者發送一個消息確認,則rabbitmq會進行刪除內存數據】
  2. Boolean autoAck=true (自動確認模式),一旦rabbitmq將消息分發給消費者就會從內存中刪除

若是這種狀況下,殺死正在執行的消費者,就會形成正在處理的信息丟失。算法

  1. 默認狀況下是autoAck是false
  2. 若是rabbitMQ宕機,則服務器數據仍然丟失

5.二、消息持久化

//聲明隊列
channel.queueDeclare(QUEUE_NAME,[durable]false,false,false,null);
//durable:持久化,
  1. 將程序中的durable的false改稱爲true,也是不能夠的。由於定義的QUEUE_NAME表明這個queue是未持久化的,rabbitmq不許從新定義一個已存在的隊列spring

    • 在控制檯【訪問mq的界面】進行刪除這個隊列

六、訂閱模式【fanout】

Y9UIzT.png

  1. 一個生產者,多個消費者
  2. 每一個消費者都有本身的隊列
  3. 生產者將消息發送到交換機【轉發器】,而不是直接發送到隊列中
  4. 每一個隊列都綁定在交換機
  5. 生產者發送的消息,通過交換機到達隊列【實現一個消息被多個消費者消費】
  • 交換機中沒法數據【只有隊列具備存儲能力】,若是想要進行存儲

6.一、生產者

public class Send {
    public static final String EXCHANGE_NAME="test_exchange_fanout";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        //聲明交換機
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//分發

        String msg="hello_exchange";
        channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
        System.out.println("send:"+msg);

        channel.close();
        connection.close();
    }
}

6.二、消費者

public class Receive1 {
    public static final String QUEUE_NAME="test_queue_exchange";
    public static final String EXCHANGE_NAME="test_exchange_fanout";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //綁定隊列到交換機
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

        channel.basicQos(1);//保證每次只能分發一個

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg=new String(body,"utf-8");
                System.out.println("Receive1"+msg);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[ok]");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };

        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer);
    }
}

6.三、消費者2

public class Receive2{
    public static final String QUEUE_NAME="email_queue_exchange";
    public static final String EXCHANGE_NAME="test_exchange_fanout";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //綁定隊列到交換機
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

        channel.basicQos(1);//保證每次只能分發一個

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg=new String(body,"utf-8");
                System.out.println("Receive2"+msg);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[ok2]");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };

        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer);
    }
}

七、Routing【direct】

一方面接收生產者的消息,另外一方面向隊列發送消息sql

Fanout(不處理路由鍵)docker

Y9U7yF.png

Direct(處理路由鍵)api

7.一、路由模式

Y9ULw9.png

7.二、生產者

public class Send {
    public static final String EXCHANGE_NAME="test_exchange_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        //聲明交換機
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");

        String msg=new String("Hello direct");

        String routingKey="warning";
        channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
        System.out.println("send"+msg);

        channel.close();
        connection.close();
    }
}

7.三、消費者1

public class Receive1 {
    public static final String EXCHANGE_NAME="test_exchange_direct";
    public static final String QUEUE_NAME="queue_direct_1";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        final Channel channel = connection.createChannel();

        //
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.basicQos(1);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");

        //定義一個消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg=new String(body,"utf-8");


                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[done]"+msg);
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        boolean autoAck=false;//自動應答false
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

7.三、消費者2

public class Receive2 {
    public static final String EXCHANGE_NAME="test_exchange_direct";
    public static final String QUEUE_NAME="queue_direct_1";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        final Channel channel = connection.createChannel();

        //
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.basicQos(1);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");

        //定義一個消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg=new String(body,"utf-8");


                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[done]"+msg);
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        boolean autoAck=false;//自動應答false
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

八、Topics Exchange

相似於sql中的模糊查詢,springboot

將路由鍵routing key和某個模式盤匹配bash

  1. 匹配一個或者多個

  2. *匹配一個

Y9UOoR.png

8.一、生產者

public class Send {

    private static final String EXCHANGE_NAME="test_exchange_topic";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        String msg="商品";
        channel.basicPublish(EXCHANGE_NAME,"goods.delete",null,msg.getBytes());
        System.out.println("---send"+msg);


        channel.close();
        connection.close();
    }

}

8.二、消費者1

public class Receive1 {
    public static final String QUEUE_NAME="test_queue_topic_1";
    private static final String EXCHANGE_NAME="test_exchange_topic";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //綁定隊列到交換機
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");



        channel.basicQos(1);//保證每次只能分發一個

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg=new String(body,"utf-8");
                System.out.println("Receive1"+msg);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[ok]");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };

        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer);
    }
}

8.二、消費者2

public class Receive2 {
    public static final String QUEUE_NAME="test_queue_topic_1=2";
    private static final String EXCHANGE_NAME="test_exchange_topic";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //綁定隊列到交換機
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.#");
        channel.basicQos(1);//保證每次只能分發一個

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg=new String(body,"utf-8");
                System.out.println("Receive1"+msg);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[ok]");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };

        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer);
    }
}

九、消息確認機制

在MQ中能夠經過持久化數據解決rabbitmq服務器異常的數據丟失問題服務器

  1. 生產者將消息發送出去之後,如何知道消息到底有沒有到達rabbitmq服務器?

    • 方式一:AMQP實現事務機制
    • 方式二:Confirm模式

9.一、事務機制

txSelect :用戶將當前channel設置成transaction模式

txCommit:用於提交事務

txRollback:回滾事務

生產者

public class TxSend {
    private static final String QUEUE_NAME="tx_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();


        Channel channel = connection.createChannel();

        String msg="hello autoCommit";
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);



        try {
            channel.txSelect();
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            channel.txCommit();
            int i=1/0;
            System.out.println(msg);
        } catch (IOException e) {
            channel.txRollback();
            System.out.println("error");
            e.printStackTrace();
        }

        channel.close();
        connection.close();
    }
}

消費者

public class TxReceive1 {
    private static final String QUEUE_NAME="tx_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                System.out.println("receive"+new String(body,"utf-8"));
            }
        });

    }
}

9.二、生產者Confirm模式

9.2.一、發送單條

public class Send1 {
    private static final String QUEUE_NAME="tx_queue";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //設置爲confirm模式
        channel.confirmSelect();

        String msg="confirm_text";

        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());

        if (!channel.waitForConfirms()){
            System.out.println("send message failed");
        }else {
            System.out.println("success");
        }
        channel.close();
        connection.close();
    }
}

9.2.二、批量發送

public class SendMore {
    private static final String QUEUE_NAME="confirm_queue";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //設置爲confirm模式
        channel.confirmSelect();

        String msg="confirm_text";

        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        //主要進行遍歷發送,串行的【發送完以後在進行確認】
        for (int i = 0; i <10 ; i++) {
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        }
        if (!channel.waitForConfirms()){
            System.out.println("send message failed");
        }else {
            System.out.println("success");
        }
        channel.close();
        connection.close();
    }
}

9.三、異步模式

十、參數詳解

10.一、queueDeclare

//綁定channel與消息隊列
//參數一:隊列名稱【若是不存在該隊列,則自動建立】
//參數二:durable【是否要進行持久化】
//參數三:exclusive【是否獨佔隊列】
//參數四:是否在消費完成以後是否要當即刪除隊列【true:自動刪除】【false:不自動刪除】
//參數五:額外參數
channel.QueueDeclare(name, durable, autoDelete, exclusive, noWait, args);

channel.queueDeclare("helloWorld",false,false,false,null);

10.二、basicPublish

//發佈消息
/*
* 參數1:【exchange】交換機名稱
* 參數2:【routingKey】隊列名稱
* 參數3:【props】傳遞消息的額外設置
* 參數4:傳遞消息的具體內容【byte類型】
*
* */
channel.basicPublish("","helloWorld",null,"hello rabbitMQ".getBytes());

10.三、basicConsume

//消費信息
/*
 * 參數一:隊列名稱
 * 參數二:開啓消息的自動確認機制
 * 參數三:消費時的回調接口
 * */
channel.basicConsume("helloWorld",true,consumer);

十一、整合Boot

11.一、依賴導入

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
     <version>2.2.6.RELEASE</version>
</dependency>

11.二、yml

spring:
  application:
    name: rabbit-springboot
  rabbitmq:
    host: 192.168.168.130
    port: 5672
    virtual-host: /
    username: lgz
    password: pwd123456

11.3boottest

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
void contextLoads() {

    rabbitTemplate.convertAndSend("hello","helloWorld");
    System.out.println(1);
}

11.四、

@SpringBootTest
class BootRabbitmqApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {

        rabbitTemplate.convertAndSend("hello","helloWorld");
        System.out.println(1);
    }



    @Test
    public void testWork(){
        for (int i = 0; i <5 ; i++) {
            rabbitTemplate.convertAndSend("work","work模型");

        }
    }

    @Test
    public void testFanout(){
        rabbitTemplate.convertAndSend("logs","","Fanout model");
    }


    @Test
    public void testRouting(){
        rabbitTemplate.convertAndSend("directs","info","info_key_routing_information");
    }


    @Test
    public  void  testTopic(){
        rabbitTemplate.convertAndSend("topics","user.save","user.save exchange");
    }
}

11.五、測試topic[消費者]

@RabbitListener(bindings = {
        @QueueBinding(
                value = @Queue,
                exchange = @Exchange(type = "topic",name = "topics"),
                key = {"product.save","product.*"}
        )
})
public  void  receive(String msg){

    System.out.println("consumer1"+msg);
}

@RabbitListener(bindings = {
        @QueueBinding(
                value = @Queue,
                exchange = @Exchange(type = "topic",name = "topics"),
                key = {"user.save","user.*"}
        )
})
public  void  receive2(String msg){

    System.out.println("consumer2"+msg);
}

11.五、測試routing

@RabbitListener(bindings = {
        @QueueBinding(
                value = @Queue,
                exchange = @Exchange(value = "directs",type = "direct"),
                key = {"info","error"}
        )
})
public void receive1(String msg){
    System.out.println(msg);
}

11.六、普通的work

@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive(String message){
    System.out.println("message:["+message+"]");
}

11.七、Fanout模式

@RabbitListener(bindings = {
        @QueueBinding(
                value = @Queue, //建立臨時隊列
                exchange =@Exchange(value = "logs", type = "fanout")    //綁定的交換機
        )
})
public void receive1(String msg){

    System.out.println("["+msg+"]");
}

11.八、Simple

@Component
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class Hello {
    @RabbitHandler
    public void receive1(String message){
        System.out.println("message"+message);
    }
}
相關文章
相關標籤/搜索