
本示以同步至我的博客 liaosi's blog-Kafka的Java代碼示例和配置說明,代碼已上傳至 個人GitHub



package com.lzumetal.mq.kafka.demo;

 * <p>Description: </p>
 * @author: liaosi
 * @date: 2018-01-30
public class Constants {

    final static String GROUP_ID = "test_group";
    final static String MY_TOPIC = "myTest";
    final static String KAFKA_SERVER_ADRESS = "";
    final static int KAFKA_SERVER_PORT = 9092;



KafkaProducer(org.apache.kafka.clients.producer.KafkaProducer)是一個用於向kafka集羣發送數據的客戶端。producer是線程安全的,多個線程能夠共享同一個 producer實例,並且這一般比在多個線程中每一個線程建立一個實例速度要快些。參考KafkaProducer官方文檔java

package com.lzumetal.mq.kafka.demo;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Before;
import org.junit.Test;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

 * <p>Description: </p>
 * Producer由一個持有未發送消息記錄的資源池和一個用來向Kafka集羣發送消息記錄的後臺IO線程組成。
 * 使用後未關閉producer將致使這些資源泄露。
 * @author: liaosi
 * @date: 2018-01-30
public class MyKafkaProducer {

    private Producer<String, String> producer;

    public void init() {
        Properties props = new Properties();
        props.put("bootstrap.servers", Constants.KAFKA_SERVER_ADRESS + ":" + Constants.KAFKA_SERVER_PORT);

        /*ack 配置項用來控制producer要求leader確認多少消息後返回調用成功。
        //props.put("acks", "all");

        //當 retries > 0 時,若是發送失敗,會自動嘗試從新發送數據。發送次數爲retries設置的值。
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);

        //key.serializer 和 value.serializer 指定使用什麼序列化方式將用戶提供的key和value進行序列化。
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        producer = new KafkaProducer<>(props);

    public void produceMsg() {

        for (int i = 0; i < 10; i++) {
            String msg = "Message_test_" + i;
            System.out.println("produce : " + msg);
            producer.send(new ProducerRecord<>(Constants.MY_TOPIC, Integer.toString(i), msg));


    private void sleep(int seconds) {
        try {
        } catch (InterruptedException e) {



A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).


The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed:

acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1.

acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.

acks=all This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.

(1)acks=0: 設置爲0表示producer不須要等待任何確認收到的信息。副本將當即加到socket buffer並認爲已經發送。沒有任何保障能夠保證此種狀況下server已經成功接收數據,同時重試配置不會發生做用(由於客戶端不知道是否失敗)回饋的offset會老是設置爲-1;
(2)acks=1: 這意味着至少要等待leader已經成功將數據寫入本地log,可是並無等待全部follower是否成功寫入。這種狀況下,若是follower沒有成功備份數據,而此時leader又掛掉,則消息會丟失。
(3)acks=all: 這意味着leader須要等待全部備份都成功寫入日誌,這種策略會保證只要有一個備份存活就不會丟失數據。這是最強的保證。github


Message record 的key, value的序列化類。算法

The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will block for max.block.ms after which it will throw an exception.

This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests.


Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first.

當設置 retries > 0 時,若是發送失敗,會自動嘗試從新發送數據,發送次數爲retries設置的值。若是設定了retries但沒有把max.in.flight.requests.per.connection 設置成 1則可能會改變數據的順序,由於若是這兩個batch都是發送到同一個partition,而且第一個batch發送失敗而第二個發送成功,則第二個batch中的消息記錄會比第一個的達到得早。
retries 的默認值是0。bootstrap

The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes.
No attempt will be made to batch records larger than this size.

Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent.
A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records.

爲了改善客戶端和Kafka集羣的性能,減小請求次數,producer會把要發送到同一個partition的批量消息做爲batch發送,batch.size 是用來設置batch的字節大小。若是 batch.size 過小則可能會下降吞吐量,設置太大則可能會致使浪費,由於咱們預先就須要騰出一個batch.size 大小的緩衝區來存貯將要發送達到該緩衝區的消息。

The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay—that is, rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5, for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.



package com.lzumetal.mq.kafka.demo;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.Test;

import java.util.*;
import java.util.concurrent.TimeUnit;

 * <p>Description: </p>
 * @author: liaosi
 * @date: 2018-01-30
public class MyKafkaConsumer {

     * 自動提交offset
    public void comsumeMsgAutoCommit() {

        Properties props = new Properties();
        props.put("bootstrap.servers", Constants.KAFKA_SERVER_ADRESS + ":" + Constants.KAFKA_SERVER_PORT);
        props.put("group.id", Constants.GROUP_ID);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

     * 手動提交offset
    public void consumerMsgManualCommit() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        final int minBatchSize = 200;
        List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
            if (buffer.size() >= minBatchSize) {

    private void insertIntoDb(List<ConsumerRecord<String, String>> buffer) {
        for (ConsumerRecord<String, String> record : buffer) {
            System.out.printf("insertIntoDb:offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

    private void sleep(int seconds) {
        try {
        } catch (InterruptedException e) {



Consumer讀取partition中的數據是經過調用發起一個fetch請求來執行的。而從Kafka Consumer來看,它有一個poll方法。可是這個poll方法只是可能會發起fetch請求。緣由是:Consumer每次發起fetch請求時,讀取到的數據是有限制的,經過配置項max.partition.fetch.bytes來限制。而在執行poll方法時,會根據配置項max.poll.records來限制一次最多pool多少個record。

那麼就可能出現這樣的狀況: 在知足max.partition.fetch.bytes限制的狀況下,假如fetch到了100個record,放到本地緩存後,因爲max.poll.records限制每次只能poll出15個record。那麼KafkaConsumer就須要執行7次poll方法才能將這一次經過網絡發起的fetch請求所fetch到的這100個record消費完畢。其中前6次是每次poll中15個record,最後一次是poll出10個record。

在consumer中,還有一個配置項:max.poll.interval.ms,它表示最大的poll數據間隔,默認值是3秒。若是超過這個間隔沒有發起pool請求,但heartbeat仍舊在發,就認爲該consumer處於 livelock狀態。就會將該consumer移出consumer group。因此爲了避免使 Consumer 本身被移出,Consumer 應該不停的發起poll(timeout)操做。而這個動做 KafkaConsumer Client是不會幫咱們作的,這就須要本身在程序中不停的調用poll方法了。

commit offset


爲了作到這一點,當使用完poll從本地緩存拉取到數據以後,須要client調用commitSync方法(或者commitAsync方法)去commit 下一次該去讀取 哪個offset的message。


對於offset的commit,Kafka Consumer Java Client支持兩種模式:由Kafka Consumer自動提交,或者是用戶經過調用commitSync、commitAsync方法的方式手動完成offset的提交。





Message record 的key, value的反序列化類。

A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using subscribe(topic) or the Kafka-based offset management strategy.

用於表示該consumer想要加入到哪一個group中。默認值是 「」。

The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.

這個值必須設置的小於session.timeout.ms,由於:當Consumer因爲某種緣由不能發Heartbeat到coordinator時,而且時間超過session.timeout.ms時,就會認爲該consumer已退出,它所訂閱的partition會分配到同一group 內的其它的consumer上。

一般設置的值要低於session.timeout.ms的1/3。默認值是:3000 (3s)

The timeout used to detect consumer failures when using Kafka's group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

Consumer session 過時時間。consumer會發送週期性的心跳代表該consumer是活着的。若是超過session.timeout.ms設定的值仍然沒有收到心跳,zebroker會把這個consumer從group中移除,而且從新rebalance。
這個值必須設置在broker configuration中的group.min.session.timeout.ms 與 group.max.session.timeout.ms之間。
其默認值是:10000 (10 s)

If true the consumer's offset will be periodically committed in the background.

設置Consumer 在 commit 方式是不是自動調焦。默認值是true。

The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true.

自動提交間隔。範圍:[0,Integer.MAX],默認值是 5000 (5 s)

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found for the consumer's group
  • anything else: throw exception to the consumer.

這個配置項,是告訴Kafka Broker在發現kafka在沒有初始offset,或者當前的offset是一個不存在的值(若是一個record被刪除,就確定不存在了)時,該如何處理。它有4種處理方式:

  • earliest:自動重置到最先的offset。
  • latest:看上去重置到最晚的offset。
  • none:若是邊更早的offset也沒有的話,就拋出異常給consumer,告訴consumer在整個consumer group中都沒有發現有這樣的offset。
  • 若是不是上述3種,只拋出異常給consumer。


The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes.


The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency.


取值範圍是:[0, Integer.Max],默認值是1。默認值設置爲1的目的是:使得consumer的請求可以儘快的返回。

The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). Note that the consumer performs multiple fetches in parallel.

一次fetch請求,從一個broker中取得的records最大大小。若是在從topic中第一個非空的partition取消息時,若是取到的第一個record的大小就超過這個配置時,仍然會讀取這個record,也就是說在這片狀況下,只會返回這一條record。broker、topic都會對producer發給它的message size作限制。因此在配置這值時,能夠參考broker的message.max.bytestopic的max.message.bytes的配置。

取值範圍是:[0, Integer.Max],默認值是:52428800 (5 MB)


一次fetch請求,從一個partition中取得的records最大大小。若是在從topic中第一個非空的partition取消息時,若是取到的第一個record的大小就超過這個配置時,仍然會讀取這個record,也就是說在這片狀況下,只會返回這一條record。 broker、topic都會對producer發給它的message size作限制。因此在配置這值時,能夠參考broker 的message.max.bytes和 topic 的max.message.bytes的配置。

The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.


The maximum number of records returned in a single call to poll().

