Kafka 經過 KafkaProducer 構造器初始化生產者客戶端的配置。
經常使用的重要配置,詳見官網。html
// 基礎配置 Map<String, Object> configs = new HashMap<>(); // Kafka broker 集羣 configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"); // key 序列化 configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // value 序列化 configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
Kafka 提供了6種構造器來構造消息。apache
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers); public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value); public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers); public ProducerRecord(String topic, Integer partition, K key, V value); public ProducerRecord(String topic, K key, V value); public ProducerRecord(String topic, V value);
支持同步發送和異步發送消息。bootstrap
同步發送app
producer.send(record).get();
異步發送異步
producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { // 回調處理流程 } });