SpringBoot @JmsListener(destination = ) 運行時動態修改

背景

最近在寫一個Mqtt消息轉發的中間件,可經過ActvieMq接收消息。須要對外提供配置接口,經過配置接口,動態配置Queue,可接收Queue的消息。spring

整個項目依賴於SpringBoot ,經過SpringBoot實現隊列消費,只須要經過@JmsListener(destination = "queueName") 註解,就能夠實現對特定隊列的消費。segmentfault

遇到問題

正式這種註解的方式,使得destination只能在代碼中寫死,無法動態修改。 而我想實現的效果是可以動態的修改destination,動態的建立Consumer。ide

解決方案

  1. 繼續使用@JmsListener,找到某種方式,可以動態修改destination
  2. 不使用@JmsListener,經過ActiveMq提供的Jar,手動實現鏈接、消費等。以前經過這種方式實現過RabbitMq的處理,所以該方案不會有難度,只是工做量多一些。

因爲SpringBoot的過度簡單,所以開始嘗試經過第一種方式。測試

解決過程

一開始想着經過反射修改註解destination可是嘗試失敗。後來想到@JmsListener(destination = "${xxx}")這種方式,根據配置文件,能夠修改消費的隊列,可是須要重新啓動。 所以能不能動態修改配置文件對應的變量,而後消費者動態注入Spring。ui

  1. 自定義實現一個MapPropertySource,而後加入到Environment中

@Configuration
public class DynamicTestSetting {.net

public static final String DYNAMIC_CONFIG = "dynamic_settting";
@Autowired
AbstractEnvironment environment;

@PostConstruct
public void init() {
    environment.getPropertySources().addFirst(new DynamicLoadPropertySource(DYNAMIC_CONFIG, null));
}

}code

@Slf4j
public class DynamicLoadPropertySource extends MapPropertySource {中間件

private static Map<String, Object> map = new ConcurrentHashMap<String, Object>(64);

private static ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
static {
    scheduled.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            //測試:定時修改value
            //真正使用可能須要傳遞一個參數或者定時讀取配置文件
            map.put("test", String.valueOf(System.currentTimeMillis()));
        }
    }, 1, 1, TimeUnit.SECONDS);
}

public DynamicLoadPropertySource(String name, Map<String, Object> source) {
    super(name, map);
}


@Override
public Object getProperty(String name) {
    return map.get(name);
}

} blog

2.消費者須要動態加入
@Slf4j
public class TestConsumer {接口

@JmsListener(destination = "${test}")
public void receiveQueue(BytesMessage msg) {
    //接收消息後的處理邏輯
}

}

/*動態注入bean到spring,須要用到ApplicationContext/
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) context.getAutowireCapableBeanFactory();

BeanDefinitionBuilder definitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(TestConsumer.class);
        defaultListableBeanFactory.registerBeanDefinition("testConsumer",definitionBuilder.getBeanDefinition());

//調用getBeaan時Spring會建立一個TestConsumer實例,這個時候ActiveMq中會建立一個destination = "${test}"隊列,TestConsummer和其綁定消費

TestConsumer consumer = context.getBean("testConsumer", TestConsumer.class);
        System.out.println(consumer);

動態關閉Queue的消費者

參見另外一篇:https://segmentfault.com/n/13...

參考文章:https://blog.csdn.net/qq_3491...

相關文章
相關標籤/搜索