背景
你可能在你的項目中用過Spring的@Async註解,以此來將部分方法轉化爲異步執行,從而提升請求的響應效率java
但在服務架構不斷的演進之中,這種丟入線程池處理的方式帶來的缺陷也愈發明顯:git
- 不利於監控
- 若是意外停機,還沒有處理的任務會盡數丟失
- 在集羣中的某個節點要處理大量異步任務時,沒法將壓力分擔到集羣中其餘節點
- 項目中若集成了使用ThreadLocal特性的模塊或第三方組件,須要注意上下文丟失的問題
思路
使用消息隊列做爲異步任務的實現方式,這樣咱們就能夠:github
- 大量成熟的MQ中間件都提供了可視化管理平臺,監控更加方便
- 能夠用消息隊列Header來保存上下文,如用戶信息、token等
- 消息隊列的發佈-訂閱模式能夠最大程度利用集羣的業務處理能力
- 更容易保證任務的順序性
- 若是有服務節點宕機,能夠利用消息確認、消息重試等機制保證任務執行的正確性
實現
爲了保證業務代碼和實現方案解耦,相似於@Aync方案,咱們一樣採用註解+攔截器的方式進行邏輯注入spring
@Around("@annotation(org.springframework.amqp.rabbit.annotation.RabbitListener)") public Object cut(ProceedingJoinPoint pjp) throws Throwable { ... }
實現思路大同小異,就是讀取註解中的隊列聲明確認發佈-訂閱關係,而後以丟入消息隊列來替換丟入線程池 架構
private String resolveKey(Queue[] queues) { String s = this.beanFactory.resolveEmbeddedValue(queues[0].value()); return (String) resolver.evaluate(s, evalContext); }
rabbitTemplate.convertAndSend(resolveKey(queues), args[0]);
爲消息隊列注入Json轉換器,方便對象傳輸app
@Bean public Jackson2JsonMessageConverter producerJackson2MessageConverter() { return new Jackson2JsonMessageConverter(); }
若有須要,咱們能夠將上下文的用戶信息、token等寫入消息的Header中異步
private MessagePostProcessor beforePublishPostProcessor() { return message -> { // setting up context to message header return message; }; }
被異步調用的service代碼:this
@Service @Slf4j public class DemoService { @RabbitListener(queuesToDeclare = @Queue("mytestqueue")) public void checkSome(List<String> tagTuple) { log.warn("check here {}", tagTuple); } }
調用service的controller:lua
@RestController public class DemoController { @Autowired private DemoService demoService; @RequestMapping("check") public Integer checkSome() { ArrayList<String> tagTuple = new ArrayList<>(); tagTuple.add("bar"); tagTuple.add("foo"); demoService.checkSome(tagTuple); return 0; } }
執行查看效果spa
17:00:14.584TRACE[AbstractHandlerMapping.java:411]Mapped to org.smop.duplex.sample.DemoController#checkSome() 17:00:21.263WARN [DemoService.java:16]check here [bar, foo]
若有幫助或啓發,還請點個👍
附代碼倉庫: