上篇文章說到,消息是怎麼被生產出來,簡單來講,就是消息被追加到了RecordAccumulator中,以ByteBuffer的形式存了下來。java
那麼在消息添加到ByteBuffer中後,後續的步驟又是怎麼樣的呢?node
首先上篇文章提到了Kafka追加消息時加鎖的粒度,是一個Deque<RecordBatch> dq,咱們根據topic(tp)來獲取到要發往這個Topic的隊列。這個隊列中的元素 RecordBatch ,即是即將發往Broker的消息的載體,一個RecordBatch能夠包含多條消息,咱們的消息即是追加到了最底層指向的一個ByteBuffer中。json
Deque<RecordBatch> dq = getOrCreateDeque(tp); synchronized (dq) { if (closed) { throw new IllegalStateException("Cannot send after the producer is closed."); } RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); if (appendResult != null) { return appendResult; } }
當咱們的RecordBatch可能滿了後(只要新的消息放不下,就會被認爲是滿了)數據結構
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) { // 獲取deque中最後一個 RecordBatch last = deque.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds()); if (future == null) { last.records.close(); } else { return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false); } } return null; }
滿了之後,在最底層的Buffer的引用會被賦給 RecordBatch 中 MemoryRecords 的 ByteBuffer,而後flip一下,變成讀模式。app
public void close() { if (writable) { // close the compressor to fill-in wrapper message metadata if necessary compressor.close(); // flip the underlying buffer to be ready for reads // flip 基礎buffer來供讀 buffer = compressor.buffer(); buffer.flip(); // reset the writable flag writable = false; } }
那麼何時會發送呢?在咱們的RecordBatch滿了,或者建立了新的RecordBatch時,會喚醒sender => NetworkClient => Selector => nioSelectoride
// batch滿了或者建立了新的batch,就去喚醒sender if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); }
喚醒nioSelector有什麼用呢?若是對java的Selector有所瞭解,那麼就應該知道Selector在select()或select(long time)的時候,是會阻塞的。oop
在建立kafkaProducer時,變回初始化一個Sender線程,這個Sender線程會循環地拉取數據發往broker。其中就有一個步驟,進行了select() 的操做。fetch
當咱們裝填好了一個ByteBuffer,就會去喚醒Selector,不要阻塞了,繼續執行,進行下一個循環,咱們來看看這個循環。這個循環實際上就是咱們的消息運輸和銷燬的流水線。this
/** * Run a single iteration of sending * * 發送消息的核心方法 * * @param now The current POSIX time in milliseconds */ void run(long now) { /** 一、從metadata獲取元數據 */ Cluster cluster = metadata.fetch(); /** 二、從Accumulator選出能夠發送的node節點 */ // get the list of partitions with data ready to send // 獲取待發送的帶數據的分區列表 // 復符合發送消息條件的節點會被返回 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); /** 三、若是ReadyCheckResult 中有 unknownLeader 的node,更新一下元數據 */ // if there are any partitions whose leaders are not known yet, force metadata update // 若是有任何分區還沒選舉出leader,強制metadata進行更新 if (result.unknownLeadersExist) { this.metadata.requestUpdate(); } /** 四、循環調用client(NetworkClient)中的ready方法,從io層面檢查消息是否符合發送條件 */ /* * 移除尚未準備好要發送的節點 */ // remove any nodes we aren't ready to send to Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } // create produce requests // 建立produce請求 Map<Integer/* nodeId */, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); if (guaranteeMessageOrder) { // Mute all the partitions drained for (List<RecordBatch> batchList : batches.values()) { for (RecordBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } } /** 六、處理RecordAccumulator中超時的消息*/ List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches); /** 七、將待發送的消息封裝成ClientRequest */ List<ClientRequest> requests = createProduceRequests(batches, now); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (result.readyNodes.size() > 0) { log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); pollTimeout = 0; } /** 八、將ClientRequest寫入KafkaChannel中的send字段 */ for (ClientRequest request : requests) client.send(request, now); /** 九、真正的把消息發送出去,並處理客戶端的ack,處理超時請求,調用用戶自定義的Callback等。*/ // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; this.client.poll(pollTimeout, now); }
獲取以前先進行前置判斷:.net
簡單地獲取元數據、判斷每一個tp的leader選出來了沒之類的,若是存在未選出leader的tp,告訴元數據,等下要更新一下。
檢驗經過的節點(就是leader所在的那個節點),將會準備翻牌,將RecordBatch(封裝了ByteBuffer)從RecordAccumulator中提取出來。
這裏會循環全部的node(leader節點),根據這個節點所屬的Topic分區(TopicPartition)信息,去獲取Deque,咱們知道Deque是能夠根據 Deque<RecordBatch> deque = getDeque(tp)獲取的。
講道理是要獲取全部可發送的消息的,但kafka限定了一次request的大小,若是一次request的大小過大,則一次io須要的時間就越長(固然從宏觀上來看,一次發的越多,效率可能越高),但總不可能爲了提高整體效率,致使一條消息要發幾秒吧?
the maximum request size to attempt to send to the server
因此當RecordBatch累加的大小超過必定限制後,循環會break,最終返回給sender一個 Map<Integer/* nodeId */, List<RecordBatch>> batches
for (Node node : nodes){ //.... do{ // Only drain the batch if it is not during backoff period. if (!backoff) { if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { // there is a rare case that a single batch size is larger than the request size due // to compression; in this case we will still eventually send this batch in a single // request // 數據量已經滿了,須要結束循環 break; } else { // 數據量沒滿,那麼取出每一個deque的第一個元素,關閉memoryRecord(關閉Compressor,並將MemoryRecords設置爲只讀) RecordBatch batch = deque.pollFirst(); batch.records.close(); size += batch.records.sizeInBytes(); ready.add(batch); batch.drainedMs = now; } } }while xxxx }
從RecordAccumulator中獲取出來的這個Map,將會被封裝成可發送對象 List<ClientRequest>。
咱們能夠看到 List<ClientRequest> 被循環分紅了兩個Map:
Map<TopicPartition, ByteBuffer> produceRecordsByPartition Map<TopicPartition, RecordBatch> recordsByPartition
produceRecordsByPartition 的構成十分簡單,即是key爲tp,val爲ByteBuffer的Map。 這個Map會被封裝成Struct,Struct能夠理解爲相似於json的一種數據結構。在Kafka中,數據的傳輸都是用的Struct這種數據結構。
private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) { // 將batches從新整理成兩個map Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size()); final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size()); for (RecordBatch batch : batches) { TopicPartition tp = batch.topicPartition; produceRecordsByPartition.put(tp, batch.records.buffer()); recordsByPartition.put(tp, batch); } // 組裝 request ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition); // TODO:建立request,這個requestSend就是真正的發送對象 RequestSend send = new RequestSend(Integer.toString(destination), this.client.nextRequestHeader(ApiKeys.PRODUCE), request.toStruct()); // 封裝回調 RequestCompletionHandler callback = new RequestCompletionHandler() { public void onComplete(ClientResponse response) { handleProduceResponse(response, recordsByPartition, time.milliseconds()); } }; return new ClientRequest(now, acks != 0, send, callback); }
recordsByPartition 則用於封裝回調,好比說失敗了後的重試、ByteBuffer的釋放等等。並調用發送消息時的那個回調。
@Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }
數據的發送前面的文章已經說過,這個封裝好的ClientRequest會被丟到KafkaSelector,最終被nioSelector發送出去,抵達Broker。淺析KafkaChannel、NetworkReceive、Send,以及底層優秀的實現:KafkaSelector的實現。
參考書籍: 《Kafka技術內幕》 鄭奇煌著 《Apache Kafka源碼剖析》 徐郡明著