延遲消息處理

以前有這樣一個需求,運營在後端配置一條系統消息或者營銷活動等類型的消息等到了須要推送的時間之後會自動的將消息推送給用戶APP端顯示,一開始是採用的任務調度的方式(定時器),經過輪詢掃表去作,由於具體何時推送消息沒有固定的頻率,固定的時間,所以須要每分鐘掃表以免消息在指定時間內未及時推送給APP端內.因此每次都是1分鐘掃描一次,太過於頻繁。因此不太適合(定時器適合那種固定頻率或時間段處理)。java

所以這裏選取了幾種延遲發送的方式:redis

        1.rabbitMQspring

        2.redis後端

        3.DelayedQueue(慎用)markdown

代碼部分(發送端):分佈式

    

/**
 * 提供了一個公有的方法
 */
public interface ISysMessageDelayProcessor {
    long FIVE_MINUTES = 5 * 60 * 1000;
    /**
     * 發送消息的處理
     * @param msg<按需自行封裝處理>
     * @param pushDate<推送時間>
     */
    void sendMessage(Object msg, LocalDateTime pushDate);

}
/**
 * 基於RabbitMQ的實現方式(須要下載rabbitMQ插件)
 * 
 * 
 */
@Slf4j
@EnableBinding(SysMessageSink.class)
public class SysMessageRabbitMQDelayProcessorImpl implements ISysMessageDelayProcessor {

    @Autowired
    private BinderAwareChannelResolver resolver;

    @Override
    public void sendMessage(Object msg LocalDateTime pushDate) {
        resolver.resolveDestination(MQTopicConstant.SYS_MESSAGE_SEND_DELAY_TOPIC_PRODUCER)
                .send(MessageBuilder.withPayload(msg)
                        .setHeader("x-delay",
                                Duration.between(LocalDateTime.now(), pushDate)
                                        .toMillis())
                        .build());
    }

}
#配置系統消息的延遲發送
spring.cloud.stream.bindings.your-topic-producer.destination=your-topic
spring.cloud.stream.rabbit.bindings.your-topic-producer.producer.delayed-exchange=true
spring.cloud.stream.bindings.your-topic-consumer.destination=your-topic
spring.cloud.stream.rabbit.bindings.your-topic-consumer.consumer.delayed-exchange=true
spring.cloud.stream.bindings.your-topic-consumer.group=your-topic-group
/**
 * 
 * 基於redis的實現
 * 
 */
public class SysMessageRedisDelayProcessorImpl implements ISysMessageDelayProcessor {
    @Autowired
    private RedisTemplate redisTemplate;


    @Override
    public void sendMessage(Object msg, LocalDateTime pushDate) {
        redisTemplate.opsForZSet().add(MQTopicConstant.SYS_MESSAGE_QUERY_DELAY_TOPIC,msg,
                    Duration.between(LocalDateTime.now(), pushDate)
                            .toMillis());
    }


}
/**
 * 是一種補備用方案,當不知足redis,rabbitMQ的場景的時候使用
 * 是一種基於內存的方式,一旦宕機,或者重啓那麼內存中的數據就會丟失
 * 慎用!
 */
public class SysMessageDelayedQueueProcessorImpl implements ISysMessageDelayProcessor, Delayed {
    private LocalDateTime executeTime;
    private Object data;
    // send queue
    private static final DelayQueue<SysMessageDelayedQueueProcessorImpl> sendDelayQueue =
            new DelayQueue<>();
    // query queue
    private static final DelayQueue<SysMessageDelayedQueueProcessorImpl> queryDelayQueue =
            new DelayQueue<>();

    public SysMessageDelayedQueueProcessorImpl() {
        new Thread(new SysMessageDelayedQueueProcessorListener(sendDelayQueue, queryDelayQueue)).start();
    }

    public SysMessageDelayedQueueProcessorImpl(LocalDateTime executeTime, Object data) {
        this.executeTime = executeTime;
        this.data = data;
    }


