聊聊rocketmq的ExtProducerResetConfiguration

本文主要研究一下rocketmq的ExtProducerResetConfigurationjava

ExtProducerResetConfiguration

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.javagit

@Configuration
public class ExtProducerResetConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
    private final static Logger log = LoggerFactory.getLogger(ExtProducerResetConfiguration.class);

    private ConfigurableApplicationContext applicationContext;

    private StandardEnvironment environment;

    private RocketMQProperties rocketMQProperties;

    private ObjectMapper objectMapper;

    public ExtProducerResetConfiguration(ObjectMapper rocketMQMessageObjectMapper,
                                             StandardEnvironment environment,
                                             RocketMQProperties rocketMQProperties) {
        this.objectMapper = rocketMQMessageObjectMapper;
        this.environment = environment;
        this.rocketMQProperties = rocketMQProperties;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = (ConfigurableApplicationContext) applicationContext;
    }

    @Override
    public void afterSingletonsInstantiated() {
        Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(ExtRocketMQTemplateConfiguration.class);

        if (Objects.nonNull(beans)) {
            beans.forEach(this::registerTemplate);
        }
    }

    private void registerTemplate(String beanName, Object bean) {
        Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);

        if (!RocketMQTemplate.class.isAssignableFrom(bean.getClass())) {
            throw new IllegalStateException(clazz + " is not instance of " + RocketMQTemplate.class.getName());
        }

        ExtRocketMQTemplateConfiguration annotation = clazz.getAnnotation(ExtRocketMQTemplateConfiguration.class);
        GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
        validate(annotation, genericApplicationContext);

        DefaultMQProducer mqProducer = createProducer(annotation);
        // Set instanceName same as the beanName
        mqProducer.setInstanceName(beanName);
        try {
            mqProducer.start();
        } catch (MQClientException e) {
            throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}",
                    beanName), e);
        }
        RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean;
        rocketMQTemplate.setProducer(mqProducer);
        rocketMQTemplate.setObjectMapper(objectMapper);


        log.info("Set real producer to :{} {}", beanName, annotation.value());
    }

    private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annotation) {
        DefaultMQProducer producer = null;

        RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
        if (producerConfig == null) {
            producerConfig = new RocketMQProperties.Producer();
        }
        String nameServer = environment.resolvePlaceholders(annotation.nameServer());
        String groupName = environment.resolvePlaceholders(annotation.group());
        groupName = StringUtils.isEmpty(groupName) ? producerConfig.getGroup() : groupName;

        String ak = environment.resolvePlaceholders(annotation.accessKey());
        ak = StringUtils.isEmpty(ak) ? producerConfig.getAccessKey() : annotation.accessKey();
        String sk = environment.resolvePlaceholders(annotation.secretKey());
        sk = StringUtils.isEmpty(sk) ? producerConfig.getSecretKey() : annotation.secretKey();
        String customizedTraceTopic = environment.resolvePlaceholders(annotation.customizedTraceTopic());
        customizedTraceTopic = StringUtils.isEmpty(customizedTraceTopic) ? producerConfig.getCustomizedTraceTopic() : customizedTraceTopic;

        if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
            producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)),
                    annotation.enableMsgTrace(), customizedTraceTopic);
            producer.setVipChannelEnabled(false);
        } else {
            producer = new DefaultMQProducer(groupName, annotation.enableMsgTrace(), customizedTraceTopic);
        }

        producer.setNamesrvAddr(nameServer);
        producer.setSendMsgTimeout(annotation.sendMessageTimeout() == -1 ? producerConfig.getSendMessageTimeout() : annotation.sendMessageTimeout());
        producer.setRetryTimesWhenSendFailed(annotation.retryTimesWhenSendAsyncFailed() == -1 ? producerConfig.getRetryTimesWhenSendFailed() : annotation.retryTimesWhenSendAsyncFailed());
        producer.setRetryTimesWhenSendAsyncFailed(annotation.retryTimesWhenSendAsyncFailed() == -1 ? producerConfig.getRetryTimesWhenSendAsyncFailed() : annotation.retryTimesWhenSendAsyncFailed());
        producer.setMaxMessageSize(annotation.maxMessageSize() == -1 ? producerConfig.getMaxMessageSize() : annotation.maxMessageSize());
        producer.setCompressMsgBodyOverHowmuch(annotation.compressMessageBodyThreshold() == -1 ? producerConfig.getCompressMessageBodyThreshold() : annotation.compressMessageBodyThreshold());
        producer.setRetryAnotherBrokerWhenNotStoreOK(annotation.retryNextServer());

        return producer;
    }

    private void validate(ExtRocketMQTemplateConfiguration annotation, GenericApplicationContext genericApplicationContext) {
        if (genericApplicationContext.isBeanNameInUse(annotation.value())) {
            throw new BeanDefinitionValidationException(String.format("Bean {} has been used in Spring Application Context, " +
                            "please check the @ExtRocketMQTemplateConfiguration",
                    annotation.value()));
        }

        if (rocketMQProperties.getNameServer() == null ||
                rocketMQProperties.getNameServer().equals(environment.resolvePlaceholders(annotation.nameServer()))) {
            throw new BeanDefinitionValidationException(
                    "Bad annotation definition in @ExtRocketMQTemplateConfiguration, nameServer property is same with " +
                            "global property, please use the default RocketMQTemplate!");
        }
    }
}
複製代碼
  • ExtProducerResetConfiguration實現了ApplicationContextAware, SmartInitializingSingleton接口;其afterSingletonsInstantiated方法會取出標記有ExtRocketMQTemplateConfiguration的beans,而後挨個執行registerTemplate方法
  • registerTemplate首先判斷RocketMQTemplate.class是不是該bean的超類,是的話則讀取ExtRocketMQTemplateConfiguration註解,校驗註解指定的NameServer地址是否與全局的相同,相同則拋出BeanDefinitionValidationException異常
  • 最後經過createProducer方法建立DefaultMQProducer,而後執行其start方法,而後複製給該bean,並設置objectMapper

