[官網翻譯]RabbitMQ基本消息隊列使用

RabbitMQ 更像是一個郵件服務器,用戶發送郵件(消息),到郵箱服務器(exchange),其餘用戶可以保證收到消息發送者的郵件(消息).java

AMQP 服務器相似與郵件服務器, 每一個交換器(exchange)都扮演了消息傳送代理,每一個消息隊列(queue)都做爲郵箱,而綁定(binding)則定義了每一個傳送代理中的路由表.發佈者(producer)發送消息給獨立的傳送代理,而後傳送代理(exchange)再路由(binding)消息到郵箱(queue)中.消費者(customer)從郵箱(queue)中收取消息.git

RabbitMQ 有三個主要概念,生產者,隊列,消費者github

生產者 單純的發送消息正則表達式

隊列 依賴主機的內存和磁盤,(這個能夠經過配置文件修改參數)能夠理解爲一個緩存.緩存

消費者 一直等待接收消息服務器

rabbitmq默認配置virtual host 爲 "/", exchange默認AMQP default,沒有默認queue. 若是exchange不指定,則exchange爲默認exchange.使用virtualHost和exchange能夠方便實現"分區"的概念. 好比我想有交易和會員兩個系統,我能夠建立兩個virtualhost,分別表示兩個系統,這兩個系統是相互隔離的. 跟activemq不一樣的是,rabbitmq更加靈活一點.網絡

官方文檔 1 hello world

建立rabbitmq的流程跟通常的模板同樣,經過ConnectionFactory--Connection--Channel--獲取客戶端通道. 拿到channel以後,就能夠進行發佈或消費.對應的jms中的點對點隊列.負載均衡

在編碼的時候,咱們能夠制定一端建立信息的規則,好比消費端進行聲明exchange或者queuedom

下面是消費端代碼,這個代碼中沒有exchange,默認爲類型爲direct的exchange是AMQP default,這裏綁定了一個隊列.異步

//1 工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
//2 鏈接
Connection connection = factory.newConnection();
//3 渠道
Channel channel = connection.createChannel();
//4 建立隊列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//5 設置消費
channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
    [@Override](https://my.oschina.net/u/1162528)
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body,"UTF-8");
        System.out.println(name + " 接收到消息 msg = " + msg);
    }
});
System.out.println("客戶端啓動.");
latch.countDown();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}

下面是生產者服務端代碼,注意,這裏的basicPublish()的第二個參數,routingkey是消費端的queue的名稱. 這一點,其實讓人有點迷惑的.

//1 工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
//2 鏈接
try (Connection connection = factory.newConnection()
) {
    //3 渠道
    Channel channel = connection.createChannel();
    //4 發佈消息
    String msg = "hello rabbitmq";
    channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
    System.out.println(name + " 發送消息 msg = " + msg);
    channel.close();
} catch (TimeoutException e) {
    e.printStackTrace();
} catch (IOException e) {
    e.printStackTrace();
}

若是按照生產者/消費者--exchange--routingkey綁定--queue 的標準模式, 在下面的例子中exchange爲hello.world,routingkey爲key-hello,隊列爲QUEUE_NAME

消費端代碼:

//1 工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
//2 鏈接
try (Connection connection = factory.newConnection()
) {
    //3 渠道
    Channel channel = connection.createChannel();
    //4 發佈消息
    String msg = "hello rabbitmq";
    channel.basicPublish("hello.world","key-hello",null,msg.getBytes());
    System.out.println(name + " 發送消息 msg = " + msg);
    channel.close();
} catch (TimeoutException e) {
    e.printStackTrace();
} catch (IOException e) {
    e.printStackTrace();
}

生產端代碼:

try {
    //1 工廠
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    //2 鏈接
    Connection connection = factory.newConnection();
    //3 渠道
    Channel channel = connection.createChannel();
    channel.exchangeDeclare("hello.world","direct");
    //4 建立隊列
    channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    channel.queueBind(QUEUE_NAME,"hello.world","key-hello");
    //5 設置消費
    channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
        [@Override](https://my.oschina.net/u/1162528)
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String msg = new String(body,"UTF-8");
            System.out.println(name + " 接收到消息 msg = " + msg);
        }
    });
    System.out.println("客戶端啓動.");
    latch.countDown();

官方文檔 2 扇出模式(訂閱模式)

用過jms的都知道topic訂閱隊列,在rabbitmq的對應的是exchange的"fanout". 實現原理,直接使用exchange,隊列自動建立,不在經過routingkey綁定exchange和queue,從而實如今exchange下的queue均可以接收到消息.

