高併發的核心技術 - 消息中間件(MQ)

MQ簡介

  • 什麼是MQ 跨進程的消息隊列,主要角色包括生產者與消費者。 生產者只負責生產信息,沒法感知消費者是誰,消息怎麼處理,處理結果是什麼。 消費者負責接收及處理消息,沒法感知生產者是誰,怎麼產生的。html

  • Mq能作什麼? MQ 特性通常有異步,吞吐量大 ,延時低; 適合作:安全

    1. 投遞異步通知。
    2. 限流,削峯谷。
    3. 可靠事件,處理數據一致性。
    4. 利用一些特性,能夠作定時任務。 等….

因爲MQ是異步處理消息的,因此MQ不適合作同步處理操做,若是須要及時的返回處理結果請不要用MQ;bash

  • MQ 個系統帶來了什麼?微信

    缺點:增長了系統的複雜性,除了代碼組件接入之外還須要考慮,高可用,集羣,消息的可靠性等問題!markdown

    生產者:消息發送怎麼保證可靠性,怎麼保證不重複!app

    消費者:怎麼保證冪等性,接收到重複消息怎麼處理!異步

    還有會帶來的處理延時等問題!分佈式

優勢: 解耦,利用MQ咱們能夠很好的給咱們系統解耦,特別是分佈式/微服系統! 原來的同步操做,能夠用異步處理,也能夠帶來更快的響應速度;oop

  • 哪些場景可使用MQ

場景 (1) 系統解耦,用戶系統或者其餘系統須要發送短信能夠經過 MQ 執行;很好的將 用戶系統 和 短信系統進行解耦;post

場景(2)

順序執行的任務場景,假設 A B C 三個任務,B須要等待 A完成纔去執行,C須要等待B完成纔去執行;

我見過一些同窗的作法是 ,用 三個定時器 錯開時間去執行的,假設 A定時器 9 點執行, B 定時器 10 點執行 , C 11 點執行 , 相似這樣子;

這樣作實際上是 不安全的, 由於 後一個任務 沒法知道 前一個任務是否 真的執行了! 假設 A 宕機了, 到 10 點 B 定時去 執行,這時候 數據就會產生異常!

當咱們 引入 MQ 後 能夠這麼作, A執行完了 發送 消息給 B ,B收到消息後 執行,C 相似,收到 B消息後執行;

場景(3)

支付網關的通知,咱們的系統經常須要接入支付功能,微信或者支付寶一般會以回調的形式通知咱們系統支付結果。

咱們能夠將咱們的支付網關獨立出來,經過MQ通知咱們業務系統進行處理,這樣處理有利於系統的解耦和擴展!

假設咱們還有一個積分系統,用戶支付成功,給用戶添加積分。只須要積分系統監聽這個消息,並處理積分就好,無需去修改再去修改網關層代碼!

若是沒有使用MQ ,我是否是還得去修改網關係統的代碼,遠程調用增長積分的接口?

這就是使用了MQ的好處,解耦和擴展!

固然咱們的轉發規則也要保證每一個感興趣的隊列能獲取到消息!

場景(4)

微服/分佈式系統,分佈式事務 - 最終一致性 處理方案!

詳情: 分佈式事務處理方案,微服事務處理方案

場景(5)

  • 消息延時隊列,可作些定時任務,不固定時間執行的定時任務。
  • 例如:用戶下單後若是24小時未支付訂單取消;
  • 確認收貨後2天后沒有評價自動好評; 等...

咱們之前的作法是 一般啓用一個定時器,每分鐘或者每小時,去跑一次取出須要處理的訂單或其餘數據進行處理。 這種作法一個是 效率比較低,若是數據量大的話,每次都要掃庫,很是要命! 再者時效性不是很高,最差的時候可能須要等待一輪時長! 還有可能出現重複執行的結果,時效和輪詢的頻率難以平衡!

利用MQ(Rabbitmq),DLX (Dead Letter Exchanges)和 消息的 TTL (Time-To-Live Extensions)特性。咱們能夠高效的完成這個任務場景!不須要掃庫,時效性更好!

DLX:www.rabbitmq.com/dlx.html,

TTL:www.rabbitmq.com/ttl.html#pe…

原理: 發送到隊列的消息,能夠設置一個存活時間 TTL,在存活時間內沒有被消費,能夠設置這個消息轉發到其餘隊列裏面去;而後咱們從這個其餘隊列裏面消費執行咱們的任務,這樣就能夠達到一個消息延時的效果!