ExtRocketMQTemplateConfiguration

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.javagithub

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface ExtRocketMQTemplateConfiguration {
    /**
     * The component name of the Producer configuration.
     */
    String value() default "";

    /**
     * The property of "name-server".
     */
    String nameServer();

    /**
     * Name of producer.
     */
    String group() default "${rocketmq.producer.group:}";
    /**
     * Millis of send message timeout.
     */
    int sendMessageTimeout() default -1;
    /**
     * Compress message body threshold, namely, message body larger than 4k will be compressed on default.
     */
    int compressMessageBodyThreshold() default -1;
    /**
     * Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
     * This may potentially cause message duplication which is up to application developers to resolve.
     */
    int retryTimesWhenSendFailed() default -1;
    /**
     * <p> Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
     * This may potentially cause message duplication which is up to application developers to resolve.
     */
    int retryTimesWhenSendAsyncFailed() default -1;
    /**
     * Indicate whether to retry another broker on sending failure internally.
     */
    boolean retryNextServer() default false;
    /**
     * Maximum allowed message size in bytes.
     */
    int maxMessageSize() default -1;
    /**
     * The property of "access-key".
     */
    String accessKey() default "${rocketmq.producer.accessKey:}";
    /**
     * The property of "secret-key".
     */
    String secretKey() default "${rocketmq.producer.secretKey:}";
    /**
     * Switch flag instance for message trace.
     */
    boolean enableMsgTrace() default true;
    /**
     * The name value of message trace topic.If you don't config,you can use the default trace topic name. */ String customizedTraceTopic() default "${rocketmq.producer.customized-trace-topic:}"; } 複製代碼
  • ExtRocketMQTemplateConfiguration註解定義了value、nameServer、group、sendMessageTimeout、compressMessageBodyThreshold、retryTimesWhenSendFailed、retryTimesWhenSendAsyncFailed、retryNextServer、maxMessageSize、accessKey、secretKey、enableMsgTrace、customizedTraceTopic屬性

ExtRocketMQTemplate

import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;

@ExtRocketMQTemplateConfiguration(nameServer = "${demo.rocketmq.extNameServer}")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
複製代碼
  • 這裏定義了ExtRocketMQTemplate類,它繼承了RocketMQTemplate,同時使用了ExtRocketMQTemplateConfiguration註解

小結

  • ExtProducerResetConfiguration實現了ApplicationContextAware, SmartInitializingSingleton接口;其afterSingletonsInstantiated方法會取出標記有ExtRocketMQTemplateConfiguration的beans,而後挨個執行registerTemplate方法
  • registerTemplate首先判斷RocketMQTemplate.class是不是該bean的超類,是的話則讀取ExtRocketMQTemplateConfiguration註解,校驗註解指定的NameServer地址是否與全局的相同,相同則拋出BeanDefinitionValidationException異常
  • 最後經過createProducer方法建立DefaultMQProducer,而後執行其start方法,而後複製給該bean,並設置objectMapper

doc

相關文章
相關標籤/搜索