kafka(三)—Kafka的Java代碼示例和配置說明

本示以同步至我的博客 liaosi's blog-Kafka的Java代碼示例和配置說明,代碼已上傳至 個人GitHub

配置類:Constants.java

消費者和生產者使用的幾個常量。html

package com.lzumetal.mq.kafka.demo;

/**
 * <p>Description: </p>
 *
 * @author: liaosi
 * @date: 2018-01-30
 */
public class Constants {

    final static String GROUP_ID = "test_group";
    final static String MY_TOPIC = "myTest";
    final static String KAFKA_SERVER_ADRESS = "192.168.128.1";
    final static int KAFKA_SERVER_PORT = 9092;


}

消息生產者:MyKafkaProducer.java

KafkaProducer(org.apache.kafka.clients.producer.KafkaProducer)是一個用於向kafka集羣發送數據的客戶端。producer是線程安全的,多個線程能夠共享同一個 producer實例,並且這一般比在多個線程中每一個線程建立一個實例速度要快些。參考KafkaProducer官方文檔java

package com.lzumetal.mq.kafka.demo;


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Before;
import org.junit.Test;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
 * <p>Description: </p>
 * Producer由一個持有未發送消息記錄的資源池和一個用來向Kafka集羣發送消息記錄的後臺IO線程組成。
 * 使用後未關閉producer將致使這些資源泄露。
 *
 * @author: liaosi
 * @date: 2018-01-30
 */
public class MyKafkaProducer {

    private Producer<String, String> producer;


    @Before
    public void init() {
        Properties props = new Properties();
        props.put("bootstrap.servers", Constants.KAFKA_SERVER_ADRESS + ":" + Constants.KAFKA_SERVER_PORT);

        /*ack 配置項用來控制producer要求leader確認多少消息後返回調用成功。
        當值爲0時producer不須要等待任何確認消息。
        當值爲1時只須要等待leader確認。
        當值爲-1或all時須要所有ISR集合返回確認才能夠返回成功。
        */
        //props.put("acks", "all");

        //當 retries > 0 時,若是發送失敗,會自動嘗試從新發送數據。發送次數爲retries設置的值。
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);

        //key.serializer 和 value.serializer 指定使用什麼序列化方式將用戶提供的key和value進行序列化。
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        producer = new KafkaProducer<>(props);
    }

    @Test
    public void produceMsg() {

        for (int i = 0; i < 10; i++) {
            String msg = "Message_test_" + i;
            System.out.println("produce : " + msg);
            //send方法是異步的。當它被調用時,它會將消息記錄添加到待發送緩衝區並當即返回。
            //使用這種方式可使生產者彙集一批消息記錄後一塊兒發送,從而提升效率。
            producer.send(new ProducerRecord<>(Constants.MY_TOPIC, Integer.toString(i), msg));
            sleep(1);
        }
        producer.close();

    }

    private void sleep(int seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


}

producer的配置說明

bootstrap.servers
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).

Kafka集羣鏈接的host/port組,格式:host1:port1,host2:port2,…
這些server僅僅是用於初始化的鏈接,以發現集羣全部成員(成員可能會動態的變化),這個列表不須要包含全部的servers(數量儘可能不止一個,以防其中一個down機了)。git

acks
The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed:


acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1.


acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.


acks=all This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.

producer須要server接收到數據以後發出的確認接收的信號,此項配置就是指procuder須要多少個這樣的確認信號。此配置實際上表明瞭數據備份的可用性。如下設置爲經常使用選項:
(1)acks=0: 設置爲0表示producer不須要等待任何確認收到的信息。副本將當即加到socket buffer並認爲已經發送。沒有任何保障能夠保證此種狀況下server已經成功接收數據,同時重試配置不會發生做用(由於客戶端不知道是否失敗)回饋的offset會老是設置爲-1;
(2)acks=1: 這意味着至少要等待leader已經成功將數據寫入本地log,可是並無等待全部follower是否成功寫入。這種狀況下,若是follower沒有成功備份數據,而此時leader又掛掉,則消息會丟失。
(3)acks=all: 這意味着leader須要等待全部備份都成功寫入日誌,這種策略會保證只要有一個備份存活就不會丟失數據。這是最強的保證。github

key.serializer/key.serializer

Message record 的key, value的序列化類。算法

buffer.memory
The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will block for max.block.ms after which it will throw an exception.


This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests.

producer能夠用來緩存數據的內存大小。該值實際爲RecordAccumulator類中的BufferPool,即Producer所管理的最大內存。但不是全部producer管理的內存都用做緩存,一些額外的內存會用於壓縮(若是引入壓縮機制),一樣還有一些用於維護請求。
若是數據產生速度大於向broker發送的速度,producer會阻塞配置項max.block.ms所設定的值,超出這個時間則拋出異常。apache

retries
Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first.

