本文主要研究一下rocketmq的ListenerContainerConfigurationjava
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.javagit
@Configuration public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton { private final static Logger log = LoggerFactory.getLogger(ListenerContainerConfiguration.class); private ConfigurableApplicationContext applicationContext; private AtomicLong counter = new AtomicLong(0); private StandardEnvironment environment; private RocketMQProperties rocketMQProperties; private ObjectMapper objectMapper; public ListenerContainerConfiguration(ObjectMapper rocketMQMessageObjectMapper, StandardEnvironment environment, RocketMQProperties rocketMQProperties) { this.objectMapper = rocketMQMessageObjectMapper; this.environment = environment; this.rocketMQProperties = rocketMQProperties; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = (ConfigurableApplicationContext) applicationContext; } @Override public void afterSingletonsInstantiated() { Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class); if (Objects.nonNull(beans)) { beans.forEach(this::registerContainer); } } private void registerContainer(String beanName, Object bean) { Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean); if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName()); } RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); validate(annotation); String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet()); GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext; genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> createRocketMQListenerContainer(containerBeanName, bean, annotation)); DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer.class); if (!container.isRunning()) { try { container.start(); } catch (Exception e) { log.error("Started container failed. {}", container, e); throw new RuntimeException(e); } } log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName); } private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) { DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer(); String nameServer = environment.resolvePlaceholders(annotation.nameServer()); nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer; String accessChannel = environment.resolvePlaceholders(annotation.accessChannel()); container.setNameServer(nameServer); if (!StringUtils.isEmpty(accessChannel)) { container.setAccessChannel(AccessChannel.valueOf(accessChannel)); } container.setTopic(environment.resolvePlaceholders(annotation.topic())); container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); container.setRocketMQMessageListener(annotation); container.setRocketMQListener((RocketMQListener) bean); container.setObjectMapper(objectMapper); container.setName(name); // REVIEW ME, use the same clientId or multiple? return container; } private void validate(RocketMQMessageListener annotation) { if (annotation.consumeMode() == ConsumeMode.ORDERLY && annotation.messageModel() == MessageModel.BROADCASTING) { throw new BeanDefinitionValidationException( "Bad annotation definition in @RocketMQMessageListener, messageModel BROADCASTING does not support ORDERLY message!"); } } }
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.javagithub
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface RocketMQMessageListener { String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}"; String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}"; String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}"; String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}"; String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}"; /** * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve * load balance. It's required and needs to be globally unique. * * * See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion. */ String consumerGroup(); /** * Topic name. */ String topic(); /** * Control how to selector message. * * @see SelectorType */ SelectorType selectorType() default SelectorType.TAG; /** * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92} */ String selectorExpression() default "*"; /** * Control consume mode, you can choice receive message concurrently or orderly. */ ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY; /** * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice. */ MessageModel messageModel() default MessageModel.CLUSTERING; /** * Max consumer thread number. */ int consumeThreadMax() default 64; /** * Max consumer timeout, default 30s. */ long consumeTimeout() default 30000L; /** * The property of "access-key". */ String accessKey() default ACCESS_KEY_PLACEHOLDER; /** * The property of "secret-key". */ String secretKey() default SECRET_KEY_PLACEHOLDER; /** * Switch flag instance for message trace. */ boolean enableMsgTrace() default true; /** * The name value of message trace topic.If you don't config,you can use the default trace topic name. */ String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER; /** * The property of "name-server". */ String nameServer() default NAME_SERVER_PLACEHOLDER; /** * The property of "access-channel". */ String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER; }