Kafka1.0.X_生產者API詳解

Producer是Kafka三大組件中的一個,用於發送消息到kafka集羣中java

Producer提供了豐富的配置(見後面的配置項)用於控制它的行爲apache

在編碼以前先使用命令建立topicbootstrap

./kafka-topics.sh --create --zookeeper hadoop01,hadoop02,hadoop03:2181 --partitions 2 --replication-factor 3 --topic mytopic1

簡單模式

Producer採用默認分區方式將消息散列的發送到各個分區當中數組

package com.jv;
​
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
​
import java.util.Properties;
​
public class MyProducer {
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        //設置kafka集羣的地址
        props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
        //ack模式,all是最慢但最安全的
        props.put("acks", "-1");
        //失敗重試次數
        props.put("retries", 0);
        //每一個分區未發送消息總字節大小(單位:字節),超過設置的值就會提交數據到服務端
        props.put("batch.size", 10);
        //props.put("max.request.size",10);
        //消息在緩衝區保留的時間,超過設置的值就會被提交到服務端
        props.put("linger.ms", 10000);
        //整個Producer用到總內存的大小,若是緩衝區滿了會提交數據到服務端
        //buffer.memory要大於batch.size,不然會報申請內存不足的錯誤
        props.put("buffer.memory", 10240);
        //序列化器
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
​
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>("mytopic1", Integer.toString(i), "dd:"+i));
        //Thread.sleep(1000000);
        producer.close();
    }
}

配置項緩存

名稱 說明 默認值 有效值 重要性
bootstrap.servers kafka集羣的broker-list,如:<br>hadoop01:9092,hadoop02:9092   必選
acks 確保生產者可靠性設置,有三個選項:<br>acks=0:不等待成功返回<br>acks=1:等Leader寫成功返回<br>acks=all:等Leader和全部ISR中的Follower寫成功返回,all也能夠用-1代替 -1 0,1,-1,all  
key.serializer key的序列化器   ByteArraySerializer<br>StringSerializer 必選
value.serializer value的序列化器   ByteArraySerializer<br>StringSerializer 必選
buffer.memory Producer整體內存大小 33554432 不要超過物理內存,根據實際狀況調整 建議必選
compression.type 壓縮類型<br>壓縮最好用於批量處理,批量處理消息越多,壓縮性能越好 none、gzip、snappy  
retries 發送失敗嘗試重發次數 0    
batch.size 每一個partition的未發送消息大小 16384 根據實際狀況調整 建議必選
client.id 附着在每一個請求的後面,用於標識請求是從什麼地方發送過來的      
connections.max<br>.idle.ms 鏈接空閒時間超過太久自動關閉(單位毫秒) 540000    
linger.ms 數據在緩衝區中保留的時長,0表示當即發送<br>爲了減小網絡耗時,須要設置這個值<br>太大可能容易致使緩衝區滿,阻塞消費者<br>過小容易頻繁請求服務端 0    
max.block.ms 最大阻塞時長 60000    
max.request.size 請求的最大字節數,該值要比batch.size大<br>不建議去更改這個值,若是設置很差會致使程序不報錯,但消息又沒有發送成功 1048576    
partitioner.class 分區類,能夠自定義分區類,實現partitioner接口 默認是哈希值%partitions    
receive.buffer.bytes socket的接收緩存空間大小,當閱讀數據時使用 32768    
request.timeout.ms 等待請求響應的最大時間,超時則重發請求,超太重試次數將拋異常 3000    
send.buffer.bytes 發送數據時的緩存空間大小 131072    
timeout.ms 控制server等待來自followers的確認的最大時間 30000    
max.in.flight.<br>requests.per.<br>connection kafka能夠在一個connection中發送多個請求,叫做一個flight,這樣能夠減小開銷,可是若是產生錯誤,可能會形成數據的發送順序改變。 5    
metadata.fetch<br>.timeout.ms 從ZK中獲取元數據超時時間<br>好比topic\host\partitions 60000    
metadata.max.age.ms 即便沒有任何partition leader 改變,強制更新metadata的時間間隔 300000    
metric.reporters 類的列表,用於衡量指標。實現MetricReporter接口,將容許增長一些類,這些類在新的衡量指標產生時就會改變。JmxReporter總會包含用於註冊JMX統計 none    
metrics.num.samples 用於維護metrics的樣本數 2    
metrics.sample.window.ms metrics系統維護可配置的樣本數量,在一個可修正的window size。這項配置配置了窗口大小,例如。咱們可能在30s的期間維護兩個樣本。當一個窗口推出後,咱們會擦除並重寫最老的窗口 30000    
reconnect.backoff.ms 鏈接失敗時,當咱們從新鏈接時的等待時間。這避免了客戶端反覆重連 10    
retry.backoff.ms 在試圖重試失敗的produce請求以前的等待時間。避免陷入發送-失敗的死循環中 100    
         

