KafkaProducer 發送消息主要有如下 3 種方式:html
Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value"); // 發送並忘記(fire-and-forget) producer.send(record); // 同步發送 Future<RecordMetadata> future = producer.send(record); RecordMetadata metadata = future.get(); // 異步發送 producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { } }); producer.close();
具體的發送流程能夠參考 KafkaProducer發送流程簡析。java
KafkaProducer 是線程安全的,多個線程能夠共享同一個 KafkaProducer 對象。算法
該參數能夠是任意的字符串,broker 會用它來識別消息的來源,會在日誌和監控指標裏展現。apache
該屬性指定 broker 的地址列表。
清單裏不須要包含全部的 broker 地址,生產者會從給定的 broker 裏查找到其餘 broker 的信息。
不過建議至少要提供兩個 broker 的信息,一旦其中一個宕機,生產者仍然可以鏈接到集羣上。bootstrap
這兩個屬性必須被設置爲一個實現了org.apache.kafka.common.serialization.Serializer
接口的類。
生產者會使用這個類把鍵值對象序列化成字節數組。數組
設置 socket 讀寫數據時用到的 TCP 緩衝區大小。若是它們被設爲 -1,就使用操做系統的默認值。
當生產者或消費者與 broker 處於不一樣的機房時,能夠適當增大這些值。安全
設置生產者內存緩衝區的大小,生產者用它緩衝要發送到服務器的消息。
若是應用程序發送消息的速度超過發送到服務器的速度,會致使生產者空間不足。
此時KafkaProducer.send()
會阻塞等待內存釋放,等待時間超過 max.block.ms 後會拋出超時異常。服務器
該參數指定了消息被髮送給 broker 以前,使用哪種壓縮算法(snappy
,gzip
或lz4
)進行壓縮。
使用壓縮能夠下降網絡傳輸開銷和存儲開銷,而這每每是向 Kafka 發送消息的瓶頸所在。網絡
該參數指定了一個批次可使用的內存字節數(而不是消息個數)。
消息批次ProducerBatch
包含了一組將要發送至同個分區的消息,當批次被填滿,批次裏的全部消息會被當即發送出去。app
不過生產者並不必定都會等到批次被填滿才發送,半滿甚至只包含一個消息的批次也可能被髮送。
因此就算把批次大小設置得很大,也不會形成延遲,只是會佔用更多的內存而已。
但若是設置得過小,生產者會頻繁地發送消息,會增長一些額外的網絡開銷。
該參數指定了生產者在發送批次以前等待的時間。
生產者會在批次填滿或等待時間達到 linger.ms 時把批次發送出去。
設置linger.ms>0
會增長延遲,但也會提高吞吐量(一次性發送更多的消息,每一個消息的開銷就變小了)。
參數指定了必需要有多少個分區副本收到消息,生產者纔會認爲消息寫入是成功的。
這個參數決定令消息丟失的可能性:
acks=0
生產者發出消息後不等待來自服務器的響應
若是當中出現了問題,致使服務器沒有收到消息,那麼生產者就無從得知,消息也就丟失了。
不過,由於生產者不須要等待服務器的響應,因此它能夠以網絡可以支持的最大速度發送消息,從而達到很高的吞吐量。
acks=1
只要集羣的 leader 節點收到消息,生產者就會收到一個來自服務器的成功響應
若是消息沒法到達 leader 節點(好比:leader節點崩潰,新的 leader 尚未被選舉出來),生產者會收到一個錯誤響應。
爲了不數據丟失,生產者會重發消息。不過,若是一個沒有收到消息的節點成爲新 leader,消息仍是會丟失。
這個時候的吞吐量取決於使用的是同步發送仍是異步發送:
Future.get()
方法),顯然會增長延遲(在網絡上傳輸一個來回的延遲)acks=all
只有當全部參與複製的節點所有收到消息時,生產者纔會收到一個來自服務器的成功響應
這種模式是最安全的,就算有服務器發生崩潰,數據也不會丟失。
不過,它的延遲比 acks=1
時更高,由於咱們要等待不僅一個服務器節點接收消息。
該參數決定了生產者能夠重發消息的次數(每次重試之間等待 retry.backoff.ms
)。
服務器返回臨時性的錯誤(好比:分區找不到 leader)時,生產者會自動重試,不必在代碼邏輯裏處理可重試的錯誤。
做爲開發者,只須要處理那些不可重試的錯誤(好比:消息字節數超過單個發送批次上限)或重試次數超出上限的狀況便可。
該參數指定生產者,最多能夠發送未響應在途消息批次數量。
在途消息批次越多,會佔用更多的內存,不過也會提高吞吐量。
當retries > 0
且max.in.flight.requests.per.connection > 1
時,可能出現消息亂序。
若是第一個批次消息寫入失敗,而第二個批次寫入成功,broker 會重試寫入第一個批次。
若是此時第一個批次也寫入成功,那麼兩個批次的順序就反過來了。
通常不建議設置retries=0
,而是令max.in.flight.requests.per.connection = 1
來保證消息順序。
在生產者嘗試發送第一批消息時,就不會有其餘的消息發送給 broker,即便發生重試消息也不會亂序。
不過這樣會嚴重影響生產者的吞吐量,因此只有在對消息的順序有嚴格要求的狀況下才能這麼作。
當 broker 失效時生產者可能會自動重試,致使一條消息被重複寫入屢次。
爲了不這種狀況,Kafka 在生產者端提供來冪等保證:同一條消息被生產者發送屢次,但在 broker端這條消息只會被寫入日誌一次。
在發送端設置 enable.idempotence = true
能夠開啓冪等性,此時配置同時知足如下條件:
max.in.flight.requests.per.connection ≤ 5
retries > 0
acks = all
其工做機制以下:
producer id
該過程對用戶來講是徹底透明的)sequence number
用於消息去重(每一個分區都有獨立的序列號)(PID, SN)
信息一同持久化到對應的分區日誌中(保證 leader 切換後去重仍然生效)若重試致使 broker 接收到小於或等於已知最大序列號的消息,broker 會拒絕寫入這些消息,從而保證每條消息也只會被保存在日誌中一次。
因爲每一個 producer 實例都會被分配不一樣的 PID,該機制只能保證單個 producer 實例的冪等性,沒法實現協同多個 producer 實現冪等。
Kafka 事務能夠實現 producer 對多個主題和分區的原子寫入,而且保證 consumer 不會讀取到未提交的數據。
Kafka 要求應用程序必須提供一個全局惟一的 TIDtransactional id
:
若是某個 producer 實例失效,該機制可以保證下一個擁有相同 TID 的實例首先完成以前未完成的事務。
此外,broker 還會爲自動每一個 producer 分配一個epoch
用於隔離fencing out
失效但仍存活的 producer:
若是存在,則認爲當前 producer 是一個殭屍實例zombie instance
並拒絕爲其提供服務,防止其破壞事務的完整性。
下面是兩個常見的應用場景:
Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty("enable.idempotence", "true"); // 開啓冪等 properties.setProperty("transactional.id", "my-transaction-id"); // 設置事務ID KafkaProducer<String, String> producer = new KafkaProducer<>(properties); ProducerRecord<String, String> record1 = new ProducerRecord<>("topic1", "key1", "value1"); ProducerRecord<String, String> record2 = new ProducerRecord<>("topic2", "key2", "value2"); ProducerRecord<String, String> record3 = new ProducerRecord<>("topic3", "key3", "value3"); producer.initTransactions(); // 初始化事務(只需執行一次) try { producer.beginTransaction(); // 開始事務 // 向多個不一樣的 topic 寫入消息 producer.send(record1); producer.send(record2); producer.send(record3); producer.commitTransaction(); // 提交事務 } catch (ProducerFencedException e) { producer.close(); // 事務ID 已被佔用 } catch (KafkaException e) { producer.abortTransaction(); }
final String groupID = "my-group-id"; Properties producerProps = new Properties(); producerProps.setProperty("bootstrap.servers", "localhost:9092"); producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProps.setProperty("enable.idempotence", "true"); // 開啓冪等 producerProps.setProperty("transactional.id", "my-transaction-id"); // 設置事務ID Properties consumerProps = new Properties(); consumerProps.setProperty("bootstrap.servers", "localhost:9092"); consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerProps.setProperty("isolation.level","read_committed"); // 設置隔離級別 consumerProps.setProperty("group.id", groupID); // 設置消費者組羣ID KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps); producer.initTransactions(); consumer.subscribe(Collections.singletonList("ping")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); // 讀取消息 try { producer.beginTransaction(); // 開啓事務 // 處理消息(能夠是任意業務場景) Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); for(ConsumerRecord<String, String> record : records){ offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset())); // 記錄消費偏移量 producer.send(new ProducerRecord<>("pong", record.value())); // 發送消息 } producer.sendOffsetsToTransaction(offsets, groupID); // 提交消費偏移量 producer.commitTransaction(); // 事務提交 } catch (ProducerFencedException e) { producer.close(); // 事務ID 已被佔用 } catch (Exception e){ producer.abortTransaction(); // 回滾事務 } }