    @Override
    public void sendMessage(Object msg, LocalDateTime pushDate) {
        sendDelayQueue.offer(new SysMessageDelayedQueueProcessorImpl(pushDate, msg));
    }


    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(Duration.between(LocalDateTime.now(), executeTime).toMillis(),
                TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }
}

接收端:ide

/**
 * 監聽
 */
public abstract class ISysMessageDelayedListener implements Runnable {

    protected static final LinkedBlockingQueue<SysMessageVO> SEND_QUEUE =
            new LinkedBlockingQueue(1000);

    @Override
    public void run() {
        sendProcessor();
    }

    /**
     * 監聽發送方法
     */
    public abstract void sendProcessor();
}
/**
 * 只用來監聽MQ延遲隊列推送過來的數據包,及轉發數據包
 * 不作其餘業務處理
 *
 */
@Component
@EnableBinding(SysMessageSink.class)
@Slf4j
public class SysMessageRabbitMQDelayedProcessorListener extends ISysMessageDelayedListener {
    @Autowired
    private SysMessageQueryProcessor sysMessageQueryProcessor;

    private static final LinkedBlockingQueue<SysMessageVO> SEND_QUEUE =
            new LinkedBlockingQueue(1000);


    /**
     * 接受發送的數據
     *
     * @param 
     */
    @StreamListener(MQTopicConstant.SYS_MESSAGE_SEND_DELAY_TOPIC_CONSUMER)
    public void onSendHandle(Object msg) {
       
        try {
            // put
            SEND_QUEUE.put(sysMessage);
        } catch (InterruptedException e) {
            log.error("caught onSendHandle invoke fail,e:", e);
        }

    }

    @Override
    public void sendProcessor() {

    }
}
/**
 * redis監聽處理
 */
@Slf4j
public class SysMessageRedisDelayedProcessorListener extends ISysMessageDelayedListener {
    private static final String setNX = "lock:sysmessage:delay";
    public static final int LOCK_EXPIRE = 300; // ms

    @Autowired
    private RedisTemplate redisTemplate;

    public SysMessageRedisDelayedProcessorListener() {
        new Thread(new SysMessagePushWork(SEND_QUEUE)).start();
    }

    /**
     * 監聽是否有到期的數據
     */
    private void monitorSendQueue() {
        while (true) {
            if (lock()) {
                Set<ZSetOperations.TypedTuple<Object>> set =
                        redisTemplate.opsForZSet().rangeWithScores(MQTopicConstant.SYS_MESSAGE_SEND_DELAY_TOPIC, 0, 0);
                Iterator<ZSetOperations.TypedTuple<Object>> iterator = set.iterator();
                while (iterator.hasNext()) {
                    ZSetOperations.TypedTuple<Object> next = iterator.next();
                    consumer(MQTopicConstant.SYS_MESSAGE_SEND_DELAY_TOPIC, next);
                }
            }
        }
    }

    /**
     * 獲取分佈式瑣
     *
     * @return
     */
    private boolean lock() {
        try {
            long expireAt = System.currentTimeMillis() + LOCK_EXPIRE + 1;
            return (Boolean) this.redisTemplate.execute((RedisCallback) connection -> {
                Boolean acquire = connection.setNX(setNX.getBytes(), String.valueOf(expireAt).getBytes());
                if (acquire) {
                    return true;
                }
                byte[] value = connection.get(setNX.getBytes());
                if (Objects.isNull(value) || value.length <= 0) {
                    return false;
                }
                long expireTime = Long.parseLong(new String(value));
                if (expireTime < System.currentTimeMillis()) {
                    // 若是鎖已通過期
                    byte[] oldValue = connection.getSet(setNX.getBytes(),
                            String.valueOf(System.currentTimeMillis() + LOCK_EXPIRE + 1).getBytes());
                    // 防止死鎖
                    return Long.parseLong(new String(oldValue)) < System.currentTimeMillis();
                }
                return true;
            });
        } catch (Exception e) {
            log.error("obtain lock option fail, caught exception:", e);
            return false;
        }
    }

