最近開發網關服務的過程中,須要用到kafka轉發消息與保存日誌,在進行壓測的過程當中因爲是多線程併發操做kafka producer 進行異步send,發現send耗時有時會達到幾十毫秒的阻塞,很大程度上上影響了併發的性能,而在後續的測試中發現單線程發送反而比多線程發送效率高出幾倍。因此就對kafka API send 的源碼進行了一下跟蹤和分析,在此總結記錄一下。spring
首先看springboot下 kafka producer 的使用api
在config中進行配置,向IOC容器中注入DefaultKafkaProducerFactory生產者工廠的實例安全
@Bean public ProducerFactory<Object, Object> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); }
建立producerspringboot
this.producer = producerFactory.createProducer();
你們都知道springboot下IOC容器管理的實例默認都是單例模式;而DefaultKafkaProducerFactory自己也是一個單例工廠多線程
@Override public Producer<K, V> createProducer() { if (this.transactionIdPrefix != null) { return createTransactionalProducer(); } if (this.producer == null) { synchronized (this) { if (this.producer == null) { this.producer = new CloseSafeProducer<K, V>(createKafkaProducer()); } } } return this.producer; }
咱們建立的producer也是個單例。併發
接下來就是具體的發送,用過kafka的小夥伴都知道producer.send是個異步操做,會返回一個Future<RecordMetadata> 類型的結果。那麼爲何單線程和多線程send效率會較大的差距呢,咱們進入KafkaProducer內部看下producer.send的具體源碼實現來找下答案app
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { //保證主題的元數據可用 ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; byte[] serializedKey; try { //序列化key serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer", cce); } byte[] serializedValue; try { //序列化Value serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer", cce); } //計算出具體的partition int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); setReadOnly(record.headers()); Header[] headers = record.headers().toArray(); int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers); ensureValidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); // producer callback will make sure to call both 'callback' and interceptor callback Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); if (transactionManager != null && transactionManager.isTransactional()) transactionManager.maybeAddPartitionToTransaction(tp); //向隊列容器中添加數據 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; // handling exceptions and record the errors; // for API exceptions return them in the future, // for other exceptions throw directly } catch (ApiException e) { log.debug("Exception occurred during message send:", e); if (callback != null) callback.onCompletion(null, e); this.errors.record(); this.interceptors.onSendError(record, tp, e); return new FutureFailure(e); } catch (InterruptedException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw new InterruptException(e); } catch (BufferExhaustedException e) { this.errors.record(); this.metrics.sensor("buffer-exhausted-records").record(); this.interceptors.onSendError(record, tp, e); throw e; } catch (KafkaException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw e; } catch (Exception e) { // we notify interceptor about all exceptions, since onSend is called before anything else in this method this.interceptors.onSendError(record, tp, e); throw e; } }
這裏除了前面作的一些序列化操做和判斷,最關鍵的就是向隊列容器中執行添加數據操做負載均衡
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
accumulator是RecordAccumulator這個類的一個實例,RecordAccumulator類是一個隊列容器類;它的內部維護了一個ConcurrentMap,每個TopicPartition都對應一個專屬的消息隊列。異步
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
咱們進入accumulator.append內部看下具體的實現jvm
public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock) throws InterruptedException { // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); ByteBuffer buffer = null; if (headers == null) headers = Record.EMPTY_HEADERS; try { //根據TopicPartition拿到對應的批處理隊列 Deque<ProducerBatch> dq = getOrCreateDeque(tp); //同步隊列,保證線程安全 synchronized (dq) { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); //把序列化後的數據放入隊列,並返回結果 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) return appendResult; } // we don't have an in-progress record batch try to allocate a new batch byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... return appendResult; } MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds())); dq.addLast(batch); incomplete.add(batch); // Don't deallocate this buffer in the finally block as it's being used in the record batch buffer = null; return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true); } } finally { if (buffer != null) free.deallocate(buffer); appendsInProgress.decrementAndGet(); } }
在getOrCreateDeque中咱們根據TopicPartition從ConcurrentMap獲取對應隊列,沒有的話就初始化一個。
private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) { Deque<ProducerBatch> d = this.batches.get(tp); if (d != null) return d; d = new ArrayDeque<>(); Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d); if (previous == null) return d; else return previous; }
更關鍵的是爲了保證併發時的線程安全,執行 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq)時,Deque<ProducerBatch>必然須要同步處理。
synchronized (dq) { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) return appendResult; }
在這裏咱們能夠看出,多線程高併發狀況下,針對dq的操做會存在比較大的資源競爭,雖然是基於內存的操做,每一個線程持有鎖的時間極短,但相比單線程狀況,高併發狀況下線程開闢較多,鎖競爭和cpu上下文切換都比較頻繁,會形成必定的性能損耗,產生阻塞耗時。
分析到這裏你就會發現,其實KafkaProducer這個異步發送是創建在生產者和消費者模式上的,send的真正操做並非直接異步發送,而是把數據放在一箇中間隊列中。那麼既然有生產者在往內存隊列中放入數據,那麼必然會有一個專有的線程負責把這些數據真正發送出去。咱們經過監控jvm線程信息能夠看到,KafkaProducer建立後確實會啓動一個守護線程用於消息的發送。
OK,咱們再回到 KafkaProducer中,會看到裏面有這樣兩個對象,Sender就是kafka發送數據的後臺線程
private final Sender sender; private final Thread ioThread;
在KafkaProducer的構造函數中會啓動Sender線程
this.sender = new Sender(logContext, client, this.metadata, this.accumulator, maxInflightRequests == 1, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), acks, retries, metricsRegistry.senderMetrics, Time.SYSTEM, this.requestTimeoutMs, config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), this.transactionManager, apiVersions); String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId; this.ioThread = new KafkaThread(ioThreadName, this.sender, true); this.ioThread.start();
進入Sender內部能夠看到這個線程的做用就是一直輪詢發送數據。
public void run() { log.debug("Starting Kafka producer I/O thread."); // main loop, runs until close is called while (running) { try { run(time.milliseconds()); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) { try { run(time.milliseconds()); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } if (forceClose) { // We need to fail all the incomplete batches and wake up the threads waiting on // the futures. log.debug("Aborting incomplete batches due to forced shutdown"); this.accumulator.abortIncompleteBatches(); } try { this.client.close(); } catch (Exception e) { log.error("Failed to close network client", e); } log.debug("Shutdown of Kafka producer I/O thread has completed."); } /** * Run a single iteration of sending * * @param now The current POSIX time in milliseconds */ void run(long now) { if (transactionManager != null) { try { if (transactionManager.shouldResetProducerStateAfterResolvingSequences()) // Check if the previous run expired batches which requires a reset of the producer state. transactionManager.resetProducerId(); if (!transactionManager.isTransactional()) { // this is an idempotent producer, so make sure we have a producer id maybeWaitForProducerId(); } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) { transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for " + "some previously sent messages and can no longer retry them. It isn't safe to continue.")); } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) { // as long as there are outstanding transactional requests, we simply wait for them to return client.poll(retryBackoffMs, now); return; } // do not continue sending if the transaction manager is in a failed state or if there // is no producer id (for the idempotent case). if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) { RuntimeException lastError = transactionManager.lastError(); if (lastError != null) maybeAbortBatches(lastError); client.poll(retryBackoffMs, now); return; } else if (transactionManager.hasAbortableError()) { accumulator.abortUndrainedBatches(transactionManager.lastError()); } } catch (AuthenticationException e) { // This is already logged as error, but propagated here to perform any clean ups. log.trace("Authentication exception while processing transactional request: {}", e); transactionManager.authenticationFailed(e); } } long pollTimeout = sendProducerData(now); client.poll(pollTimeout, now); }
經過上面的分析咱們能夠看出producer.send操做自己實際上是個基於內存的存儲操做,耗時幾乎能夠忽略不計,但因爲高併發狀況下,線程同步會有必定的性能損耗,固然這個損耗在通常的應用場景下幾乎是能夠忽略不計的,但若是是數據量比較大,高併發的場景下會比較明顯。
針對上面的問題分析,這裏說下我我的的一些總結:
一、首先避免多線程操做producer發送數據,你能夠採用生產者消費者模式把producer.send從你的多線程操做中解耦出來,維護一個你要發送的消息隊列,單獨開闢一個線程操做;
二、可能有的小夥伴會問,那麼多建立幾個producer的實例或者維護一個producer池能夠嗎,我本來也是這個想法,只是在測試中發現效果也不是很理想,我估計是因爲建立producer實例過多,致使線程數量也跟着增長,自己的業務線程再加上kafka的線程,線程上下文切換比較頻繁,CPU資源壓力比較大,效率也不如單線程操做;
三、這個問題其實真是針對API操做來說的,send操做並非真正的數據發送,真正的數據發送由守護線程進行;按照kafka自己的設計思想,若是操做自己就成爲了你性能的瓶頸,你應該考慮的是集羣部署,負載均衡;
四、無鎖纔是真正的高性能;