MQ在全部項目裏面都很常見,java
一、減小非緊急性任務對整個業務流程形成的延時;spring
二、減小高併發對系統所形成的性能上的影響;docker
舉例幾個場景:springboot
一、給註冊完成的用戶派發優惠券、加積分、發消息等(派發優惠券、加積分、發消息這些屬於非緊急性任務,可交由MQ進行處理,先讓用戶完成註冊)併發
二、實時收集用戶運動數據,而且收集數據後還須要比較複雜和耗時的操做才能完成業務處理(實時的數據採集任務通常併發量都是很高的,咱們就應該先發送到MQ,再進行有序的處理)app
另外說明一下,高併發的問題有不少種處理手段,而MQ是我認爲的最穩健、簡單的手段之一,因此我會優先使用分佈式
首先用docker安裝RabbitMQ,快捷ide
進入控制檯,下圖簡單介紹下各個功能模塊高併發
大體流程性能
一、發送消息到MQ
二、MQ接收並保存消息等待消費
三、消費者有序地進行消息處理
springboot中使用MQ超級簡單
一、配置
#RabbitMq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=test
spring.rabbitmq.listener.simple.retry.max-attempts=5
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.initial-interval=5000
spring.rabbitmq.listener.simple.default-requeue-rejected=false
spring.rabbitmq.listener.simple.prefetch=1 #這個根據本身業務狀況來定
二、發送
@Autowired private AmqpTemplate amqpTemplate; @Test public void testMq(){ User user = new User(); user.setName("cjh陳"); amqpTemplate.convertAndSend("exchange_01","que_01", JSON.toJSONString(JSONResult.SUCC("",user))); }
三、消費
@RabbitListener(containerFactory = "containerFactory",concurrency = "10",bindings = {@QueueBinding(declare = "false",value = @Queue(value = "que_01"), exchange = @Exchange(declare = "false",value = "exchange_01",type = "fanout"))}) public void process(Message message, com.rabbitmq.client.Channel channel) { Long deliveryTag = null; String data = null; try { deliveryTag = message.getMessageProperties().getDeliveryTag(); data = new String(message.getBody(), "UTF-8"); //你的業務處理 test01(data); } catch (Exception e) { logger.warn("MQ異常 {} , {} , {}" , e.getMessage(),e.getCause(),data); }finally { logger.warn("======消息處理結束"); } try { channel.basicAck(deliveryTag, false); // 確認消息成功消費(配置須要開啓消費確認模式) } catch (IOException e) { logger.warn("======MQ應答出錯,請檢查"); } }
3步完成使用
特別說明幾點
一、AmqpTemplate 好像作不到發佈確認,要用RabbitTemplate,發佈確認我主要用在分佈式事務的場景
二、containerFactory能夠不配置,根據實際狀況來,下面再說明
三、concurrency指併發量,實時性要求很高的話,prefetch應該設低點或者設置成1,concurrency的值調高點
簡單畫了一下,,,留着本身看,,
四、declare = "false",很經常使用,意思就是說加入你MQ控制檯裏面已經新建好隊列或者交換機了,這裏就應該配false表示程序再也不進行從新定義,否則容易發生報錯(當從新定義的參數與已定義的參數不一致時就會報錯)
五、權限方面的處理之後再記錄,,
回到上面第2點,containerFactory何時會用到?
某些配置須要自定義,好比線程池的大小,
當concurrency數值放大的時候,好比100,我發現大部分的消費者並無工做,這是由於被線程池的大小所限制,網上的人說線程池大小默認是50,我也沒去查估計差很少也就這個數,那麼這個時候咱們就須要自定義的containerFactory:
package xxx; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.converter.MappingJackson2MessageConverter; import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author cjh * @Package xxx * @Description: * @date: 2019/6/30 20:19 */ @Configuration @EnableRabbit public class MqContainerFactory implements RabbitListenerConfigurer { /** * containerFactory * @Description: 自定義配置 * @param * @return */ @Bean public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory) throws Exception{ SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); //必須是concurrency的兩倍以上 ExecutorService service=Executors.newFixedThreadPool(200); factory.setTaskExecutor(service); factory.setPrefetchCount(1); return factory; } @Bean public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() { DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); factory.setMessageConverter(new MappingJackson2MessageConverter()); return factory; } @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory()); } }
有錯歡迎指正,轉載請註明博客出處:http://www.cnblogs.com/cjh-notes/