使用消息隊列擴展異步執行的實現方式

背景

你可能在你的項目中用過Spring的@Async註解,以此來將部分方法轉化爲異步執行,從而提升請求的響應效率java

但在服務架構不斷的演進之中,這種丟入線程池處理的方式帶來的缺陷也愈發明顯:git

  • 不利於監控
  • 若是意外停機,還沒有處理的任務會盡數丟失
  • 在集羣中的某個節點要處理大量異步任務時,沒法將壓力分擔到集羣中其餘節點
  • 項目中若集成了使用ThreadLocal特性的模塊或第三方組件,須要注意上下文丟失的問題

思路

使用消息隊列做爲異步任務的實現方式,這樣咱們就能夠:github

  • 大量成熟的MQ中間件都提供了可視化管理平臺,監控更加方便
  • 能夠用消息隊列Header來保存上下文,如用戶信息、token等
  • 消息隊列的發佈-訂閱模式能夠最大程度利用集羣的業務處理能力
  • 更容易保證任務的順序性
  • 若是有服務節點宕機,能夠利用消息確認、消息重試等機制保證任務執行的正確性

實現

爲了保證業務代碼和實現方案解耦,相似於@Aync方案,咱們一樣採用註解+攔截器的方式進行邏輯注入spring

@Around("@annotation(org.springframework.amqp.rabbit.annotation.RabbitListener)")
	public Object cut(ProceedingJoinPoint pjp) throws Throwable {
		...
	}

實現思路大同小異,就是讀取註解中的隊列聲明確認發佈-訂閱關係,而後以丟入消息隊列來替換丟入線程池 架構

private String resolveKey(Queue[] queues) {
		String s = this.beanFactory.resolveEmbeddedValue(queues[0].value());
		return (String) resolver.evaluate(s, evalContext);
	}
rabbitTemplate.convertAndSend(resolveKey(queues), args[0]);

爲消息隊列注入Json轉換器,方便對象傳輸app

@Bean
    public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

若有須要,咱們能夠將上下文的用戶信息、token等寫入消息的Header中異步

private MessagePostProcessor beforePublishPostProcessor() {
        return message -> {
//          setting up context to message header
            return message;
        };
    }

被異步調用的service代碼:this

@Service
@Slf4j
public class DemoService {
    
    @RabbitListener(queuesToDeclare = @Queue("mytestqueue"))
    public void checkSome(List<String> tagTuple) {
        log.warn("check here {}", tagTuple);
    }
}

調用service的controller:lua

@RestController
public class DemoController {
    
    @Autowired
    private DemoService demoService;
    
    @RequestMapping("check")
    public Integer checkSome() {
        ArrayList<String> tagTuple = new ArrayList<>();
        tagTuple.add("bar");
        tagTuple.add("foo");
        demoService.checkSome(tagTuple);
        return 0;
    }
}

執行查看效果spa

17:00:14.584TRACE[AbstractHandlerMapping.java:411]Mapped to org.smop.duplex.sample.DemoController#checkSome()
17:00:21.263WARN [DemoService.java:16]check here [bar, foo]

若有幫助或啓發,還請點個👍

附代碼倉庫:

https://github.com/s-mop/homer

相關文章
相關標籤/搜索