下面是消費端代碼,建立一個exchange,類型爲fanout,而後獲取一個系統的queue,而後將queue和exchange綁定在一塊兒.

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_LOG, ExchangeTypes.FANOUT);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,EXCHANGE_LOG,"");
Consumer consumer = new DefaultConsumer(channel){
    [@Override](https://my.oschina.net/u/1162528)
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String log = new String(body);
        System.out.println(name+"<<<<  " + log);
    }
};
channel.basicConsume(queueName,true,consumer);
System.out.println(name +" 客戶端等待中....." );
latch.countDown();

下面是生產端代碼

Channel channel = connection.createChannel();
//channel.exchangeDeclare(EXCHANGE_LOG, ExchangeTypes.FANOUT);

for (int i = 0; i < MSG_NUM; i++) {

    String logMsg =name +">>>> 日誌.... "+i;

    channel.basicPublish(EXCHANGE_LOG,"",null,logMsg.getBytes());
}


channel.close();

官方文檔 3 主題模式

首先區別正則表達式,只有2種通配符,*表示一個字(一個英文word,不是字母),#表示多個英文字. 這個實現是經過帶有通配符的綁定關係,經過綁定關係,將不一樣的消息分發到不一樣的queue.

消費端代碼,建立一個exchange,類型爲topic,使用通配符綁定關係綁定exchange和自身的隊列.

try {
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare(TOPIC,ExchangeTypes.TOPIC);
    String queueName = channel.queueDeclare().getQueue();
    for (int i = 0; i < routingKeys.length; i++) {
        String routingKey = routingKeys[i];
        channel.queueBind(queueName, TOPIC, routingKey);
    }
    channel.basicConsume(queueName,new DefaultConsumer(channel){
        [@Override](https://my.oschina.net/u/1162528)
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println(name+ " : " +new String(body) );
        }
    });
    System.out.println(name+" 等待中.....");
    latch.countDown();
} catch (IOException e) {
    e.printStackTrace();
} catch (TimeoutException e) {
    e.printStackTrace();
}

生成端代碼

try (Connection connection = factory.newConnection()){
    Channel channel = connection.createChannel();
    channel.exchangeDeclare(TOPIC, ExchangeTypes.TOPIC);
    for (String routingKey:routingKeys) {
        channel.basicPublish(TOPIC,routingKey,null,routingKey.getBytes());
    }
    channel.close();
} catch (IOException e) {
    e.printStackTrace();
} catch (TimeoutException e) {
    e.printStackTrace();
}

官方文檔 4 自定義路由

看了上面的簡單,訂閱,主題,你會更加理解exchange,routingkey,queue之間的. 上面的都是一些比較特殊場景的應用.

消費端代碼,建立一個exchange,默認類型爲direct,而後經過routingkey綁定不一樣的queue,值得注意的是消費端能夠將exchange綁定不一樣的queue.

try {
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare(ROUNTING, ExchangeTypes.DIRECT);
    String queueNm = channel.queueDeclare().getQueue();
    channel.queueBind(queueNm,ROUNTING,routingKey);
    if(routingKey.contains("info")){
        channel.queueBind(queueNm,ROUNTING,"error");
    }
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
        [@Override](https://my.oschina.net/u/1162528)
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String msg = new String(body);
            System.out.println(routingKey+" : " + msg);
        }
    };
    channel.basicConsume(queueNm,true,defaultConsumer);
    System.out.println(routingKey+"客戶端等待中....");
    latch.countDown();
} catch (IOException e) {
    e.printStackTrace();
} catch (TimeoutException e) {
    e.printStackTrace();
}

生產端代碼

try (Connection connection = factory.newConnection()){
    Channel channel = connection.createChannel();
//                channel.exchangeDeclare(ROUNTING, ExchangeTypes.DIRECT);
    if (name.contains("1")){
        channel.basicPublish(ROUNTING,"error",null,(name+">>error").getBytes());
        channel.basicPublish(ROUNTING,"warning",null,(name+">>warning").getBytes());
    }else {
        channel.basicPublish(ROUNTING,"info",null,(name+">>infoinfo").getBytes());
    }
    channel.close();
} catch (IOException e) {
    e.printStackTrace();
} catch (TimeoutException e) {
    e.printStackTrace();
}

官方文檔 5 使用mq來實現rpc

