消息中間件系列三:使用RabbitMq原生Java客戶端進行消息通訊(消費者(接收方)自動確認模式、消費者(接收方)自行確認模式、生產者(發送方)確認模式)

準備工做:html

1)安裝RabbitMQ,參考文章:消息中間件系列二:RabbitMQ入門(基本概念、RabbitMQ的安裝和運行)java

2.)分別新建名爲OriginalRabbitMQProducer和OriginalRabbitMQConsumer的maven工程git

在pom.xml文件裏面引入以下依賴:github

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.0.0</version>
    </dependency>

說明:5系列的版本最好使用JDK8及以上, 低於JDK8能夠使用4.x(具體的版本號到Maven的中央倉庫查)的版本服務器

1、消費者(接收方)自動確認模式

 前面有談到消費者收到的每一條消息都必須進行確認,消息的確認機制分爲自動確認和消費者自行確認。下面咱們來看一下自動確認的示例:異步

示例1:交換器是direct

1. 在工程OriginalRabbitMQProducer新建一個一個direct的生產者

package study.demo.normal;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 
 * @Description: 交換器是direct的生產者 路由鍵徹底匹配時,消息才投放到對應隊列
 * @author leeSmall
 * @date 2018年9月15日
 *
 */
public class DirectProducer {

    //定義交換器的名字
    private final static String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {

        //1.建立一個鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        
        //設置要鏈接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //設置用戶名 這裏使用缺省的
        //factory.setUsername(..);
        
        //設置鏈接斷開  這裏使用缺省的
        //factory.setPort();
        
        //設置虛擬主機 這裏使用缺省的
        //factory.setVirtualHost();


        //2.經過鏈接工廠建立一個鏈接
        Connection connection = factory.newConnection();

        //3.經過鏈接建立一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();

        //4.經過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //定義一組路由鍵
        String[]routingKeys = {"error","info","warning"};

        //發佈消息的交換器上
        for(int i=0;i<3;i++){
            //路由鍵
            String routingKey = routingKeys[i];
            
            //要發送的消息
            String message = "Hello world_"+(i+1);

            /**
             * 發送消息到交換器上
             * 參數1:交換器的名字
             * 參數2:路由鍵
             * 參數3:BasicProperties
             * 參數4:要發送的消息
             */
            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());
            System.out.println("Sent "+routingKey+":"+message);

        }

        channel.close();
        connection.close();

    }

}

2. 在工程OriginalRabbitMQConsumer新建一個direct的只消費error日誌的消費者

package study.demo.normal;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 
 * @Description: 交換器是direct 只消費error日誌的消費者
 * @author leeSmall
 * @date 2018年9月15日
 *
 */
public class ConsumerError {

    //定義交換器的名字
    private static final String EXCHANGE_NAME = "direct_logs";
    // private static final String EXCHANGE_NAME = "fanout_logs_1";

    public static void main(String[] argv) throws IOException, TimeoutException {
        //1.建立一個鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置要鏈接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //2.經過鏈接工廠建立一個鏈接
        Connection connection = factory.newConnection();
        
        //3.經過鏈接建立一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();
        
        //4.經過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        
        //5.聲明隨機隊列
        String queueName = channel.queueDeclare().getQueue();
        
        //6.聲明一個只消費錯誤日誌的路由鍵error
        String routingKey = "error";
        
        //7.隊列經過路由鍵綁定到交換器上
        channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
        
        System.out.println("Waiting message.......");

        //8.設置一個監聽器監聽消費消息
        Consumer consumerB = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
            }
        };
     //9.自動確認:autoAck參數爲true
        channel.basicConsume(queueName,true,consumerB);
    }

}

3. 啓動消費者ConsumerError,再啓動生產者DirectProducer,查看效果

啓動消費者ConsumerError:maven

啓動生產者DirectProducer:ide

 

查看消費者ConsumerError如今的情況:性能

 

能夠看到消費者只消費了error級別的消息,這是由於在direct模式下,消費者只定義了路由鍵routingKey = "error";ui

 4. 在工程OriginalRabbitMQConsumer新建一個direct的消費全部日誌的消費者

package study.demo.normal;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 
 * @Description: 交換器是direct 消費全部日誌的消費者
 * @author leeSmall
 * @date 2018年9月15日
 *
 */
public class ConsumerAll {
    //定義交換器的名字
    private static final String EXCHANGE_NAME = "direct_logs";
    // private static final String EXCHANGE_NAME = "fanout_logs_1";

