咱們如今有兩個主要的系統一個是活動系統一個是獎品系統,活動系統會調用獎品系統發放獎勵。git
最開始兩個之間只經過http直接調用,優勢:開發成本低,沒有多餘組件引入;發放獎勵實時返回;活動系統不須要管獎品是否還有剩餘庫存。缺點:這樣就致使上游活動系統強依賴於下游的獎品系統,若是一旦獎品系統掛掉,咱們活動系統也就不可用了;這裏還有個bug,在調用獎品系統發放獎勵,獎品系統發放成功了,可是活動系統請求超時了,就會致使提示客戶的沒有獎品了,可是實際獎品又發放了。github
訪問量上來後發直接走http確定是不行的,因此引入了MQ將將兩個系統隔離開,優勢:全部發放流程異步執行,活動系統響應更快了;這兩個系統就變成弱引用關係,即便獎品系統掛掉,活動系統仍能正常運行;不會出現上面說的bug了;缺點:發放獎勵將會有延遲;引入MQ增長了項目複雜度,咱們必須去考慮消息的丟失,重複消費等問題;活動系統須要知道獎品的庫存狀況。redis
針對上面使用MQ發放獎勵會遇到的問題,咱們能夠經過面的方案來解決。spring
在數據庫建立一張異常消息表。數據庫
這樣子雖然仍是不能徹底杜絕消息丟失,可是絕大部分狀況下是沒有問題的。dom
爲每一個消息生成業務流水號,將流水號和發放裏的參數一塊兒發送到獎品系統,獎品系統在發放獎勵的時候先判斷這個流水號是否存在,存在就表示該獎品已經發過來直接返回發放成功,若是沒有就進行發放獎勵操做。異步
咱們在配置活動的時候會將獎品的庫存放到咱們活動系統,在發MQ消息以前回去判斷是否有剩餘庫存,若是沒有直接返回獎勵領完了,若是有才回去發MQ消息。扣減庫存能夠參考基於redis實現的扣減庫存。ide
下面是引入MQ事後咱們系統的流程圖 spring-boot
/** * Rabbit 發送消息 * * @author yuhao.wang */ @Service public class RabbitSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback, InitializingBean { private final Logger logger = LoggerFactory.getLogger(RabbitSender.class); /** * Rabbit MQ 客戶端 */ @Autowired private RabbitTemplate rabbitTemplate; /** * 系統配置 */ @Autowired private SystemConfig systemConfig; /** * 發送MQ消息 * * @param exchangeName 交換機名稱 * @param routingKey 路由名稱 * @param message 發送消息體 */ public void sendMessage(String exchangeName, String routingKey, Object message) { Assert.notNull(message, "message 消息體不能爲NULL"); Assert.notNull(exchangeName, "exchangeName 不能爲NULL"); Assert.notNull(routingKey, "routingKey 不能爲NULL"); // 獲取CorrelationData對象 CorrelationData correlationData = this.correlationData(message); correlationData.setExchange(exchangeName); correlationData.setRoutingKey(routingKey); correlationData.setMessage(message); logger.info("發送MQ消息,消息ID:{},消息體:{}, exchangeName:{}, routingKey:{}", correlationData.getId(), JSON.toJSONString(message), exchangeName, routingKey); // 發送消息 this.convertAndSend(exchangeName, routingKey, message, correlationData); } /** * 用於實現消息發送到RabbitMQ交換器後接收ack回調。 * 若是消息發送確認失敗就進行重試。 * * @param correlationData * @param ack * @param cause */ @Override public void confirm(org.springframework.amqp.rabbit.support.CorrelationData correlationData, boolean ack, String cause) { // 消息回調確認失敗處理 if (!ack && correlationData instanceof CorrelationData) { CorrelationData correlationDataExtends = (CorrelationData) correlationData; //消息發送失敗,就進行重試,重試事後還不能成功就記錄到數據庫 if (correlationDataExtends.getRetryCount() < systemConfig.getMqRetryCount()) { logger.info("MQ消息發送失敗,消息重發,消息ID:{},重發次數:{},消息體:{}", correlationDataExtends.getId(), correlationDataExtends.getRetryCount(), JSON.toJSONString(correlationDataExtends.getMessage())); // 將重試次數加一 correlationDataExtends.setRetryCount(correlationDataExtends.getRetryCount() + 1); // 重發發消息 this.convertAndSend(correlationDataExtends.getExchange(), correlationDataExtends.getRoutingKey(), correlationDataExtends.getMessage(), correlationDataExtends); } else { //消息重試發送失敗,將消息放到數據庫等待補發 logger.warn("MQ消息重發失敗,消息入庫,消息ID:{},消息體:{}", correlationData.getId(), JSON.toJSONString(correlationDataExtends.getMessage())); // TODO 保存消息到數據庫 } } else { logger.info("消息發送成功,消息ID:{}", correlationData.getId()); } } /** * 用於實現消息發送到RabbitMQ交換器,但無相應隊列與交換器綁定時的回調。 * 基本上來講線上不可能出現這種狀況,除非手動將已經存在的隊列刪掉,不然在測試階段確定能測試出來。 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.error("MQ消息發送失敗,replyCode:{}, replyText:{},exchange:{},routingKey:{},消息體:{}", replyCode, replyText, exchange, routingKey, JSON.toJSONString(message.getBody())); // TODO 保存消息到數據庫 } /** * 消息相關數據(消息ID) * * @param message * @return */ private CorrelationData correlationData(Object message) { return new CorrelationData(UUID.randomUUID().toString(), message); } /** * 發送消息 * * @param exchange 交換機名稱 * @param routingKey 路由key * @param message 消息內容 * @param correlationData 消息相關數據(消息ID) * @throws AmqpException */ private void convertAndSend(String exchange, String routingKey, final Object message, CorrelationData correlationData) throws AmqpException { try { rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData); } catch (Exception e) { logger.error("MQ消息發送異常,消息ID:{},消息體:{}, exchangeName:{}, routingKey:{}", correlationData.getId(), JSON.toJSONString(message), exchange, routingKey, e); // TODO 保存消息到數據庫 } } @Override public void afterPropertiesSet() throws Exception { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } }
生產者端使用ConfirmCallback和ReturnCallback回調機制,最大限度的保證消息不丟失,對原有CorrelationData類進行擴展,來實現消息的重發,具體請看源碼。測試
/** * 發放優惠券的MQ處理 * * @author yuhao.wang */ @Service @ConditionalOnClass({RabbitTemplate.class}) public class SendMessageListener { private final Logger logger = LoggerFactory.getLogger(SendMessageListener.class); @RabbitListener(queues = RabbitConstants.QUEUE_NAME_SEND_COUPON) public void process(SendMessage sendMessage, Channel channel, Message message) throws Exception { logger.info("[{}]處理髮放優惠券獎勵消息隊列接收數據,消息ID:{},消息體:{}", RabbitConstants.QUEUE_NAME_SEND_COUPON, message.getMessageProperties().getCorrelationIdString(), JSON.toJSONString(sendMessage)); try { // 參數校驗 Assert.notNull(sendMessage, "sendMessage 消息體不能爲NULL"); // TODO 處理消息 // 確認消息已經消費成功 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { logger.error("MQ消息處理異常,消息ID:{},消息體:{}", message.getMessageProperties().getCorrelationIdString(), JSON.toJSONString(sendMessage), e); try { // TODO 保存消息到數據庫 // 確認消息已經消費成功 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e1) { logger.error("保存異常MQ消息到數據庫異常,放到死性隊列,消息ID:{}", message.getMessageProperties().getCorrelationIdString()); // 確認消息將消息放到死信隊列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } } }
消費者端主要作了消息消費失敗的容錯處理。
https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases
spring-boot-student-rabbitmq 工程