在分佈式環境中,遠程調用rpc有不少實現方式,比較流行的,非跨語言速度極快的java RMI, google的基於protobuf/http2的GRPC ,facebook的IO多路複用/tcp的Thrift,使用WSDL的Web Service等. MQ一樣也能夠作RPC實現,這源於MQ的自然負載均衡,以及rpc的非實時性要求. 使用rabbitmq實現rpc,用到了三點,第一是connection屬性的BasicProperties,須要設置一個 應答隊列replyTo,這個是在publish時帶入的;第二 使用默認exchange,不須要設定exchange; 第三,應答隊列的屬性應當是排他自動刪除的,這個使用默認無數方法生成的隊列就能夠,默認爲排他, 自動刪除,非持久隊列.關於這點,能夠看源碼:

AutorecoveringChannel.java

@Override
public AMQP.Queue.DeclareOk queueDeclare() throws IOException {
    return queueDeclare("", false, true, true, null);
}

下面是rpc服務端代碼

try {
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.basicQos(1);
    channel.queueDeclare(REQUEST_QUEUE,false,false,true,null);
    System.out.println("RPC 服務器等待....");
    channel.basicConsume(REQUEST_QUEUE,false,new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String replyQueue = properties.getReplyTo();
            AMQP.BasicProperties replyProp = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();
            String message = new String(body);
            int n = Integer.parseInt(message);
            String responseBody =String.valueOf(fibonacci(n));
            channel.basicPublish("",replyQueue,replyProp,responseBody.getBytes());
            channel.basicAck(envelope.getDeliveryTag(),false);
            System.out.println("計算 Fibonacci ["+message+"] = "+responseBody);
        }
    });
    latch.countDown();
} catch (IOException e) {
    e.printStackTrace();
} catch (TimeoutException e) {
    e.printStackTrace();
}
private int fibonacci(int value){
    if(value == 0 || value == 1){
        return value;
    }else {
        return fibonacci(value-1)+fibonacci(value-2);
    }
}

下面是rpc客戶端代碼,注意看沒有設置exchange,隊列也是使用默認的queueDeclare()

try {
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    //聲明應答隊列,默認是排他,自動刪除,非持久隊列,也就是說,當客戶端中止了,隊列就好消失
    String queueName = channel.queueDeclare().getQueue();
    String correlationId = UUID.randomUUID().toString();
    AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().correlationId(correlationId).replyTo(queueName).build();
    channel.basicPublish("",REQUEST_QUEUE,properties,message.getBytes());
    BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
    channel.basicConsume(queueName,true,new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            if (correlationId.equalsIgnoreCase(properties.getCorrelationId())){
                response.offer(new String(body));
            }
        }
    });
    System.out.println("接收到消息:"+response.take());
} catch (IOException e) {
    e.printStackTrace();
} catch (TimeoutException e) {
    e.printStackTrace();
} catch (InterruptedException e) {
    e.printStackTrace();
}

非官方 6 被拋棄冷落的direct同胞兄弟headers 相似主題+訂閱模式結合

上面的例子都是使用routingkey來進行綁定關係,在一些狀況下,可能仍是不能知足業務場景, 好比我想要"張三",電話"123456789"的全部消息,轉到一個特殊處理(僅舉例,無心義).

消費端代碼,一樣是建立一個exchange,類型headers,而後構建一個map,經過BasicProperties, 傳遞參數.注意這裏的map的value能夠爲java的一些基本類型(能夠查閱Frame.fieldValueSize()), 可是不能是用戶自定義的類型.rabbitmq對於不存在queue,發送的消息會丟失,因此從消息持久化的角度, 服務端和客戶端都應當declare,可是隻有消費端declare,並不會報錯,若是消息比客戶端啓動更早到達,則會丟失消息.

我作了一個測試,any能夠有多個,正常接收消息,相似訂閱模式fanout,可是注意all只能有一個接收.

try {
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare(HEADERS_QUEUE,ExchangeTypes.HEADERS);
    String queueName = channel.queueDeclare().getQueue();
//                channel.queueDeclare(HEADERS_QUEUE,false,false,true,null);
    Map<String,Object> headers = new HashMap<>();
    if(name.endsWith("all")){
        headers.put("x-match","all");
        headers.put("name","張三");
        headers.put("phone","123456789");
    }else if (name.endsWith("any1")){
        headers.put("x-match","any");
        headers.put("name","張三");
        headers.put("phone","0000");
    }else{
        headers.put("x-match","any");
        headers.put("name","張三1");
        headers.put("phone","123456789");
    }
    channel.queueBind(queueName,HEADERS,"",headers);
    channel.basicConsume(queueName,new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("接收消息>>"+new String(body));
        }
    });
    System.out.println("客戶啓動.");
    latch.countDown();
