設計與實現
定義主題隊列註解java
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Component public @interface RMessage { /** * 消息隊列 * @return */ String queue(); /** * 主題 * @return */ String topic() default "system"; }
springboot啓動監聽初始化任務隊列與消息主題,消費者訂閱主題redis
@Slf4j @Component public class RMessageListener implements ApplicationListener<ApplicationStartedEvent> { /** * consumer monitoringMethod monitorMessage */ private final static String METHOD_MONITOR_MESSAGE = "monitorMessage"; /** * redisson topic name */ private final static String ATTRIBUTE_NAME_TOPIC = "topic"; /** * redisson messageQueue name */ private final static String ATTRIBUTE_NAME_QUEUE = "queue"; /** * redisson queue map */ public static Map<String, RBlockingDeque<? super Serializable>> messageQueue = new ConcurrentHashMap<>(); /** * redisson offQueue map */ public static Map<String, RDelayedQueue<? super Serializable>> offQueue = new ConcurrentHashMap<>(); /** * redisson topic map */ public static Map<String, RTopic> topicMap = new ConcurrentHashMap<>(); @Override public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent) { ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false); provider.addIncludeFilter(new AnnotationTypeFilter(RMessage.class)); String basePackage = applicationStartedEvent.getSpringApplication().getMainApplicationClass().getPackage().getName(); Set<BeanDefinition> beanDefinitions = provider.findCandidateComponents(basePackage); ConfigurableListableBeanFactory beanFactory = applicationStartedEvent.getApplicationContext().getBeanFactory(); mqInit(beanDefinitions, beanFactory); provider.clearCache(); provider.resetFilters(false); provider.addIncludeFilter(new AssignableTypeFilter(RMessageConsumer.class)); Set<BeanDefinition> consumers = provider.findCandidateComponents(basePackage); consumerSubscribe(beanFactory, consumers); } /** * consumer subscription news * * @param beanFactory * @param consumers */ private void consumerSubscribe(ConfigurableListableBeanFactory beanFactory, Set<BeanDefinition> consumers) { consumers.forEach(beanDefinition -> { log.info("rMessage init consumer {}",beanDefinition.getBeanClassName()); try { Object bean = beanFactory.getBean(Class.forName(beanDefinition.getBeanClassName())); Method method = bean.getClass().getMethod(METHOD_MONITOR_MESSAGE); ReflectionUtils.invokeMethod(method,bean); } catch (ClassNotFoundException | NoSuchMethodException e) { e.printStackTrace(); } }); } /** * Parameter initialization * * @param beanDefinitions * @param beanFactory */ private void mqInit(Set<BeanDefinition> beanDefinitions,final ConfigurableListableBeanFactory beanFactory) { RedissonClient redissonClient = beanFactory.getBean(RedissonClient.class); beanDefinitions.stream().filter(beanDefinition -> beanDefinition instanceof AnnotatedBeanDefinition).forEach(beanDefinition->{ AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition)beanDefinition; AnnotationMetadata annotationMetadata = annotatedBeanDefinition.getMetadata(); MergedAnnotation<RMessage> mergedAnnotation = annotationMetadata.getAnnotations().get(RMessage.class); String queryName = mergedAnnotation.getString(ATTRIBUTE_NAME_QUEUE); String topicName = mergedAnnotation.getString(ATTRIBUTE_NAME_TOPIC); String shortName = topicName+"."+queryName; RBlockingDeque<? super Serializable> blockingDeque = redissonClient.getBlockingDeque(shortName); messageQueue.put(shortName,blockingDeque); RDelayedQueue<? super Serializable> delayedQueue = redissonClient.getDelayedQueue(blockingDeque); offQueue.put(shortName,delayedQueue); RTopic topic = redissonClient.getTopic(topicName); topicMap.put(shortName,topic); }); } }
抽象隊列主題列表spring
/** * @author Ming * @date 2020/9/15 11:22 */ public abstract class AbstractQueue { Map<String, RDelayedQueue<? super Serializable>> offQueue = RMessageListener.offQueue; Map<String, RBlockingDeque<? super Serializable>> messageQueue = RMessageListener.messageQueue; Map<String, RTopic> topicMap = RMessageListener.topicMap; protected RDelayedQueue<? super Serializable> getRDelayedQueue() { return offQueue.get(shortName()); } protected RBlockingDeque<? super Serializable> getMessageQueue() { return messageQueue.get(shortName()); } private String shortName() { Annotation[] annotations = this.getClass().getAnnotations(); RMessage rMessage = Arrays.stream(annotations).filter(annotation -> annotation instanceof RMessage) .map(annotation -> (RMessage)annotation).findAny().get(); String queryName = rMessage.queue(); String topicName = rMessage.topic(); return topicName+"."+queryName; } protected RTopic getTopic() { return topicMap.get(shortName()); } }
抽象生產者模板springboot
@Slf4j public abstract class RMessageProducer<T extends Serializable> extends AbstractQueue { /** * 發送延時消息 * @param message * @param delay * @param timeUnit */ public void sendMessage(T message, long delay, TimeUnit timeUnit) { log.info("rMessage sendMessage: {}, delayTime {}",message.toString(),delay+timeUnit.name()); super.getRDelayedQueue().offer(message,delay,timeUnit); super.getTopic().publish(this.hashCode()); } /** * 發送異步消息 * @param message */ public void sendMessage(T message) { this.sendMessage(message,0,TimeUnit.MILLISECONDS); } }
抽象消費者模板app
@Slf4j public abstract class RMessageConsumer<T extends Serializable> extends AbstractQueue { public void monitorMessage() { CompletableFuture.runAsync(this::pastConsumption); super.getTopic().addListener(Object.class,(c,m)-> { try { Object take = super.getMessageQueue().take(); log.info("rMessage receiveMessage: {}, receiving time {}",take.toString(), LocalDateTime.now()); this.useMessage((T)take); } catch (InterruptedException e) { e.printStackTrace(); } }); } protected abstract void useMessage(T message); public void pastConsumption() { while (super.getRDelayedQueue().size() > 0 || super.getMessageQueue().size() > 0) { try { Object take = super.getMessageQueue().take(); log.info("rMessage receiveMessage: {}, receiving time {}",take.toString(), LocalDateTime.now()); this.useMessage((T)take); } catch (InterruptedException e) { e.printStackTrace(); } } } }
具體使用
生產者異步
@RMessage(queue = "redisQuery",topic = "order") public class RedissonProducer extends RMessageProducer<HashMap> { } @RestController @RequestMapping("producer") @AllArgsConstructor public class ProducerController { private RedissonProducer redissonProducer; @PostMapping public String send() { HashMap<String,Object> map = new HashMap<>(); map.put("name","張三"); map.put("time", "測試順序第二條"+LocalDateTime.now()); redissonProducer.sendMessage(map,5, TimeUnit.MINUTES); return "send msg"; } }
消費者ide
@RMessage(queue = "redisQuery",topic = "order") public class RedissonConsumer extends RMessageConsumer<HashMap> { @Override protected void useMessage(HashMap message) { System.out.println("接收到消息:"+message); } }