本篇隨筆將彙總一些我對消息隊列 RabbitMQ 的認識,順便談談其在高併發和秒殺系統中的具體應用。
html
想了下,仍是先拋出一個簡單示例,隨後再根據其具體應用場景進行擴展,我以爲這樣表述條理更清晰些。java
RabbitConfig:緩存
@Configuration public class RabbitConfig { @Bean public Queue callQueue() { return new Queue(MQConstant.CALL); } }
Client:服務器
@Component public class Client { @Autowired private RabbitTemplate rabbitTemplate; public void sendCall(String content) { for (int i = 0; i < 10000; i++) { String message = i + "-" + content; System.out.println(String.format("Sender: %s", message)); rabbitTemplate.convertAndSend(MQConstant.CALL, message); } } }
Server:微信
@Component public class Server { @RabbitHandler @RabbitListener(queues = MQConstant.CALL) public void callProcess(String message) throws InterruptedException { Thread.sleep(1000); System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!", message)); } }
Result:多線程
Sender: Hello, are you there! Receiver: reply("Hello, are you there!") Yes, I just saw your message!
以上示例會在 rabbitmq 中建立一條隊列 CALL, 消息在其中等待消費:
併發
在此基礎上的簡單擴展我就再也不寫案例了,好比領域模塊完成了其核心業務規則以後可能須要更新緩存、寫個郵件、記個複雜日誌、作個統計報表等等,這些不須要及時反饋或者耗時的附屬業務均可以經過異步隊列分發,以此來提高核心業務的響應速度,同時如此處理能讓領域邊界更加清晰,代碼的可維護性和持續拓展的能力也會有所提高。異步
上個示例中我提到的應用場景是解耦和通知,再接着擴展,因其具有良好的緩衝性質,因此還有一個很是適合的應用場景那就是削峯。對於突如其來的極高併發請求,咱們能夠先瞬速地將其加入隊列並回複用戶一個友好提示,而後服務器可在其能承受的範圍內慢慢處理,以此來防止突發的 CPU 和內存 「爆表」。async
改造以後對於發送方來講固然是比較爽的,他只是將請求加入消息隊列而已,處理壓力都歸到了消費端。接着思考,這樣處理有沒有反作用?若是這個請求恰好是線程阻塞的,那還要加入隊列慢慢排隊處理,那不是完蛋了,用戶要猴年馬月才能獲得反饋?因此針對此,我以爲應該將消費端的方法改成異步調用(即多線程)以提高吞吐量,在 Spring Boot 中的寫法也很是簡單:ide
@Component public class Server { @Async @RabbitHandler @RabbitListener(queues = MQConstant.CALL) public void callProcess(String message) throws InterruptedException { Thread.sleep(100); System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!", message)); } }
參照示例一的方法,我發佈了 10000 條消息加入隊列,且消費端的調用每次阻塞一秒,那可有意思了,何時能處理完?但若是開幾百個線程同時處理的話,那幾十秒就夠了,固然具體多少合適還應根據具體的業務場景和服務器配置酌情考慮。另外,別忘了配線程池:
@Configuration public class AsyncConfig { @Bean public Executor asyncExecutor(){ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(500); executor.setQueueCapacity(10); executor.setThreadNamePrefix("MyExecutor-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }
RabbitMQ 可能爲 N 個應用同時提供服務,要是你和你的藍顏知己忽然心有靈犀,在不一樣的業務上使用了同一個 routingKey,想一想就刺激。所以,隊列多了天然要進行分組管理,限定好 Exchange 的規則,接下來就能夠獨自玩耍了。
MQConstant:
public class MQConstant { public static final String EXCHANGE = "YOUCLK-MESSAGE-EXCHANGE"; public static final String CALL = MQConstant.EXCHANGE + ".CALL"; public static final String ALL = MQConstant.EXCHANGE + ".#"; }
RabbitConfig:
@Configuration public class RabbitConfig { @Bean public Queue callQueue() { return new Queue(MQConstant.CALL); } @Bean TopicExchange exchange() { return new TopicExchange(MQConstant.EXCHANGE); } @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with(MQConstant.ALL); } }
此時咱們再去查隊列 CALL,能夠看到已經綁定了Exchange:
固然 Exchange 的做用遠不止如此,以上示例爲 Topic 模式,除此以外還有 Direct、Headers 和 Fanout 模式,寫法都差很少,感興趣的童鞋能夠去查看 「官方文檔」 進行更深刻了解。
延時任務的場景相信小夥伴們都接觸過,特別是搶購的時候,在規定時間內未付款訂單就被回收了。微信支付的 API 裏面也有一個支付完成後的延時再確認消息推送,實現原理應該都差很少。
利用 RabbitMQ 實現該功能首先要了解他的兩個特性,分別是 Time-To-Live Extensions 和 Dead Letter Exchanges,字面意思上就能理解個大概,一個是生存時間,一個是死信。整個過程也很容易理解,TTL 至關於一個緩衝隊列,等待其過時以後消息會由 DLX 轉發到實際消費隊列,如此便實現了他的延時過程。
MQConstant:
public class MQConstant { public static final String PER_DELAY_EXCHANGE = "PER_DELAY_EXCHANGE"; public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE"; public static final String DELAY_CALL_TTL = "DELAY_CALL_TTL"; public static final String CALL = "CALL"; }
ExpirationMessagePostProcessor:
public class ExpirationMessagePostProcessor implements MessagePostProcessor { private final Long ttl; public ExpirationMessagePostProcessor(Long ttl) { this.ttl = ttl; } @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties() .setExpiration(ttl.toString()); return message; } }
Client:
@Component public class Client { @Autowired private RabbitTemplate rabbitTemplate; public void sendCall(String content) { for (int i = 1; i <= 3; i++) { long expiration = i * 5000; String message = i + "-" + content; System.out.println(String.format("Sender: %s", message)); rabbitTemplate.convertAndSend(MQConstant.DELAY_CALL_TTL, (Object) message, new ExpirationMessagePostProcessor(expiration)); } } }
Server:
@Component public class Server { @Async @RabbitHandler @RabbitListener(queues = MQConstant.CALL) public void callProcess(String message) throws InterruptedException { String date = (new SimpleDateFormat("HH:mm:ss")).format(new Date()); System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!- %s", message, date)); } }
Result:
Sender: 1-Hello, are you there! Sender: 2-Hello, are you there! Sender: 3-Hello, are you there! Receiver: reply("1-Hello, are you there!") Yes, I just saw your message!- 23:04:12 Receiver: reply("2-Hello, are you there!") Yes, I just saw your message!- 23:04:17 Receiver: reply("3-Hello, are you there!") Yes, I just saw your message!- 23:04:22
結果一目瞭然,分別在隊列中延遲了 5秒,10秒,15秒,固然,以上只是個人簡單示例,童鞋們可翻閱官方文檔(「 ttl 」 && 「 dlx 」)進一步深刻學習。
本篇隨筆不應就這麼結束,但晚上心情很差,百感交集,沒法繼續寫做,無奈至此。近期正在尋覓新的工做機會,個人微信:youclk,不管有沒有推薦的,給我點鼓勵,謝謝!
個人公衆號《有刻》,咱們共同成長!