;            } catch (IOException e) {
    e.printStackTrace();
} catch (TimeoutException e) {
    e.printStackTrace();
}

生產端代碼,服務端很簡單,只須要將過濾條件添加便可.

try {
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
//  channel.exchangeDeclare(HEADERS, ExchangeTypes.HEADERS);
    Map<String,Object> headers = new HashMap<>();
    headers.put("name","張三");
    headers.put("phone","123456789");
    AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(headers).build();
    channel.basicPublish(HEADERS,"",properties,"hello rabbitmq  ".getBytes());
    channel.close();
    connection.close();
} catch (IOException e) {
    e.printStackTrace();
} catch (TimeoutException e) {
    e.printStackTrace();
}

System.out.println("服務端啓動.");

非官方 7 事務

事務幾乎無處不在,而如今談及事務毫不是簡單的事務,而是分佈式事務.遺憾的是這裏的事務跟分佈式事務沒有必然聯繫. 這裏單純的談及rabbitmq的事務.首先說一下,rabbitmq是基於tcp協議的,tcp三次握手四次揮手,這裏就涉及到消息的確認 機制.而rabbitmq的事務也是依賴這個確認機制的.再來講一下確認機制,咱們在使用rabbitmq或者jms默認都是 有確認機制的,只不過是默認實現,咱們能夠經過一些ack的參數或接口設置.通常都是默認批量自動ack, 何時ack呢?rabbitmq中沒有消息過時的概念,只有消息被正常處理了,客戶端發送了ack,纔會刪除. 批量ack,則是在ack到必定數量以後才一塊發送ack,減小帶寬,可是失敗則影響較大.傳統的事務,是先 開啓事務,進行操做,事務提交,事務回滾,速度將減慢到原來的2倍(通過本地測試,差很少這個數). rabbitmq提供了一個高級的Publisher Confirm機制,跟傳統不太同樣,其實是將事務的提交拆分了, 等全部事務提交完畢,在最終確認.速度介於並接近非事務速度(可能測試用例問題,跟傳統tx差很少?!). 當開啓publisher confirm時,該信道上會爲每個消息分配一個id,當消息被髮送到消費端時,rabbitmq就 會發確認到生產端,消息的發送和確認是異步.

無事務消息代碼