更全的配置參考官方文檔:http://kafka.apache.org/documentation/#producerconfigs安全

冪等模式

​ 冪等性:客戶端一次或屢次操做,最終數據是一致的,好比購買火車票支付時可能顯示網絡異常,但其實已經扣款成功,用戶再次發起扣款不會再觸發真正的扣款Kafka只能保證在一個會話中的冪等性網絡

​ 冪等模式只須要將enable.idempotence設置爲true,一旦設置了該屬性,那麼retries默認是Integer.MAX_VALUE ,acks默認是all。代碼的寫法和前面例子沒什麼區別app

事務模式

​事務模式要求數據發送必須包含在事務中,在事務中能夠向多個topic發送數據,消費者端最好也使用事務模式讀,保證一次能將整個事務的數據所有讀取過來。固然消費者也能夠不設置爲事務讀的模式。dom

   @Test
    public void transactional(){
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
        props.put("transactional.id", "my_transactional_id");
        Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
​
        producer.initTransactions();
​
        try {
            //數據發送必須在beginTransaction()和commitTransaction()中間,不然會報狀態不對的異常
            producer.beginTransaction();
            for (int i = 0; i < 100; i++)
                producer.send(new ProducerRecord<>("mytopic1", Integer.toString(i), Integer.toString(i)));
            producer.commitTransaction();
        } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
            // 這些異常不能被恢復,所以必需要關閉並退出Producer
            producer.close();
        } catch (KafkaException e) {
            // 出現其它異常,終止事務
            producer.abortTransaction();
        }
        producer.close();
    }

自定義分區類(Partitioner)

沒什麼特殊分區邏輯,把kafka默認的分區類拿出來讀一下socket

package org.apache.kafka.clients.producer.internals;
​
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
​
public class DefaultPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();
    //須要覆蓋的方法
    public DefaultPartitioner() {
    }
    //須要覆蓋的方法,能夠在這裏添加配置信息
    public void configure(Map<String, ?> configs) {
    }
    //須要覆蓋的方法,最重要的
    /*
    topic:主題
    key:動態綁定的,傳的什麼類型就是什麼類型
    keyBytes:Ascii碼數組
    value:動態綁定的,傳的什麼類型就是什麼類型
    valueBytes:Ascii碼數組
    cluster:kafka集羣
    */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //拿到全部分區
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        //分區數量
        int numPartitions = partitions.size();
        //若是key爲空,則取消息做爲分區依據
        if (keyBytes == null) {
            int nextValue = this.nextValue(topic);
            //可用分區,我在想應該是
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            //可用分區數
            if (availablePartitions.size() > 0) {
                //計算分區索引
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                //返回分區
                return ((PartitionInfo)availablePartitions.get(part)).partition();
            } else {
                //若是可用分區=0,則直接返回全部分區中的一個
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            //key有值,則返回全部分區中的一個
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
    //若是沒有key,則調用該方法那消息來作分區依據
    private int nextValue(String topic) {
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
​
        return counter.getAndIncrement();
    }
    //須要覆蓋的方法
    public void close() {
    }
}
相關文章
相關標籤/搜索