RabbitMQ消息的可靠性主要包括兩方面,一方面是經過實現消費的重試機制(經過@Retryable來實現重試,能夠設置重試次數和重試頻率,可是要保證冪等性),另外一方面就是實現消息生產者的可靠投遞(注意消費單冪等),下面主要講下生產者實現的可靠消息投遞。java
rabbitTemplate的發送流程是這樣的:web
1 發送數據並返回(不確認rabbitmq服務器已成功接收)redis
2 異步的接收從rabbitmq返回的ack確認信息spring
3 收到ack後調用confirmCallback函數 注意:在confirmCallback中是沒有原message的,因此沒法在這個函數中調用重發,confirmCallback只有一個通知的做用 在這種狀況下,若是在2,3步中任什麼時候候切斷鏈接,咱們都沒法確認數據是否真的已經成功發送出去,從而形成數據丟失的問題。apache
最完美的解決方案只有1種: 使用rabbitmq的事務機制。 可是在這種狀況下,rabbitmq的效率極低,每秒鐘處理的message在幾百條左右。實在不可取。json
第二種解決方式,使用同步的發送機制,也就是說,客戶端發送數據,rabbitmq收到後返回ack,再收到ack後,send函數才返回。代碼相似這樣:緩存
建立channel send message wait for ack(or 超時) close channel 返回成功or失敗
一樣的,因爲每次發送message都要從新創建鏈接,效率很低。服務器
基於上面的分析,咱們使用一種新的方式來作到數據的不丟失。網絡
在rabbitTemplate異步確認的基礎上dom
1 在redis中緩存已發送的message
2 經過confirmCallback或者被確認的ack,將被確認的message從本地刪除
3 定時掃描本地的message,若是大於必定時間未被確認,則重發
固然了,這種解決方式也有必定的問題: 想象這種場景,rabbitmq接收到了消息,在發送ack確認時,網絡斷了,形成客戶端沒有收到ack,重發消息。(相比於丟失消息,重發消息要好解決的多,咱們能夠在consumer端作到冪等)。 自動重試的代碼以下:
package cn.chinotan.service.reliabletransmission; /** * @program: test * @description: rabbitMq常量 * @author: xingcheng * @create: 2018-08-12 12:30 **/ public class MyConstant { public static final String MY_EXCHANGE = "my_exchange"; public static final String ERROR_EXCHANGE = "error_exchange"; public static final String MY_QUEUE_THREE = "my_queue_three"; public final static String KEY_PREFIX = "test:rabbitMq:"; /** * consumer失敗後等待時間(mils) */ public static final int ONE_MINUTE = 1 * 60 * 1000; /** * MQ消息retry時間 */ public static final int RETRY_TIME_INTERVAL = ONE_MINUTE; /** * MQ消息有效時間 */ public static final int VALID_TIME = ONE_MINUTE; }
package cn.chinotan.service.reliabletransmission; import java.io.Serializable; /** * @program: test * @description: 包裝消息 * @author: xingcheng * @create: 2018-09-24 15:32 **/ public class MessageWithTime implements Serializable { private String id; private long time; private String message; public String getId() { return id; } public void setId(String id) { this.id = id; } public long getTime() { return time; } public void setTime(long time) { this.time = time; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } }
package cn.chinotan.service.reliabletransmission; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * rabbitMQ配置 */ @Configuration public class ReliableRabbitConfig { @Bean public DirectExchange myExchange() { return new DirectExchange(MyConstant.MY_EXCHANGE, true, false); } @Bean public Queue myQueueOne() { return new Queue(MyConstant.MY_QUEUE_THREE, true); } @Bean public Binding queueOneBinding() { return BindingBuilder.bind(myQueueOne()).to(myExchange()).withQueueName(); } }
package cn.chinotan.service.reliabletransmission; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import java.util.Map; import java.util.UUID; /** * @program: test * @description: rabbitService * @author: xingcheng * @create: 2018-09-24 14:28 **/ @Service public class RabbitMQService { Logger logger = LoggerFactory.getLogger(RabbitMQService.class); @Autowired StringRedisTemplate redisTemplate; @Autowired private RabbitTemplate rabbitTemplate; public Boolean send(String exchange, String routingKey, Object message) { try { String key = StringUtils.join(MyConstant.KEY_PREFIX, UUID.randomUUID().toString().replace("-", "").toLowerCase()); // 發送前保存消息和時間和id到redis緩存中 MessageWithTime messageWithTime = new MessageWithTime(); messageWithTime.setId(key); messageWithTime.setMessage(JSONObject.toJSONString(message)); messageWithTime.setTime(System.currentTimeMillis()); redisTemplate.opsForValue().set(key, JSONObject.toJSONString(messageWithTime)); // 異步回調通知 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { logger.info("message send success--id:[{}]", correlationData.getId()); // 發送成功後,刪除redis緩存 redisTemplate.delete(correlationData.getId()); } else { // 發送失敗後打印日誌,進行重試 logger.error("message send fail--id:[{}]", correlationData.getId()); } }); CorrelationData correlationData = new CorrelationData(key); rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData); } catch (Exception e) { logger.error("發送消息異常{}", e); return false; } return true; } Boolean send(String exchange, String routingKey, MessageWithTime message) { try { // 異步回調通知 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { logger.info("message send success--id:[{}]", correlationData.getId()); // 發送成功後,刪除redis緩存 redisTemplate.delete(correlationData.getId()); } else { // 發送失敗後打印日誌,進行重試 logger.error("message send fail--id:[{}]", correlationData.getId()); } }); CorrelationData correlationData = new CorrelationData(message.getId()); Map map = JSON.parseObject(message.getMessage(), Map.class); rabbitTemplate.convertAndSend(exchange, routingKey, map, correlationData); } catch (Exception e) { logger.error("發送消息異常{}", e); return false; } return true; } }
package cn.chinotan.service.reliabletransmission; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.Map; /** * 生產者 */ @Service public class ReliableProducr { private static final Logger LOGGER = LoggerFactory.getLogger(ReliableProducr.class); @Autowired private RabbitMQService rabbitMQService; public Boolean send(Map msg) { return rabbitMQService.send(MyConstant.MY_EXCHANGE, MyConstant.MY_QUEUE_THREE, msg); } public Boolean send(MessageWithTime msg) { return rabbitMQService.send(MyConstant.MY_EXCHANGE, MyConstant.MY_QUEUE_THREE, msg); } }
package cn.chinotan.service.reliabletransmission; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.context.WebApplicationContext; import org.springframework.web.context.support.WebApplicationContextUtils; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; import javax.servlet.annotation.WebListener; /** * @program: test * @description: 可靠投遞監聽器 * @author: xingcheng * @create: 2018-09-24 16:05 **/ @WebListener public class ReliableTransContextListener implements ServletContextListener { Logger logger = LoggerFactory.getLogger(ReliableTransContextListener.class); private WebApplicationContext springContext; @Override public void contextInitialized(ServletContextEvent sce) { logger.info("ReliableTransContextListener init start..........."); springContext = WebApplicationContextUtils.getWebApplicationContext(sce.getServletContext()); if (springContext != null) { RetryCache retryCache = (RetryCache) springContext.getBean("retryCache"); new Thread(() -> retryCache.startRetry()).start(); } } @Override public void contextDestroyed(ServletContextEvent sce) { } }
package cn.chinotan.service.reliabletransmission; import com.alibaba.fastjson.JSON; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import java.util.List; import java.util.Set; /** * @program: test * @description: 緩存重試 * @author: xingcheng * @create: 2018-09-24 16:12 **/ @Component("retryCache") public class RetryCache { private boolean stop = false; Logger logger = LoggerFactory.getLogger(RetryCache.class); @Autowired private ReliableProducr producr; @Autowired private StringRedisTemplate redisTemplate; private final String STAR = "*"; public void startRetry() { while (!stop) { try { Thread.sleep(MyConstant.RETRY_TIME_INTERVAL); } catch (InterruptedException e) { e.printStackTrace(); } long now = System.currentTimeMillis(); Set<String> keys = redisTemplate.keys(StringUtils.join(MyConstant.KEY_PREFIX, STAR)); if (keys != null && !keys.isEmpty()) { List<String> list = redisTemplate.opsForValue().multiGet(keys); list.forEach(value -> { MessageWithTime messageWithTime = JSON.parseObject(value, MessageWithTime.class); if (null != messageWithTime) { if (messageWithTime.getTime() + 3 * MyConstant.VALID_TIME < now) { logger.error("send message {} failed after 3 min ", messageWithTime); redisTemplate.delete(messageWithTime.getId()); } else if (messageWithTime.getTime() + MyConstant.VALID_TIME < now) { Boolean send = producr.send(messageWithTime); logger.info("進行從新投遞消息"); if (!send) { logger.error("retry send message failed {}", messageWithTime); } } } }); } } } }
package cn.chinotan.service.reliabletransmission; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Map; /** * queueThree消費者 */ @Component public class MyQueueThreeConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(MyQueueThreeConsumer.class); /** * 消費者作好冪等 * * @param content */ @RabbitListener(queues = MyConstant.MY_QUEUE_THREE) @RabbitHandler public void process(Map content) { LOGGER.info("消費者,queueThree開始執行 {}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); LOGGER.info("消費者,queueThree消費內容:[{}]", JSON.toJSONString(content)); } }
import cn.chinotan.service.reliabletransmission.MyConstant; import cn.chinotan.service.reliabletransmission.RabbitMQService; import cn.chinotan.service.reliabletransmission.ReliableProducr; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.HashMap; import java.util.Map; /** * @program: test * @description: 可靠投遞測試 * @author: xingcheng * @create: 2018-09-24 15:57 **/ @RunWith(SpringRunner.class) @SpringBootTest(classes = MyApplication.class) public class ReliableTransmissionTest { @Autowired private ReliableProducr producr; @Autowired private RabbitMQService rabbitMQService; /** * 正常狀況測試 * @throws Exception */ @Test public void reliableTransmissionTest() throws Exception { Map<String, String> map = new HashMap<>(); map.put("name", "xingheng"); producr.send(map); } /** * 異常狀況測試 * @throws Exception */ @Test public void reliableTransmissionFailTest() throws Exception { Map<String, String> map = new HashMap<>(); map.put("name", "xingheng"); rabbitMQService.send(MyConstant.ERROR_EXCHANGE, MyConstant.MY_QUEUE_THREE, map); } }
注意事項:
1.配置中要開啓發布者確認,相似這樣:
spring: rabbitmq: publisher-confirms: true
2.若是要測試異常狀況只須要將消息發送到一個不存在的交換機便可
3.注意消費端冪等
簡單測試結果:
在重試一次後,會將它發送到正確的交換機,因而發送成功