static class NoPublisher implements Runnable {

    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see Thread#run()
     */
    @Override
    public void run() {
        try {
            try (Connection connection = factory.newConnection()) {
                Channel channel = connection.createChannel();
                channel.queueDeclare(NO_TRANSACTION, false, false, false, null);
                long start = System.currentTimeMillis();
                try {
                    for (int i = 0; i < MSG_NUM; i++) {
                        String msg = "rabbitmq msg!";
                        channel.basicPublish("", NO_TRANSACTION, null, msg.getBytes());
                    }
                    channel.basicPublish("", NO_TRANSACTION, null, "end".getBytes());
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    channel.close();
                }
                long end = System.currentTimeMillis();
                System.out.println("[發送方]發送方耗時:" + (end - start));
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

static class NoConsumer implements Runnable {

    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see Thread#run()
     */
    @Override
    public void run() {
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(NO_TRANSACTION, false, false, false, null);
            //每次1條
            channel.basicQos(1);
            long start = System.currentTimeMillis();
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body);
                    channel.basicAck(envelope.getDeliveryTag(),false);
                    if (msg.equalsIgnoreCase("end")){
                        long end = System.currentTimeMillis();
                        System.out.println("[接收方]接收完畢"+(end-start));
                        try {
                            channel.close();
                            connection.close();
                        } catch (TimeoutException e) {
                            e.printStackTrace();
                        }
                    }
                }
            };
            //手動ack
            channel.basicConsume(NO_TRANSACTION, false, consumer);
            System.out.println("[接收方]客戶端等待中......");
            latch.countDown();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

輸出:

[接收方]客戶端等待中......
[發送方]發送方耗時:4080
[接收方]接收完畢16904

事務消息代碼

static class TranPublisher implements Runnable {

    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see Thread#run()
     */
    @Override
    public void run() {
        try {
            try (Connection connection = factory.newConnection()) {
                Channel channel = connection.createChannel();
                channel.queueDeclare(TRANSACTION, false, false, false, null);
                long start = System.currentTimeMillis();
                try {
                    for (int i = 0; i < MSG_NUM;) {
                        if (i%BATCH ==0){
                            //開啓事務
                            channel.txSelect();
                            for (int j = 0; j < BATCH; j++) {
                                String msg = "rabbitmq msg!";
                                if(i + j != MSG_NUM -1 ){
                                    channel.basicPublish("", TRANSACTION, null, msg.getBytes());
                                }else{
                                    channel.basicPublish("", TRANSACTION, null, "end".getBytes());
                                }
                            }
                            i += BATCH;
                            //commit
                            channel.txCommit();
                        }
                    }

                } catch (Exception e) {
                    //回滾事務
                    channel.txRollback();
                    e.printStackTrace();
                } finally {
                    channel.close();
                }
                long end = System.currentTimeMillis();
                System.out.println("[tx發送方]發送方耗時:" + (end - start)+" 批量大小="+BATCH);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

static class TranConsumer implements Runnable {

    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see Thread#run()
     */
    @Override
    public void run() {
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(TRANSACTION, false, false, false, null);
            //每次1條
            channel.basicQos(1);
            long start = System.currentTimeMillis();
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body);
                    //發送ack
                    channel.basicAck(envelope.getDeliveryTag(), false);
                    if (msg.equalsIgnoreCase("end")){
                        long end = System.currentTimeMillis();
                        System.out.println("[tx接收方]接收完畢"+(end-start));
                        try {
                            channel.close();
                            connection.close();
                        } catch (TimeoutException e) {
                            e.printStackTrace();
                        }
                    }
                }
            };
            //手動ack
            channel.basicConsume(TRANSACTION, false, consumer);
            System.out.println("[tx接收方]客戶端等待中......");
            latch.countDown();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

輸出:

[tx接收方]客戶端等待中......
[tx發送方]發送方耗時:8703 批量大小=100
[tx接收方]接收完畢22160

消息確認代碼

static class ConfirmPublisher implements Runnable {

    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see Thread#run()
     */
    @Override
    public void run() {
        try {
            try (Connection connection = factory.newConnection()) {
                Channel channel = connection.createChannel();
                long start = System.currentTimeMillis();
                try {
                    for (int i = 0; i < MSG_NUM; ) {
                        if (i%BATCH ==0){
                            //開啓confirm3
                            channel.confirmSelect();
                            for (int j = 0; j < BATCH; j++) {
                                String msg = "rabbitmq msg!";
                                if(i + j != MSG_NUM -1){
                                    channel.basicPublish("", CONFIRM, null, msg.getBytes());
                                }else{
                                    channel.basicPublish("", CONFIRM, null, "end".getBytes());
                                }
                            }
                            i += BATCH;
                            //confirm
//                                  waitForConfirmsOrDie 相對於 waitForConfirms 來講,只要有nack就好拋出異常,同時也是一種阻塞式
                            channel.waitForConfirmsOrDie();
                            //channel.addConfirmListener(new ConfirmListener() {
//                                    @Override
//                                    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
////                                        System.out.println("ack deliveryTag = " + deliveryTag);
//                                    }
//
//                                    @Override
//                                    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
////                                        System.out.println("nack deliveryTag = " + deliveryTag);
//                                    }
//                                });
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    channel.close();
                }
                long end = System.currentTimeMillis();
                System.out.println("[confirm發送方]發送方耗時:" + (end - start)+" 批量大小="+BATCH);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

static class ConfirmConsumer implements Runnable {

    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see Thread#run()
     */
    @Override
    public void run() {
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(CONFIRM, false, false, false, null);
            //每次1條
            channel.basicQos(1);
            long start = System.currentTimeMillis();
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body);
                    //發送ack
                    channel.basicAck(envelope.getDeliveryTag(), false);
//                     System.out.println("確認"+msg);
                    if (msg.equals("end")){
                        long end = System.currentTimeMillis();
                        System.out.println("[confirm接收方]接收完畢"+(end-start));
                        try {
                            channel.close();
                            connection.close();
                        } catch (TimeoutException e) {
                            e.printStackTrace();
                        }
                    }
                }
            };
            //手動ack
            channel.basicConsume(CONFIRM, false, consumer);
            System.out.println("[confirm接收方]客戶端等待中......");
            latch.countDown();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

輸出:

[confirm接收方]客戶端等待中......
[confirm發送方]發送方耗時:5358 批量大小=100
[confirm接收方]接收完畢22502

10w簡單消息發送時間

無事務:15s左右

tx事務:20s左右

confirm事務:20s左右

本地測試,因此這裏沒有網絡的延遲.

這裏有個疑問,confirm事務沒有像官方說明的同樣,接近無事務的效率.

因爲我的水平有限,若有問題請指出。 https://fansinzhao.github.io/page/image/weixin.png

相關文章
相關標籤/搜索