前言
最近業務開發部門給咱們部門提了一個需求,由於他們開發環境和測試環境共用一套kafka,他們但願咱們部門能幫他們實現自動給kafka的topic加上環境前綴,好比開發環境,則topic爲dev_topic,測試環境,則topic爲test_topic,他們kafka客戶端是使用spring-kafka。一開始接到這個需求的時候,我內心是拒絕的,爲啥開發環境和測試環境不分別部署一套kafka,還要那麼麻煩。但老大都答應接這個需求了,做爲小羅羅也只能接了java
實現思路
一、生產者端
能夠經過生產者攔截器,來給topic加前綴git
二、實現步驟
a、編寫一個生產者攔截器github
@Slf4j public class KafkaProducerInterceptor implements ProducerInterceptor<String, MessageDTO> { /** * 運行在用戶主線程中,在消息被序列化以前調用 * @param record * @return */ @Override public ProducerRecord<String, MessageDTO> onSend(ProducerRecord<String, MessageDTO> record) { log.info("原始topic:{}",record.topic()); return new ProducerRecord<String, MessageDTO>(TOPIC_KEY_PREFIX + record.topic(), record.partition(),record.timestamp(),record.key(), record.value()); } /** * 在消息被應答以前或者消息發送失敗時調用,一般在producer回調邏輯觸發以前,運行在produer的io線程中 * @param metadata * @param exception */ @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { log.info("實際topic:{}",metadata.topic()); } /** * 清理工做 */ @Override public void close() { } /** * 初始化工做 * @param configs */ @Override public void configure(Map<String, ?> configs) { }
b、配置攔截器spring
kafka: producer: # 生產者攔截器配置 properties: interceptor.classes: com.github.lybgeek.kafka.producer.interceptor.KafkaProducerInterceptor
c、測試springboot
二、消費者端
這個就稍微有點難搞了,由於業務開發部門他們是直接用@KafkaListener的註解,形以下ide
@KafkaListener(id = "msgId",topics = {Constant.TOPIC})
像這種也沒啥好的辦法,就只能經過源碼了,經過源碼能夠發如今以下地方工具
KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization
會把@KafkaListener的值賦值給消費者,若是對spring有了解的朋友,可能會知道postProcessAfterInitialization是spring後置處理器的方法,主要用來bean初始化後的一些操做,既然咱們知道@KafkaListener會在bean初始化後再進行賦值,那咱們就能夠在bean初始化前,修改掉@KafkaListener的值。具體實現以下post
@Component public class KafkaListenerFactoryBeanPostProcesser implements BeanFactoryPostProcessor { @SneakyThrows @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { List<String> packageNames = AutoConfigurationPackages.get(beanFactory); for (String packageName : packageNames) { Reflections reflections = new Reflections(new ConfigurationBuilder() .forPackages(packageName) // 指定路徑URL .addScanners(new SubTypesScanner()) // 添加子類掃描工具 .addScanners(new FieldAnnotationsScanner()) // 添加 屬性註解掃描工具 .addScanners(new MethodAnnotationsScanner() ) // 添加 方法註解掃描工具 .addScanners(new MethodParameterScanner() ) // 添加方法參數掃描工具 ); Set<Method> methodSet = reflections.getMethodsAnnotatedWith(KafkaListener.class); if(!CollectionUtils.isEmpty(methodSet)){ for (Method method : methodSet) { KafkaListener kafkaListener = method.getAnnotation(KafkaListener.class); changeTopics(kafkaListener); } } } } private void changeTopics(KafkaListener kafkaListener) throws Exception{ InvocationHandler invocationHandler = Proxy.getInvocationHandler(kafkaListener); Field memberValuesField = invocationHandler.getClass().getDeclaredField("memberValues"); memberValuesField.setAccessible(true); Map<String,Object> memberValues = (Map<String,Object>)memberValuesField.get(invocationHandler); String[] topics = (String[])memberValues.get("topics"); System.out.println("修改前topics:" + Lists.newArrayList(topics)); for (int i = 0; i < topics.length; i++) { topics[i] = Constant.TOPIC_KEY_PREFIX + topics[i]; } memberValues.put("topics", topics); System.out.println("修改後topics:" + Lists.newArrayList(kafkaListener.topics())); } }
測試測試
總結
雖然實現了動態修改topic,但我仍是以爲topic不要隨便改變,有條件的話,kafka仍是得基於物理環境隔離,其次真的客觀條件不容許,要動態變動topic,則需作好topic動態變動宣導以及相關wiki的編寫,否則很容易掉坑ui
demo連接
https://github.com/lyb-geek/springboot-learning/tree/master/springboot-mq-idempotent-consume