一、代碼實現git
kafkaListenergithub
須要指定id,例如這裏是:full-part-id。spring
@KafkaListener(topics = "part-full-topic", id = "full-part-id", containerGroup = "full-part-group") public void listenFullPart(ConsumerRecord<String, String> record) { Optional<String> recordOptional = Optional.fromNullable(record.value()); if (recordOptional.isPresent()) { List<PartStockInfoVo> partStockInfoVos = JSONObject.parseArray(recordOptional.get(), PartStockInfoVo.class); esPartInfoClient.updateFullIndex(partStockInfoVos); } }
消費開關app
@RestController public class KafkaManageController { @Autowired private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; @RequestMapping("/stop") public void stop() { MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("full-part-id"); listenerContainer.stop(); } @RequestMapping("/start") public void start() { MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("full-part-id"); listenerContainer.start(); } }
參考:spa