聊聊最近,2020年,在2019年的年尾時,你們可謂對這年充滿新但願,特別是有20200202這一天。但是澳洲長達幾個月的大火,新型冠狀病毒nCoV的發現,科比的去世等等事情,讓你們感到至關的無奈,生命是如此的脆弱,明天又是如此的未知。可是人應當活在當下,勇敢的面對疫情,和你們和政府一塊兒打贏這場沒硝煙的戰爭!
做爲程序員,我一定不能中止工做,不能中止學習,如今在家辦公,徹底配合如今提倡的隔離戰術,對本身負責,對社會負責。下面我會和你們分享一篇我以前寫的筆記,和你們一塊兒討論關於 Kafka 的一個問題:爲何 Kafka 發送消息失敗?java
雖然在使用資源後關閉資源是很是正常的操做,可是確實咱們也是常常會缺乏調用 close()
關閉資源的的代碼,特別是在本身寫 demo 的時候。並且,之前寫關於文件 IO 的例子時,不寫 close()
方法確實也不會報錯或者出現問題,可是那爲啥到了 Kafka 這裏,不寫就會出現問題呢?node
Properties properties = new Properties(); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello, Kafka!"); try { producer.send(record); } catch (Exception e) { e.printStackTrace(); } //不寫致使消息發送失敗 //producer.close();
註釋:此方法會一直阻塞直到以前全部的發送請求都完成。程序員
/** * Close this producer. This method blocks until all previously sent requests complete. * This method is equivalent to <code>close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)</code>. * <p> * <strong>If close() is called from {@link Callback}, a warning message will be logged and close(0, TimeUnit.MILLISECONDS) * will be called instead. We do this because the sender thread would otherwise try to join itself and * block forever.</strong> * <p> * * @throws InterruptException If the thread is interrupted while blocked */ @Override public void close() { close(Long.MAX_VALUE, TimeUnit.MILLISECONDS); }
生產者由一個緩衝區空間池組成,其中保存還沒有傳輸到服務器的記錄,以及一個後臺I/O線程,該線程負責將這些記錄轉換爲請求並將它們傳輸到集羣。使用後不關閉生產商將泄露這些資源。api
/* * <p> * The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server * as well as a background I/O thread that is responsible for turning these records into requests and transmitting them * to the cluster. Failure to close the producer after use will leak these resources. * <p> */
到這裏咱們能夠稍微總結一下,KafkaProduce r發送消息並非馬上往 Kafka 中發送,而是先存在一個緩衝區裏,而後有一條後臺線程去不斷地讀取消息,而後再往Kafka中發送。咱們也能夠總結一下,之因此不寫 close() 方法,咱們的 main() 方法中,發送完 main() 方法就執行完了,而此時消息可能只是剛到緩衝區中,還沒被後臺線程去讀取而後發送。服務器
下面咱們要閱讀 KafkaProducer 的源碼,來驗證上面的總結。app
最要關注的是:建立了存放消息的隊列,而且建立了一條後臺線程,主要是從隊列中獲取消息,往 kafka 中發送。異步
KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer, Metadata metadata, KafkaClient kafkaClient) { try { // ..... 省略掉不少其餘代碼,主要是關於Producer的配置,例如clientId、序列化和攔截器等等。 // 這裏就是存放消息的隊列。 this.accumulator = new RecordAccumulator(logContext, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.totalMemorySize, this.compressionType, config.getLong(ProducerConfig.LINGER_MS_CONFIG), retryBackoffMs, metrics, time, apiVersions, transactionManager); // .... 繼續省略無關配置 // 這個Sender實現了Runnable,是一條後臺線程,處理向Kafka集羣發送生產請求的後臺線程 this.sender = new Sender(logContext, client, this.metadata, this.accumulator, maxInflightRequests == 1, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), acks, retries, metricsRegistry.senderMetrics, Time.SYSTEM, this.requestTimeoutMs, config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), this.transactionManager, apiVersions); String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId; this.ioThread = new KafkaThread(ioThreadName, this.sender, true); // 啓動線程 this.ioThread.start(); this.errors = this.metrics.sensor("errors"); config.logUnused(); AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics); log.debug("Kafka producer started"); } catch (Throwable t) { // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121 close(0, TimeUnit.MILLISECONDS, true); // now propagate the exception throw new KafkaException("Failed to construct kafka producer", t); } }
發送消息並非直接就往 kafka 發送,而是存放到咱們上面說起到的隊列 accumulator。async
/** * Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>. * 異步發送消息記錄到指定主題 * See {@link #send(ProducerRecord, Callback)} for details. */ @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record) { return send(record, null); } @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); } /** * Implementation of asynchronously send a record to a topic. * 實現異步發送消息到對應的主題 */ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { // ... 省略了一些代碼,主要是關於序列化、分區、事務、CallBack等等的配置 // 下面是往建立KafkaProducer時建立的accumulator裏添加消息記錄。 // RecordAccumulator,private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;是存放消息的變量。 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; // 下面是異常處理 } catch (ApiException e) { log.debug("Exception occurred during message send:", e); if (callback != null) callback.onCompletion(null, e); this.errors.record(); this.interceptors.onSendError(record, tp, e); return new FutureFailure(e); } // ....省略一堆異常處理 }
接下來咱們得看一下後臺線程 Sender 是怎麼從隊列 accumulator 裏面獲取消息記錄,而後發往 Kafka 的。ide
/** * The main run loop for the sender thread * 循環執行 */ public void run() { log.debug("Starting Kafka producer I/O thread."); // main loop, runs until close is called while (running) { try { run(time.milliseconds()); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. // 咱們已中止接受請求(調用了close()方法),但accumulator中可能仍有請求或等待確認,那麼就wait直到這些請求完成 while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) { try { run(time.milliseconds()); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } // 當等待時長用完,要強制關閉時,咱們要讓全部未完成的批處理失敗 if (forceClose) { // We need to fail all the incomplete batches and wake up the threads waiting on // the futures. log.debug("Aborting incomplete batches due to forced shutdown"); this.accumulator.abortIncompleteBatches(); } try { // 關閉客戶端 this.client.close(); } catch (Exception e) { log.error("Failed to close network client", e); } log.debug("Shutdown of Kafka producer I/O thread has completed."); }
發送數據前的準備oop
private long sendProducerData(long now) { Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send // 獲取準備發送數據的分區列表 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); // if there are any partitions whose leaders are not known yet, force metadata update // 處理沒有可用的leader副本的問題,強制更新元數據 if (!result.unknownLeaderTopics.isEmpty()) { // The set of topics with unknown leader contains topics with leader election pending as well as // topics which may have expired. Add the topic again to metadata to ensure it is included // and request metadata update, since there are messages to send to the topic. for (String topic : result.unknownLeaderTopics) this.metadata.add(topic); log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics); this.metadata.requestUpdate(); } // remove any nodes we aren't ready to send to // 移除還沒準備好發送的節點 Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now)); } } // 建立請求 Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); if (guaranteeMessageOrder) { // Mute all the partitions drained for (List<ProducerBatch> batchList : batches.values()) { for (ProducerBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } } // ..... 省略其餘處理 // 真正發送請求的方法 sendProduceRequests(batches, now); return pollTimeout; }
最後是使用 NetworkClient 發送數據到 Kafka 的。
/** * Create a produce request from the given record batches */ private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) { if (batches.isEmpty()) return; Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size()); final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size()); // find the minimum magic version used when creating the record sets byte minUsedMagic = apiVersions.maxUsableProduceMagic(); for (ProducerBatch batch : batches) { if (batch.magic() < minUsedMagic) minUsedMagic = batch.magic(); } for (ProducerBatch batch : batches) { TopicPartition tp = batch.topicPartition; MemoryRecords records = batch.records(); // down convert if necessary to the minimum magic used. In general, there can be a delay between the time // that the producer starts building the batch and the time that we send the request, and we may have // chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use // the new message format, but found that the broker didn't support it, so we need to down-convert on the // client before sending. This is intended to handle edge cases around cluster upgrades where brokers may // not all support the same message format version. For example, if a partition migrates from a broker // which is supporting the new magic version to one which doesn't, then we will need to convert. if (!records.hasMatchingMagic(minUsedMagic)) records = batch.records().downConvert(minUsedMagic, 0, time).records(); produceRecordsByPartition.put(tp, records); recordsByPartition.put(tp, batch); } String transactionalId = null; if (transactionManager != null && transactionManager.isTransactional()) { transactionalId = transactionManager.transactionalId(); } ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout, produceRecordsByPartition, transactionalId); RequestCompletionHandler callback = new RequestCompletionHandler() { public void onComplete(ClientResponse response) { handleProduceResponse(response, recordsByPartition, time.milliseconds()); } }; String nodeId = Integer.toString(destination); ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, requestTimeoutMs, callback); // 最後是利用NetworkClient發送的。 client.send(clientRequest, now); log.trace("Sent produce request to {}: {}", nodeId, requestBuilder); }
若是不寫 Producer.close()
,確實可能會致使消息的發送失敗,而註釋中也提醒了咱們必定要 close
掉生產者,避免資源泄漏。而這其中最主要的緣由是 KafkaProducer
實現異步發送的邏輯。它是先將消息存放到RecordAccumulator
隊列中,而後讓 KafkaThread
線程後臺不斷地從 RecordAccumulator
中讀取已準備好發送的消息,最後發送到 Kafka
中。而咱們的代碼中,若是不寫 Producer.close()
,就不會進行超時 wait
,而當 main()
方法執行完後,KafkaThread
線程還沒來得及從 RecordAccumulator
隊列中獲取消息也跟着被銷燬了,因此致使消息最後仍是沒發送成功。