在上一篇博文中,咱們寫了程序來發送和接受消息從一個隊列中。html
在這篇博文中咱們將建立一個工做隊列,用於在多個工做人員之間分配耗時的任務。java
Work Queues 工做隊列(又稱:任務隊列)背後的主要思想是避免當即執行資源密集型任務,而且必須等待它完成。相反,咱們安排任務稍後完成。咱們將任務封裝 爲消息並將其發送到隊列。在後臺運行的工做進程將彈出任務並最終執行做業。當您運行許多工做程序時,它們之間將共享任務。spring
這個概念在Web應用程序中特別有用,由於在短的HTTP請求窗口中沒法處理複雜的任務。app
咱們沒有真實的業務場景,所以接下來咱們將會用Thread.sleep()方法來模擬一個耗時比較久的任務。框架
編寫application.propertieside
咱們將在生成的項目中找到application.properties文件,其中沒有任何內容。ui
添加application.properties 配置以下:this
spring.profiles.active=usage_message
logging.level.org=ERROR
tutorial.client.duration=10000
# 當declareExchange爲true時,將持久標誌設置爲此值
spring.rabbitmq.durable=true
# PERSISTENT或NON_PERSISTENT肯定RabbitMQ是否應該保留消息
spring.rabbitmq.deliveryMode=PERSISTENT
# 更多屬性設置 https://docs.spring.io/spring-amqp/reference/htmlsingle/#_common_properties
編寫Java配置類spa
剛纔配置文件中咱們配置了一個code
tutorial.client.duration=10000
可是這個配置字段不存在於任何框架jar包裏,所以咱們須要編寫一個類來處理這個屬性
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.context.ConfigurableApplicationContext; public class RabbitAmqpTutorialsRunner implements CommandLineRunner { @Value("${tutorial.client.duration:0}") private int duration; @Autowired private ConfigurableApplicationContext ctx; @Override public void run(String... args) throws Exception { // TODO Auto-generated method stub System.out.println("Ready ... running for " + duration + "ms"); Thread.sleep(duration); ctx.close(); } }
咱們仍然和以前教程同樣須要一個Java配置類:
import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; import com.xingyun.springamqp.business.Tut2Receiver; import com.xingyun.springamqp.business.Tut2Sender; @Profile({"tut2", "work-queues"}) @Configuration public class Tut2Config { @Bean public Queue hello() { return new Queue("hello"); } @Profile("receiver") private static class ReceiverConfig { @Bean public Tut2Receiver receiver1() { return new Tut2Receiver(1); } @Bean public Tut2Receiver receiver2() { return new Tut2Receiver(2); } } @Profile("sender") @Bean public Tut2Sender sender() { return new Tut2Sender(); } }
經過上面這個配置類,咱們作了四件事
爲何要有這兩個配置文件? 由於咱們待會運行生產者和消費者的時候,能夠經過動態加載不一樣的配置文件來啓動不一樣的類。
好比咱們啓動生產者發佈信息就能夠調用這個配置:
--spring.profiles.active=tut2,sender
當咱們想啓動消費者就動態調用這個配置
--spring.profiles.active=tut2,receiver
接下來咱們須要修改下整個應用程序的啓動類:
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Profile; import org.springframework.scheduling.annotation.EnableScheduling; import com.xingyun.springamqp.config.RabbitAmqpTutorialsRunner; @SpringBootApplication @EnableScheduling public class RabbitMq0x02SpringAmqpWorkQueuesSampleApplication { public static void main(String[] args) { SpringApplication.run(RabbitMq0x02SpringAmqpWorkQueuesSampleApplication.class, args); } @Profile("usage_message") @Bean public CommandLineRunner usage() { return new CommandLineRunner() { @Override public void run(String... arg0) throws Exception { System.out.println("This app uses Spring Profiles to control its behavior.\n"); System.out.println("Sample usage: java -jar " + "RabbitMQ_0x02_SpringAMQP_WorkQueues_Sample-0.0.1-SNAPSHOT.jar " + "--spring.profiles.active=work-queues,sender"); } }; } @Profile("!usage_message") @Bean public CommandLineRunner tutorial() { return new RabbitAmqpTutorialsRunner(); } }
當執行這個項目的jar 文件時會自動加載這個usage_message 配置,打印用法信息。
咱們在啓動類上添加@EnableScheduling,以便於開啓對定時任務的支持.
生產者
咱們將修改發送方以經過在RabbitTemplate上使用相同的方法發佈消息convertAndSend,以很是人爲的方式在消息中附加一個點來提供識別其是否爲更長時間運行任務的方法。該文檔將此定義爲「將Java對象轉換爲Amqp消息並將其發送到具備默認路由密鑰的默認交換」。
import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; public class Tut2Sender { @Autowired private RabbitTemplate template; @Autowired private Queue queue; int dots = 0; int count = 0; @Scheduled(fixedDelay = 1000, initialDelay = 500) public void send() { StringBuilder builder = new StringBuilder("Hello"); if (dots++ == 3) { dots = 1; } for (int i = 0; i < dots; i++) { builder.append('.'); } builder.append(Integer.toString(++count)); String message = builder.toString(); template.convertAndSend(queue.getName(), message); System.out.println(" [x] Sent '" + message + "'"); } }
消費者
咱們的接收器Tut2Receiver模擬doWork()方法中僞造任務的任意長度,其中點數轉換爲工做所需的秒數。一樣,咱們利用「hello」隊列上的@RabbitListener和@RabbitHandler來接收消息。消耗該消息的實例將添加到咱們的監視器中,以顯示處理消息的實例,消息和時間長度。
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.util.StopWatch; @RabbitListener(queues = "hello") public class Tut2Receiver { private final int instance; public Tut2Receiver(int i) { this.instance = i; } @RabbitHandler public void receive(String in) throws InterruptedException { StopWatch watch = new StopWatch(); watch.start(); System.out.println("instance " + this.instance + " [x] Received '" + in + "'"); doWork(in); watch.stop(); System.out.println("instance " + this.instance + " [x] Done in " + watch.getTotalTimeSeconds() + "s"); } private void doWork(String in) throws InterruptedException { for (char ch : in.toCharArray()) { if (ch == '.') { Thread.sleep(1000); } } } }
查看用法
java -jar RabbitMQ_0x02_SpringAMQP_WorkQueues_Sample-0.0.1-SNAPSHOT.jar
運行生產者
java -jar RabbitMQ_0x02_SpringAMQP_WorkQueues_Sample-0.0.1-SNAPSHOT.jar --spring.profiles.active=work-queues,sender
運行消費者
java -jar RabbitMQ_0x02_SpringAMQP_WorkQueues_Sample-0.0.1-SNAPSHOT.jar --spring.profiles.active=work-queues,receiver