    /**
     * 刪除過時的數據
     *
     * @param value
     */
    private void removeDataByExpireTime(String key, Object value) {
        redisTemplate.opsForZSet().remove(key, value);
    }

    /**
     * 消費
     *
     * @param next
     */
    private void consumer(String key, ZSetOperations.TypedTuple<Object> next) {
        // processor and remove
        if (!ifExpire(next.getScore())) {
            return;
        }
       if (MQTopicConstant.SYS_MESSAGE_SEND_DELAY_TOPIC.equals(key)) {
            // in queue
            SEND_QUEUE.offer(sysMessage);
        }
        // remove
        removeDataByExpireTime(key, next.getValue());


    }

    /**
     * 過時判斷
     *
     * @param expireTime
     * @return
     */
    private boolean ifExpire(Double expireTime) {
        return (expireTime.longValue() + 1000) <= LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();
    }

    @Override
    public void sendProcessor() {
        // 監聽發送隊列的變化
        monitorSendQueue();
    }
}
/**
 *
 */
@Slf4j
public class SysMessageDelayedQueueProcessorListener extends ISysMessageDelayedListener {
    private DelayQueue<SysMessageDelayedQueueProcessorImpl> sendDelayQueue;

    public SysMessageDelayedQueueProcessorListener(DelayQueue<SysMessageDelayedQueueProcessorImpl> sendDelayQueue) {
        this.sendDelayQueue = sendDelayQueue;
        new Thread(new SysMessagePushWork(SEND_QUEUE)).start();
    }

  

    @Override
    public void sendProcessor() {
        CompletableFuture.runAsync(() -> {
            while (true) {
                try {
                    // processor
                    SysMessageDelayedQueueProcessorImpl queue = sendDelayQueue.take();
                    if (Objects.isNull(queue)) {
                        continue;
                    }
                   
                    // execute
                    SEND_QUEUE.offer(queue.getData());
                } catch (InterruptedException e) {
                    // 
                }
            }
        });

    }
}
/**
 */
@Configuration
public class SysMessageConfiguration {

    /**
     * 基於rabbitMQ的延遲處理
     * @return
     */
    @Primary
    @ConditionalOnClass(name = "org.springframework.cloud.stream.binding.BinderAwareChannelResolver")
    @Bean
    public SysMessageRabbitMQDelayProcessorImpl createSysMessageRabbitMQDelayProcessor() {

        return new SysMessageRabbitMQDelayProcessorImpl();
    }

//    /**
//     * 基於redis的延遲處理
//     * @return
//     */
//    @ConditionalOnClass(RedisTemplate.class)
//    @ConditionalOnMissingClass("org.springframework.cloud.stream.binding.BinderAwareChannelResolver")
//    @Bean
//    public SysMessageRedisDelayProcessorImpl createSysMessageRedisDelayProcessor() {
//        return new SysMessageRedisDelayProcessorImpl();
//    }

    /**
     * 基於內存的延遲處理
     * @return
     */
    @ConditionalOnMissingClass({"org.springframework.cloud.stream.binding.BinderAwareChannelResolver",
            "org.springframework.data.redis.core.RedisTemplate"})
    @Bean
    public SysMessageDelayedQueueProcessorImpl createSysMessageDelayedQueueProcessor() {
        return new SysMessageDelayedQueueProcessorImpl();
    }
}
private ISysMessageDelayProcessor sysMessageDelayProcessor;

    @Autowired
    public xxxx(ISysMessageDelayProcessor sysMessageDelayProcessor) {
        this.sysMessageDelayProcessor = sysMessageDelayProcessor;
    }

 其餘部分業務代碼按需處理便可ui

相關文章
相關標籤/搜索