本文主要講解一下kafka生產者的幾個配置參數。java
static { config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, BOOSTRAP_SERVERS_DOC) .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) .define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC) .define(ACKS_CONFIG, Type.STRING, "1", in(Arrays.asList("all", "-1", "0", "1")), Importance.HIGH, ACKS_DOC) .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC) .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC) .define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC) .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC) .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CLIENT_ID_DOC) .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.MEDIUM, SEND_BUFFER_DOC) .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), Importance.MEDIUM, RECEIVE_BUFFER_DOC) .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), Importance.MEDIUM, MAX_REQUEST_SIZE_DOC) .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC) .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), Importance.LOW, RECONNECT_BACKOFF_MS_DOC) .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC) .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, RETRY_BACKOFF_MS_DOC) .define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, 60 * 1000, atLeast(0), Importance.LOW, METADATA_FETCH_TIMEOUT_DOC) .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC) .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, Type.LONG, 30000, atLeast(0), Importance.LOW, METRICS_SAMPLE_WINDOW_MS_DOC) .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC) .define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, Type.INT, 5, atLeast(1), Importance.LOW, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC) .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC) .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC); }
/** <code>block.on.buffer.full</code> */ public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full"; private static final String BLOCK_ON_BUFFER_FULL_DOC = "When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default " + "this setting is true and we block, however in some scenarios blocking is not desirable and it is better to " + "immediately give an error. Setting this to <code>false</code> will accomplish that: the producer will throw a BufferExhaustedException if a recrord is sent and the buffer space is full.";
默認是true,也就是當memory buffer耗盡的時候阻塞等待;若是爲false則拋出BufferExhaustedException異常ios
主要是做用在這個類
kafka-clients-0.8.2.2-sources.jar!/org/apache/kafka/clients/producer/internals/BufferPool.javaapache
/** * Allocate a buffer of the given size. This method blocks if there is not enough memory and the buffer pool * is configured with blocking mode. * * @param size The buffer size to allocate in bytes * @return The buffer * @throws InterruptedException If the thread is interrupted while blocked * @throws IllegalArgumentException if size is larger than the total memory controlled by the pool (and hence we would block * forever) * @throws BufferExhaustedException if the pool is in non-blocking mode and size exceeds the free memory in the pool */ public ByteBuffer allocate(int size) throws InterruptedException { if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations."); this.lock.lock(); try { // check if we have a free buffer of the right size pooled if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); // now check if the request is immediately satisfiable with the // memory on hand or if we need to block int freeListSize = this.free.size() * this.poolableSize; if (this.availableMemory + freeListSize >= size) { // we have enough unallocated or pooled memory to immediately // satisfy the request freeUp(size); this.availableMemory -= size; lock.unlock(); return ByteBuffer.allocate(size); } else if (!blockOnExhaustion) { throw new BufferExhaustedException("You have exhausted the " + this.totalMemory + " bytes of memory you configured for the client and the client is configured to error" + " rather than block when memory is exhausted."); } else { // we are out of memory and will have to block int accumulated = 0; ByteBuffer buffer = null; Condition moreMemory = this.lock.newCondition(); this.waiters.addLast(moreMemory); // loop over and over until we have a buffer or have reserved // enough memory to allocate one while (accumulated < size) { long startWait = time.nanoseconds(); moreMemory.await(); long endWait = time.nanoseconds(); this.waitTime.record(endWait - startWait, time.milliseconds()); // check if we can satisfy this request from the free list, // otherwise allocate memory if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { // just grab a buffer from the free list buffer = this.free.pollFirst(); accumulated = size; } else { // we'll need to allocate memory, but we may only get // part of what we need on this iteration freeUp(size - accumulated); int got = (int) Math.min(size - accumulated, this.availableMemory); this.availableMemory -= got; accumulated += got; } } // remove the condition for this thread to let the next thread // in line start getting memory Condition removed = this.waiters.removeFirst(); if (removed != moreMemory) throw new IllegalStateException("Wrong condition: this shouldn't happen."); // signal any additional waiters if there is more memory left // over for them if (this.availableMemory > 0 || !this.free.isEmpty()) { if (!this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } // unlock and return the buffer lock.unlock(); if (buffer == null) return ByteBuffer.allocate(size); else return buffer; } } finally { if (lock.isHeldByCurrentThread()) lock.unlock(); } }
/** <code>acks</code> */ public static final String ACKS_CONFIG = "acks"; private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the " + " durability of records that are sent. The following settings are common: " + " <ul>" + " <li><code>acks=0</code> If set to zero then the producer will not wait for any acknowledgment from the" + " server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be" + " made that the server has received the record in this case, and the <code>retries</code> configuration will not" + " take effect (as the client won't generally know of any failures). The offset given back for each record will" + " always be set to -1." + " <li><code>acks=1</code> This will mean the leader will write the record to its local log but will respond" + " without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after" + " acknowledging the record but before the followers have replicated it then the record will be lost." + " <li><code>acks=all</code> This means the leader will wait for the full set of in-sync replicas to" + " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica" + " remains alive. This is the strongest available guarantee.";
acks=0:msg 只要被 producer 發送出去就認爲已經發送完成了;acks=1(
默認
):若是 leader 接收到消息併發送 ack (不會等會該 msg 是否同步到其餘副本)就認爲 msg 發送成功了; acks=all或者-1:leader 接收到 msg 並從全部 isr 接收到 ack 後再向 producer 發送 ack,這樣才認爲 msg 發送成功了,這是最高級別的可靠性保證。併發
/** <code>retries</code> */ public static final String RETRIES_CONFIG = "retries"; private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error." + " Note that this retry is no different than if the client resent the record upon receiving the " + "error. Allowing retries will potentially change the ordering of records because if two records are " + "sent to a single partition, and the first fails and is retried but the second succeeds, then the second record " + "may appear first.";
默認是0app
/** <code>retry.backoff.ms</code> */ public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms"; private static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed produce request to a given topic partition." + " This avoids repeated sending-and-failing in a tight loop.";
默認爲100L,發送重試的間隔less
/** <code>max.in.flight.requests.per.connection</code> */ public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection"; private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking.";
限制客戶端在單個鏈接上可以發送的未響應請求的個數。設置此值是1表示kafka broker在響應請求以前client不能再向同一個broker發送請求。注意:設置此參數是爲了不消息亂序.默認爲5。socket
/** <code>buffer.memory</code> */ public static final String BUFFER_MEMORY_CONFIG = "buffer.memory"; private static final String BUFFER_MEMORY_DOC = "The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are " + "sent faster than they can be delivered to the server the producer will either block or throw an exception based " + "on the preference specified by <code>block.on.buffer.full</code>. " + "<p>" + "This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since " + "not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if " + "compression is enabled) as well as for maintaining in-flight requests.";
producer 能夠使用的最大內存(
默認32 * 1024 * 1024L
),若是超過這個值,取決於block.on.buffer.full的配置,看是阻塞仍是拋出異常ide
/** <code>batch.size</code> */ public static final String BATCH_SIZE_CONFIG = "batch.size"; private static final String BATCH_SIZE_DOC = "The producer will attempt to batch records together into fewer requests whenever multiple records are being sent" + " to the same partition. This helps performance on both the client and the server. This configuration controls the " + "default batch size in bytes. " + "<p>" + "No attempt will be made to batch records larger than this size. " + "<p>" + "Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent. " + "<p>" + "A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable " + "batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a " + "buffer of the specified batch size in anticipation of additional records.";
默認16384(
16KB
),當 batch 的大小超過 batch.size 或者時間達到 linger.ms 就會發送 batch,根據經驗,設置爲1MB 吞吐會更高,過小的話吞吐小,太大的話致使內存浪費進而影響吞吐量oop
/** <code>linger.ms</code> */ public static final String LINGER_MS_CONFIG = "linger.ms"; private static final String LINGER_MS_DOC = "The producer groups together any records that arrive in between request transmissions into a single batched request. " + "Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to " + "reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount " + "of artificial delay—that is, rather than immediately sending out a record the producer will wait for up to " + "the given delay to allow other records to be sent so that the sends can be batched together. This can be thought " + "of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once " + "we get <code>batch.size</code> worth of records for a partition it will be sent immediately regardless of this " + "setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the " + "specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting <code>linger.ms=5</code>, " + "for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absense of load.";
在一個 batch 達不到 batch.size 時,這個 batch 最多將會等待 linger.ms 時間,超過這個時間這個 batch 就會被髮送,但也會帶來相應的延遲,能夠根據具體的場景進行設置.默認爲0(
no delay
),也就是這個時候batch是不起做用的。fetch
/** <code>metadata.fetch.timeout.ms</code> */ public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms"; private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the " + "topic's partitions. This configuration controls the maximum amount of time we will block waiting for the metadata " + "fetch to succeed before throwing an exception back to the client.";
獲取meta的超時時間,默認60*1000,主要是下面兩個方法調用
/** * Wait for cluster metadata including partitions for the given topic to be available. * @param topic The topic we want metadata for * @param maxWaitMs The maximum time in ms for waiting on the metadata */ private void waitOnMetadata(String topic, long maxWaitMs) { if (metadata.fetch().partitionsForTopic(topic) != null) { return; } else { long begin = time.milliseconds(); long remainingWaitMs = maxWaitMs; while (metadata.fetch().partitionsForTopic(topic) == null) { log.trace("Requesting metadata update for topic {}.", topic); int version = metadata.requestUpdate(); metadata.add(topic); sender.wakeup(); metadata.awaitUpdate(version, remainingWaitMs); long elapsed = time.milliseconds() - begin; if (elapsed >= maxWaitMs) throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); remainingWaitMs = maxWaitMs - elapsed; } } } @Override public List<PartitionInfo> partitionsFor(String topic) { waitOnMetadata(topic, this.metadataFetchTimeoutMs); return this.metadata.fetch().partitionsForTopic(topic); }
0.10版本的話,partitionsFor這個參數改成max.block.ms
/** <code>max.request.size</code> */ public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size"; private static final String MAX_REQUEST_SIZE_DOC = "The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server " + "has its own cap on record size which may be different from this. This setting will limit the number of record " + "batches the producer will send in a single request to avoid sending huge requests.";
一個請求的最大長度,默認爲1 1024 1024(1MB)
長度計算
public static int recordSize(int keySize, int valueSize) { return CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + KEY_SIZE_LENGTH + keySize + VALUE_SIZE_LENGTH + valueSize; }
/** <code>timeout.ms</code> */ public static final String TIMEOUT_CONFIG = "timeout.ms"; private static final String TIMEOUT_DOC = "The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to " + "meet the acknowledgment requirements the producer has specified with the <code>acks</code> configuration. If the " + "requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout " + "is measured on the server side and does not include the network latency of the request.";
默認是30 * 1000,新版的話是叫作request.timeout.ms參數