本文全部代碼均在Github上:
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version> </dependency>
代碼上整體能夠分爲三部分:github
消費者的配置spring
org.apache.kafka.clients.consumer.ConsumerConfig
類中都有列舉包括每一個配置項的文檔說明代碼以下:apache
// 1. 配置 Properties properties = new Properties(); //bootstrap.servers kafka集羣地址 host1:port1,host2:port2 .... properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // key.deserializer 消息key序列化方式 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // value.deserializer 消息體序列化方式 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // group.id 消費組id properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group"); // enable.auto.commit 設置自動提交offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // auto.offset.reset properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 2. 建立消費者實例並訂閱topic KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); String[] topics = new String[]{"demo-topic"}; consumer.subscribe(Arrays.asList(topics)); // 3. 消費消息 while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); } }
生產者這塊的代碼基本上和消費者的結構同樣,不一樣的是,producer 的發消息的部分:bootstrap
生產者的配置springboot
org.apache.kafka.clients.producer.ProducerConfig
類中也都有列舉發送消息到 topicapp
producer.send(new ProducerRecord<>("demo-topic", data))
Future.get()
阻塞接收總體代碼以下異步
// 1. 配置 Properties properties = new Properties(); // bootstrap.servers kafka集羣地址 host1:port1,host2:port2 .... properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // key.deserializer 消息key序列化方式 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // value.deserializer 消息體序列化方式 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 2. 建立生產者實例 KafkaProducer<String, String> producer = new KafkaProducer<>(properties); // 3. 發送消息 // 0 異步發送消息 for (int i = 0; i < 10; i++) { String data = "async :" + i; // 發送消息 producer.send(new ProducerRecord<>("demo-topic", data)); } // 1 同步發送消息 調用get()阻塞返回結果 for (int i = 0; i < 10; i++) { String data = "sync : " + i; try { // 發送消息 Future<RecordMetadata> send = producer.send(new ProducerRecord<>("demo-topic", data)); RecordMetadata recordMetadata = send.get(); System.out.println(recordMetadata); } catch (Exception e) { e.printStackTrace(); } } // 2 異步發送消息 回調callback() for (int i = 0; i < 10; i++) { String data = "callback : " + i; // 發送消息 producer.send(new ProducerRecord<>("demo-topic", data), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { // 發送消息的回調 if (exception != null) { exception.printStackTrace(); } else { System.out.println(metadata); } } }); } producer.close();
<parent> <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent --> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> .... .... <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!--kafka starter--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!--方便測試用--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency>
# application.yml spring: kafka: bootstrap-servers: 127.0.0.1:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer group-id: test-group
// 啓動類 @SpringBootApplication public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } } // 消費者 @Component public class Consumer { @KafkaListener(topics = { "test-topic" }) public void receiveMessage(String message) { System.out.println(message); } } // 生產者 @Component public class Producer { @Resource KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String topic, String message) { kafkaTemplate.send(topic, message); } } // 測試 @RunWith(SpringRunner.class) @SpringBootTest public class DemoApplicationTests { @Autowired private Producer producer; @Test public void send() { producer.sendMessage("test-topic", "test-message"); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } }
整合SpringBoot以後的代碼仍是很是簡潔的,不過仍是要熟悉原生API,這樣才能在實際項目中遇到問題時遊刃有餘。async