Java 異步處理 RabbitMQ

attachments-2020-03-VvHYQs7u5e7afa36afaea.jpg

不少開發人員說,將應用程序切換到異步處理很複雜。由於他們有一個自然須要同步通訊的Web應用程序。在這篇文章中,我想介紹一種方法來達到異步通訊的目的:使用一些衆所周知的庫和工具來設計他們的系統。 下面的例子是用Java編寫的,但我相信它更多的是基本原理,同一個應用程序能夠用任何語言來從新寫。web

所需的工具和庫:spring

  1. Spring Boot
  2. RabbitMQ

1.Web應用程序

一個用Spring MVC編寫的Web應用程序並運行在Tomcat上。 它所作的只是將一個字符串發送到一個隊列中 (異步通訊的開始) 並等待另外一個隊列中的消息做爲HTTP響應發送回來。服務器

首先,咱們須要定義幾個依賴項,而後等待Spring Boot執行全部必要的自動配置。app

  1. <dependencies>dom

  2. <dependency>異步

  3. <groupId>org.springframework.boot</groupId>ide

  4. <artifactId>spring-boot-starter-web</artifactId>工具

  5. </dependency>this

  6. <dependency>spa

  7. <groupId>org.springframework.boot</groupId>

  8. <artifactId>spring-boot-starter-amqp</artifactId>

  9. </dependency>

  10. <dependency>

  11. <groupId>com.thedeanda</groupId>

  12. <artifactId>lorem</artifactId>

  13. </dependency>

  14. </dependencies>

  1.  

  2. @SpringBootApplication

  3. publicclassBlockingApplication{

  4. publicstaticvoid main(String[] args){

  5. SpringApplication.run(BlockingApplication.class, args);

  6. }

  7. @RestController

  8. publicstaticclassMessageController{

  9. privatefinalRabbitTemplate rabbitTemplate;

  10. publicMessageController(CachingConnectionFactory connectionFactory){

  11. this.rabbitTemplate =newRabbitTemplate(connectionFactory);

  12. }

  13. @GetMapping("invoke")

  14. publicString sendMessage(){

  15. Message response = rabbitTemplate.sendAndReceive("uppercase",null, request());

  16. returnnewString(response.getBody());

  17. }

  18. privatestaticMessage request(){

  19. Lorem LOREM =LoremIpsum.getInstance();

  20. String name = LOREM.getFirstName()+" "+ LOREM.getLastName();

  21. returnnewMessage(name.getBytes(),newMessageProperties());

  22. }

  23. }

  24. @Bean

  25. publicCachingConnectionFactory connectionFactory(){

  26. CachingConnectionFactory factory =newCachingConnectionFactory();

  27. factory.setAddresses("localhost:5672");

  28. factory.setUsername("admin");

  29. factory.setPassword("admin");

  30. return factory;

  31. }

  32. }

2.消費端應用程序

第二個應用程序僅僅是一個等待消息的RabbitMQ的消費端,將拿到的字符串轉換爲大寫,而後將此結果發送到輸出隊列中。

  1. <dependencies>

  2. <dependency>

  3. <groupId>org.springframework.boot</groupId>

  4. <artifactId>spring-boot-starter-amqp</artifactId>

  5. </dependency>

  6. </dependencies>

 

  1. @SpringBootApplication

  2. publicclassServiceApplication{

  3. publicstaticvoid main(String[] args){

  4. SpringApplication.run(ServiceApplication.class, args);

  5. }

  6. publicstaticclassMessageListener{

  7. publicString handleMessage(byte[] message){

  8. Random rand =newRandom();

  9. // Obtain a number between [0 - 49] + 50 = [50 - 99]

  10. int n = rand.nextInt(50)+50;

  11. String content =newString(message);

  12. try{

  13. Thread.sleep(n);

  14. }catch(InterruptedException e){

  15. e.printStackTrace();

  16. }

  17. return content.toUpperCase();

  18. }

  19. }

  20. @Bean

  21. publicCachingConnectionFactory connectionFactory(){

  22. CachingConnectionFactory factory =newCachingConnectionFactory();

  23. factory.setAddresses("localhost:5672");

  24. factory.setUsername("admin");

  25. factory.setPassword("admin");

  26. return factory;

  27. }

  28. @Bean

  29. publicSimpleMessageListenerContainer serviceListenerContainer(){

  30. SimpleMessageListenerContainer container =newSimpleMessageListenerContainer();

  31. container.setConnectionFactory(connectionFactory());

  32. container.setConcurrentConsumers(20);

  33. container.setMaxConcurrentConsumers(40);

  34. container.setQueueNames("uppercase_messages");

  35. container.setMessageListener(newMessageListenerAdapter(newMessageListener()));

  36. return container;

  37. }

  38. }

3.底層如何執行的?

程序啓動並首次調用sendMessage()方法後,咱們能夠看到Spring AMQP支持自動建立了一個新的回覆隊列並等待來自咱們的服務應用程序的響應。

  1. 2019-05-1217:23:21.451 INFO 4574---[nio-8080-exec-1].l.DirectReplyToMessageListenerContainer:Container initialized for queues:[amq.rabbitmq.reply-to]

  2. 2019-05-1217:23:21.457 INFO 4574---[nio-8080-exec-1].l.DirectReplyToMessageListenerContainer:SimpleConsumer[queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-VF-iqD9rLEuljIBstbCI1A identity=10e58093] started

若是咱們在消費端應用程序中查看消息,咱們能夠看到Spring自動傳播有關回覆隊列的信息以及相關ID,用於將其傳遞迴Web應用程序以便可以將請求和響應配對在一塊兒。

這就是發生魔術的地方。 固然,若是您想使其更復雜,您能夠在協做中包含更多服務,而後將Web應用程序的最終響應放入與自動生成的隊列不一樣的隊列中, 該隊列只具備正確的關聯ID。 另外,不要忘記設置合理的超時。

這個解決方案還有一個很大的缺點 - 應用程序吞吐量。 我故意這樣作,以便我能夠跟進這篇文章,進一步深刻調查 AsyncProfiler! 可是目前,咱們使用Tomcat做爲主HTTP服務器,默認爲200個線程,這意味着咱們的應用程序沒法同時處理200多條消息,由於咱們的服務器線程正在等待RabbitMQ 回覆隊列的響應,直到有消息進入或發生超時。

 

attachments-2020-03-618oUL7h5e7afa4f5f672.jpg

相關文章
相關標籤/搜索