首先推薦一個關於Kafka的中文網站:http://orchome.com/kafka/indexhtml
部分翻譯直接參考此網站內容,可是網站目前的API版本爲0.10.0.1,因此在學習過程當中,自行翻譯了一下0.11.0的API文檔。在翻譯過程當中有些地方也不是太理解,感受翻譯的不太準確,有問題的地方望讀者指出。 java
public class KafkaProducer<K,V> extends Object implements Producer<K,V>
Kafka客戶端發佈消息至kafka集羣。apache
生產者是線程安全的,在線程之間共享單個生產者實例一般比持有多個實例更快。bootstrap
下面是一個簡單的例子,它使用生產者將包含有序數字的字符串消息做爲鍵/值對發送。api
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); producer.close();
生產者的緩衝空間池保留還沒有發送到服務器的消息,後臺I/O線程負責將這些消息轉換成請求發送到集羣。若是使用後不關閉生產者,則會泄露這些資源。
send()方法是異步的,添加消息到緩衝區等待發送,並當即返回。這容許生產者將單個的消息批量在一塊兒發送來提升效率。緩存
ack是判別請求是否爲完整的條件。咱們指定了「all」將會阻塞消息,這種設置使性能最低,可是是最可靠的。安全
retries,若是請求失敗,生產者會自動重試,咱們指定是0次即不啓動重試,若是啓用重試,則會有重複消息的可能性。服務器
batch.size ,producer爲每一個分區未發送的消息保持一個緩衝區。緩存的大小是經過 batch.size 配置指定的。值較大的話將會產生更大的批處理。並須要更多的內存(由於一般咱們會爲每一個「活躍」的分區都設置1個緩衝區)。oracle
linger.ms默認狀況,即使緩衝空間尚未滿,緩衝也可當即發送,可是,若是想減小請求的數量,能夠設置linger.ms大於0。這將指示生產者發送請求以前等待一段時間,但願更多的消息填補到未滿的批中。這相似於TCP的算法,例如上面的代碼段,可能100條消息在一個請求發送,由於咱們設置了linger(逗留)時間爲1毫秒,而後,若是咱們沒有填滿緩衝區,這個設置將增長1毫秒的延遲請求以等待更多的消息。須要注意的是,在高負載下,即便是 linger.ms=0,相近的時間通常也會組成批。在不處於高負載的狀況下,若是設置比0大,將以少許的延遲代價換取更少的,更有效的請求。
buffer.memory 控制生產者可用的緩存總量,若是消息發送速度比它們可以傳輸到服務器的速度快,將會耗盡這個緩存空間。當緩存空間耗盡,其餘發送調用將被阻塞,阻塞時間的閾值經過max.block.ms設定,以後它將拋出一個TimeoutException。
key.serializer和value.serializer示例,將用戶提供的key和value對象ProducerRecord轉換成字節,你可使用附帶的ByteArraySerializaer或StringSerializer處理簡單的string或byte類型。
從kafka 0.11開始,KafkaProducer 支持兩種額外的模式:idempotent producer和transactional producer。
idempotent producer(冪等性生產者)加強Kafka的投遞語義,從至少一次投遞變爲徹底的一次投遞,特別是消息重試將再也不重複提出。
transactional producer(事務性生產者)容許應用程序將消息發送給多個原子分區(和topics)。
使用idempotent,enable.idempotence配置項必須設置爲true,若是設置爲true,重試配置項(retries )將默認設置爲 Integer.MAX_VALUE,max.inflight.requests.per.connection 將默認設置爲 1,asks config(確認配置)將默認設置爲all 。idempotent producer的API並無改變,因此現有的應用程序應用此特性時不須要作修改。
爲了充分利用idempotent producer,必須避免應用級別的重試發送,由於這樣不能de-duplicated(去耦合/去重複:此處不是太理解,不知道如何翻譯)。所以,若是應用程序啓用了idempotence,建議取消retries (重試)配置,由於它將被默認爲Integer.MAX_VALUE。此外,若是send(ProducerRecord)即便有無限重試仍是返回了一個錯誤(例如,若是消息在發送以前在緩衝區中過時),則建議關閉producer 並檢查最後生成的消息的內容,以確保它不是重複的。最後,producer 只能保證在單個會話中發送消息的idempotent 特性。
使用transactional producer和與它相關的API,則必須設置transactional.id配置屬性,若是transactional.id被設置,idempotence 會隨着其所依賴的producer的配置被自動啓用,此外,transactions 中包含的topics應該配置爲持久性的。特別是,replication.factor(複製因子)至少應該爲3,這些topics的 min.insync.replicas應該設爲2。最後,爲了保證transactional 端到端的實現,consumers必須配置爲只讀取提交的信息。
transactional.id的目的是在單個生產者實例的多個會話中啓用事務恢復(transaction recovery )。它一般由分區、狀態和應用程序中的shard標識符派生而來。所以,對於在分區應用程序中運行的每一個生產者實例來講,它應該是唯一的。
全部新的transactional api都是阻塞的,而且在故障時拋出異常。下面的示例演示瞭如何使用新的api。它與上面的示例相似,只是全部100個消息都是單個事務的一部分。
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("transactional.id", "my-transactional-id"); Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); producer.initTransactions(); try { producer.beginTransaction(); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i))); producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // We can't recover from these exceptions, so our only option is to close the producer and exit. producer.close(); } catch (KafkaException e) { // For all other exceptions, just abort the transaction and try again. producer.abortTransaction(); } producer.close();
正如在示例中所暗示的那樣,每一個生產者只能有一個打開的事務。beginTransaction()和commitTransaction()之間發送的全部消息都將是單個事務的一部分。當transactional.id被指定,producer發送的全部消息必須是事務的一部分。
transactional producer使用異常來傳達錯誤狀態。具體地說,不須要爲producer . send()或調用. get()指定回調函數:若是任何一個producer.send()或事務性調用在事務中遇到不可恢復的錯誤,則將拋出KafkaException。查看send(ProducerRecord)文檔,瞭解從事務發送中探知錯誤的更多細節。
在接收一個KafkaException時,經過調用producer.abortTransaction()咱們能夠確保任何成功的寫操做標記爲失敗(停止),所以保持事務保證。
這個客戶端能夠與0.10.0版本或更新的brokers進行通訊。舊的或新的brokers可能不支持某些客戶端特性。例如,事務api須要broker 0.11.0版本或更高。當調用的API在運行broker版本中不可用,你將收到一個UnsupportedVersionException。
public KafkaProducer(Map<String,Object> configs)
producer經過提供一組鍵值對做爲配置來實例化。有效配置字符串都記錄在這裏。值能夠是字符串或適當類型的對象(例如,數字配置能夠接受字符串「42」或整數42)。
public KafkaProducer(Map<String,Object> configs,Serializer<K> keySerializer,Serializer<V> valueSerializer)
producer經過提供一組鍵值對做爲配置、一個鍵和一個值序列化器來實例化。有效配置字符串都記錄在這裏。值能夠是字符串或適當類型的對象(例如,數字配置能夠接受字符串「42」或整數42)。
public KafkaProducer(Properties properties) public KafkaProducer(Properties properties,Serializer<K> keySerializer,Serializer<V> valueSerializer)
同上。
public void initTransactions()
當在配置中設置了transactional.id,該方法須要在任何其餘方法以前被調用,該方法執行如下步驟:一、確保由producer先前實例發起的任何事物都已經完成。若是前一個實例的事務在進程中失敗,它將被終止。若是最後一個事務已經開始完成,但尚未完成,該方法將等待它完成。二、獲取producer的內部ID和epoch(紀元?此處不清楚準確的翻譯),用於 producer.發佈的全部將來的事務消息。
Specified by:
initTransactions in interface Producer<K,V>
Throws:
IllegalStateException - 若是配置中沒有設置producer的transactional.id則拋出此異常。
public void beginTransaction()
須要在每一個新事務開始以前調用。
Specified by:
beginTransaction in interface Producer<K,V>
Throws:
ProducerFencedException - 若是另外一個活躍的producer有相同的transactional.id則拋出此異常。
public void sendOffsetsToTransaction(Map<TopicPartition,OffsetAndMetadata> offsets,String consumerGroupId)
向consumer組協調器發送被消耗的偏移量列表,並將這些偏移量標記爲當前事務的一部分。只有當事務成功提交時,這些偏移量纔會被視爲消費掉的。當您須要將消費和生成的消息一塊兒批量處理時,應該使用此方法,一般在consume-transform-produce 模式中使用。
Specified by:
sendOffsetsToTransaction in interface Producer<K,V>
Throws:
ProducerFencedException - 若是另外一個活躍的producer有相同的transactional.id則拋出此異常。
public void commitTransaction()
提交正在進行中的事務。此方法將在實際提交事務以前flush任何未發送的消息。此外,如何事務包含部分的任何send(ProducerRecord) 調用觸發不可恢復的錯誤,那麼該方法將當即拋出最後一個接收到的異常,而事務將不會被提交。所以,在事務中對send(ProducerRecord)的調用必須成功,以便此方法成功。
Specified by:
commitTransaction in interface Producer<K,V>
Throws:
ProducerFencedException - 若是另外一個活躍的producer有相同的transactional.id則拋出此異常。
public void abortTransaction()
停止正在進行中的事務。當此調用完成時,任何未flush的生成消息將停止。若是任何先前的send(ProducerRecord)調用有ProducerFencedException或ProducerFencedException 致使的調用失敗,此調用將當即拋出異常。
Specified by:
abortTransaction in interface Producer<K,V>
Throws:
ProducerFencedException - 若是另外一個活躍的producer有相同的transactional.id則拋出此異常。
public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)
異步發送消息到topic,並在發送的消息被確認的時候調用所提供的回調。
send是異步的,而且一旦消息被保存在等待發送的消息緩衝區中,此方法就當即返回。這樣並行發送多條消息而不阻塞去等待每一條消息的響應。
發送的結果是一個RecordMetadata,它指定了消息發送的分區,分配的offset和消息的時間戳。若是topic使用的是CreateTime,則使用用戶提供的時間戳或發送的時間(若是用戶沒有指定消息的時間戳時使用發送的時間)若是topic使用的是LogAppendTime,則追加消息時,時間戳是broker的本地時間。
由於send 調用是異步的,它將爲分配給消息的RecordMetadata返回一個Future。若是future調用get(),則將阻塞,直到相關請求完成並返回該消息的metadata,或拋出發送異常。
若是你想模擬一個簡單的阻塞調用,您能夠當即調用get()方法:
byte[] key = "key".getBytes(); byte[] value = "value".getBytes(); ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value); producer.send(record).get();
徹底非阻塞的使用能夠利用回調參數提供一個回調,當請求完成時將被調用。
producer.send(myRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) e.printStackTrace(); System.out.println("The offset of the record we just sent is: " + metadata.offset()); } });
發送到同一個分區的消息回調保證按必定的順序執行,也就是說,在下面的例子中 callback1 保證執行 callback2 以前:
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1); producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);
當send做爲事務的一部分使用時,不須要定義回調或者檢查future的結果來檢查send的錯誤。若是任何send由於一個不可恢復的錯誤而調用失敗,則最終的commitTransaction() 調用將失敗,而且在最後的發送失敗時拋出異常。當發生這種狀況的時候,應用程序應該調用abortTransaction()來重置狀態並繼續發送數據。
有些事務發送錯誤沒法經過調用abortTransaction()來解決。特別是,若是一個事務發送完成時伴隨着一個ProducerFencedException,OutOfOrderSequenceException,
UnsupportedVersionException,或一個AuthorizationException等異常,那麼惟一的選擇就是調用close()。重大的錯誤致使生產者進入無效狀態,在這種狀態下,future的API調用將一樣的底層錯誤包裝在新的KafkaException中拋出。
當啓用idempotence(冪等性),但沒有配置transactional.id 的時候,是一個相似的場景。在這種狀況下,UnsupportedVersionException和AuthorizationException被視爲重大錯誤。可是,ProducerFencedException不須要被處理。此外,它有可能在收到一個OutOfOrderSequenceException以後繼續發送信息,可是這樣作可能致使等待中的消息的無序投遞,爲了確保正確的順序,你應該關閉生產者並建立一個新的實例。
若是目標topic的消息格式不升級到0.11.0.0,idempotent(冪等性)和transactional(事務性)的生產請求將失敗並伴隨一個UnsupportedForMessageFormatException的錯誤。若是在事務中遇到這種狀況,則能夠停止並繼續執行。可是須要注意的是,以後發送到同一topic將會繼續收到相同的異常,直到topic被更新。
注意:callback通常在生產者的I/O線程中執行,因此是至關的快的,不然將延遲其餘的線程的消息發送。若是你想要執行阻塞或計算代價高昂的回調,建議在callback主體中使用本身的Executor來並行處理。
Specified by:
send in interface Producer<K,V>
Parameters:
record - 發送的消息
callback - 當消息被服務器確認的時候執行用戶提供的回調 (null 表示沒有回調)
Throws:
IllegalStateException -若是配置了transactional.id 但沒有事務啓動。
InterruptException - 若是線程在阻塞時被中斷
SerializationException - 若是序列化配置了無效的鍵值對象
TimeoutException - 若是獲取metadata或爲消息分配內存所消耗的時間超過了 max.block.ms設定的值。
KafkaException -若是出現了不屬於公共API異常的Kafka相關的錯誤。
public void flush()
調用此方法可使全部的緩衝消息當即能夠發送(即便linger.ms配置的值大於0)而且阻塞與這些信息相關聯的請求的完成。flush()後置條件是任何先前發送的記錄已經完成(舉例來講就是,Future.isDone() == true)。一個請求根據你指定的確認配置被成功確認以後則被認爲是完成的,不然會致使錯誤。
當一個線程被阻塞等待一個flush()調用完成時,其它線程能夠繼續發送消息,可是不能保證關於flush調用開始以後發送的消息的完成。
這個方法能夠用於從一些輸入系統消費消息並生產至kafka中。flush()調用提供了一種方便的方法來確保全部之前發送的消息實際上已經完成。
這個示例展現瞭如何從一個Kafka topic中消費,並生成至另外一個Kafka topic:
for(ConsumerRecord<String, String> record: consumer.poll(100)) producer.send(new ProducerRecord("my-topic", record.key(), record.value()); producer.flush(); consumer.commit();
須要注意的是,上述示例可能在生產(produce)請求失敗的時候刪除消息。若是要確保這種狀況不會發生,須要在配置中設置retries=<large_number>。
應用程序不須要爲事務性生產者調用此方法,由於commitTransaction()將在執行提交以前flush全部緩衝消息。這將確保在提交以前先前的beginTransaction()以後的全部send(ProducerRecord)調用都已經完成。
Specified by:
flush in interface Producer<K,V>
Throws:
InterruptException - 若是線程在阻塞時被中斷
public List<PartitionInfo> partitionsFor(String topic)
獲取給定topic的分區metadata ,這能夠用於自定義分區。
Specified by:
partitionsFor in interface Producer<K,V>
Throws:
InterruptException - 若是線程在阻塞時被中斷
public Map<MetricName,? extends Metric> metrics()
得到生產者維護的完整的內部度量集。
Specified by:
metrics in interface Producer<K,V>
public void close()
關閉這個生產者,此方法阻塞直到全部之前的發送請求完成。該方法等效於close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)。
若是從回調中調用close(),日誌將記錄一條警告消息並以close(0, TimeUnit.MILLISECONDS)調用替代。咱們這樣作是由於發送方線程將嘗試鏈接本身並永遠阻塞。
Specified by:
close in interface Closeable
Specified by:
close in interface AutoCloseable
Specified by:
close in interface Producer<K,V>
Throws:
InterruptException - 若是線程在阻塞時被中斷
public void close(long timeout,TimeUnit timeUnit)
此方法等待生產者完成全部未完成請求的發送直到超時。若是超時以前生產者不能完成全部請求,此方法將馬上丟棄任何的未發送和未確認的消息。
若是從一個Callback中調用此方法,此方法將不會阻塞,等同於close(0, TimeUnit.MILLISECONDS).這樣作是由於在阻塞生產者的I/O線程時不會發生進一步的發送。
Specified by:
close in interface Producer<K,V>
Parameters:
timeout - 等待生產者完成任何正要發生的請求的最大時間。這個值應該是非負的。指定超時爲0意味着不等待正要發生的請求的完成。
timeUnit - 超時時間的單位
Throws:
InterruptException - 若是線程在阻塞時被中斷
IllegalArgumentException - 若是超時時間的值是負的