Kafka 0.10.0

2.1 Producer API

We encourage all new development to use the new Java producer. This client is production tested and generally both faster and more fully featured than the previous Scala client.
You can use this client by adding a dependency on the client jar using the following example maven co-ordinates (you can change the version numbers with new releases):html

	<dependency>
	    <groupId>org.apache.kafka</groupId>
	    <artifactId>kafka-clients</artifactId>
	    <version>0.10.0.0</version>
	</dependency>

Examples showing how to use the producer are given in the javadocs.java

推薦你們都用新的Java client來代替舊的scala的client,react

A Kafka client that publishes records to the Kafka cluster.ios

The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.apache

Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value pairs.bootstrap

 

Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all"); //ack方式,all,會等全部的commit最慢的方式
 props.put("retries", 0); //失敗是否重試,設置會有可能產生重複數據
 props.put("batch.size", 16384); //對於每一個partition的batch buffer大小
 props.put("linger.ms", 1);  //等多久,若是buffer沒滿,好比設爲1,即消息發送會多1ms的延遲,若是buffer沒滿
 props.put("buffer.memory", 33554432); //整個producer能夠用於buffer的內存大小
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for(int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

 producer.close();

 

對於buffer.memorywindows

The buffer.memory controls the total amount of memory available to the producer for buffering. If records are sent faster than they can be transmitted to the server then this buffer space will be exhausted.
When the buffer space is exhausted additional send calls will block. The threshold for time to block is determined by max.block.ms after which it throws a TimeoutException.api

producer所能buffer數據的大小,若是數據產生的比發送的快,那麼這個buffer會耗盡,由於producer的send的異步的,會先放到buffer,可是若是buffer滿了,那麼send就會被block,而且當達到max.block.ms時會觸發TimeoutException安全

注意和batch.size的區別,這個是batch的大小session

 

這裏send接口,還能夠加callback的,

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
 producer.send(myRecord,
               new Callback() {
                   public void onCompletion(RecordMetadata metadata, Exception e) {
                       if(e != null)
                           e.printStackTrace();
                       System.out.println("The offset of the record we just sent is: " + metadata.offset());
                   }
               });

 

 

2.2 Consumer API

As of the 0.9.0 release we have added a new Java consumer to replace our existing high-level ZooKeeper-based consumer and low-level consumer APIs.

雖然在0.9中已經提供新的consumer,可是當前仍然可使用老的接口

 

2.2.3 New Consumer API

This new unified consumer API removes the distinction between the 0.8 high-level and low-level consumer APIs. You can use this client by adding a dependency on the client jar using the following example maven co-ordinates (you can change the version numbers with new releases):

	<dependency>
	    <groupId>org.apache.kafka</groupId>
	    <artifactId>kafka-clients</artifactId>
	    <version>0.10.0.0</version>
	</dependency>

Examples showing how to use the consumer are given in the javadocs.

 

Detecting Consumer Failures

The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer will stay in the group and continue to receive messages from the partitions it was assigned.

新版的consumer,經過poll來表示consumer的活性,即若是consumer是不斷的在調用poll的,那麼咱們就認爲這個consumer是正常的

這樣有個問題,若是你的操做時間比較長,或是取得的records數目太多,會致使poll的間隔比較長致使超時;固然也能夠設置上配置

  1. session.timeout.ms: By increasing the session timeout, you can give the consumer more time to handle a batch of records returned from poll(long). The only drawback is that it will take longer for the server to detect hard consumer failures, which can cause a delay before a rebalance can be completed. However, clean shutdown with close() is not impacted since the consumer will send an explicit message to the server to leave the group and cause an immediate rebalance.
  2. max.poll.records: Processing time in the poll loop is typically proportional to the number of records processed, so it's natural to want to set a limit on the number of records handled at once. This setting provides that. By default, there is essentially no limit.

能夠配置session.timeout.ms,讓timeout的時候長些
也能夠經過max.poll.records,限制一次poll的條目數

 

你也能夠把真正的邏輯,放在其餘線程去作,而後儘可能快點去poll;但這裏注意,在處理完後須要commit;
若是要保證數據不丟,每每不會依賴auto commit,而是當邏輯處理完後,再手動的commit;若是處理延遲太長,該consumer已經超時,此時去作commit,會報CommitFailedException 異常

 

Automatic Offset Committing

自動offset commit的例子,

    Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");  //自動commit
     props.put("auto.commit.interval.ms", "1000"); //定時commit的週期
     props.put("session.timeout.ms", "30000"); //consumer活性超時時間
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar")); //subscribe,foo,bar,兩個topic
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);  //100是超時等待時間
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
     }

 

