這段時間由於工做關係一直在忙於消息中間件的發開,如今趁着項目收尾階段分享下對kafka的一些使用心得。html
kafka的原理我這裏就不作介紹了,可參考http://orchome.com/kafka/index 這裏我重點給你們介紹下kafka生產者的使用java
kafka可分爲新舊版本,舊版本(0.8Scala版本)咱們不去研究,新版本(0.9和0.10)增長了異步發送的API算法
示例代碼以下apache
pom.xml增長依賴bootstrap
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.10.1.1</version> </dependency>
JAVA發送方法:api
Properties props = new Properties(); props.put("bootstrap.servers", bootstrap.servers); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 0); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String,String> producer = new KafkaProducer<String,String>(props); ProducerRecord<String,String> record = new ProducerRecord<String,String>(message.getTopic().getName(), message.getMessageId(), JSONObject.toJSONString(message));
RecordMetadata recordMetadata = producer.send(record).get();
ack
是判別請求是否爲完整的條件(就是是判斷是否是成功發送了)。咱們指定了「all」將會阻塞消息,這種設置性能最低,可是是最可靠的。緩存
retries
,若是請求失敗,生產者會自動重試,咱們指定是0次,若是啓用重試,則會有重複消息的可能性。服務器
producer
(生產者)緩存每一個分區未發送消息。緩存的大小是經過 batch.size
配置指定的。值較大的話將會產生更大的批。並須要更多的內存(由於每一個「活躍」的分區都有1個緩衝區)。oracle
默認緩衝可當即發送,即遍緩衝空間尚未滿,可是,若是你想減小請求的數量,能夠設置linger.ms
大於0。這將指示生產者發送請求以前等待一段時間,但願更多的消息填補到未滿的批中。這相似於TCP的算法,例如上面的代碼段,可能100條消息在一個請求發送,由於咱們設置了linger(逗留)時間爲1毫秒,而後,若是咱們沒有填滿緩衝區,這個設置將增長1毫秒的延遲請求以等待更多的消息。須要注意的是,在高負載下,相近的時間通常也會組成批,即便是 linger.ms=0
。在不處於高負載的狀況下,若是設置比0大,以少許的延遲代價換取更少的,更有效的請求。app
buffer.memory
控制生產者可用的緩存總量,若是消息發送速度比其傳輸到服務器的快,將會耗盡這個緩存空間。當緩存空間耗盡,其餘發送調用將被阻塞,阻塞時間的閾值經過max.block.ms
設定,以後它將拋出一個TimeoutException。
key.serializer
和value.serializer
示例,將用戶提供的key和value對象ProducerRecord轉換成字節,你可使用附帶的ByteArraySerializaer或StringSerializer處理簡單的string或byte類型。
ProducerRecord介紹:topic【消息主題】 key【消息的key值,一般用於消息的分區】 value【消息體】
/** * Create a record to be sent to Kafka * * @param topic The topic the record will be appended to * @param key The key that will be included in the record * @param value The record contents */ public ProducerRecord(String topic, K key, V value) { this(topic, null, null, key, value); }
producer.send發送方式爲異步發送,添加消息到緩衝區等待發送,並當即返回。生產者將單個的消息批量在一塊兒發送來提升效率。 因爲send調用是異步的,它將爲分配消息的此消息的返回一個Future。若是future調用get(),則將阻塞,直到相關請求完成並返回該消息的metadata,或拋出發送異常RecordMetadata
徹底無阻塞的話,能夠利用回調參數提供的請求完成時將調用的回調通知。
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value); producer.send(myRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) System.out.println("獲取消息發送結果"); } });
整體來講,新版API生產者發送方式比較簡單,這裏我也很少作描述。重點在與消費者的實現,我將會在下一篇給你們詳細介紹