rabbitmq學習記錄(八)消息發佈確認機制

RabbitMQ服務器崩了致使的消息數據丟失,已經持久化的消息數據咱們能夠經過消息持久化來預防。可是,若是消息從生產者發送到vhosts過程當中出現了問題,持久化消息數據的方案就無效了。 RabbitMQ爲咱們提供了兩種解決方案:java

方案一:經過AMQP事務機制實現,這也是AMQP協議層面提供的解決方案;

實現方法以及測試結果以下: 生產者一:服務器

package com.example.demo.queue.confirm.amqp;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer01 {

	private static final String QUEUE_NAME = "message_confirm_ampq_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		String msg = "msg from producer:";
		try {
			channel.txSelect();
			channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
			System.out.println("send msg : "+msg);
			channel.txCommit();
		} catch (Exception e1) {
			channel.txRollback();
		}
		channel.close();
		connection.close();
	}
	
}

生產者二:異步

package com.example.demo.queue.confirm.amqp;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer02 {

	private static final String QUEUE_NAME = "message_confirm_ampq_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		String msg = "msg from producer:";
		try {
			channel.txSelect();
			channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
			System.out.println("send msg : "+msg);
			int k = 2/0;// 觸發rollback事件
			channel.txCommit();
		} catch (Exception e1) {
			channel.txRollback();
		}
		channel.close();
		connection.close();
	}
	
}

消費者:ide

package com.example.demo.queue.confirm.amqp;

import java.io.IOException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Consumer01 {

	// 隊列名稱
	private static final String QUEUE_NAME = "message_confirm_ampq_queue";
	
	public static void main(String[] args) {
		try {
			// 獲取鏈接
			Connection connection = ConnectionUtil.getConnection();
			// 建立通道
			final Channel channel = connection.createChannel();
			// 聲明隊列
			channel.queueDeclare(QUEUE_NAME, true, false, false, null);
			// 定義消費者
			DefaultConsumer consumer = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
					String msg = new String(body,"UTF-8");
					System.out.println("[1]:receive msg:"+msg);
					System.out.println("[1]:deal msg successful.");
				}
			};
			// 接收信息
			channel.basicConsume(QUEUE_NAME, true, consumer);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
}

下面咱們開始測試: 先運行測試類的main方法,接下來運行生產者1的main方法,結果以下: 再來運行運行生產者2的main方法,結果以下: 能夠看到消費者只接收到了生產者1發送的消息,消息2的沒有收到。至於消息有沒發送到message broker,或者說 緣由就是:在發送者發送消息過程當中,執行性能

channel.txCommit();

以前,若是出現了什麼問題,就行執行測試

channel.txRollback();

回滾事務。 可是事務影響性能比較嚴重,因此仍是建議使用方案二。spa

方案二:經過將channel設置成confirm模式來實現; confirm:發送單條消息後確認

package com.example.demo.queue.confirm.confirm.single;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {

	private static final String QUEUE_NAME = "message_confirm_single_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		String msg = "Producer發出的信息:";
		channel.confirmSelect();
		channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
		System.out.println("send msg : "+msg);
		if (channel.waitForConfirms()) {
			System.out.println("信息發送成功.");
		} else {
			System.out.println("信息發送失敗.");
		}
		channel.close();
		connection.close();
	}
	
}

confirm:批量發送消息後確認3d

package com.example.demo.queue.confirm.confirm.batch;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {

	private static final String QUEUE_NAME = "message_confirm_batch_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		String msg = "msg from producer:";
		channel.confirmSelect();
		for(int i=0;i<10;i++) {
			channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
			System.out.println("send msg["+i+"] : "+msg);
		}
		if (channel.waitForConfirms()) {
			System.out.println("msg send successfully");
		} else {
			System.out.println("msg send fail");
		}
		channel.close();
		connection.close();
	}
	
}

confirm:發送信息,異步確認code

package com.example.demo.queue.confirm.confirm.synch;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;

public class Producer {

	private static final String QUEUE_NAME = "message_confirm_synch_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		channel.confirmSelect();
		// 存放信息的序列化
		SortedSet<Long> longTreeSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
		// 添加監聽器
		channel.addConfirmListener(new ConfirmListener() {
			
			/**
			 * 收到消費者已經處理完消息以後發出的反饋,觸發該方法
			 */
			@Override
			public void handleNack(long deliveryTag, boolean multiple) throws IOException {
				System.out.println("handleNack() deliveryTag="+deliveryTag+",multiple="+multiple);
				if(multiple) {
					longTreeSet.headSet(deliveryTag+1).clear();
				} else {
					longTreeSet.remove(deliveryTag);
				}
			}
			
			/**
			 * 長時間沒收到消費者已經處理完消息以後發出的反饋,觸發該方法
			 */
			@Override
			public void handleAck(long deliveryTag, boolean multiple) throws IOException {
				System.out.println("handleAck() deliveryTag="+deliveryTag+",multiple="+multiple);
				if(multiple) {
					longTreeSet.headSet(deliveryTag+1).clear();
				} else {
					longTreeSet.remove(deliveryTag);
				}
			}
		});
		while(true) {
			long seqNo = channel.getNextPublishSeqNo();
			channel.basicPublish("", QUEUE_NAME, null, ("seqNo:"+seqNo).getBytes());
			longTreeSet.add(seqNo);
		}
	}
	
}
相關文章
相關標籤/搜索