Manual Offset Control

手工commit offset,

     Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "false"); //關閉自動commit
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     final int minBatchSize = 200;
     List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records) {
             buffer.add(record);
         }
         if (buffer.size() >= minBatchSize) {
             insertIntoDb(buffer); 
             consumer.commitSync(); //批量完成寫入後,手工sync offset
             buffer.clear();
         }
     }

上面的方式,批量的sync offset

The above example uses commitSync to mark all received messages as committed. In some cases you may wish to have even finer control over which messages have been committed by specifying an offset explicitly. In the example below we commit offset after we finish handling the messages in each partition.

更細粒度的commitSync

     try {
         while(running) {
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
             for (TopicPartition partition : records.partitions()) { //按partition處理
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); //取出partition對應的Records
                 for (ConsumerRecord<String, String> record : partitionRecords) { //處理每條record
                     System.out.println(record.offset() + ": " + record.value());
                 }
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); //取出last offset
                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); //獨立的sync每一個partition的offset
             }
         }
     } finally {
       consumer.close();
     }

這裏爲什麼lastOffset要加1,由於你要commit的是,你下一條要讀的log的offset,因此必定是當前的offset+1

Note: The committed offset should always be the offset of the next message that your application will read. Thus, when calling commitSync(offsets) you should add one to the offset of the last message processed.

 

Manual Partition Assignment

In the previous examples, we subscribed to the topics we were interested in and let Kafka dynamically assign a fair share of the partitions for those topics based on the active consumers in the group.
However, in some cases you may need finer control over the specific partitions that are assigned.

For example:

  • If the process is maintaining some kind of local state associated with that partition (like a local on-disk key-value store), then it should only get records for the partition it is maintaining on disk.
  • If the process itself is highly available and will be restarted if it fails (perhaps using a cluster management framework like YARN, Mesos, or AWS facilities, or as part of a stream processing framework). In this case there is no need for Kafka to detect the failure and reassign the partition since the consuming process will be restarted on another machine.

更細粒度,讀取某個topic的某個partition,這樣就不依賴kafka的動態assign,

To use this mode, instead of subscribing to the topic using subscribe, you just call assign(Collection) with the full list of partitions that you want to consume.

String topic = "foo";
     TopicPartition partition0 = new TopicPartition(topic, 0);
     TopicPartition partition1 = new TopicPartition(topic, 1);
     consumer.assign(Arrays.asList(partition0, partition1));

調用consumer.assign,指定該consumer讀topic 「foo」中的0,1兩個partition

Once assigned, you can call poll in a loop, just as in the preceding examples to consume records.
The group that the consumer specifies is still used for committing offsets, but now the set of partitions will only change with another call to assign.

而後仍然用poll讀取records,仍然用定義的consumer group來committing offsets;可是這個對應關係,除非再次調用assign,不然不會改變

 

Controlling The Consumer's Position

In most use cases the consumer will simply consume records from beginning to end, periodically committing its position (either automatically or manually). However Kafka allows the consumer to manually control its position, moving forward or backwards in a partition at will. This means a consumer can re-consume older records, or skip to the most recent records without actually consuming the intermediate records.

重置offset,

Kafka allows specifying the position using seek(TopicPartition, long) to specify the new position. 重置某個partition的offset

Special methods for seeking to the earliest and latest offset the server maintains are also available ( seekToBeginning(Collection) and seekToEnd(Collection) respectively).

 

Consumption Flow Control