當設置 retries > 0 時,若是發送失敗,會自動嘗試從新發送數據,發送次數爲retries設置的值。若是設定了retries但沒有把max.in.flight.requests.per.connection 設置成 1則可能會改變數據的順序,由於若是這兩個batch都是發送到同一個partition,而且第一個batch發送失敗而第二個發送成功,則第二個batch中的消息記錄會比第一個的達到得早。
retries 的默認值是0。bootstrap

batch.size
The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes.
No attempt will be made to batch records larger than this size.


Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent.
A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records.

爲了改善客戶端和Kafka集羣的性能,減小請求次數,producer會把要發送到同一個partition的批量消息做爲batch發送,batch.size 是用來設置batch的字節大小。若是 batch.size 過小則可能會下降吞吐量,設置太大則可能會致使浪費,由於咱們預先就須要騰出一個batch.size 大小的緩衝區來存貯將要發送達到該緩衝區的消息。
若將該值設爲0,則不會進行批處理。緩存

linger.ms
The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay—that is, rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5, for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.

producer會將request傳輸之間到達的全部records聚合到一個批請求。一般這個值發生在欠負載狀況下,record到達速度快於發送。可是在某些場景下,client即便在正常負載下也指望減小請求數量。這個設置就是如此,經過人工添加少許時延,而不是立馬發送一個record,producer會等待所給的時延,以讓其餘records發送出去,這樣就會被聚合在一塊兒。這個相似於TCP的Nagle算法。該設置給了batch的時延上限:當咱們得到一個partition的batch.size大小的records,就會當即發送出去,而無論該設置;可是若是對於這個partition沒有累積到足夠的record,會linger指定的時間等待更多的records出現。該設置的默認值爲0(無時延)。例如,設置linger.ms=5,會減小request發送的數量,可是在無負載下會增長5ms的發送時延。安全

消息消費者:MyKafkaConsumer.java

package com.lzumetal.mq.kafka.demo;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.Test;

import java.util.*;
import java.util.concurrent.TimeUnit;

/**
 * <p>Description: </p>
 *
 * @author: liaosi
 * @date: 2018-01-30
 */
public class MyKafkaConsumer {


