pomjava
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.6.RELEASE</version> </dependency>
生產/發送消息spring
@Autowired private KafkaTemplate<String, String> kafkaTemplate; public ListenableFuture<SendResult<String, String>> send(String topic, String key, String message) { return kafkaTemplate.send(topic, key, message); }
消費/接收消息debug
@Component @Slf4j public class KafkaReceiver { @KafkaListener(topics = {"iom"}) public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); log.info("record:" + record); log.debug("message:" + message); } } }
打印出record:code
record = ConsumerRecord(topic = iom, partition = 0, offset = 16, CreateTime = 1558926238803, serialized key size = 11, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 18612341234, value = {key:18612341234, time:2019-05-27T11:03:58.786})