If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, effectively giving these partitions the same priority for consumption. However in some cases consumers may want to first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions when these partitions have few or no data to consume.

One of such cases is stream processing, where processor fetches from two topics and performs the join on these two streams. When one of the topics is long lagging behind the other, the processor would like to pause fetching from the ahead topic in order to get the lagging stream to catch up. Another example is bootstraping upon consumer starting up where there are a lot of history data to catch up, the applications usually want to get the latest data on some of the topics before consider fetching other topics.

Kafka supports dynamic controlling of consumption flows by using pause(Collection) and resume(Collection) to pause the consumption on the specified assigned partitions and resume the consumption on the specified paused partitions respectively in the futurepoll(long) calls.

若是consumer被assign多個partititions,那麼他們是被同時,相同優先級讀取的;
因爲某些緣由,你可能想優先讀取其中部分partitions,

因此這裏提供,pause或resume接口

注意,這裏pause,只是consumer在poll這些partitions的時候,不真正返回records,但仍是會去作poll,不會形成rebalance

 

Managing Consumer Groups

With the ConsumerGroupCommand tool, we can list, delete, or describe consumer groups. For example, to list all consumer groups across all topics:

 > bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list

test-consumer-group

To view offsets as in the previous example with the ConsumerOffsetChecker, we "describe" the consumer group like this:

 > bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group test-consumer-group

GROUP                          TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
test-consumer-group            test-foo                       0          1               3               2               test-consumer-group_postamac.local-1456198719410-29ccd54f-0

When you're using the new consumer API where the broker handles coordination of partition handling and rebalance, you can manage the groups with the "--new-consumer" flags:

 > bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server broker1:9092 --list

 

 

 

 

 

2.3 Streams API

As of the 0.10.0 release we have added a new client library named Kafka Streams to let users implement their stream processing applications with data stored in Kafka topics.

alpha版本

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>0.10.0.0</version>
    </dependency>

 

Kafka Streams allows for performing continuous computation on input coming from one or more input topics and sends output to zero or more output topics.

 

Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    StreamsConfig config = new StreamsConfig(props);

    KStreamBuilder builder = new KStreamBuilder();
    builder.from("my-input-topic").mapValue(value -> value.length().toString()).to("my-output-topic");

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();

 

個人理解是,把Samza給integretion進來了,由於單獨的samza沒人用

 

4.6 Message Delivery Semantics

 

So effectively Kafka guarantees at-least-once delivery by default and allows the user to implement at most once delivery by disabling retries on the producer and committing its offset prior to processing a batch of messages.

Exactly-once delivery requires co-operation with the destination storage system but Kafka provides the offset which makes implementing this straight-forward.

 

kafka是默認提供at-least-once語義的,
只須要關閉producer的重發機制而且在收到message後就先直接commit,就能夠達到at most once語義

理論上,單靠kafka是沒法實現 Exactly-once 的,須要配合其餘如存儲系統進行去重
單純經過offset,是沒法解,producer致使的duplication的,由於kafka裏面的數據自己就是重複的

 

Availability and Durability Guarantees

When writing to Kafka, producers can choose whether they wait for the message to be acknowledged by 0,1 or all (-1) replicas.
Note that "acknowledgement by all replicas" does not guarantee that the full set of assigned replicas have received the message. By default, when acks=all, acknowledgement happens as soon as all the current in-sync replicas have received the message. For example, if a topic is configured with only two replicas and one fails (i.e., only one in sync replica remains), then writes that specify acks=all will succeed. However, these writes could be lost if the remaining replica also fails.

Although this ensures maximum availability of the partition, this behavior may be undesirable to some users who prefer durability over availability.
Therefore, we provide two topic-level configurations that can be used to prefer message durability over availability:

  1. Disable unclean leader election - if all replicas become unavailable, then the partition will remain unavailable until the most recent leader becomes available again. This effectively prefers unavailability over the risk of message loss. See the previous section on Unclean Leader Election for clarification.
  2. Specify a minimum ISR size - the partition will only accept writes if the size of the ISR is above a certain minimum, in order to prevent the loss of messages that were written to just a single replica, which subsequently becomes unavailable. This setting only takes effect if the producer uses acks=all and guarantees that the message will be acknowledged by at least this many in-sync replicas. This setting offers a trade-off between consistency and availability. A higher setting for minimum ISR size guarantees better consistency since the message is guaranteed to be written to more replicas which reduces the probability that it will be lost. However, it reduces availability since the partition will be unavailable for writes if the number of in-sync replicas drops below the minimum threshold.

