Java 小記 — RabbitMQ 的實踐與思考

前言

本篇隨筆將彙總一些我對消息隊列 RabbitMQ 的認識,順便談談其在高併發和秒殺系統中的具體應用。
html

1. 預備示例

想了下,仍是先拋出一個簡單示例,隨後再根據其具體應用場景進行擴展,我以爲這樣表述條理更清晰些。java

RabbitConfig:緩存

@Configuration
public class RabbitConfig {

    @Bean
    public Queue callQueue() {
        return new Queue(MQConstant.CALL);
    }
}

Client:服務器

@Component
public class Client {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendCall(String content) {
        for (int i = 0; i < 10000; i++) {
            String message = i + "-" + content;
            System.out.println(String.format("Sender: %s", message));
            rabbitTemplate.convertAndSend(MQConstant.CALL, message);
        }
    }
}

Server:微信

@Component
public class Server {

    @RabbitHandler
    @RabbitListener(queues = MQConstant.CALL)
    public void callProcess(String message) throws InterruptedException {
        Thread.sleep(1000);
        System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!", message));
    }

}

Result:多線程

Sender: Hello, are you there!
Receiver: reply("Hello, are you there!") Yes, I just saw your message!

以上示例會在 rabbitmq 中建立一條隊列 CALL, 消息在其中等待消費:
併發

在此基礎上的簡單擴展我就再也不寫案例了,好比領域模塊完成了其核心業務規則以後可能須要更新緩存、寫個郵件、記個複雜日誌、作個統計報表等等,這些不須要及時反饋或者耗時的附屬業務均可以經過異步隊列分發,以此來提高核心業務的響應速度,同時如此處理能讓領域邊界更加清晰,代碼的可維護性和持續拓展的能力也會有所提高。異步

2. 削峯

上個示例中我提到的應用場景是解耦和通知,再接着擴展,因其具有良好的緩衝性質,因此還有一個很是適合的應用場景那就是削峯。對於突如其來的極高併發請求,咱們能夠先瞬速地將其加入隊列並回複用戶一個友好提示,而後服務器可在其能承受的範圍內慢慢處理,以此來防止突發的 CPU 和內存 「爆表」。async

改造以後對於發送方來講固然是比較爽的,他只是將請求加入消息隊列而已,處理壓力都歸到了消費端。接着思考,這樣處理有沒有反作用?若是這個請求恰好是線程阻塞的,那還要加入隊列慢慢排隊處理,那不是完蛋了,用戶要猴年馬月才能獲得反饋?因此針對此,我以爲應該將消費端的方法改成異步調用(即多線程)以提高吞吐量,在 Spring Boot 中的寫法也很是簡單:ide

@Component
public class Server {

    @Async
    @RabbitHandler
    @RabbitListener(queues = MQConstant.CALL)
    public void callProcess(String message) throws InterruptedException {
        Thread.sleep(100);
        System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!", message));
    }

}

參照示例一的方法,我發佈了 10000 條消息加入隊列,且消費端的調用每次阻塞一秒,那可有意思了,何時能處理完?但若是開幾百個線程同時處理的話,那幾十秒就夠了,固然具體多少合適還應根據具體的業務場景和服務器配置酌情考慮。另外,別忘了配線程池:

@Configuration
public class AsyncConfig {

    @Bean
    public Executor asyncExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(500);
        executor.setQueueCapacity(10);

        executor.setThreadNamePrefix("MyExecutor-");

        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

3. Exchange

RabbitMQ 可能爲 N 個應用同時提供服務,要是你和你的藍顏知己忽然心有靈犀,在不一樣的業務上使用了同一個 routingKey,想一想就刺激。所以,隊列多了天然要進行分組管理,限定好 Exchange 的規則,接下來就能夠獨自玩耍了。

MQConstant:

public class MQConstant {

    public static final String EXCHANGE = "YOUCLK-MESSAGE-EXCHANGE";

    public static final String CALL = MQConstant.EXCHANGE + ".CALL";

    public static final String ALL = MQConstant.EXCHANGE + ".#";
}

RabbitConfig:

@Configuration
public class RabbitConfig {

    @Bean
    public Queue callQueue() {
        return new Queue(MQConstant.CALL);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(MQConstant.EXCHANGE);
    }

    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with(MQConstant.ALL);
    }
}

此時咱們再去查隊列 CALL,能夠看到已經綁定了Exchange:

固然 Exchange 的做用遠不止如此,以上示例爲 Topic 模式,除此以外還有 Direct、Headers 和 Fanout 模式,寫法都差很少,感興趣的童鞋能夠去查看 「官方文檔」 進行更深刻了解。

4. 延時隊列

延時任務的場景相信小夥伴們都接觸過,特別是搶購的時候,在規定時間內未付款訂單就被回收了。微信支付的 API 裏面也有一個支付完成後的延時再確認消息推送,實現原理應該都差很少。

利用 RabbitMQ 實現該功能首先要了解他的兩個特性,分別是 Time-To-Live Extensions 和 Dead Letter Exchanges,字面意思上就能理解個大概,一個是生存時間,一個是死信。整個過程也很容易理解,TTL 至關於一個緩衝隊列,等待其過時以後消息會由 DLX 轉發到實際消費隊列,如此便實現了他的延時過程。

MQConstant:

public class MQConstant {

    public static final String PER_DELAY_EXCHANGE = "PER_DELAY_EXCHANGE";

    public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";

    public static final String DELAY_CALL_TTL = "DELAY_CALL_TTL";

    public static final String CALL = "CALL";

}

ExpirationMessagePostProcessor:

public class ExpirationMessagePostProcessor implements MessagePostProcessor {
    private final Long ttl;

    public ExpirationMessagePostProcessor(Long ttl) {
        this.ttl = ttl;
    }

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties()
                .setExpiration(ttl.toString());
        return message;
    }
}

Client:

@Component
public class Client {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendCall(String content) {
        for (int i = 1; i <= 3; i++) {
            long expiration = i * 5000;
            String message = i + "-" + content;
            System.out.println(String.format("Sender: %s", message));
            rabbitTemplate.convertAndSend(MQConstant.DELAY_CALL_TTL, (Object) message, new ExpirationMessagePostProcessor(expiration));

        }
    }
}

Server:

@Component
public class Server {

    @Async
    @RabbitHandler
    @RabbitListener(queues = MQConstant.CALL)
    public void callProcess(String message) throws InterruptedException {
        String date = (new SimpleDateFormat("HH:mm:ss")).format(new Date());
        System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!- %s", message, date));
    }

}

Result:

Sender: 1-Hello, are you there!
Sender: 2-Hello, are you there!
Sender: 3-Hello, are you there!
Receiver: reply("1-Hello, are you there!") Yes, I just saw your message!- 23:04:12
Receiver: reply("2-Hello, are you there!") Yes, I just saw your message!- 23:04:17
Receiver: reply("3-Hello, are you there!") Yes, I just saw your message!- 23:04:22

結果一目瞭然,分別在隊列中延遲了 5秒,10秒,15秒,固然,以上只是個人簡單示例,童鞋們可翻閱官方文檔(「 ttl 」 && 「 dlx 」)進一步深刻學習。

結語

本篇隨筆不應就這麼結束,但晚上心情很差,百感交集,沒法繼續寫做,無奈至此。近期正在尋覓新的工做機會,個人微信:youclk,不管有沒有推薦的,給我點鼓勵,謝謝!


個人公衆號《有刻》,咱們共同成長!

相關文章
相關標籤/搜索