實戰|固然我仍是更建議你用MQ搞定超時訂單的-(2)

1、依然用三根雞毛作引言

  • 真的! 不騙大家的喔~ 相信你們都遇到相似於:訂單30min後未支付自動取消的開發任務

2、MQ 延遲消息實現原理

傲嬌的RabbitMQ官網赫然寫着:git

RabbitMQ is the most widely deployed open source message broker.
複製代碼

因而可知,RabbitMQ是一個消息中間件,生產者生成消息,消費者消費消息,它遵循AMQP(高級消息隊列協議),是最普遍部署的開源消息代理。 因此,今天我用RabbitMQ爲你們搗鼓一下延遲隊列。程序員

使用RabbitMQ來實現延遲任務必須先了解RabbitMQ的兩個概念:消息的TTL和死信Exchange,經過這二者的組合來實現上述需求。github

  • 消息的TTL(Time To Live)

消息的TTL就是消息的存活時間。RabbitMQ 能夠對隊列和消息分別設置TTL。對隊列設置就是隊列沒有消費者連着的保留時間,也能夠對每個單獨的消息作單獨的設置。超過了這個時間,咱們認爲這個消息就死了,稱之爲死信。若是隊列設置了,消息也設置了,那麼會取小的(誰小誰尷尬)。因此一個消息若是被路由到不一樣的隊列中,這個消息死亡的時間有可能不同(不一樣的隊列設置)。這裏單講單個消息的TTL,由於它纔是實現延遲任務的關鍵。windows

那麼,如何設置這個TTL值呢?有兩種方式,第一種是在建立隊列的時候設置隊列的"x-message-ttl"屬性,以下:api

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
複製代碼

這樣全部被投遞到該隊列的消息都最多不會存活超過6s。bash

另外一種方式即是針對每條消息設置TTL,代碼以下:app

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());
複製代碼

這樣這條消息的過時時間也被設置成了6s。異步

但這兩種方式是有區別的,若是設置了隊列的TTL屬性,那麼一旦消息過時,就會被隊列丟棄,而第二種方式,消息即便過時,也不必定會被立刻丟棄,由於消息是否過時是在即將投遞到消費者以前斷定的,若是當前隊列有嚴重的消息積壓狀況,則已過時的消息也許還能存活較長時間。 另外,還須要注意的一點是,若是不設置TTL,表示消息永遠不會過時,若是將TTL設置爲0,則表示除非此時能夠直接投遞該消息到消費者,不然該消息將會被丟棄。

單靠死信還不能實現延遲任務,還要靠Dead Letter Exchange

  • Dead Letter Exchanges

Exchage的概念在這裏就不在贅述。一個消息在知足以下條件下,會進死信路由,記住這裏是路由而不是隊列,一個路由能夠對應不少隊列。

  1. 一個消息被Consumer拒收了,而且reject方法的參數裏requeuefalse。也就是說不會被再次放在隊列裏,被其餘消費者使用。
  2. 上面的消息的TTL到了,消息就過時了。
  3. 隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。

Dead Letter Exchange其實就是一種普通的exchange,和建立其餘exchange沒有兩樣。只是在某一個設置Dead Letter Exchange的隊列中有消息過時了,會自動觸發消息的轉發,發送到Dead Letter Exchange中去。

  • 原理圖

延遲任務經過消息的TTLDead Letter Exchange來實現。咱們須要創建2個隊列,一個用於發送消息,一個用於消息過時後的轉發目標隊列。

生產者生產一條延時消息,根據須要延時時間的不一樣,利用不一樣的routingkey將消息路由到不一樣的延時隊列,每一個隊列都設置了不一樣的TTL屬性,並綁定在同一個死信交換機中,消息過時後,根據routingkey的不一樣,又會被路由到不一樣的死信隊列中,消費者只須要監聽對應的死信隊列進行處理便可。

3、實戰演練

  • 下載安裝 windows示例
  1. 下載RabbitMQ,須要ErLang環境的支持
  2. 運行命令
rabbitmq-plugins enable rabbitmq_management
複製代碼

開啓Web管理插件,而後啓動rabbitmq-server訪問http://localhost:15672/#/,輸入密令後你能看到就能夠啦.

  • 插件安裝

在 RabbitMQ 3.6.x 以前咱們通常採用死信隊列(DLX)+TTL過時時間來實現延遲隊列,咱們這裏不作過多介紹,能夠參考其餘道友的:TTL+DLX實現方式。

在 RabbitMQ 3.6.x開始(如今都3.8.+了),RabbitMQ 官方提供了延遲隊列的插件,能夠下載放置到 RabbitMQ 根目錄下的 plugins 下。延遲隊列插件下載地址:

  1. 官方地址 2. JFrog Bintray地址 我安裝的時候在官網沒找到3.7.x的,可是3.8.0是向下兼容3.7.x的,而後我又在Bintray找到了3.7.x,你們信不過就找對應的版本插件哈....

下載好,放到 plugins的目錄中,運行以下命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
複製代碼
  • 搭建SpringBoot環境
  1. yml配置以下