設置過時時間: 過時時間能夠統一設置到消息隊列裏面,也能夠單獨設置到某個消息!

PS 若是消息設置了過時時間,發生到了設置有過時時間的隊列,已隊列設置的過時時間爲準!

已 SpringBoot 爲例:

配置轉發隊列和被轉發隊列:

@Component
@Configuration
public class RabbitMqConfig {
    @Bean
    public Queue curQueue() {
        Map<String, Object> args = new HashMap<String, Object>();
        //超時後的轉發器 過時轉發到 delay_queue_exchange
        args.put("x-dead-letter-exchange", "delay_queue_exchange");
        //routingKey 轉發規則
        args.put("x-dead-letter-routing-key", "user.#");
        //過時時間 20 秒
        args.put("x-message-ttl", 20000);
        return new Queue("cur_queue", false, false, false, args);
    }
    @Bean
    public Queue delayQueue() {
        return new Queue("delay_queue");
    }
    @Bean
    TopicExchange exchange() {
        //當前隊列
        return new TopicExchange("cur_queue_exchange");
    }
    @Bean
    TopicExchange exchange2() {
        //被轉發的隊列
        return new TopicExchange("delay_queue_exchange");
    }
    @Bean
    Binding bindingHelloQueue(Queue curQueue, TopicExchange exchange) {
         //綁定隊列到轉發器
        return BindingBuilder.bind(curQueue).to(exchange).with("user.#");
    }
    @Bean
    Binding bindingHelloQueue2(Queue delayQueue, TopicExchange exchange2) {
        return BindingBuilder.bind(delayQueue).to(exchange2).with("user.#");
    }
}
複製代碼

發生消息:

@Component
public class MqEventSender {
    Logger logger = LoggerFactory.getLogger(MqEventSender.class);
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 消息沒有設置 時間
     *  發生到隊列 cur_queue_exchange
     * @param msg
     */
    public void sendMsg(String msg) {
        logger.info("發送消息: " + msg);
        rabbitTemplate.convertAndSend("cur_queue_exchange", "user.ss", msg);
    }
    /**
     * 消息設置時間
     *  發生到隊列 cur_queue_exchange
     * @param msg
     */
    public void sendMsgWithTime(String msg) {
        logger.info("發送消息: " + msg);
        MessageProperties messageProperties = new MessageProperties();
        //過時時間設置 10 秒
        messageProperties.setExpiration("10000");
        Message message = rabbitTemplate.getMessageConverter().toMessage(msg, messageProperties);
        rabbitTemplate.convertAndSend("cur_queue_exchange", "user.ss", message);
    }
}
複製代碼

消息監聽:

監聽 的隊列是 delay_queue 而不是 cur_queue;

PS cur_queue 不該該有監聽者,不然消息被消費達不到想要的延時消息效果!

/**
 * Created by linli on 2017/8/21.
 * 監聽 被丟到 超時隊列內容
 */
@Component
@RabbitListener(queues = "delay_queue")
public class DelayQueueListener {
    public static Logger logger = LoggerFactory.getLogger(AddCommentsEventListener.class);
    @RabbitHandler
    public void process(@Payload String msg) {
        logger.info("收到消息 "+msg);
    }
}
複製代碼

測試:

/**
 * Created by linli on 2017/8/21.
 */
@RestController
@RequestMapping("/test")
public class TestContorller {
    @Autowired
    MqEventSender sender;
    @RequestMapping("/mq/delay")
    public String test() {
        sender.sendMsg("隊列延時消息!");
        sender.sendMsgWithTime("消息延時消息!");
        return "";
    }
}
複製代碼

結果:

觀察結果發現:發送時間 和 收到時間 間隔 20秒 ;

咱們給消息設置的 10 秒 TTL 時間沒有生效!驗證了 : 若是消息設置了過時時間,發生到了設置有過時時間的隊列,已隊列設置的過時時間爲準!

若是但願每一個消息都要本身的存活時間,發送到隊列 不要設置

args.put(「x-message-ttl」, 20000);
複製代碼

消息的過時時間 設置在隊列仍是消息,根據本身的業務場景去定!

  • 總結

MQ 是一個跨進程的消息隊列,咱們能夠很好的利用他進行系統的解耦; 引入MQ會給系統帶來必定的複雜度,須要評估! MQ 適合作異步任務,不適合作同步任務!

相關文章
相關標籤/搜索