    public static void main(String[] argv) throws IOException,
            InterruptedException, TimeoutException {

        //1.建立一個鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置要鏈接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //2.經過鏈接工廠建立一個鏈接
        Connection connection = factory.newConnection();
        
        //3.經過鏈接建立一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();
        
        //4.經過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        
        
        //5.聲明隨機隊列
        String queueName = channel.queueDeclare().getQueue();
        
        //6.定義一組路由鍵消費全部日誌
        String[]routingKeys = {"error","info","warning"};
        
        //7.隊列經過路由鍵綁定到交換器上
        for(String routingKey:routingKeys){
            //隊列和交換器的綁定
            channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
        }
        System.out.println("Waiting message.......");

        //8.設置一個監聽器監聽消費消息
        Consumer consumerA = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
            }
        };
         //9.自動確認:autoAck參數爲true
         channel.basicConsume(queueName,true,consumerA);

    }

  }

5. 啓動消費者ConsumerAll,再啓動生產者DirectProducer,查看效果

啓動消費者ConsumerAll:

啓動生產者DirectProducer:

 查看消費者ConsumerAll的狀態:

 能夠看到消費者ConsumerAll消費了生產者DirectProducer產生的全部消息,這是由於在direct模式下,消費者定義了和生產者同樣個數的路由鍵String[]routingKeys = {"error","info","warning"};

示例2:交換器是fanout

1. 在工程OriginalRabbitMQProducer新建一個交換器是fanout的生產者

package study.demo.normal;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 
 * @Description: 交換器是fanout的生產者 能夠理解爲廣播,會把全部消息投放到綁定到這個交換器上的隊列上
 * @author leeSmall
 * @date 2018年9月15日
 *
 */
public class FanoutProducer {

    private final static String EXCHANGE_NAME = "fanout_logs_1";

    public static void main(String[] args) throws IOException, TimeoutException {

        //1.建立一個鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        
        //設置要鏈接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //設置用戶名 這裏使用缺省的
        //factory.setUsername(..);
        
        //設置鏈接斷開  這裏使用缺省的
        //factory.setPort();
        
        //設置虛擬主機 這裏使用缺省的
        //factory.setVirtualHost();


        //2.經過鏈接工廠建立一個鏈接
        Connection connection = factory.newConnection();

        //3.經過鏈接建立一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();

        //4.經過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        //定義一組路由鍵
        String[]routingKeys = {"error","info","warning"};

        //發佈消息的交換器上
        for(int i=0;i<3;i++){
            //路由鍵
            String routingKey = routingKeys[i];
            
            //要發送的消息
            String message = "Hello world_"+(i+1);

            /**
             * 發送消息到交換器上
             * 參數1:交換器的名字
             * 參數2:路由鍵
             * 參數3:BasicProperties
             * 參數4:要發送的消息
             */
            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());
            System.out.println("Sent "+routingKey+":"+message);

        }

        channel.close();
        connection.close();

    }

}

2. 在工程OriginalRabbitMQConsumer新建一個fanout的只消費error日誌的消費者

package study.demo.normal;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 
 * @Description: 交換器是fanout,只消費error日誌的消費者
 * @author leeSmall
 * @date 2018年9月15日
 *
 */
public class ConsumerError {

    //定義交換器的名字
    //private static final String EXCHANGE_NAME = "direct_logs";
     private static final String EXCHANGE_NAME = "fanout_logs_1";

    public static void main(String[] argv) throws IOException, TimeoutException {
        //1.建立一個鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置要鏈接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //2.經過鏈接工廠建立一個鏈接
        Connection connection = factory.newConnection();
        
        //3.經過鏈接建立一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();
        
        //4.經過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        
        //5.聲明隨機隊列
        String queueName = channel.queueDeclare().getQueue();
        
        //6.聲明一個只消費錯誤日誌的路由鍵error
        String routingKey = "error";
        
        //7.隊列經過路由鍵綁定到交換器上
        channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
        
        System.out.println("Waiting message.......");

        //8.設置一個監聽器監聽消費消息
        Consumer consumerB = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
            }
        };
      //9.自動確認:autoAck參數爲true
     channel.basicConsume(queueName,true,consumerA);


    }

}

3. 啓動消費者ConsumerError,再啓動生產者DirectProducer,查看效果

啓動消費者ConsumerError:

啓動生產者FanoutProducer:

 

查看消費者ConsumerError如今的情況:

 