       /**
     * 自動提交offset
     */
    @Test
    public void comsumeMsgAutoCommit() {

        Properties props = new Properties();
        props.put("bootstrap.servers", Constants.KAFKA_SERVER_ADRESS + ":" + Constants.KAFKA_SERVER_PORT);
        props.put("group.id", Constants.GROUP_ID);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(Constants.MY_TOPIC));
        
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            sleep(1);
        }
    }

    /**
     * 手動提交offset
     */
    @Test
    public void consumerMsgManualCommit() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(Constants.MY_TOPIC));
        final int minBatchSize = 200;
        List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                buffer.add(record);
            }
            if (buffer.size() >= minBatchSize) {
                insertIntoDb(buffer);
                consumer.commitSync();
                buffer.clear();
            }
        }
    }

    private void insertIntoDb(List<ConsumerRecord<String, String>> buffer) {
        for (ConsumerRecord<String, String> record : buffer) {
            System.out.printf("insertIntoDb:offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }


    private void sleep(int seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

poll方法

從上一次消費完畢後提交的offset處拉取數據,消費後提交offset有兩種方式,手動和自動。網絡

Consumer讀取partition中的數據是經過調用發起一個fetch請求來執行的。而從Kafka Consumer來看,它有一個poll方法。可是這個poll方法只是可能會發起fetch請求。緣由是:Consumer每次發起fetch請求時,讀取到的數據是有限制的,經過配置項max.partition.fetch.bytes來限制。而在執行poll方法時,會根據配置項max.poll.records來限制一次最多pool多少個record。

那麼就可能出現這樣的狀況: 在知足max.partition.fetch.bytes限制的狀況下,假如fetch到了100個record,放到本地緩存後,因爲max.poll.records限制每次只能poll出15個record。那麼KafkaConsumer就須要執行7次poll方法才能將這一次經過網絡發起的fetch請求所fetch到的這100個record消費完畢。其中前6次是每次poll中15個record,最後一次是poll出10個record。

在consumer中,還有一個配置項:max.poll.interval.ms,它表示最大的poll數據間隔,默認值是3秒。若是超過這個間隔沒有發起pool請求,但heartbeat仍舊在發,就認爲該consumer處於 livelock狀態。就會將該consumer移出consumer group。因此爲了避免使 Consumer 本身被移出,Consumer 應該不停的發起poll(timeout)操做。而這個動做 KafkaConsumer Client是不會幫咱們作的,這就須要本身在程序中不停的調用poll方法了。

commit offset

當一個consumer因某種緣由退出Group時,進行從新分配partition後,同一group中的另外一個consumer在讀取該partition時,怎麼可以知道上一個consumer該從哪一個offset的message讀取呢?也是是如何保證同一個group內的consumer不重複消費消息呢?上面說了一次走網絡的fetch請求會拉取到必定量的數據,可是這些數據尚未被消息完畢,Consumer就掛掉了,下一次進行數據fetch時,是否會從上次讀到的數據開始讀取,而致使Consumer消費的數據丟失嗎?

爲了作到這一點,當使用完poll從本地緩存拉取到數據以後,須要client調用commitSync方法(或者commitAsync方法)去commit 下一次該去讀取 哪個offset的message。

而這個commit方法會經過走網絡的commit請求將offset在coordinator中保留,這樣就可以保證下一次讀取(不論進行了rebalance)時,既不會重複消費消息,也不會遺漏消息。

對於offset的commit,Kafka Consumer Java Client支持兩種模式:由Kafka Consumer自動提交,或者是用戶經過調用commitSync、commitAsync方法的方式手動完成offset的提交。

Consumer配置說明

bootstrap.servers

參考生產者中的bootstrap.servers配置說明

key.descrializer、value.descrializer

Message record 的key, value的反序列化類。

group.id
A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using subscribe(topic) or the Kafka-based offset management strategy.

用於表示該consumer想要加入到哪一個group中。默認值是 「」。

heartbeat.interval.ms
The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.

心跳間隔。心跳是在consumer與coordinator之間進行的。心跳用來保持consumer的會話,而且在有consumer加入或者離開group時幫助進行rebalance。
這個值必須設置的小於session.timeout.ms,由於:當Consumer因爲某種緣由不能發Heartbeat到coordinator時,而且時間超過session.timeout.ms時,就會認爲該consumer已退出,它所訂閱的partition會分配到同一group 內的其它的consumer上。

一般設置的值要低於session.timeout.ms的1/3。默認值是:3000 (3s)

session.timeout.ms
The timeout used to detect consumer failures when using Kafka's group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

Consumer session 過時時間。consumer會發送週期性的心跳代表該consumer是活着的。若是超過session.timeout.ms設定的值仍然沒有收到心跳,zebroker會把這個consumer從group中移除,而且從新rebalance。
這個值必須設置在broker configuration中的group.min.session.timeout.ms 與 group.max.session.timeout.ms之間。
其默認值是:10000 (10 s)

enable.auto.commit
If true the consumer's offset will be periodically committed in the background.

設置Consumer 在 commit 方式是不是自動調焦。默認值是true。

auto.commit.interval.ms
The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true.

自動提交間隔。範圍:[0,Integer.MAX],默認值是 5000 (5 s)

auto.offset.reset
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found for the consumer's group
  • anything else: throw exception to the consumer.

這個配置項,是告訴Kafka Broker在發現kafka在沒有初始offset,或者當前的offset是一個不存在的值(若是一個record被刪除,就確定不存在了)時,該如何處理。它有4種處理方式:

  • earliest:自動重置到最先的offset。
  • latest:看上去重置到最晚的offset。
  • none:若是邊更早的offset也沒有的話,就拋出異常給consumer,告訴consumer在整個consumer group中都沒有發現有這樣的offset。
  • 若是不是上述3種,只拋出異常給consumer。

默認值是latest。

fetch.max.wait.ms
The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes.

Fetch請求發給broker後,在broker中可能會被阻塞的(當topic中records的總size小於fetch.min.bytes時),此時這個fetch請求耗時就會比較長。這個配置就是來配置consumer最多等待response多久。

fetch.min.bytes
The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency.

當consumer向一個broker發起fetch請求時,broker返回的records的大小最小值。若是broker中數據量不夠的話會wait,直到數據大小知足這個條件。

取值範圍是:[0, Integer.Max],默認值是1。默認值設置爲1的目的是:使得consumer的請求可以儘快的返回。

fetch.max.bytes
The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). Note that the consumer performs multiple fetches in parallel.

一次fetch請求,從一個broker中取得的records最大大小。若是在從topic中第一個非空的partition取消息時,若是取到的第一個record的大小就超過這個配置時,仍然會讀取這個record,也就是說在這片狀況下,只會返回這一條record。broker、topic都會對producer發給它的message size作限制。因此在配置這值時,能夠參考broker的message.max.bytestopic的max.message.bytes的配置。

取值範圍是:[0, Integer.Max],默認值是:52428800 (5 MB)

max.partition.fetch.bytes

一次fetch請求,從一個partition中取得的records最大大小。若是在從topic中第一個非空的partition取消息時,若是取到的第一個record的大小就超過這個配置時,仍然會讀取這個record,也就是說在這片狀況下,只會返回這一條record。 broker、topic都會對producer發給它的message size作限制。因此在配置這值時,能夠參考broker 的message.max.bytes和 topic 的max.message.bytes的配置。

max.poll.interval.ms
The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.

前面說過要求程序中不間斷的調用poll()。若是長時間沒有調用poll,且間隔超過這個值時,就會認爲這個consumer失敗了。

max.poll.records
The maximum number of records returned in a single call to poll().

Consumer每次調用poll()時取到的records的最大數。

相關文章
相關標籤/搜索