We encourage all new development to use the new Java producer. This client is production tested and generally both faster and more fully featured than the previous Scala client.
You can use this client by adding a dependency on the client jar using the following example maven co-ordinates (you can change the version numbers with new releases):html
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.0</version> </dependency>
Examples showing how to use the producer are given in the javadocs.java
推薦你們都用新的Java client來代替舊的scala的client,react
A Kafka client that publishes records to the Kafka cluster.ios
The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.apache
Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value pairs.bootstrap
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); //ack方式,all,會等全部的commit最慢的方式 props.put("retries", 0); //失敗是否重試,設置會有可能產生重複數據 props.put("batch.size", 16384); //對於每一個partition的batch buffer大小 props.put("linger.ms", 1); //等多久,若是buffer沒滿,好比設爲1,即消息發送會多1ms的延遲,若是buffer沒滿 props.put("buffer.memory", 33554432); //整個producer能夠用於buffer的內存大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for(int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); producer.close();
對於buffer.memory
:windows
The buffer.memory
controls the total amount of memory available to the producer for buffering. If records are sent faster than they can be transmitted to the server then this buffer space will be exhausted.
When the buffer space is exhausted additional send calls will block. The threshold for time to block is determined by max.block.ms
after which it throws a TimeoutException.api
producer所能buffer數據的大小,若是數據產生的比發送的快,那麼這個buffer會耗盡,由於producer的send的異步的,會先放到buffer,可是若是buffer滿了,那麼send就會被block,而且當達到max.block.ms時會觸發TimeoutException
安全
注意和batch.size的區別,這個是batch的大小session
這裏send接口,還能夠加callback的,
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value); producer.send(myRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) e.printStackTrace(); System.out.println("The offset of the record we just sent is: " + metadata.offset()); } });
As of the 0.9.0 release we have added a new Java consumer to replace our existing high-level ZooKeeper-based consumer and low-level consumer APIs.
雖然在0.9中已經提供新的consumer,可是當前仍然可使用老的接口
2.2.3 New Consumer API
This new unified consumer API removes the distinction between the 0.8 high-level and low-level consumer APIs. You can use this client by adding a dependency on the client jar using the following example maven co-ordinates (you can change the version numbers with new releases):
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.0</version> </dependency>
Examples showing how to use the consumer are given in the javadocs.
Detecting Consumer Failures
The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer will stay in the group and continue to receive messages from the partitions it was assigned.
新版的consumer,經過poll來表示consumer的活性,即若是consumer是不斷的在調用poll的,那麼咱們就認爲這個consumer是正常的
這樣有個問題,若是你的操做時間比較長,或是取得的records數目太多,會致使poll的間隔比較長致使超時;固然也能夠設置上配置
session.timeout.ms
: By increasing the session timeout, you can give the consumer more time to handle a batch of records returned from poll(long)
. The only drawback is that it will take longer for the server to detect hard consumer failures, which can cause a delay before a rebalance can be completed. However, clean shutdown with close()
is not impacted since the consumer will send an explicit message to the server to leave the group and cause an immediate rebalance.max.poll.records
: Processing time in the poll loop is typically proportional to the number of records processed, so it's natural to want to set a limit on the number of records handled at once. This setting provides that. By default, there is essentially no limit.能夠配置session.timeout.ms
,讓timeout的時候長些
也能夠經過max.poll.records
,限制一次poll的條目數
你也能夠把真正的邏輯,放在其餘線程去作,而後儘可能快點去poll;但這裏注意,在處理完後須要commit;
若是要保證數據不丟,每每不會依賴auto commit,而是當邏輯處理完後,再手動的commit;若是處理延遲太長,該consumer已經超時,此時去作commit,會報CommitFailedException
異常
Automatic Offset Committing
自動offset commit的例子,
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); //自動commit props.put("auto.commit.interval.ms", "1000"); //定時commit的週期 props.put("session.timeout.ms", "30000"); //consumer活性超時時間 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); //subscribe,foo,bar,兩個topic while (true) { ConsumerRecords<String, String> records = consumer.poll(100); //100是超時等待時間 for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); }
Manual Offset Control
手工commit offset,
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); //關閉自動commit props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); consumer.commitSync(); //批量完成寫入後,手工sync offset buffer.clear(); } }
上面的方式,批量的sync offset
The above example uses commitSync
to mark all received messages as committed. In some cases you may wish to have even finer control over which messages have been committed by specifying an offset explicitly. In the example below we commit offset after we finish handling the messages in each partition.
更細粒度的commitSync
try { while(running) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { //按partition處理 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); //取出partition對應的Records for (ConsumerRecord<String, String> record : partitionRecords) { //處理每條record System.out.println(record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); //取出last offset consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); //獨立的sync每一個partition的offset } } } finally { consumer.close(); }
這裏爲什麼lastOffset要加1,由於你要commit的是,你下一條要讀的log的offset,因此必定是當前的offset+1
Note: The committed offset should always be the offset of the next message that your application will read. Thus, when calling commitSync(offsets)
you should add one to the offset of the last message processed.
Manual Partition Assignment
In the previous examples, we subscribed to the topics we were interested in and let Kafka dynamically assign a fair share of the partitions for those topics based on the active consumers in the group.
However, in some cases you may need finer control over the specific partitions that are assigned.
For example:
更細粒度,讀取某個topic的某個partition,這樣就不依賴kafka的動態assign,
To use this mode, instead of subscribing to the topic using subscribe
, you just call assign(Collection)
with the full list of partitions that you want to consume.
String topic = "foo"; TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1));
調用consumer.assign,指定該consumer讀topic 「foo」中的0,1兩個partition
Once assigned, you can call poll
in a loop, just as in the preceding examples to consume records.
The group that the consumer specifies is still used for committing offsets, but now the set of partitions will only change with another call to assign
.
而後仍然用poll讀取records,仍然用定義的consumer group來committing offsets;可是這個對應關係,除非再次調用assign,不然不會改變
Controlling The Consumer's Position
In most use cases the consumer will simply consume records from beginning to end, periodically committing its position (either automatically or manually). However Kafka allows the consumer to manually control its position, moving forward or backwards in a partition at will. This means a consumer can re-consume older records, or skip to the most recent records without actually consuming the intermediate records.
重置offset,
Kafka allows specifying the position using seek(TopicPartition, long)
to specify the new position. 重置某個partition的offset
Special methods for seeking to the earliest and latest offset the server maintains are also available ( seekToBeginning(Collection)
and seekToEnd(Collection)
respectively).
Consumption Flow Control
If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, effectively giving these partitions the same priority for consumption. However in some cases consumers may want to first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions when these partitions have few or no data to consume.
One of such cases is stream processing, where processor fetches from two topics and performs the join on these two streams. When one of the topics is long lagging behind the other, the processor would like to pause fetching from the ahead topic in order to get the lagging stream to catch up. Another example is bootstraping upon consumer starting up where there are a lot of history data to catch up, the applications usually want to get the latest data on some of the topics before consider fetching other topics.
Kafka supports dynamic controlling of consumption flows by using pause(Collection)
and resume(Collection)
to pause the consumption on the specified assigned partitions and resume the consumption on the specified paused partitions respectively in the futurepoll(long)
calls.
若是consumer被assign多個partititions,那麼他們是被同時,相同優先級讀取的;
因爲某些緣由,你可能想優先讀取其中部分partitions,
因此這裏提供,pause或resume接口
注意,這裏pause,只是consumer在poll這些partitions的時候,不真正返回records,但仍是會去作poll,不會形成rebalance
Managing Consumer Groups
With the ConsumerGroupCommand tool, we can list, delete, or describe consumer groups. For example, to list all consumer groups across all topics:
> bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list test-consumer-group
To view offsets as in the previous example with the ConsumerOffsetChecker, we "describe" the consumer group like this:
> bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group test-consumer-group GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER test-consumer-group test-foo 0 1 3 2 test-consumer-group_postamac.local-1456198719410-29ccd54f-0
When you're using the new consumer API where the broker handles coordination of partition handling and rebalance, you can manage the groups with the "--new-consumer" flags:
> bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server broker1:9092 --list
As of the 0.10.0 release we have added a new client library named Kafka Streams to let users implement their stream processing applications with data stored in Kafka topics.
alpha版本
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>0.10.0.0</version> </dependency>
Kafka Streams allows for performing continuous computation on input coming from one or more input topics and sends output to zero or more output topics.
Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); StreamsConfig config = new StreamsConfig(props); KStreamBuilder builder = new KStreamBuilder(); builder.from("my-input-topic").mapValue(value -> value.length().toString()).to("my-output-topic"); KafkaStreams streams = new KafkaStreams(builder, config); streams.start();
個人理解是,把Samza給integretion進來了,由於單獨的samza沒人用
So effectively Kafka guarantees at-least-once delivery by default and allows the user to implement at most once delivery by disabling retries on the producer and committing its offset prior to processing a batch of messages.
Exactly-once delivery requires co-operation with the destination storage system but Kafka provides the offset which makes implementing this straight-forward.
kafka是默認提供at-least-once語義的,
只須要關閉producer的重發機制而且在收到message後就先直接commit,就能夠達到at most once語義
理論上,單靠kafka是沒法實現 Exactly-once 的,須要配合其餘如存儲系統進行去重
單純經過offset,是沒法解,producer致使的duplication的,由於kafka裏面的數據自己就是重複的
Availability and Durability Guarantees
When writing to Kafka, producers can choose whether they wait for the message to be acknowledged by 0,1 or all (-1) replicas.
Note that "acknowledgement by all replicas" does not guarantee that the full set of assigned replicas have received the message. By default, when acks=all, acknowledgement happens as soon as all the current in-sync replicas have received the message. For example, if a topic is configured with only two replicas and one fails (i.e., only one in sync replica remains), then writes that specify acks=all will succeed. However, these writes could be lost if the remaining replica also fails.
Although this ensures maximum availability of the partition, this behavior may be undesirable to some users who prefer durability over availability.
Therefore, we provide two topic-level configurations that can be used to prefer message durability over availability:
當producer的設置爲,acknowledgement by all replicas,這裏的all replicas不是指AR,而是指ISR,因此雖然是3 replicas,但若是隻有一個replica alive,那麼只要這個replica ack就算是all ack
若是你更傾向於durability,而非availability,有兩個設置,
1. 關閉unclean leader election, 這樣leader必需要是在isr中的replica,若是沒有replica available,那麼該partition會offline,這樣能夠在犧牲可用性的狀況下,下降丟數據的可能性
2. Specify a minimum ISR size,若是ISR的數目小於這個值,那麼這個partition就會直接offline
這個配置僅僅在acks=all時纔有意義,這樣只要有minimum個isr,並完成ack,咱們就能夠認爲該all ack
明顯這個作法也會大大下降可用性
Log compaction ensures that Kafka will always retain at least the last known value for each message key within the log of data for a single topic partition. It addresses use cases and scenarios such as restoring state after application crashes or system failure, or reloading caches after application restarts during operational maintenance.
當前管理consumer的offset應該就是用的這個方案
細節用到再看吧
Starting in 0.9, the Kafka cluster has the ability to enforce quotas on produce and fetch requests. Quotas are basically byte-rate thresholds defined per client-id. A client-id logically identifies an application making a request. Hence a single client-id can span multiple producer and consumer instances and the quota will apply for all of them as a single entity i.e. if client-id="test-client" has a produce quota of 10MB/sec, this is shared across all instances with that same id.
By default, each unique client-id receives a fixed quota in bytes/sec as configured by the cluster (quota.producer.default, quota.consumer.default). This quota is defined on a per-broker basis. Each client can publish/fetch a maximum of X bytes/sec per broker before it gets throttled. We decided that defining these quotas per broker is much better than having a fixed cluster wide bandwidth per client because that would require a mechanism to share client quota usage among all the brokers. This can be harder to get right than the quota implementation itself!
0.9加入,限流的功能
經過quota.producer.default, quota.consumer.default來限定某個client-id 的fixed quota in bytes/sec ;注意這個限流是per-broker的,而不是per-cluster的
How does a broker react when it detects a quota violation? In our solution, the broker does not return an error rather it attempts to slow down a client exceeding its quota. It computes the amount of delay needed to bring a guilty client under it's quota and delays the response for that time. This approach keeps the quota violation transparent to clients (outside of client-side metrics). This also keeps them from having to implement any special backoff and retry behavior which can get tricky. In fact, bad client behavior (retry without backoff) can exacerbate the very problem quotas are trying to solve.
Client byte rate is measured over multiple small windows (e.g. 30 windows of 1 second each) in order to detect and correct quota violations quickly.
Typically, having large measurement windows (for e.g. 10 windows of 30 seconds each) leads to large bursts of traffic followed by long delays which is not great in terms of user experience.
broker當發現quota violation時,不會直接拒絕響應,而是去delay response;這樣對client的影響會比較小
而且咱們在作流量統計的時候,是基於多個小時間窗口,這樣更準確一些
It is possible to override the default quota for client-ids that need a higher (or even lower) quota. The mechanism is similar to the per-topic log config overrides. Client-id overrides are written to ZooKeeper under/config/clients. These overrides are read by all brokers and are effective immediately. This lets us change quotas without having to do a rolling restart of the entire cluster. See here for details.
固然你能夠經過動態配置去修改某個client的quotas配置,Client-id overrides are written to ZooKeeper under/config/clients.
The following sets the default quota per producer and consumer client-id to 10MB/sec.
quota.producer.default=10485760 quota.consumer.default=10485760
It is also possible to set custom quotas for each client.
> bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-name clientA --entity-type clients Updated config for clientId: "clientA".
Here's how to describe the quota for a given client.
> ./kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name clientA --entity-type clients Configs for clients:clientA are producer_byte_rate=1024,consumer_byte_rate=2048
In release 0.9.0.0, the Kafka community added a number of features that, used either separately or together, increases security in a Kafka cluster.
These features are considered to be of beta quality. The following security measures are currently supported:
1. 能夠對brokers和clients,brokers,tools之間的connection進行Authentication,使用SSL,SASL
數字證書原理,很詳細
SSL的延遲,http://www.ruanyifeng.com/blog/2014/09/ssl-latency.html
2. 對brokers和zookeeper之間的鏈接進行Authentication
3. 數據傳輸用SSL加密,性能會降低
4. 對clients的讀寫操做進行Authorization
5. Authorization 是pluggable,並可使用外部的authorization services
實際使用的方式仍是比較複雜的,後面用到再看吧
Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. Kafka Connect can ingest entire databases or collect metrics from all your application servers into Kafka topics, making the data available for stream processing with low latency. An export job can deliver data from Kafka topics into secondary storage and query systems or into batch systems for offline analysis.
提供kafka的導入和導出工具
Kafka Connect currently supports two modes of execution: standalone (single process) and distributed.
In standalone mode all work is performed in a single process. You can start a standalone process with the following command:
> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
distribute,
> bin/connect-distributed.sh config/connect-distributed.properties
In particular, the following configuration parameters are critical to set before starting your cluster:
group.id
(default connect-cluster
) - unique name for the cluster, used in forming the Connect cluster group; note that this must not conflict with consumer group IDsconfig.storage.topic
(default connect-configs
) - topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated topic. You may need to manually create the topic to ensure single partition for the config topic as auto created topics may have multiple partitions.offset.storage.topic
(default connect-offsets
) - topic to use for storing offsets; this topic should have many partitions and be replicatedstatus.storage.topic
(default connect-status
) - topic to use for storing statuses; this topic can have multiple partitions and should be replicated上面4個配置是須要在connect-*.properties裏面指定的
對於standalone,在命令行中還須要指明connector的配置,
可是對於distributed的方式,use the REST API described below to create, modify, and destroy connectors.
name
- Unique name for the connector. Attempting to register again with the same name will fail.connector.class
- The Java class for the connectortasks.max
- The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot achieve this level of parallelism.topics
- A list of topics to use as input for this connectorREST API
Since Kafka Connect is intended to be run as a service, it also provides a REST API for managing connectors. By default this service runs on port 8083. The following are the currently supported endpoints:
GET /connectors
- return a list of active connectorsPOST /connectors
- create a new connector; the request body should be a JSON object containing a string name
field and a object config
field with the connector configuration parametersGET /connectors/{name}
- get information about a specific connector
總結,
比較有價值的改動是,使用新的Producer和Consumer client,尤爲Consumer會大大下降以前使用low-level consumer的複雜度
增長安全組件,支持SSL和SASL來作安全驗證
至於stream API和connect,有些雞肋