當producer的設置爲,acknowledgement by all replicas,這裏的all replicas不是指AR,而是指ISR,因此雖然是3 replicas,但若是隻有一個replica alive,那麼只要這個replica ack就算是all ack

若是你更傾向於durability,而非availability,有兩個設置,

1. 關閉unclean leader election, 這樣leader必需要是在isr中的replica,若是沒有replica available,那麼該partition會offline,這樣能夠在犧牲可用性的狀況下,下降丟數據的可能性

2. Specify a minimum ISR size,若是ISR的數目小於這個值,那麼這個partition就會直接offline
這個配置僅僅在acks=all時纔有意義,這樣只要有minimum個isr,並完成ack,咱們就能夠認爲該all ack
明顯這個作法也會大大下降可用性

 

4.8 Log Compaction

Log compaction ensures that Kafka will always retain at least the last known value for each message key within the log of data for a single topic partition. It addresses use cases and scenarios such as restoring state after application crashes or system failure, or reloading caches after application restarts during operational maintenance.

當前管理consumer的offset應該就是用的這個方案

細節用到再看吧

 

4.9 Quotas

Starting in 0.9, the Kafka cluster has the ability to enforce quotas on produce and fetch requests. Quotas are basically byte-rate thresholds defined per client-id. A client-id logically identifies an application making a request. Hence a single client-id can span multiple producer and consumer instances and the quota will apply for all of them as a single entity i.e. if client-id="test-client" has a produce quota of 10MB/sec, this is shared across all instances with that same id.

By default, each unique client-id receives a fixed quota in bytes/sec as configured by the cluster (quota.producer.default, quota.consumer.default). This quota is defined on a per-broker basis. Each client can publish/fetch a maximum of X bytes/sec per broker before it gets throttled. We decided that defining these quotas per broker is much better than having a fixed cluster wide bandwidth per client because that would require a mechanism to share client quota usage among all the brokers. This can be harder to get right than the quota implementation itself!

0.9加入,限流的功能
經過quota.producer.default, quota.consumer.default來限定某個client-id 的fixed quota in bytes/sec ;注意這個限流是per-broker的,而不是per-cluster的

 

How does a broker react when it detects a quota violation? In our solution, the broker does not return an error rather it attempts to slow down a client exceeding its quota. It computes the amount of delay needed to bring a guilty client under it's quota and delays the response for that time. This approach keeps the quota violation transparent to clients (outside of client-side metrics). This also keeps them from having to implement any special backoff and retry behavior which can get tricky. In fact, bad client behavior (retry without backoff) can exacerbate the very problem quotas are trying to solve.

Client byte rate is measured over multiple small windows (e.g. 30 windows of 1 second each) in order to detect and correct quota violations quickly.
Typically, having large measurement windows (for e.g. 10 windows of 30 seconds each) leads to large bursts of traffic followed by long delays which is not great in terms of user experience.

broker當發現quota violation時,不會直接拒絕響應,而是去delay response;這樣對client的影響會比較小
而且咱們在作流量統計的時候,是基於多個小時間窗口,這樣更準確一些

It is possible to override the default quota for client-ids that need a higher (or even lower) quota. The mechanism is similar to the per-topic log config overrides. Client-id overrides are written to ZooKeeper under/config/clients. These overrides are read by all brokers and are effective immediately. This lets us change quotas without having to do a rolling restart of the entire cluster. See here for details.

固然你能夠經過動態配置去修改某個client的quotas配置,Client-id overrides are written to ZooKeeper under/config/clients.