能夠看到消費者消費了全部級別的消息,這是由於在fanout模式下,雖然消費者只定義了路由鍵routingKey = "error",可是由於fanut是廣播模式,會把全部消息投放到綁定到這個交換器上的隊列上

 4. 在工程OriginalRabbitMQConsumer新建一個fanout的消費全部日誌的消費者

package study.demo.normal;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 
 * @Description: 交換器是fanout 消費全部日誌的消費者
 * @author leeSmall
 * @date 2018年9月15日
 *
 */
public class ConsumerAll {
    //定義交換器的名字
    //private static final String EXCHANGE_NAME = "direct_logs";
    private static final String EXCHANGE_NAME = "fanout_logs_1";

    public static void main(String[] argv) throws IOException,
            InterruptedException, TimeoutException {

        //1.建立一個鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置要鏈接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //2.經過鏈接工廠建立一個鏈接
        Connection connection = factory.newConnection();
        
        //3.經過鏈接建立一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();
        
        //4.經過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        
        
        //5.聲明隨機隊列
        String queueName = channel.queueDeclare().getQueue();
        
        //6.定義一組路由鍵消費全部日誌
        String[]routingKeys = {"error","info","warning"};
        
        //7.隊列經過路由鍵綁定到交換器上
        for(String routingKey:routingKeys){
            //隊列和交換器的綁定
            channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
        }
        System.out.println("Waiting message.......");

        //8.設置一個監聽器監聽消費消息
        Consumer consumerA = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
            }
        };
       //9.自動確認:autoAck參數爲true
       channel.basicConsume(queueName,true,consumerA);



    }
}

5. 啓動消費者ConsumerAll,再啓動生產者DirectProducer,查看效果

啓動消費者ConsumerAll:

啓動生產者FanoutProducer:

 查看消費者ConsumerAll的狀態:

 能夠看到消費者ConsumerAll消費了生產者DirectProducer產生的全部消息,這是由於在fanout模式下,消費者定義了和生產者同樣個數的路由鍵String[]routingKeys = {"error","info","warning"};

3、消費者(接收方)自行確認模式

1. 在工程OriginalRabbitMQProducer新建一個消費者自行確認生產者

package study.demo.consumerconfirm;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 
 * @Description: 消費者自行確認生產者
 * @author leeSmall
 * @date 2018年9月15日
 *
 */
public class ConsumerConfirmProducer {

    //交換器
    private final static String EXCHANGE_NAME = "direct_cc_confirm_1";
    
    //路由鍵
    private final static String ROUTE_KEY = "error";

    public static void main(String[] args) throws IOException, TimeoutException,
            InterruptedException {

        //1.建立一個鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        
        //設置要鏈接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //設置用戶名 這裏使用缺省的
        //factory.setUsername(..);
        
        //設置鏈接斷開  這裏使用缺省的
        //factory.setPort();
        
        //設置虛擬主機 這裏使用缺省的
        //factory.setVirtualHost();


        //2.經過鏈接工廠建立一個鏈接
        Connection connection = factory.newConnection();

        //3.經過鏈接建立一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();

        //4.經過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //發佈消息的交換器上
        for(int i=0;i<10;i++){
            //要發送的消息
            String message = "Hello world_"+(i+1);

            /**
             * 發送消息到交換器上
             * 參數1:交換器的名字
             * 參數2:路由鍵
             * 參數3:BasicProperties
             * 參數4:要發送的消息
             */
            channel.basicPublish(EXCHANGE_NAME,ROUTE_KEY,null,message.getBytes());
            System.out.println("Sent "+ROUTE_KEY+":"+message);

        }

        channel.close();
        connection.close();

    }

}

2. 在工程OriginalRabbitMQConsumer新建一個消費者自行確認消費者

package study.demo.consumerconfirm;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 
 * @Description: 消費者自行確認消費者
 * @author leeSmall
 * @date 2018年9月15日
 *
 */
public class ClientConsumerAck {

    private static final String EXCHANGE_NAME = "direct_cc_confirm_1";

    public static void main(String[] argv) throws IOException, TimeoutException {
        //1.建立一個鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置要鏈接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //2.經過鏈接工廠建立一個鏈接
        Connection connection = factory.newConnection();
        
        //3.經過鏈接建立一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();
        
        //4.經過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //5.聲明隊列
        String queueName = "consumer_confirm";
        channel.queueDeclare(queueName,false,false,
                false,null);

        //6.聲明一個只消費錯誤日誌的路由鍵error
        String routingKey = "error";
        
        //7.隊列經過路由鍵綁定到交換器上
        channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
        System.out.println("Waiting message.......");

        //8.設置一個監聽器監聽消費消息
        Consumer consumerB = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
                //消費者自行確認
                this.getChannel().basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //9.消費者自行確認:autoAck參數爲false
        channel.basicConsume(queueName,false,consumerB);
    }

} 

