RabbitMQ的消息可靠性能夠分爲三部分,分別是消息的接收、消息的保存(消息持久化,前面文章已經講了)、消息的發送。java
RabbitMQ消息的接收如同在以前講消息隊列時說的有三種狀況,分別是同步可靠發送、異步可靠發送、異步發送。RabbitMQ提供一下兩種方式實現消息的發送的可靠:spring
事務機制服務器
RabbitMQ提供事務機制(同步可靠)保證消息的可靠發送。事務主要有txSelect()、txCommit()和txRollback()三個方法。txSelect()用於開啓事務,txCommit()用於提交事務,若是在txCommit()以前Broker出現錯誤拋出異常則須要txRollback()回滾事務。示例代碼以下:異步
Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); try { channel.txSelect(); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, "hello".getBytes(StandardCharsets.UTF_8)); channel.txCommit(); } catch (Exception e) { channel.txRollback(); }
RabbitMQ事務的AMQP消息以下:ide
Confirm模式函數
Confirm模式主要解決消息可靠而且比事務模式更高效。生產者將信道設置成confirm模式,一旦信道進入confirm模式,全部在該信道上面發佈的消息都會被指派一個惟一的ID(從1開始),一旦消息被投遞到全部匹配的隊列以後,broker就會發送一個確認給生產者(包含消息的惟一ID),這就使得生產者知道消息已經正確到達目的隊列了,若是消息和隊列是可持久化的,那麼確認消息會將消息寫入磁盤以後發出,broker回傳給生產者的確認消息中deliver-tag域包含了確認消息的序列號,此外broker也能夠設置basic.ack的multiple域,表示到這個序列號以前的全部消息都已經獲得了處理。spring-boot
Confirm模式主要有三種模式:ui
普通模式的Confirm是同步可靠模式(也能夠超時等待),有點類型事務模式,代碼以下:spa
channel.confirmSelect(); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, "hello".getBytes(StandardCharsets.UTF_8)); if (channel.waitForConfirms()) { System.out.println("message send Success"); }
異步模式的Confirm是異步可靠模式,該模式就是前面文章將的註冊回調函數,服務器成功接收後將會回調該函數。RabbitMQ的異步Confirm基本也是這種,只能有少量不一樣。RabbitMQ在客戶端須要維護沒有成功發送消息Id集,一些參數的意義以下:.net
代碼示例以下:
SortedSet<Long> unconfirmSet = Collections.synchronizedSortedSet(new TreeSet<>()); channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println(deliveryTag + " :Ack"); if (multiple) { unconfirmSet.headSet(deliveryTag + 1).clear(); } else { unconfirmSet.remove(deliveryTag); } } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println(deliveryTag + " :Nack"); if (multiple) { unconfirmSet.headSet(deliveryTag + 1).clear(); } else { unconfirmSet.remove(deliveryTag); } } }); long nextPublishSeqNo = channel.getNextPublishSeqNo(); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, "hello".getBytes(StandardCharsets.UTF_8)); unconfirmSet.add(nextPublishSeqNo);
RabbitMQ提供消息確認機制(Message Ack),RabbitMQ只有在Consumer發送Ack後纔會刪除消息。
Consumer發送Ack有下面兩種方式:
自動發送Ack,當Consumer接收到消息後自動發送Ack給RabbitMQ
channel.basicConsume(queueName, true, new DefaultConsumer(consumer) { public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { } });
手動發送Ack,當Consumer處理完這條消息後手動發送Ack給RabbitMQ
channel.basicConsume(queueName, true, new DefaultConsumer(consumer) { public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { channel.basicAck(envelope.getDeliveryTag(), false); } });
Spring Framework有一套使用RabbitMQ的API,因此咱們僅僅使用Spring API便可操做RabbitMQ。
Spring Boot使用須要導入一下Jar包:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
而後在yml或者properties文件中配置以下屬性:
spring: rabbitmq: host: 47.100.20.186 username: guest password: guest
Spring Boot會根據屬性配置經過RabbitAutoConfiguration注入相關對象進行使用。
RabbitMQ配置類,主要配置RabbitMQ的Queue、Exchange等
@Configuration public class RabbitConfig { public static final String QUEUE = "spring_queue_hello"; public static final String ROUTING_KEY = "spring_key_hello"; @Bean public Exchange exchange() { return new DirectExchange(EXCHANGE_NAME, true, false, null); } @Bean public Queue queue() { return new Queue(QUEUE, true, false, false, null); } @Bean public Binding binding(Exchange exchange, @Qualifier("queue") Queue queue) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs(); } }
生成者,Spring經過AmqpTemplate發送消息
@Component public class Producer { @Autowired private AmqpTemplate amqpTemplate; // 這裏使用Spring Scheduled每3s發送消息 @Scheduled(fixedDelay = 3000L) public void send() { String message = "hello"; amqpTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, message); } }
生成者,經過註解消費
@Component @RabbitListener(queues = RabbitConfig.QUEUE) public class Consumer { @RabbitHandler public void process(String message) { System.out.println(message); } }
由上面能夠,Spring幫助咱們編解碼了,Spring經過MessageConverter進行消息的編解碼,Spring默認注入SimpleMessageConverter消息解析器進行消息的編解碼,其支持Java序列化編解碼,若是咱們想使用Json,能夠注入JacksonMessageConverter進行編解碼,代碼以下:
@Bean public Jackson2JsonMessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); }
http://blog.didispace.com/spring-boot-rabbitmq/ http://blog.csdn.net/u013256816/article/details/55515234