準備工做:html
1)安裝RabbitMQ,參考文章:消息中間件系列二:RabbitMQ入門(基本概念、RabbitMQ的安裝和運行)java
2.)分別新建名爲OriginalRabbitMQProducer和OriginalRabbitMQConsumer的maven工程git
在pom.xml文件裏面引入以下依賴:github
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.0.0</version> </dependency>
說明:5系列的版本最好使用JDK8及以上, 低於JDK8能夠使用4.x(具體的版本號到Maven的中央倉庫查)的版本服務器
前面有談到消費者收到的每一條消息都必須進行確認,消息的確認機制分爲自動確認和消費者自行確認。下面咱們來看一下自動確認的示例:異步
package study.demo.normal; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * * @Description: 交換器是direct的生產者 路由鍵徹底匹配時,消息才投放到對應隊列 * @author leeSmall * @date 2018年9月15日 * */ public class DirectProducer { //定義交換器的名字 private final static String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException, TimeoutException { //1.建立一個鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置要鏈接的RabbitMQ服務器的地址 factory.setHost("127.0.0.1"); //設置用戶名 這裏使用缺省的 //factory.setUsername(..); //設置鏈接斷開 這裏使用缺省的 //factory.setPort(); //設置虛擬主機 這裏使用缺省的 //factory.setVirtualHost(); //2.經過鏈接工廠建立一個鏈接 Connection connection = factory.newConnection(); //3.經過鏈接建立一個信道 信道是用來傳送數據的 Channel channel = connection.createChannel(); //4.經過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //定義一組路由鍵 String[]routingKeys = {"error","info","warning"}; //發佈消息的交換器上 for(int i=0;i<3;i++){ //路由鍵 String routingKey = routingKeys[i]; //要發送的消息 String message = "Hello world_"+(i+1); /** * 發送消息到交換器上 * 參數1:交換器的名字 * 參數2:路由鍵 * 參數3:BasicProperties * 參數4:要發送的消息 */ channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes()); System.out.println("Sent "+routingKey+":"+message); } channel.close(); connection.close(); } }
package study.demo.normal; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * * @Description: 交換器是direct 只消費error日誌的消費者 * @author leeSmall * @date 2018年9月15日 * */ public class ConsumerError { //定義交換器的名字 private static final String EXCHANGE_NAME = "direct_logs"; // private static final String EXCHANGE_NAME = "fanout_logs_1"; public static void main(String[] argv) throws IOException, TimeoutException { //1.建立一個鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置要鏈接的RabbitMQ服務器的地址 factory.setHost("127.0.0.1"); //2.經過鏈接工廠建立一個鏈接 Connection connection = factory.newConnection(); //3.經過鏈接建立一個信道 信道是用來傳送數據的 Channel channel = connection.createChannel(); //4.經過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //5.聲明隨機隊列 String queueName = channel.queueDeclare().getQueue(); //6.聲明一個只消費錯誤日誌的路由鍵error String routingKey = "error"; //7.隊列經過路由鍵綁定到交換器上 channel.queueBind(queueName,EXCHANGE_NAME,routingKey); System.out.println("Waiting message......."); //8.設置一個監聽器監聽消費消息 Consumer consumerB = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println("Accept:"+envelope.getRoutingKey()+":"+message); } }; //9.自動確認:autoAck參數爲true channel.basicConsume(queueName,true,consumerB); } }
啓動消費者ConsumerError:maven
啓動生產者DirectProducer:ide
查看消費者ConsumerError如今的情況:性能
能夠看到消費者只消費了error級別的消息,這是由於在direct模式下,消費者只定義了路由鍵routingKey = "error";ui
package study.demo.normal; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * * @Description: 交換器是direct 消費全部日誌的消費者 * @author leeSmall * @date 2018年9月15日 * */ public class ConsumerAll { //定義交換器的名字 private static final String EXCHANGE_NAME = "direct_logs"; // private static final String EXCHANGE_NAME = "fanout_logs_1"; public static void main(String[] argv) throws IOException, InterruptedException, TimeoutException { //1.建立一個鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置要鏈接的RabbitMQ服務器的地址 factory.setHost("127.0.0.1"); //2.經過鏈接工廠建立一個鏈接 Connection connection = factory.newConnection(); //3.經過鏈接建立一個信道 信道是用來傳送數據的 Channel channel = connection.createChannel(); //4.經過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //5.聲明隨機隊列 String queueName = channel.queueDeclare().getQueue(); //6.定義一組路由鍵消費全部日誌 String[]routingKeys = {"error","info","warning"}; //7.隊列經過路由鍵綁定到交換器上 for(String routingKey:routingKeys){ //隊列和交換器的綁定 channel.queueBind(queueName,EXCHANGE_NAME,routingKey); } System.out.println("Waiting message......."); //8.設置一個監聽器監聽消費消息 Consumer consumerA = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println("Accept:"+envelope.getRoutingKey()+":"+message); } }; //9.自動確認:autoAck參數爲true channel.basicConsume(queueName,true,consumerA); } }
啓動消費者ConsumerAll:
啓動生產者DirectProducer:
查看消費者ConsumerAll的狀態:
能夠看到消費者ConsumerAll消費了生產者DirectProducer產生的全部消息,這是由於在direct模式下,消費者定義了和生產者同樣個數的路由鍵String[]routingKeys = {"error","info","warning"};
package study.demo.normal; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * * @Description: 交換器是fanout的生產者 能夠理解爲廣播,會把全部消息投放到綁定到這個交換器上的隊列上 * @author leeSmall * @date 2018年9月15日 * */ public class FanoutProducer { private final static String EXCHANGE_NAME = "fanout_logs_1"; public static void main(String[] args) throws IOException, TimeoutException { //1.建立一個鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置要鏈接的RabbitMQ服務器的地址 factory.setHost("127.0.0.1"); //設置用戶名 這裏使用缺省的 //factory.setUsername(..); //設置鏈接斷開 這裏使用缺省的 //factory.setPort(); //設置虛擬主機 這裏使用缺省的 //factory.setVirtualHost(); //2.經過鏈接工廠建立一個鏈接 Connection connection = factory.newConnection(); //3.經過鏈接建立一個信道 信道是用來傳送數據的 Channel channel = connection.createChannel(); //4.經過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //定義一組路由鍵 String[]routingKeys = {"error","info","warning"}; //發佈消息的交換器上 for(int i=0;i<3;i++){ //路由鍵 String routingKey = routingKeys[i]; //要發送的消息 String message = "Hello world_"+(i+1); /** * 發送消息到交換器上 * 參數1:交換器的名字 * 參數2:路由鍵 * 參數3:BasicProperties * 參數4:要發送的消息 */ channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes()); System.out.println("Sent "+routingKey+":"+message); } channel.close(); connection.close(); } }
package study.demo.normal; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * * @Description: 交換器是fanout,只消費error日誌的消費者 * @author leeSmall * @date 2018年9月15日 * */ public class ConsumerError { //定義交換器的名字 //private static final String EXCHANGE_NAME = "direct_logs"; private static final String EXCHANGE_NAME = "fanout_logs_1"; public static void main(String[] argv) throws IOException, TimeoutException { //1.建立一個鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置要鏈接的RabbitMQ服務器的地址 factory.setHost("127.0.0.1"); //2.經過鏈接工廠建立一個鏈接 Connection connection = factory.newConnection(); //3.經過鏈接建立一個信道 信道是用來傳送數據的 Channel channel = connection.createChannel(); //4.經過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //5.聲明隨機隊列 String queueName = channel.queueDeclare().getQueue(); //6.聲明一個只消費錯誤日誌的路由鍵error String routingKey = "error"; //7.隊列經過路由鍵綁定到交換器上 channel.queueBind(queueName,EXCHANGE_NAME,routingKey); System.out.println("Waiting message......."); //8.設置一個監聽器監聽消費消息 Consumer consumerB = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println("Accept:"+envelope.getRoutingKey()+":"+message); } }; //9.自動確認:autoAck參數爲true channel.basicConsume(queueName,true,consumerA); } }
啓動消費者ConsumerError:
啓動生產者FanoutProducer:
查看消費者ConsumerError如今的情況:
能夠看到消費者消費了全部級別的消息,這是由於在fanout模式下,雖然消費者只定義了路由鍵routingKey = "error",可是由於fanut是廣播模式,會把全部消息投放到綁定到這個交換器上的隊列上
package study.demo.normal; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * * @Description: 交換器是fanout 消費全部日誌的消費者 * @author leeSmall * @date 2018年9月15日 * */ public class ConsumerAll { //定義交換器的名字 //private static final String EXCHANGE_NAME = "direct_logs"; private static final String EXCHANGE_NAME = "fanout_logs_1"; public static void main(String[] argv) throws IOException, InterruptedException, TimeoutException { //1.建立一個鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置要鏈接的RabbitMQ服務器的地址 factory.setHost("127.0.0.1"); //2.經過鏈接工廠建立一個鏈接 Connection connection = factory.newConnection(); //3.經過鏈接建立一個信道 信道是用來傳送數據的 Channel channel = connection.createChannel(); //4.經過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //5.聲明隨機隊列 String queueName = channel.queueDeclare().getQueue(); //6.定義一組路由鍵消費全部日誌 String[]routingKeys = {"error","info","warning"}; //7.隊列經過路由鍵綁定到交換器上 for(String routingKey:routingKeys){ //隊列和交換器的綁定 channel.queueBind(queueName,EXCHANGE_NAME,routingKey); } System.out.println("Waiting message......."); //8.設置一個監聽器監聽消費消息 Consumer consumerA = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println("Accept:"+envelope.getRoutingKey()+":"+message); } }; //9.自動確認:autoAck參數爲true channel.basicConsume(queueName,true,consumerA); } }
啓動消費者ConsumerAll:
啓動生產者FanoutProducer:
查看消費者ConsumerAll的狀態:
能夠看到消費者ConsumerAll消費了生產者DirectProducer產生的全部消息,這是由於在fanout模式下,消費者定義了和生產者同樣個數的路由鍵String[]routingKeys = {"error","info","warning"};
package study.demo.consumerconfirm; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * * @Description: 消費者自行確認生產者 * @author leeSmall * @date 2018年9月15日 * */ public class ConsumerConfirmProducer { //交換器 private final static String EXCHANGE_NAME = "direct_cc_confirm_1"; //路由鍵 private final static String ROUTE_KEY = "error"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //1.建立一個鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置要鏈接的RabbitMQ服務器的地址 factory.setHost("127.0.0.1"); //設置用戶名 這裏使用缺省的 //factory.setUsername(..); //設置鏈接斷開 這裏使用缺省的 //factory.setPort(); //設置虛擬主機 這裏使用缺省的 //factory.setVirtualHost(); //2.經過鏈接工廠建立一個鏈接 Connection connection = factory.newConnection(); //3.經過鏈接建立一個信道 信道是用來傳送數據的 Channel channel = connection.createChannel(); //4.經過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //發佈消息的交換器上 for(int i=0;i<10;i++){ //要發送的消息 String message = "Hello world_"+(i+1); /** * 發送消息到交換器上 * 參數1:交換器的名字 * 參數2:路由鍵 * 參數3:BasicProperties * 參數4:要發送的消息 */ channel.basicPublish(EXCHANGE_NAME,ROUTE_KEY,null,message.getBytes()); System.out.println("Sent "+ROUTE_KEY+":"+message); } channel.close(); connection.close(); } }
package study.demo.consumerconfirm; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * * @Description: 消費者自行確認消費者 * @author leeSmall * @date 2018年9月15日 * */ public class ClientConsumerAck { private static final String EXCHANGE_NAME = "direct_cc_confirm_1"; public static void main(String[] argv) throws IOException, TimeoutException { //1.建立一個鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置要鏈接的RabbitMQ服務器的地址 factory.setHost("127.0.0.1"); //2.經過鏈接工廠建立一個鏈接 Connection connection = factory.newConnection(); //3.經過鏈接建立一個信道 信道是用來傳送數據的 Channel channel = connection.createChannel(); //4.經過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //5.聲明隊列 String queueName = "consumer_confirm"; channel.queueDeclare(queueName,false,false, false,null); //6.聲明一個只消費錯誤日誌的路由鍵error String routingKey = "error"; //7.隊列經過路由鍵綁定到交換器上 channel.queueBind(queueName,EXCHANGE_NAME,routingKey); System.out.println("Waiting message......."); //8.設置一個監聽器監聽消費消息 Consumer consumerB = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println("Accept:"+envelope.getRoutingKey()+":"+message); //消費者自行確認 this.getChannel().basicAck(envelope.getDeliveryTag(),false); } }; //9.消費者自行確認:autoAck參數爲false channel.basicConsume(queueName,false,consumerB); } }
package study.demo.consumerconfirm; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * * @Description: 消費者自行確認休眠不回覆ack的消費者 * @author leeSmall * @date 2018年9月15日 * */ public class ClientConsumerSlowAck { private static final String EXCHANGE_NAME = "direct_cc_confirm_1"; public static void main(String[] argv) throws IOException, TimeoutException { //1.建立一個鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置要鏈接的RabbitMQ服務器的地址 factory.setHost("127.0.0.1"); //2.經過鏈接工廠建立一個鏈接 Connection connection = factory.newConnection(); //3.經過鏈接建立一個信道 信道是用來傳送數據的 Channel channel = connection.createChannel(); //4.經過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //5.聲明隊列 String queueName = "consumer_confirm"; channel.queueDeclare(queueName,false,false, false,null); //6.聲明一個只消費錯誤日誌的路由鍵error String routingKey = "error"; //7.隊列經過路由鍵綁定到交換器上 channel.queueBind(queueName,EXCHANGE_NAME,routingKey); System.out.println("Waiting message......."); //8.設置一個監聽器監聽消費消息 Consumer consumerB = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { //消費者自行確認時不回覆ack,一直休眠 Thread.sleep(20000); } catch (InterruptedException e) { e.printStackTrace(); } String message = new String(body,"UTF-8"); System.out.println("Accept:"+envelope.getRoutingKey()+":"+message); //this.getChannel().basicAck(envelope.getDeliveryTag(),false); } }; //9.消費者自行確認:autoAck參數爲false channel.basicConsume(queueName,false,consumerB); } }
啓動消費者自行確認消費者ClientConsumerAck:
啓動消費者自行確認休眠不回覆ack的消費者ClientConsumerSlowAck
啓動消費者自行確認生產者ConsumerConfirmProducer
查看消費者自行確認消費者ClientConsumerAck的狀態:
查看消費者自行確認休眠不回覆ack的消費者ClientConsumerSlowAck的狀態:
查看RabbitMQ服務器上的隊列狀況:
能夠看到隊列consumer_confirm裏面有5條消息未消費,這是由於消費者自行確認休眠不回覆ack的消費者ClientConsumerSlowAck收到了這5條消息,可是沒有向RabbitMQ服務器發送確認消息,RabbitMQMQ認爲這5條消息尚未被消費就一直存在隊列裏面
下面停掉ClientConsumerSlowAck,查看ClientConsumerAck和RabbitMQ服務器裏面隊列consumer_confirm的狀態
能夠看到停掉ClientConsumerSlowAck之後,以前的5條消息被ClientConsumerAck消費了
package study.demo.consumerconfirm; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * * @Description: 消費者自行確認拒絕消息的消費者 * @author leeSmall * @date 2018年9月15日 * */ public class ClientConsumerReject { private static final String EXCHANGE_NAME = "direct_cc_confirm_1"; public static void main(String[] argv) throws IOException, TimeoutException { //1.建立一個鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置要鏈接的RabbitMQ服務器的地址 factory.setHost("127.0.0.1"); //2.經過鏈接工廠建立一個鏈接 Connection connection = factory.newConnection(); //3.經過鏈接建立一個信道 信道是用來傳送數據的 Channel channel = connection.createChannel(); //4.經過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //5.聲明隊列 String queueName = "consumer_confirm"; channel.queueDeclare(queueName,false,false, false,null); //6.聲明一個只消費錯誤日誌的路由鍵error String routingKey = "error"; //7.隊列經過路由鍵綁定到交換器上 channel.queueBind(queueName,EXCHANGE_NAME,routingKey); System.out.println("Waiting message......."); //8.設置一個監聽器監聽消費消息 Consumer consumerB = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //消費者自行拒絕消息 參數requeue=true,讓RabbitMQ服務器從新分發消息,requeue=false讓RabbitMQ服務器移除消息 this.getChannel().basicReject(envelope.getDeliveryTag(),true); System.out.println("Reject:"+envelope.getRoutingKey() +":"+new String(body,"UTF-8")); } }; //9.消費者自行確認:autoAck參數爲false channel.basicConsume(queueName,false,consumerB); } }
啓動消費者自行確認消費者ClientConsumerAck:
啓動消費者自行確認拒絕消息的消費者ClientConsumerReject:
啓動消費者自行確認生產者ConsumerConfirmProducer
查看消費者自行確認消費者ClientConsumerAck的狀態:
查看消費者自行確認拒絕消息的消費者ClientConsumerReject的狀態:
能夠看到消息都被ClientConsumerAck消費了,這是由於消費者ClientConsumerReject拒絕了全部消息,這裏要注意
this.getChannel().basicReject(envelope.getDeliveryTag(),true);
這段代碼的basicReject的第二個參數requeue,參數requeue=true,讓RabbitMQ服務器從新分發消息,requeue=false讓RabbitMQ服務器移除消息
requeue=false時,RabbitMQ服務器會刪掉被ClientConsumerReject拒絕的消息,消費者ClientConsumerAck就不能消費全部消息了
爲何要有個發送方確認模式?
生產者不知道消息是否真正到達RabbitMq,也就是說發佈操做不返回任何消息給生產者。
AMQP協議層面爲咱們提供的事務機制解決了這個問題,可是事務機制自己也會帶來問題:
1)嚴重的性能問題
2)使生產者應用程序產生同步
RabbitMQ團隊爲咱們拿出了更好的方案,即採用發送方確認模式,該模式比事務更輕量,性能影響幾乎能夠忽略不計。
package study.demo.producerconfirm; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * * @Description: 生產者(發送方)確認同步模式 * @author leeSmall * @date 2018年9月16日 * */ public class ProducerConfirm { private final static String EXCHANGE_NAME = "producer_confirm"; private final static String ROUTE_KEY = "error"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //1.建立一個鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置要鏈接的RabbitMQ服務器的地址 factory.setHost("127.0.0.1"); //設置用戶名 這裏使用缺省的 //factory.setUsername(..); //設置鏈接斷開 這裏使用缺省的 //factory.setPort(); //設置虛擬主機 這裏使用缺省的 //factory.setVirtualHost(); //2.經過鏈接工廠建立一個鏈接 Connection connection = factory.newConnection(); //3.經過鏈接建立一個信道 信道是用來傳送數據的 Channel channel = connection.createChannel(); //將信道設置爲發送方確認 channel.confirmSelect(); //發佈消息的交換器上 for(int i=0;i<2;i++){ String msg = "Hello "+(i+1); channel.basicPublish(EXCHANGE_NAME,ROUTE_KEY,null,msg.getBytes()); //等待RabbitMQ返回消息確認消息已送達RabbitMQ服務器 if (channel.waitForConfirms()){ System.out.println("發送方同步確認: "+ROUTE_KEY+":"+msg); } } // 關閉頻道和鏈接 channel.close(); connection.close(); } }
package study.demo.producerconfirm; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * * @Description: 生產者(發送方)確認異步模式 * @author leeSmall * @date 2018年9月16日 * */ public class ProducerConfirmAsync { private final static String EXCHANGE_NAME = "producer_confirm"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //1.建立一個鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置要鏈接的RabbitMQ服務器的地址 factory.setHost("127.0.0.1"); //設置用戶名 這裏使用缺省的 //factory.setUsername(..); //設置鏈接斷開 這裏使用缺省的 //factory.setPort(); //設置虛擬主機 這裏使用缺省的 //factory.setVirtualHost(); //2.經過鏈接工廠建立一個鏈接 Connection connection = factory.newConnection(); //3.經過鏈接建立一個信道 信道是用來傳送數據的 Channel channel = connection.createChannel(); //4.經過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //將信道設置爲發送方確認 channel.confirmSelect(); //信道被關閉監聽器 用於RabbitMQ服務器斷線重連 //channel.addShutdownListener(); /** * 生產者異步確認監聽 * 參數deliveryTag表明了當前channel惟一的投遞 * 參數multiple:false * */ channel.addConfirmListener(new ConfirmListener() { //RabbitMQ服務器確認收到消息 public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("RabbitMQ服務器確認收到消息Ack deliveryTag="+deliveryTag +"multiple:"+multiple); } //RabbitMQ服務器因爲本身內部出現故障沒有收到消息 public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("RabbitMQ服務沒有收到消息Ack deliveryTag="+deliveryTag +"multiple:"+multiple); } }); //生產者異步返回監聽 這裏和發佈消息時的mandatory參數有關 //參數mandatory:mandatory=true,投遞消息時沒法找到一個合適的隊列,把消息返回給生產者,mandatory=false 丟棄消息(缺省) channel.addReturnListener(new ReturnListener() { public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println("replyText:"+replyText); System.out.println("exchange:"+exchange); System.out.println("routingKey:"+routingKey); System.out.println("msg:"+msg); } }); //聲明一組路由鍵 String[] routingKeys={"error","info","warning"}; //發送消息到交換器上 for(int i=0;i<3;i++){ String routingKey = routingKeys[i%3]; // 發送的消息 String message = "Hello World_"+(i+1)+("_"+System.currentTimeMillis()); //經過路由鍵把消息發送到交換器上 //參數mandatory: mandatory=true,投遞消息時沒法找到一個合適的隊列,把消息返回給生產者, // mandatory=false 丟棄消息(缺省) channel.basicPublish(EXCHANGE_NAME, routingKey, false, null, message.getBytes()); System.out.println("----------------------------------------------------"); System.out.println(" Sent Message: [" + routingKey +"]:'"+ message + "'"); //sleep一下讓程序不快速結束 能夠看到RabbitMQ服務器的響應 Thread.sleep(1000); } // 關閉信道和鏈接 channel.close(); connection.close(); } }
package study.demo.producerconfirm; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * * @Description: 發送方確認消費者 * @author leeSmall * @date 2018年9月16日 * */ public class ProducerConfirmConsumer { private static final String EXCHANGE_NAME = "producer_confirm"; public static void main(String[] argv) throws IOException, TimeoutException { //1.建立一個鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置要鏈接的RabbitMQ服務器的地址 factory.setHost("127.0.0.1"); //2.經過鏈接工廠建立一個鏈接 Connection connection = factory.newConnection(); //3.經過鏈接建立一個信道 信道是用來傳送數據的 Channel channel = connection.createChannel(); //4.經過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //5.聲明隊列 String queueName = "producer_confirm"; channel.queueDeclare(queueName,false,false, false,null); //6.聲明一個只消費錯誤日誌的路由鍵error String routingKey = "error"; //7.隊列經過路由鍵綁定到交換器上 channel.queueBind(queueName,EXCHANGE_NAME,routingKey); System.out.println("Waiting message......."); // 8.建立隊列消費者 設置一個監聽器監聽消費消息 final Consumer consumerB = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println( "Received ["+ envelope.getRoutingKey() + "] "+message); } }; //9.消費者自動確認:autoAck參數爲true channel.basicConsume(queueName, true, consumerB); } }
啓動發送方確認消費者ProducerConfirmConsumer:
啓動生產者(發送方)確認同步模式的類ProducerConfirm:
查看發送方確認消費者ProducerConfirmConsumer的狀態:
啓動生產者(發送方)確認異步模式ProducerConfirmAsync:
查看發送方確認消費者ProducerConfirmConsumer的狀態: