Kafka生產者APi

kafka客戶端發佈record(消息)到kafka集羣。

新的生產者是線程安全的,在線程之間共享單個生產者實例,一般單例比多個實例要快。html

一個簡單的例子,使用producer發送一個有序的key/value(鍵值對),放到java的main方法裏就能直接運行,java

Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for(int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); producer.close(); 

生產者的緩衝空間池保留還沒有發送到服務器的消息,後臺I/O線程負責將這些消息轉換成請求發送到集羣。若是使用後不關閉生產者,則會泄露這些資源。算法

send()方法是異步的,添加消息到緩衝區等待發送,並當即返回。生產者將單個的消息批量在一塊兒發送來提升效率。apache

ack是判別請求是否爲完整的條件(就是是判斷是否是成功發送了)。咱們指定了「all」將會阻塞消息,這種設置性能最低,可是是最可靠的。bootstrap

retries,若是請求失敗,生產者會自動重試,咱們指定是0次,若是啓用重試,則會有重複消息的可能性。api

producer(生產者)緩存每一個分區未發送消息。緩存的大小是經過 batch.size 配置指定的。值較大的話將會產生更大的批。並須要更多的內存(由於每一個「活躍」的分區都有1個緩衝區)。緩存

默認緩衝可當即發送,即使緩衝空間尚未滿,可是,若是你想減小請求的數量,能夠設置linger.ms大於0。這將指示生產者發送請求以前等待一段時間,但願更多的消息填補到未滿的批中。這相似於TCP的算法,例如上面的代碼段,可能100條消息在一個請求發送,由於咱們設置了linger(逗留)時間爲1毫秒,而後,若是咱們沒有填滿緩衝區,這個設置將增長1毫秒的延遲請求以等待更多的消息。須要注意的是,在高負載下,相近的時間通常也會組成批,即便是 linger.ms=0。在不處於高負載的狀況下,若是設置比0大,以少許的延遲代價換取更少的,更有效的請求。安全

buffer.memory 控制生產者可用的緩存總量,若是消息發送速度比其傳輸到服務器的快,將會耗盡這個緩存空間。當緩存空間耗盡,其餘發送調用將被阻塞,阻塞時間的閾值經過max.block.ms設定,以後它將拋出一個TimeoutException。服務器

key.serializervalue.serializer示例,將用戶提供的key和value對象ProducerRecord轉換成字節,你可使用附帶的ByteArraySerializaerStringSerializer處理簡單的string或byte類型。oracle

send()

public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback) 

異步發送一條消息到topic,並調用callback(當發送已確認)。

send是異步的,而且一旦消息被保存在等待發送的消息緩存中,此方法就當即返回。這樣並行發送多條消息而不阻塞去等待每一條消息的響應。

發送的結果是一個RecordMetadata,它指定了消息發送的分區,分配的offset和消息的時間戳。若是topic使用的是CreateTime,則使用用戶提供的時間戳或發送的時間(若是用戶沒有指定指定消息的時間戳)若是topic使用的是LogAppendTime,則追加消息時,時間戳是broker的本地時間。

因爲send調用是異步的,它將爲分配消息的此消息的RecordMetadata返回一個Future。若是future調用get(),則將阻塞,直到相關請求完成並返回該消息的metadata,或拋出發送異常。

若是要模擬一個簡單的阻塞調用,你能夠調用get()方法。

byte[] key = "key".getBytes(); byte[] value = "value".getBytes(); ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value) producer.send(record).get(); 

徹底無阻塞的話,能夠利用回調參數提供的請求完成時將調用的回調通知。

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value); producer.send(myRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) e.printStackTrace(); System.out.println("The offset of the record we just sent is: " + metadata.offset()); } }); 

發送到同一個分區的消息回調保證按必定的順序執行,也就是說,在下面的例子中 callback1 保證執行 callback2 以前:

producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1); producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2); 

注意:callback通常在生產者的I/O線程中執行,因此是至關的快的,不然將延遲其餘的線程的消息發送。若是你須要執行阻塞或計算昂貴(消耗)的回調,建議在callback主體中使用本身的Executor來並行處理。

pecified by:
send in interface Producer<K,V> 
Parameters:

record - 發送的記錄(消息)
callback - 用戶提供的callback,服務器來調用這個callback來應答結果(null表示沒有callback)。

Throws:

InterruptException - 若是線程在阻塞中斷。SerializationException - 若是key或value不是給定有效配置的serializers。TimeoutException - 若是獲取元數據或消息分配內存話費的時間超過max.block.ms。KafkaException - Kafka有關的錯誤(不屬於公共API的異常)。

做者:無名 連接:http://orchome.com/303 來源:OrcHome 著做權歸做者全部。商業轉載請聯繫做者得到受權,非商業轉載請註明出處。
相關文章
相關標籤/搜索