不少開發人員說,將應用程序切換到異步處理很複雜。由於他們有一個自然須要同步通訊的Web應用程序。在這篇文章中,我想介紹一種方法來達到異步通訊的目的:使用一些衆所周知的庫和工具來設計他們的系統。 下面的例子是用Java編寫的,但我相信它更多的是基本原理,同一個應用程序能夠用任何語言來從新寫。java
所需的工具和庫:git
一個用Spring MVC編寫的Web應用程序並運行在Tomcat上。 它所作的只是將一個字符串發送到一個隊列中 (異步通訊的開始) 並等待另外一個隊列中的消息做爲HTTP響應發送回來。github
首先,咱們須要定義幾個依賴項,而後等待Spring Boot執行全部必要的自動配置。web
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.thedeanda</groupId>
<artifactId>lorem</artifactId>
</dependency>
</dependencies>
複製代碼
@SpringBootApplication
public class BlockingApplication {
public static void main(String[] args) {
SpringApplication.run(BlockingApplication.class, args);
}
@RestController
public static class MessageController {
private final RabbitTemplate rabbitTemplate;
public MessageController(CachingConnectionFactory connectionFactory) {
this.rabbitTemplate = new RabbitTemplate(connectionFactory);
}
@GetMapping("invoke")
public String sendMessage() {
Message response = rabbitTemplate.sendAndReceive("uppercase", null, request());
return new String(response.getBody());
}
private static Message request() {
Lorem LOREM = LoremIpsum.getInstance();
String name = LOREM.getFirstName() + " " + LOREM.getLastName();
return new Message(name.getBytes(), new MessageProperties());
}
}
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setAddresses("localhost:5672");
factory.setUsername("admin");
factory.setPassword("admin");
return factory;
}
}
複製代碼
第二個應用程序僅僅是一個等待消息的RabbitMQ的消費端,將拿到的字符串轉換爲大寫,而後將此結果發送到輸出隊列中。spring
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
複製代碼
@SpringBootApplication
public class ServiceApplication {
public static void main(String[] args) {
SpringApplication.run(ServiceApplication.class, args);
}
public static class MessageListener {
public String handleMessage(byte[] message) {
Random rand = new Random();
// Obtain a number between [0 - 49] + 50 = [50 - 99]
int n = rand.nextInt(50) + 50;
String content = new String(message);
try {
Thread.sleep(n);
} catch (InterruptedException e) {
e.printStackTrace();
}
return content.toUpperCase();
}
}
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setAddresses("localhost:5672");
factory.setUsername("admin");
factory.setPassword("admin");
return factory;
}
@Bean
public SimpleMessageListenerContainer serviceListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setConcurrentConsumers(20);
container.setMaxConcurrentConsumers(40);
container.setQueueNames("uppercase_messages");
container.setMessageListener(new MessageListenerAdapter(new MessageListener()));
return container;
}
}
複製代碼
程序啓動並首次調用sendMessage()方法後,咱們能夠看到Spring AMQP支持自動建立了一個新的回覆隊列並等待來自咱們的服務應用程序的響應。bash
2019-05-12 17:23:21.451 INFO 4574 --- [nio-8080-exec-1] .l.DirectReplyToMessageListenerContainer : Container initialized for queues: [amq.rabbitmq.reply-to]
2019-05-12 17:23:21.457 INFO 4574 --- [nio-8080-exec-1] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-VF-iqD9rLEuljIBstbCI1A identity=10e58093] started
複製代碼
若是咱們在消費端應用程序中查看消息,咱們能夠看到Spring自動傳播有關回覆隊列的信息以及**相關ID,**用於將其傳遞迴Web應用程序以便可以將請求和響應配對在一塊兒。服務器
這就是發生魔術的地方。 固然,若是您想使其更復雜,您能夠在協做中包含更多服務,而後將Web應用程序的最終響應放入與自動生成的隊列不一樣的隊列中, 該隊列只具備正確的關聯ID。 另外,不要忘記設置合理的超時。app
這個解決方案還有一個很大的缺點 - 應用程序吞吐量。 我故意這樣作,以便我能夠跟進這篇文章,進一步深刻調查AsyncProfiler
! 可是目前,咱們使用Tomcat做爲主HTTP服務器,默認爲200個線程,這意味着咱們的應用程序沒法同時處理200多條消息,由於咱們的服務器線程正在等待RabbitMQ 回覆隊列的響應,直到有消息進入或發生超時。dom
感謝您閱讀本文,敬請關注後續內容! 若是您想本身嘗試一下,請查看個人GitHub存儲庫。異步