消息中間件RabbitMQ的使用

 

原理場景

MQ在全部項目裏面都很常見,java

一、減小非緊急性任務對整個業務流程形成的延時;spring

二、減小高併發對系統所形成的性能上的影響;docker

 

舉例幾個場景:springboot

一、給註冊完成的用戶派發優惠券、加積分、發消息等(派發優惠券、加積分、發消息這些屬於非緊急性任務,可交由MQ進行處理,先讓用戶完成註冊)併發

二、實時收集用戶運動數據,而且收集數據後還須要比較複雜和耗時的操做才能完成業務處理(實時的數據採集任務通常併發量都是很高的,咱們就應該先發送到MQ,再進行有序的處理)app

另外說明一下,高併發的問題有不少種處理手段,而MQ是我認爲的最穩健、簡單的手段之一,因此我會優先使用分佈式

 

大概流程

首先用docker安裝RabbitMQ,快捷ide

進入控制檯,下圖簡單介紹下各個功能模塊高併發

大體流程性能

一、發送消息到MQ  

二、MQ接收並保存消息等待消費

三、消費者有序地進行消息處理

 

Springboot中的使用

springboot中使用MQ超級簡單

一、配置

#RabbitMq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=test
spring.rabbitmq.listener.simple.retry.max-attempts=5
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.initial-interval=5000
spring.rabbitmq.listener.simple.default-requeue-rejected=false
spring.rabbitmq.listener.simple.prefetch=1 #這個根據本身業務狀況來定

 

二、發送

 @Autowired
    private AmqpTemplate amqpTemplate;


 @Test
    public void testMq(){
        User user = new User();
        user.setName("cjh陳");

        amqpTemplate.convertAndSend("exchange_01","que_01", JSON.toJSONString(JSONResult.SUCC("",user)));
    }

 

三、消費

  @RabbitListener(containerFactory = "containerFactory",concurrency = "10",bindings = {@QueueBinding(declare = "false",value = @Queue(value = "que_01"), exchange = @Exchange(declare = "false",value = "exchange_01",type = "fanout"))})
    public void process(Message message, com.rabbitmq.client.Channel channel) {

        Long deliveryTag = null;
        String data = null;
        try {

            deliveryTag = message.getMessageProperties().getDeliveryTag();
            data = new String(message.getBody(), "UTF-8");

            //你的業務處理
            test01(data);

        } catch (Exception e) {
            logger.warn("MQ異常 {} , {} , {}" , e.getMessage(),e.getCause(),data);
        }finally {
            logger.warn("======消息處理結束");
        }

            try {
                channel.basicAck(deliveryTag, false); // 確認消息成功消費(配置須要開啓消費確認模式)
            } catch (IOException e) {
                logger.warn("======MQ應答出錯,請檢查");
            }
    }

 

3步完成使用

特別說明幾點

一、AmqpTemplate 好像作不到發佈確認,要用RabbitTemplate,發佈確認我主要用在分佈式事務的場景

二、containerFactory能夠不配置,根據實際狀況來,下面再說明

三、concurrency指併發量,實時性要求很高的話,prefetch應該設低點或者設置成1,concurrency的值調高點

簡單畫了一下,,,留着本身看,,

四、declare = "false",很經常使用,意思就是說加入你MQ控制檯裏面已經新建好隊列或者交換機了,這裏就應該配false表示程序再也不進行從新定義,否則容易發生報錯(當從新定義的參數與已定義的參數不一致時就會報錯)

五、權限方面的處理之後再記錄,,

 

回到上面第2點,containerFactory何時會用到?

某些配置須要自定義,好比線程池的大小,

當concurrency數值放大的時候,好比100,我發現大部分的消費者並無工做,這是由於被線程池的大小所限制,網上的人說線程池大小默認是50,我也沒去查估計差很少也就這個數,那麼這個時候咱們就須要自定義的containerFactory:

package xxx;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author cjh
 * @Package xxx
 * @Description:
 * @date: 2019/6/30 20:19
 */
@Configuration
@EnableRabbit
public class MqContainerFactory implements RabbitListenerConfigurer {

    /**
     * containerFactory
     * @Description: 自定義配置
     * @param
     * @return
     */
    @Bean
    public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory) throws Exception{
        SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //必須是concurrency的兩倍以上
        ExecutorService service=Executors.newFixedThreadPool(200);
        factory.setTaskExecutor(service);
        factory.setPrefetchCount(1);
        return factory;
    }

    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(new MappingJackson2MessageConverter());
        return factory;
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    }

}

 

記錄完成

有錯歡迎指正,轉載請註明博客出處:http://www.cnblogs.com/cjh-notes/

相關文章
相關標籤/搜索