#集成 rabbitmq
 rabbitmq:
 host: localhost
 port: 5672
 username: guest
 password: guest
 virtual-host: /
 connection-timeout: 150000
 publisher-confirms: true    #開啓確認機制 採用消息確認模式,
 publisher-returns: true     #開啓return確認機制
 template:                   #消息發出去後,異步等待響應
 mandatory: true           #設置爲 true 後,消費者在消息沒有被路由到合適隊列狀況下會被return監聽,而不會自動刪除
複製代碼
  1. 啓動配置聲明幾個Bean
@Configuration
public class MQConfig {
    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

    public static final String DELAY_EXCHANGE = "Ex.DelayExchange";
    public static final String DELAY_QUEUE = "MQ.DelayQueue";
    public static final String DELAY_KEY = "delay.#";

    /** * 延時交換機 * * @return TopicExchange */
    @Bean
    public TopicExchange delayExchange() {
        Map<String, Object> pros = new HashMap<>(3);
        //設置交換機支持延遲消息推送
        pros.put("x-delayed-message", "topic");
        TopicExchange exchange = new TopicExchange(DELAY_EXCHANGE, true, false, pros);
        //咱們在也能夠在 Exchange 的聲明中能夠設置exchange.setDelayed(true)來開啓延遲隊列
        exchange.setDelayed(true);
        return exchange;
    }

    /** * 延時隊列 * * @return Queue */
    @Bean
    public Queue delayQueue() {
        return new Queue(DELAY_QUEUE, true);
    }

    /** * 綁定隊列和交換機,以及設定路由規則key * * @return Binding */
    @Bean
    public Binding delayBinding() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(DELAY_KEY);
    }
}
複製代碼
  1. 建立一個生產者
/** * @author LiJing * @ClassName: MQSender * @Description: MQ發送 生產者 * @date 2019/10/9 11:50 */
@Component
public class MQSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("correlationData: " + correlationData);
            System.out.println("ack: " + ack);
            if (!ack) {
                System.out.println("異常處理....");
            }
        }
    };

    final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange , String routingKey) {
            System.out.println("return exchange: " + exchange + ", routingKey: "
                    + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
        }
    };

    public void sendDelay(Object message, int delayTime) {
        //採用消息確認模式,消息發出去後,異步等待響應
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + 時間戳 全局惟一
        CorrelationData correlationData = new CorrelationData("delay" + System.nanoTime());
        //發送消息時指定 header 延遲時間
        rabbitTemplate.convertAndSend(MQConfig.DELAY_EXCHANGE, "delay.boot", message,
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        //設置消息持久化
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        // 兩種方式 都可
                        //message.getMessageProperties().setHeader("x-delay", "6000");
                        message.getMessageProperties().setDelay(delayTime);
                        return message;
                    }
                }, correlationData);
    }
}
複製代碼
  1. 建立一個消費者
/** * @author LiJing * @ClassName: MQReceiver * @Description: 消費者 * @date 2019/10/9 11:51 */
@Component
@Slf4j
public class MQReceiver {
    @RabbitListener(queues = MQConfig.DELAY_QUEUE)
    @RabbitHandler
    public void onDelayMessage(Message msg, Channel channel) throws IOException {
        long deliveryTag = msg.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag, true);
        System.out.println("延遲隊列在" + LocalDateTime.now()+"時間," + "延遲後收到消息:" + new String(msg.getBody()));
    }
}
複製代碼

5.建立一個mq的測試控制器

@RestController
@RequestMapping("/mq")
public class MqController extends AbstractController {

    @Autowired
    private MQSender mqSender;

    @GetMapping(value = "/send/delay")
    public void sendDelay(int delayTime) {
        String msg = "hello delay";
        System.out.println("發送開始時間:" + LocalDateTime.now() + "測試發送delay消息====>" + msg);
        mqSender.sendDelay(msg, delayTime);
    }
}
複製代碼
  1. 啓動,測試一把
http://localhost:8080/api/mq/send/delay?delayTime=6000
 http://localhost:8080/api/mq/send/delay?delayTime=10000
複製代碼

果真,名不虛傳..... 意思就是:你已經成功引發了個人注意...小小的演練,你們有收穫就點個愛心

4、小結來了

延時隊列在須要延時處理的場景下很是有用,使用RabbitMQ來實現延時隊列,能夠很好的利用RabbitMQ的特性,如:消息可靠發送、消息可靠投遞、死信隊列來保障消息至少被消費一次以及未被正確處理的消息不會被丟棄。

另外,經過RabbitMQ集羣的特性,能夠很好的解決單點故障問題,不會由於單個節點掛掉致使延時隊列不可用或者消息丟失。

固然,延時隊列還有不少其它選擇,好比利用Redis的zset,Quartz或者利用kafka的時間輪,這些方式各有特色,但就像爐石傳說通常,這些知識就比如手裏的卡牌,知道的越多,能夠用的卡牌也就越多,遇到問題便能遊刃有餘,因此須要大量的知識儲備和經驗積累才能打造出更出色的卡牌組合,讓本身解決問題的能力獲得更好的提高。

5、結束語

肥朝告訴我說:聞道有前後,術業有專攻,達者爲師。

那今日份的講解就到此結束,具體的代碼請移步個人gitHub的mybot項目888分支查閱,fork體驗一把,或者評論區留言探討,寫的很差,請多多指教~~

相關文章
相關標籤/搜索