Kafka Java Client基本使用及整合SpringBoot

本文全部代碼均在Github上:

https://github.com/kevinwang0...java

https://github.com/kevinwang0...git


kafka-clients

添加依賴

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.5.0</version>
</dependency>

消費者 Consumer

代碼上整體能夠分爲三部分:github

  1. 消費者的配置spring

    1. 消費者的配置在 org.apache.kafka.clients.consumer.ConsumerConfig 類中都有列舉包括每一個配置項的文檔說明
  2. 建立消費者實例並訂閱topic
  3. 消費消息

代碼以下:apache

// 1. 配置
Properties properties = new Properties();
//bootstrap.servers kafka集羣地址 host1:port1,host2:port2 ....
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// key.deserializer 消息key序列化方式
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// value.deserializer 消息體序列化方式
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// group.id 消費組id
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group");
// enable.auto.commit 設置自動提交offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// auto.offset.reset
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// 2. 建立消費者實例並訂閱topic
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
String[] topics = new String[]{"demo-topic"};
consumer.subscribe(Arrays.asList(topics));

// 3. 消費消息
while (true) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  for (ConsumerRecord<String, String> record : records) {
    System.out.println(record);
  }
}

生產者 Producer

生產者這塊的代碼基本上和消費者的結構同樣,不一樣的是,producer 的發消息的部分:bootstrap

  1. 生產者的配置springboot

    1. org.apache.kafka.clients.producer.ProducerConfig 類中也都有列舉
  2. 建立生產者實例
  3. 發送消息到 topicapp

    1. 異步發送消息 producer.send(new ProducerRecord<>("demo-topic", data))
    2. 同步發送消息 ,使用 Future.get() 阻塞接收
    3. 異步發送消息,回調的方式

總體代碼以下異步

// 1. 配置
Properties properties = new Properties();
// bootstrap.servers kafka集羣地址 host1:port1,host2:port2 ....
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// key.deserializer 消息key序列化方式
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// value.deserializer 消息體序列化方式
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());


// 2. 建立生產者實例
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

// 3. 發送消息

// 0 異步發送消息
for (int i = 0; i < 10; i++) {
  String data = "async :" + i;
  // 發送消息
  producer.send(new ProducerRecord<>("demo-topic", data));
}

// 1 同步發送消息 調用get()阻塞返回結果
for (int i = 0; i < 10; i++) {
  String data = "sync : " + i;
  try {
    // 發送消息
    Future<RecordMetadata> send = producer.send(new ProducerRecord<>("demo-topic", data));
    RecordMetadata recordMetadata = send.get();
    System.out.println(recordMetadata);
  } catch (Exception e) {
    e.printStackTrace();
  }
}


// 2 異步發送消息 回調callback()
for (int i = 0; i < 10; i++) {
  String data = "callback : " + i;
  // 發送消息
  producer.send(new ProducerRecord<>("demo-topic", data), new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
      // 發送消息的回調
      if (exception != null) {
        exception.printStackTrace();
      } else {
        System.out.println(metadata);
      }
    }
  });
}

producer.close();

整合SpringBoot

添加依賴

<parent>
  <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent -->
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.3.2.RELEASE</version>
  <relativePath/> <!-- lookup parent from repository -->
</parent>

....
....


<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter</artifactId>
</dependency>
<!--kafka starter-->
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

<!--方便測試用-->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-test</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka-test</artifactId>
  <scope>test</scope>
</dependency>

代碼

# application.yml
spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: test-group
// 啓動類
@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}

// 消費者
@Component
public class Consumer {

    @KafkaListener(topics = { "test-topic" })
    public void receiveMessage(String message) {
        System.out.println(message);
    }
}

// 生產者
@Component
public class Producer {

    @Resource
    KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

// 測試
@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoApplicationTests {

    @Autowired
    private Producer producer;

    @Test
    public void send() {
        producer.sendMessage("test-topic", "test-message");
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

總結

整合SpringBoot以後的代碼仍是很是簡潔的,不過仍是要熟悉原生API,這樣才能在實際項目中遇到問題時遊刃有餘。async

相關文章
相關標籤/搜索