3. 在工程OriginalRabbitMQConsumer新建一個消費者自行確認休眠不回覆ack的消費者

package study.demo.consumerconfirm;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 
 * @Description: 消費者自行確認休眠不回覆ack的消費者
 * @author leeSmall
 * @date 2018年9月15日
 *
 */
public class ClientConsumerSlowAck {

    private static final String EXCHANGE_NAME = "direct_cc_confirm_1";

    public static void main(String[] argv) throws IOException, TimeoutException {
        //1.建立一個鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置要鏈接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //2.經過鏈接工廠建立一個鏈接
        Connection connection = factory.newConnection();
        
        //3.經過鏈接建立一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();
        
        //4.經過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //5.聲明隊列
        String queueName = "consumer_confirm";
        channel.queueDeclare(queueName,false,false,
                false,null);

        //6.聲明一個只消費錯誤日誌的路由鍵error
        String routingKey = "error";
        
        //7.隊列經過路由鍵綁定到交換器上
        channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
        System.out.println("Waiting message.......");

        //8.設置一個監聽器監聽消費消息
        Consumer consumerB = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                try {
                    //消費者自行確認時不回覆ack,一直休眠
                    Thread.sleep(20000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String message = new String(body,"UTF-8");
                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
                //this.getChannel().basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //9.消費者自行確認:autoAck參數爲false
        channel.basicConsume(queueName,false,consumerB);
    }

} 

4. 分別啓動消費者自行確認消費者ClientConsumerAck和消費者自行確認休眠不回覆ack的消費者ClientConsumerSlowAck,再啓動消費者自行確認生產者ConsumerConfirmProducer查看狀態

啓動消費者自行確認消費者ClientConsumerAck:

啓動消費者自行確認休眠不回覆ack的消費者ClientConsumerSlowAck

 

啓動消費者自行確認生產者ConsumerConfirmProducer

 

查看消費者自行確認消費者ClientConsumerAck的狀態:

查看消費者自行確認休眠不回覆ack的消費者ClientConsumerSlowAck的狀態:

查看RabbitMQ服務器上的隊列狀況:

能夠看到隊列consumer_confirm裏面有5條消息未消費,這是由於消費者自行確認休眠不回覆ack的消費者ClientConsumerSlowAck收到了這5條消息,可是沒有向RabbitMQ服務器發送確認消息,RabbitMQMQ認爲這5條消息尚未被消費就一直存在隊列裏面

下面停掉ClientConsumerSlowAck,查看ClientConsumerAck和RabbitMQ服務器裏面隊列consumer_confirm的狀態

 

能夠看到停掉ClientConsumerSlowAck之後,以前的5條消息被ClientConsumerAck消費了

5. 在工程OriginalRabbitMQConsumer 新建一個消費者自行確認拒絕消息的消費者

package study.demo.consumerconfirm;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 
 * @Description: 消費者自行確認拒絕消息的消費者
 * @author leeSmall
 * @date 2018年9月15日
 *
 */
public class ClientConsumerReject {

    private static final String EXCHANGE_NAME = "direct_cc_confirm_1";

    public static void main(String[] argv) throws IOException, TimeoutException {
        //1.建立一個鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置要鏈接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //2.經過鏈接工廠建立一個鏈接
        Connection connection = factory.newConnection();
        
        //3.經過鏈接建立一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();
        
        //4.經過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //5.聲明隊列
        String queueName = "consumer_confirm";
        channel.queueDeclare(queueName,false,false,
                false,null);

        //6.聲明一個只消費錯誤日誌的路由鍵error
        String routingKey = "error";
        
        //7.隊列經過路由鍵綁定到交換器上
        channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
        System.out.println("Waiting message.......");

        //8.設置一個監聽器監聽消費消息
        Consumer consumerB = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                //消費者自行拒絕消息 參數requeue=true,讓RabbitMQ服務器從新分發消息,requeue=false讓RabbitMQ服務器移除消息
                this.getChannel().basicReject(envelope.getDeliveryTag(),true);
                System.out.println("Reject:"+envelope.getRoutingKey()
                        +":"+new String(body,"UTF-8"));
            }
        };

        //9.消費者自行確認:autoAck參數爲false
        channel.basicConsume(queueName,false,consumerB);
    }

} 

 6. 分別啓動消費者自行確認消費者ClientConsumerAck和消費者自行確認拒絕消息的消費者ClientConsumerReject,再啓動消費者自行確認生產者ConsumerConfirmProducer查看狀態

 啓動消費者自行確認消費者ClientConsumerAck:

 

 啓動消費者自行確認拒絕消息的消費者ClientConsumerReject:

 

 

