核心源碼以下api
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // 執行攔截器 ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); } private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; // 獲取元數據 ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); Cluster cluster = clusterAndWaitTime.cluster; // 序列化 key、value byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); // 獲取分區。 // 若是爲空,會計算 key 的 hash 值,再和該主題的分區總數取餘獲得分區號; // 若是 key 也爲空,客戶端會生成遞增的隨機整數,再和該主題的分區總數區域獲得分區號。 int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); // 校驗序列化後的記錄是否超過限制 int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers); ensureValidRecordSize(serializedSize); // 時間戳,默認是 KafkaProducer 初始化時間 long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); // 初始化回調和響應的攔截器對象 Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); // 把消息添加到記錄累加器中 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs); if (result.batchIsFull || result.newBatchCreated) { // 當 batch 滿了,或者建立了新的 batch 後,喚醒 Sender 線程 this.sender.wakeup(); } return result.future; }
核心源碼以下緩存
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) { // 獲取緩存的集羣信息 Cluster cluster = metadata.fetch(); Integer partitionsCount = cluster.partitionCountForTopic(topic); // Return cached metadata if we have it, and if the record's partition is either undefined // or within the known partition range // 若是緩存中的數據知足條件,直接返回緩存中的元數據。 if (partitionsCount != null && (partition == null || partition < partitionsCount)) return new ClusterAndWaitTime(cluster, 0); long begin = time.milliseconds(); long remainingWaitMs = maxWaitMs; long elapsed; do { // 更新元數據的標記 needUpdate=true,並獲取當前的 version。 int version = metadata.requestUpdate(); sender.wakeup(); // 喚醒 Sender 線程 try { metadata.awaitUpdate(version, remainingWaitMs); // 等待更新 } catch (TimeoutException ex) { } cluster = metadata.fetch(); // 從新獲取元數據 elapsed = time.milliseconds() - begin; if (elapsed >= maxWaitMs) // 超出最大等待時間,拋出異常 throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); remainingWaitMs = maxWaitMs - elapsed; partitionsCount = cluster.partitionCountForTopic(topic); } while (partitionsCount == null); // 分區數量是 0,繼續上述循環 if (partition != null && partition >= partitionsCount) { // 當指定的分區號大於等於分數總數時,異常 throw new KafkaException(String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount)); } return new ClusterAndWaitTime(cluster, elapsed); } // 等待更新 public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException { long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; // 版本號<=當前版本號,說明未更新,須要繼續循環等待更新 while ((this.version <= lastVersion) && !isClosed()) { if (remainingWaitMs != 0) wait(remainingWaitMs); // 等待一會再判斷 long elapsed = System.currentTimeMillis() - begin; if (elapsed >= maxWaitMs) // 超過了最大等待時間 throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); remainingWaitMs = maxWaitMs - elapsed; } }
Kafka 使用緩衝池技術給消息分配堆字節緩存 HeapByteBuffer,緩衝池的空閒隊列 free 存放了空閒的緩存隊列,優先直接從中取出第一個進行分配緩存,若是緩衝池不夠了,利用 ReentrantLock + Condition 構造等待隊列,等待緩衝池足夠分配。
Kafka 在處理消息響應時,釋放分配的內存,並把加入空閒隊列 free。安全
// 緩衝池 public class BufferPool { // 可用總內存 buffer.memory private final long totalMemory; // 一批消息的大小 batch.size private final int poolableSize; private final ReentrantLock lock; // 空閒緩存隊列 private final Deque<ByteBuffer> free; // 等待隊列 private final Deque<Condition> waiters; // 可用未分配的內存總量是nonPooledAvailableMemory和free * poolableSize中字節緩衝區的總和。 private long nonPooledAvailableMemory; } // 字節緩衝分配 public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { if (size > this.totalMemory) throw new IllegalArgumentException("消息大小超過總內存"); ByteBuffer buffer = null; this.lock.lock(); try { // 直接在空閒隊列分配 if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); // 計算空閒隊列總大小 int freeListSize = this.free.size() * this.poolableSize; if (this.nonPooledAvailableMemory + freeListSize >= size) { // 可用的總內存(未分配的+空閒隊列)>消息大小 // we have enough unallocated or pooled memory to immediately // satisfy the request, but need to allocate the buffer freeUp(size); this.nonPooledAvailableMemory -= size; // 未分配內存總數-消息大小 } else { // 內存不夠分配 int accumulated = 0; Condition moreMemory = this.lock.newCondition(); try { long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); // 加入等待隊列 // loop over and over until we have a buffer or have reserved // enough memory to allocate one while (accumulated < size) { // 輪詢,直到足夠分配內存 long startWaitNs = time.nanoseconds(); long timeNs; boolean waitingTimeElapsed; try { // 等待一段時間 waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); } remainingTimeToBlockNs -= timeNs; // 直接在空閒隊列分配 if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { buffer = this.free.pollFirst(); accumulated = size; } else { // 內存不夠,accumulated累加計數 freeUp(size - accumulated); int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory); this.nonPooledAvailableMemory -= got; accumulated += got; } } accumulated = 0; // 清空 } } } if (buffer == null) // 沒有在空閒隊列分配到內存,須要在堆上分配內存 return new HeapByteBuffer(size, size); else return buffer; } private void freeUp(int size) { while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size) this.nonPooledAvailableMemory += this.free.pollLast().capacity(); // 釋放空閒隊列的內存 } // 處理生產者響應消息時,釋放分配的內存 public void deallocate(ByteBuffer buffer, int size) { lock.lock(); try { if (size == this.poolableSize && size == buffer.capacity()) { buffer.clear(); this.free.add(buffer); // 加到空閒隊列 } else { this.nonPooledAvailableMemory += size; // 增長未分配內存數量 } Condition moreMem = this.waiters.peekFirst(); if (moreMem != null) moreMem.signal(); } finally { lock.unlock(); } }
累加器使用 CopyOnWriteMap 來緩存消息,key 是主題分區信息,value 是個雙端隊列,隊列中的對象是壓縮後的批量消息。app
// 累加器緩存 ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches = new CopyOnWriteMap<>();
CopyOnWriteMap 是線程安全的,是由 Kafka 實現的寫時複製 Map,內部定義了 volatile 的 Map,讀時不用加鎖,直接讀取,寫時須要加鎖,而後拷貝一個 Map 副本進行實際的寫入,寫入完成後再把原來的 Map 指向修改後的 Map。
雙端隊列 Deque 實際上就是 ArrayDeque,非線程安全的,須要手動同步。使用雙端隊列能夠在消息發送失敗時,把消息直接放回隊列頭部進行重試。oop
// 累加消息到緩存 public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock) throws InterruptedException { ByteBuffer buffer = null; try { Deque<ProducerBatch> dq = getOrCreateDeque(tp); // 檢查 batches 是否有該分區的映射,若是沒有,則建立一個 synchronized (dq) { // 加鎖後分配 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) return appendResult; } byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); // 計算消息大小 int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); buffer = free.allocate(size, maxTimeToBlock); // 利用 BufferPool 分配字節緩存 synchronized (dq) { // 加鎖後分配 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); // 構造出壓縮後的批量消息對象 ProducerBatch MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds())); dq.addLast(batch); // 加入雙端隊列 return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true); } } }