Producer是Kafka三大組件中的一個,用於發送消息到kafka集羣中java
Producer提供了豐富的配置(見後面的配置項)用於控制它的行爲apache
在編碼以前先使用命令建立topicbootstrap
./kafka-topics.sh --create --zookeeper hadoop01,hadoop02,hadoop03:2181 --partitions 2 --replication-factor 3 --topic mytopic1
Producer採用默認分區方式將消息散列的發送到各個分區當中數組
package com.jv; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class MyProducer { public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); //設置kafka集羣的地址 props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); //ack模式,all是最慢但最安全的 props.put("acks", "-1"); //失敗重試次數 props.put("retries", 0); //每一個分區未發送消息總字節大小(單位:字節),超過設置的值就會提交數據到服務端 props.put("batch.size", 10); //props.put("max.request.size",10); //消息在緩衝區保留的時間,超過設置的值就會被提交到服務端 props.put("linger.ms", 10000); //整個Producer用到總內存的大小,若是緩衝區滿了會提交數據到服務端 //buffer.memory要大於batch.size,不然會報申請內存不足的錯誤 props.put("buffer.memory", 10240); //序列化器 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("mytopic1", Integer.toString(i), "dd:"+i)); //Thread.sleep(1000000); producer.close(); } }
配置項緩存
名稱 | 說明 | 默認值 | 有效值 | 重要性 |
---|---|---|---|---|
bootstrap.servers | kafka集羣的broker-list,如:<br>hadoop01:9092,hadoop02:9092 | 無 | 必選 | |
acks | 確保生產者可靠性設置,有三個選項:<br>acks=0:不等待成功返回<br>acks=1:等Leader寫成功返回<br>acks=all:等Leader和全部ISR中的Follower寫成功返回,all也能夠用-1代替 | -1 | 0,1,-1,all | |
key.serializer | key的序列化器 | ByteArraySerializer<br>StringSerializer | 必選 | |
value.serializer | value的序列化器 | ByteArraySerializer<br>StringSerializer | 必選 | |
buffer.memory | Producer整體內存大小 | 33554432 | 不要超過物理內存,根據實際狀況調整 | 建議必選 |
compression.type | 壓縮類型<br>壓縮最好用於批量處理,批量處理消息越多,壓縮性能越好 | 無 | none、gzip、snappy | |
retries | 發送失敗嘗試重發次數 | 0 | ||
batch.size | 每一個partition的未發送消息大小 | 16384 | 根據實際狀況調整 | 建議必選 |
client.id | 附着在每一個請求的後面,用於標識請求是從什麼地方發送過來的 | |||
connections.max<br>.idle.ms | 鏈接空閒時間超過太久自動關閉(單位毫秒) | 540000 | ||
linger.ms | 數據在緩衝區中保留的時長,0表示當即發送<br>爲了減小網絡耗時,須要設置這個值<br>太大可能容易致使緩衝區滿,阻塞消費者<br>過小容易頻繁請求服務端 | 0 | ||
max.block.ms | 最大阻塞時長 | 60000 | ||
max.request.size | 請求的最大字節數,該值要比batch.size大<br>不建議去更改這個值,若是設置很差會致使程序不報錯,但消息又沒有發送成功 | 1048576 | ||
partitioner.class | 分區類,能夠自定義分區類,實現partitioner接口 | 默認是哈希值%partitions | ||
receive.buffer.bytes | socket的接收緩存空間大小,當閱讀數據時使用 | 32768 | ||
request.timeout.ms | 等待請求響應的最大時間,超時則重發請求,超太重試次數將拋異常 | 3000 | ||
send.buffer.bytes | 發送數據時的緩存空間大小 | 131072 | ||
timeout.ms | 控制server等待來自followers的確認的最大時間 | 30000 | ||
max.in.flight.<br>requests.per.<br>connection | kafka能夠在一個connection中發送多個請求,叫做一個flight,這樣能夠減小開銷,可是若是產生錯誤,可能會形成數據的發送順序改變。 | 5 | ||
metadata.fetch<br>.timeout.ms | 從ZK中獲取元數據超時時間<br>好比topic\host\partitions | 60000 | ||
metadata.max.age.ms | 即便沒有任何partition leader 改變,強制更新metadata的時間間隔 | 300000 | ||
metric.reporters | 類的列表,用於衡量指標。實現MetricReporter接口,將容許增長一些類,這些類在新的衡量指標產生時就會改變。JmxReporter總會包含用於註冊JMX統計 | none | ||
metrics.num.samples | 用於維護metrics的樣本數 | 2 | ||
metrics.sample.window.ms | metrics系統維護可配置的樣本數量,在一個可修正的window size。這項配置配置了窗口大小,例如。咱們可能在30s的期間維護兩個樣本。當一個窗口推出後,咱們會擦除並重寫最老的窗口 | 30000 | ||
reconnect.backoff.ms | 鏈接失敗時,當咱們從新鏈接時的等待時間。這避免了客戶端反覆重連 | 10 | ||
retry.backoff.ms | 在試圖重試失敗的produce請求以前的等待時間。避免陷入發送-失敗的死循環中 | 100 | ||
更全的配置參考官方文檔:http://kafka.apache.org/documentation/#producerconfigs安全
冪等性:客戶端一次或屢次操做,最終數據是一致的,好比購買火車票支付時可能顯示網絡異常,但其實已經扣款成功,用戶再次發起扣款不會再觸發真正的扣款Kafka只能保證在一個會話中的冪等性網絡
冪等模式只須要將enable.idempotence設置爲true,一旦設置了該屬性,那麼retries默認是Integer.MAX_VALUE ,acks默認是all。代碼的寫法和前面例子沒什麼區別app
事務模式要求數據發送必須包含在事務中,在事務中能夠向多個topic發送數據,消費者端最好也使用事務模式讀,保證一次能將整個事務的數據所有讀取過來。固然消費者也能夠不設置爲事務讀的模式。dom
@Test public void transactional(){ Properties props = new Properties(); props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); props.put("transactional.id", "my_transactional_id"); Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); producer.initTransactions(); try { //數據發送必須在beginTransaction()和commitTransaction()中間,不然會報狀態不對的異常 producer.beginTransaction(); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<>("mytopic1", Integer.toString(i), Integer.toString(i))); producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // 這些異常不能被恢復,所以必需要關閉並退出Producer producer.close(); } catch (KafkaException e) { // 出現其它異常,終止事務 producer.abortTransaction(); } producer.close(); }
沒什麼特殊分區邏輯,把kafka默認的分區類拿出來讀一下socket
package org.apache.kafka.clients.producer.internals; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.utils.Utils; public class DefaultPartitioner implements Partitioner { private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap(); //須要覆蓋的方法 public DefaultPartitioner() { } //須要覆蓋的方法,能夠在這裏添加配置信息 public void configure(Map<String, ?> configs) { } //須要覆蓋的方法,最重要的 /* topic:主題 key:動態綁定的,傳的什麼類型就是什麼類型 keyBytes:Ascii碼數組 value:動態綁定的,傳的什麼類型就是什麼類型 valueBytes:Ascii碼數組 cluster:kafka集羣 */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //拿到全部分區 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); //分區數量 int numPartitions = partitions.size(); //若是key爲空,則取消息做爲分區依據 if (keyBytes == null) { int nextValue = this.nextValue(topic); //可用分區,我在想應該是 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); //可用分區數 if (availablePartitions.size() > 0) { //計算分區索引 int part = Utils.toPositive(nextValue) % availablePartitions.size(); //返回分區 return ((PartitionInfo)availablePartitions.get(part)).partition(); } else { //若是可用分區=0,則直接返回全部分區中的一個 return Utils.toPositive(nextValue) % numPartitions; } } else { //key有值,則返回全部分區中的一個 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } //若是沒有key,則調用該方法那消息來作分區依據 private int nextValue(String topic) { AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); } //須要覆蓋的方法 public void close() { } }