啓動消費者自行確認生產者ConsumerConfirmProducer

 

 

 查看消費者自行確認消費者ClientConsumerAck的狀態:

查看消費者自行確認拒絕消息的消費者ClientConsumerReject的狀態:

能夠看到消息都被ClientConsumerAck消費了,這是由於消費者ClientConsumerReject拒絕了全部消息,這裏要注意

this.getChannel().basicReject(envelope.getDeliveryTag(),true);

 

這段代碼的basicReject的第二個參數requeue,參數requeue=true,讓RabbitMQ服務器從新分發消息,requeue=false讓RabbitMQ服務器移除消息

 requeue=false時,RabbitMQ服務器會刪掉被ClientConsumerReject拒絕的消息,消費者ClientConsumerAck就不能消費全部消息了

4、生產者(發送方)確認模式

 爲何要有個發送方確認模式?

生產者不知道消息是否真正到達RabbitMq,也就是說發佈操做不返回任何消息給生產者。
AMQP協議層面爲咱們提供的事務機制解決了這個問題,可是事務機制自己也會帶來問題:
1)嚴重的性能問題
2)使生產者應用程序產生同步
RabbitMQ團隊爲咱們拿出了更好的方案,即採用發送方確認模式,該模式比事務更輕量,性能影響幾乎能夠忽略不計。

1. 在OriginalRabbitMQProducer工程新建一個生產者(發送方)確認同步模式的類

package study.demo.producerconfirm;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 
 * @Description: 生產者(發送方)確認同步模式
 * @author leeSmall
 * @date 2018年9月16日
 *
 */
public class ProducerConfirm {

    private final static String EXCHANGE_NAME = "producer_confirm";
    private final static String ROUTE_KEY = "error";

    public static void main(String[] args) throws IOException, TimeoutException,
            InterruptedException {
        //1.建立一個鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        
        //設置要鏈接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //設置用戶名 這裏使用缺省的
        //factory.setUsername(..);
        
        //設置鏈接斷開  這裏使用缺省的
        //factory.setPort();
        
        //設置虛擬主機 這裏使用缺省的
        //factory.setVirtualHost();


        //2.經過鏈接工廠建立一個鏈接
        Connection connection = factory.newConnection();

        //3.經過鏈接建立一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();
        //將信道設置爲發送方確認
        channel.confirmSelect();

        //發佈消息的交換器上
        for(int i=0;i<2;i++){
            String msg = "Hello "+(i+1);
            channel.basicPublish(EXCHANGE_NAME,ROUTE_KEY,null,msg.getBytes());
            //等待RabbitMQ返回消息確認消息已送達RabbitMQ服務器
            if (channel.waitForConfirms()){
                System.out.println("發送方同步確認: "+ROUTE_KEY+":"+msg);
            }
        }


        // 關閉頻道和鏈接
        channel.close();
        connection.close();
    }

}

2. 在OriginalRabbitMQProducer工程新建一個生產者(發送方)確認異步模式的類

package study.demo.producerconfirm;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;



/**
 * 
 * @Description: 生產者(發送方)確認異步模式
 * @author leeSmall
 * @date 2018年9月16日
 *
 */
public class ProducerConfirmAsync {

    private final static String EXCHANGE_NAME = "producer_confirm";

    public static void main(String[] args) throws IOException, TimeoutException,
            InterruptedException {
        //1.建立一個鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        
        //設置要鏈接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //設置用戶名 這裏使用缺省的
        //factory.setUsername(..);
        
        //設置鏈接斷開  這裏使用缺省的
        //factory.setPort();
        
        //設置虛擬主機 這裏使用缺省的
        //factory.setVirtualHost();


        //2.經過鏈接工廠建立一個鏈接
        Connection connection = factory.newConnection();

        //3.經過鏈接建立一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();

        //4.經過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //將信道設置爲發送方確認
        channel.confirmSelect();

        //信道被關閉監聽器 用於RabbitMQ服務器斷線重連
        //channel.addShutdownListener();

        /**
         * 生產者異步確認監聽
         * 參數deliveryTag表明了當前channel惟一的投遞
         * 參數multiple:false
         * 
         */
        channel.addConfirmListener(new ConfirmListener() {
            //RabbitMQ服務器確認收到消息
            public void handleAck(long deliveryTag, boolean multiple)
                    throws IOException {
                System.out.println("RabbitMQ服務器確認收到消息Ack deliveryTag="+deliveryTag
                        +"multiple:"+multiple);
            }

            //RabbitMQ服務器因爲本身內部出現故障沒有收到消息
            public void handleNack(long deliveryTag, boolean multiple)
                    throws IOException {
                System.out.println("RabbitMQ服務沒有收到消息Ack deliveryTag="+deliveryTag
                        +"multiple:"+multiple);
            }
        });

        //生產者異步返回監聽 這裏和發佈消息時的mandatory參數有關
        //參數mandatory:mandatory=true,投遞消息時沒法找到一個合適的隊列,把消息返回給生產者,mandatory=false 丟棄消息(缺省)
        channel.addReturnListener(new ReturnListener() {
            public void handleReturn(int replyCode, String replyText,
                                     String exchange, String routingKey,
                                     AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body);
                System.out.println("replyText:"+replyText);
                System.out.println("exchange:"+exchange);
                System.out.println("routingKey:"+routingKey);
                System.out.println("msg:"+msg);
            }
        });


        //聲明一組路由鍵
        String[] routingKeys={"error","info","warning"};
        //發送消息到交換器上
        for(int i=0;i<3;i++){
            String routingKey = routingKeys[i%3];
            // 發送的消息
            String message = "Hello World_"+(i+1)+("_"+System.currentTimeMillis());
            
            //經過路由鍵把消息發送到交換器上
            //參數mandatory: mandatory=true,投遞消息時沒法找到一個合適的隊列,把消息返回給生產者,
            //              mandatory=false 丟棄消息(缺省)
            channel.basicPublish(EXCHANGE_NAME, routingKey, false,
                    null, message.getBytes());
            System.out.println("----------------------------------------------------");
            System.out.println(" Sent Message: [" + routingKey +"]:'"+ message + "'");
            //sleep一下讓程序不快速結束 能夠看到RabbitMQ服務器的響應
            Thread.sleep(1000);
        }

        // 關閉信道和鏈接
        channel.close();
        connection.close();
    }


}

3. 在OriginalRabbitMQConsumer工程新建一個發送方確認消費者

package study.demo.producerconfirm;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 
 * @Description: 發送方確認消費者
 * @author leeSmall
 * @date 2018年9月16日
 *
 */
public class ProducerConfirmConsumer {

    private static final String EXCHANGE_NAME = "producer_confirm";

    public static void main(String[] argv) throws IOException, TimeoutException {
        //1.建立一個鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置要鏈接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //2.經過鏈接工廠建立一個鏈接
        Connection connection = factory.newConnection();
        
        //3.經過鏈接建立一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();
        
        //4.經過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //5.聲明隊列
        String queueName = "producer_confirm";
        channel.queueDeclare(queueName,false,false,
                false,null);

        //6.聲明一個只消費錯誤日誌的路由鍵error
        String routingKey = "error";
        
        //7.隊列經過路由鍵綁定到交換器上
        channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
        System.out.println("Waiting message.......");

        // 8.建立隊列消費者 設置一個監聽器監聽消費消息 
        final Consumer consumerB = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println( "Received ["+ envelope.getRoutingKey() + "] "+message);
            }
        };
        //9.消費者自動確認:autoAck參數爲true
        channel.basicConsume(queueName, true, consumerB);
    }

}

4. 啓動發送方確認消費者ProducerConfirmConsumer,再分別啓動生產者(發送方)確認同步模式的類ProducerConfirm和生產者(發送方)確認異步模式ProducerConfirmAsync

啓動發送方確認消費者ProducerConfirmConsumer:

 啓動生產者(發送方)確認同步模式的類ProducerConfirm:

 

查看發送方確認消費者ProducerConfirmConsumer的狀態:

 

 啓動生產者(發送方)確認異步模式ProducerConfirmAsync:

 

查看發送方確認消費者ProducerConfirmConsumer的狀態:

 

 

示例代碼獲取地址

相關文章
相關標籤/搜索