RabbitMQ服務器崩了致使的消息數據丟失,已經持久化的消息數據咱們能夠經過消息持久化來預防。可是,若是消息從生產者發送到vhosts過程當中出現了問題,持久化消息數據的方案就無效了。 RabbitMQ爲咱們提供了兩種解決方案:java
實現方法以及測試結果以下: 生產者一:服務器
package com.example.demo.queue.confirm.amqp; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.example.demo.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer01 { private static final String QUEUE_NAME = "message_confirm_ampq_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); String msg = "msg from producer:"; try { channel.txSelect(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("send msg : "+msg); channel.txCommit(); } catch (Exception e1) { channel.txRollback(); } channel.close(); connection.close(); } }
生產者二:異步
package com.example.demo.queue.confirm.amqp; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.example.demo.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer02 { private static final String QUEUE_NAME = "message_confirm_ampq_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); String msg = "msg from producer:"; try { channel.txSelect(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("send msg : "+msg); int k = 2/0;// 觸發rollback事件 channel.txCommit(); } catch (Exception e1) { channel.txRollback(); } channel.close(); connection.close(); } }
消費者:ide
package com.example.demo.queue.confirm.amqp; import java.io.IOException; import com.example.demo.utils.ConnectionUtil; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Consumer01 { // 隊列名稱 private static final String QUEUE_NAME = "message_confirm_ampq_queue"; public static void main(String[] args) { try { // 獲取鏈接 Connection connection = ConnectionUtil.getConnection(); // 建立通道 final Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 定義消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("[1]:receive msg:"+msg); System.out.println("[1]:deal msg successful."); } }; // 接收信息 channel.basicConsume(QUEUE_NAME, true, consumer); } catch (Exception e) { e.printStackTrace(); } } }
下面咱們開始測試: 先運行測試類的main方法,接下來運行生產者1的main方法,結果以下: 再來運行運行生產者2的main方法,結果以下:
能夠看到消費者只接收到了生產者1發送的消息,消息2的沒有收到。至於消息有沒發送到message broker,或者說 緣由就是:在發送者發送消息過程當中,執行性能
channel.txCommit();
以前,若是出現了什麼問題,就行執行測試
channel.txRollback();
回滾事務。 可是事務影響性能比較嚴重,因此仍是建議使用方案二。spa
package com.example.demo.queue.confirm.confirm.single; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.example.demo.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { private static final String QUEUE_NAME = "message_confirm_single_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); String msg = "Producer發出的信息:"; channel.confirmSelect(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("send msg : "+msg); if (channel.waitForConfirms()) { System.out.println("信息發送成功."); } else { System.out.println("信息發送失敗."); } channel.close(); connection.close(); } }
confirm:批量發送消息後確認3d
package com.example.demo.queue.confirm.confirm.batch; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.example.demo.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { private static final String QUEUE_NAME = "message_confirm_batch_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); String msg = "msg from producer:"; channel.confirmSelect(); for(int i=0;i<10;i++) { channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("send msg["+i+"] : "+msg); } if (channel.waitForConfirms()) { System.out.println("msg send successfully"); } else { System.out.println("msg send fail"); } channel.close(); connection.close(); } }
confirm:發送信息,異步確認code
package com.example.demo.queue.confirm.confirm.synch; import java.io.IOException; import java.util.Collections; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.TimeoutException; import com.example.demo.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; public class Producer { private static final String QUEUE_NAME = "message_confirm_synch_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.confirmSelect(); // 存放信息的序列化 SortedSet<Long> longTreeSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); // 添加監聽器 channel.addConfirmListener(new ConfirmListener() { /** * 收到消費者已經處理完消息以後發出的反饋,觸發該方法 */ @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("handleNack() deliveryTag="+deliveryTag+",multiple="+multiple); if(multiple) { longTreeSet.headSet(deliveryTag+1).clear(); } else { longTreeSet.remove(deliveryTag); } } /** * 長時間沒收到消費者已經處理完消息以後發出的反饋,觸發該方法 */ @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("handleAck() deliveryTag="+deliveryTag+",multiple="+multiple); if(multiple) { longTreeSet.headSet(deliveryTag+1).clear(); } else { longTreeSet.remove(deliveryTag); } } }); while(true) { long seqNo = channel.getNextPublishSeqNo(); channel.basicPublish("", QUEUE_NAME, null, ("seqNo:"+seqNo).getBytes()); longTreeSet.add(seqNo); } } }