Rabbitmq 實現延時任務

一、須要用到插件 rabbitmq_delayed_message_exchange 來實現,插件下載地址:https://www.rabbitmq.com/community-plugins.htmlhtml

 

 

 

二、下載後把插件放到 plugins 裏面,而後到 sbin裏面打開cmd,執行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 命令java

 

 

 三、插件裝好後,從新啓動mq,而後集成mq。spring

  首先,導包app

    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

 

  

  而後,配置文件配置鏈接信息:ide

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.listener.simple.acknowledge-mode=manual

 

  

  mq 配置:spring-boot

  

package com.rrg.gz.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 *  mq配置
 * @author huangsz  2019/4/25 0025
 */
@Configuration
public class RabbitPluginConfig {

    /**
     * 延時隊列交換機
     * 注意這裏的交換機類型:CustomExchange
     * @return
     */
    @Bean
    public CustomExchange delayExchange(){
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("rrg_delay_exchange","x-delayed-message",true, false,args);
    }

    /**
     * 延時隊列
     * @return
     */
    @Bean
    public Queue delayQueue(){
        return new Queue("rrg_delay_queue",true);
    }

    /**
     * 給延時隊列綁定交換機
     * @return
     */
    @Bean
    public Binding cfgDelayBinding(Queue cfgDelayQueue, CustomExchange cfgUserDelayExchange){
        return BindingBuilder.bind(cfgDelayQueue).to(cfgUserDelayExchange).with("rrg_delay_key").noargs();
    }
}
View Code

 

 

  發送消息類、接收類和信息類,信息類是咱們本身時間業務封裝須要消費的信息。測試

package com.rrg.gz.mq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 消息發送者
 *
 * @author huangsz  2019/3/7 0007
 */
@Component
public class Sender {
    private static Logger log = LoggerFactory.getLogger(Sender.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendDelayMessage(MqEntity entity, long time) {
        // 這裏的消息能夠是任意對象,無需額外配置,直接傳便可
        log.info("延時隊列生產消息");
        this.rabbitTemplate.convertAndSend(
                "rrg_delay_exchange",
                "rrg_delay_key",
                entity,
                message -> {
                    // 注意這裏時間能夠使long,並且是設置header
                    message.getMessageProperties().setHeader("x-delay",time);
                    return message;
                }
        );
        log.info("{}ms後執行", time);
    }

}
View Code

 

package com.rrg.gz.mq;

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 接受者
 *
 * @author huangsz  2019/3/7 0007
 */

@Component
public class Receiver {
    private static Logger log = LoggerFactory.getLogger(Receiver.class);
    @Autowired
    private Sender sender;

    @RabbitListener(queues = "rrg_delay_queue")
    public void cfgUserReceiveDealy(MqEntity entity, Message message, Channel channel) throws Exception{
        log.info("開始接受消息!");
        // 通知 MQ 消息已被接收,能夠ACK(從隊列中刪除)了
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        System.out.println("接收消息並打印");
        System.out.println(entity);
    }
}
View Code
package com.rrg.gz.mq;


import java.io.Serializable;

/**
 *  必定要實現 Serializable
 * @author huangsz  2019/3/7 0007
 */
public class MqEntity implements Serializable {

    private Integer userId;
    private String msg;

    public MqEntity() {
    }

    public MqEntity(Integer userId, String msg) {
        this.userId = userId;
        this.msg = msg;
    }

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    @Override
    public String toString() {
        return "MqEntity{" +
                "userId=" + userId +
                ", msg='" + msg + '\'' +
                '}';
    }
}
View Code

 

 

四、寫一個controller測試:ui

  

@RequestMapping("/test1")
    public void test(){
        MqEntity mqEntity = new MqEntity(1,"30秒後消費");
        sender.sendDelayMessage(mqEntity,30000);
    }

    @RequestMapping("/test2")
    public void test2(){
        MqEntity mqEntity = new MqEntity(1,"10秒後消費");
        sender.sendDelayMessage(mqEntity,10000);
    }

 

 

先執行test1,而後執行test2,這個時候,不須要等test1消費完以後,test2才消費信息。this

 

相關文章
相關標籤/搜索