The following sets the default quota per producer and consumer client-id to 10MB/sec.

  quota.producer.default=10485760
  quota.consumer.default=10485760

It is also possible to set custom quotas for each client.

> bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-name clientA --entity-type clients
Updated config for clientId: "clientA".

Here's how to describe the quota for a given client.

> ./kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-name clientA --entity-type clients
Configs for clients:clientA are producer_byte_rate=1024,consumer_byte_rate=2048

 

7. Security

7.1 Security Overview

In release 0.9.0.0, the Kafka community added a number of features that, used either separately or together, increases security in a Kafka cluster.
These features are considered to be of beta quality. The following security measures are currently supported:

  1. Authentication of connections to brokers from clients (producers and consumers), other brokers and tools, using either SSL or SASL (Kerberos). SASL/PLAIN can also be used from release 0.10.0.0 onwards.
  2. Authentication of connections from brokers to ZooKeeper
  3. Encryption of data transferred between brokers and clients, between brokers, or between brokers and tools using SSL (Note that there is a performance degradation when SSL is enabled, the magnitude of which depends on the CPU type and the JVM implementation.)
  4. Authorization of read / write operations by clients
  5. Authorization is pluggable and integration with external authorization services is supported

1. 能夠對brokers和clients,brokers,tools之間的connection進行Authentication,使用SSL,SASL

數字證書原理,很詳細

數字證書, 數字簽名, SSL(TLS) , SASL

SSL的延遲,http://www.ruanyifeng.com/blog/2014/09/ssl-latency.html

2. 對brokers和zookeeper之間的鏈接進行Authentication

3. 數據傳輸用SSL加密,性能會降低

4. 對clients的讀寫操做進行Authorization

5. Authorization 是pluggable,並可使用外部的authorization services

 

實際使用的方式仍是比較複雜的,後面用到再看吧

 

8. Kafka Connect

8.1 Overview

Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. Kafka Connect can ingest entire databases or collect metrics from all your application servers into Kafka topics, making the data available for stream processing with low latency. An export job can deliver data from Kafka topics into secondary storage and query systems or into batch systems for offline analysis.

提供kafka的導入和導出工具

Kafka Connect currently supports two modes of execution: standalone (single process) and distributed.

In standalone mode all work is performed in a single process. You can start a standalone process with the following command:

> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]

distribute,

> bin/connect-distributed.sh config/connect-distributed.properties

In particular, the following configuration parameters are critical to set before starting your cluster:

  • group.id (default connect-cluster) - unique name for the cluster, used in forming the Connect cluster group; note that this must not conflict with consumer group IDs
  • config.storage.topic (default connect-configs) - topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated topic. You may need to manually create the topic to ensure single partition for the config topic as auto created topics may have multiple partitions.
  • offset.storage.topic (default connect-offsets) - topic to use for storing offsets; this topic should have many partitions and be replicated
  • status.storage.topic (default connect-status) - topic to use for storing statuses; this topic can have multiple partitions and should be replicated

上面4個配置是須要在connect-*.properties裏面指定的

對於standalone,在命令行中還須要指明connector的配置,

可是對於distributed的方式,use the REST API described below to create, modify, and destroy connectors.

  • name - Unique name for the connector. Attempting to register again with the same name will fail.
  • connector.class - The Java class for the connector
  • tasks.max - The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot achieve this level of parallelism.
  • topics - A list of topics to use as input for this connector

REST API

Since Kafka Connect is intended to be run as a service, it also provides a REST API for managing connectors. By default this service runs on port 8083. The following are the currently supported endpoints:

  • GET /connectors - return a list of active connectors
  • POST /connectors - create a new connector; the request body should be a JSON object containing a string name field and a object config field with the connector configuration parameters
  • GET /connectors/{name} - get information about a specific connector
  • 。。。。。。

 

總結,

比較有價值的改動是,使用新的Producer和Consumer client,尤爲Consumer會大大下降以前使用low-level consumer的複雜度

增長安全組件,支持SSL和SASL來作安全驗證

至於stream API和connect,有些雞肋

相關文章
相關標籤/搜索