leader節點java
[2019-09-25 19:40:22,266] INFO Handling request:RequestHeader(apiKey=FETCH, apiVersion=5, clientId=broker-0-fetcher-0, correlationId=8855) -- {replica_id=0,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,topics=[{topic=test.vv19,partitions=[{partition=0,fetch_offset=35,log_start_offset=0,max_bytes=1048576}]}]} from connection 172.16.113.38:9094-172.16.113.38:49385-0;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$) [2019-09-25 19:40:22,266] INFO testEnter0006-replica:0 newLogEndOffset:35 oldLogEndOffsetMetadata 35; (com.code260.ss.KafkaTestUtils$) [2019-09-25 19:40:22,267] INFO testEnter0007-maybeIncrementLeaderHW:Set(35 [0 : 2765]) newHighWatermark:35 oldHighWatermark 35; (com.code260.ss.KafkaTestUtils$) [2019-09-25 19:40:22,610] INFO Handling request:RequestHeader(apiKey=METADATA, apiVersion=5, clientId=producer-1, correlationId=34) -- {topics=[test.vv19],allow_auto_topic_creation=true} from connection 172.16.113.38:9094-172.16.113.38:60308-1;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$) ******************************** [2019-09-25 19:40:22,769] INFO Handling request:RequestHeader(apiKey=FETCH, apiVersion=5, clientId=broker-0-fetcher-0, correlationId=8856) -- {replica_id=0,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,topics=[{topic=test.vv19,partitions=[{partition=0,fetch_offset=35,log_start_offset=0,max_bytes=1048576}]}]} from connection ******************************** 172.16.113.38:9094-172.16.113.38:49385-0;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$) [2019-09-25 19:40:22,769] INFO testEnter0006-replica:0 newLogEndOffset:35 oldLogEndOffsetMetadata 35; (com.code260.ss.KafkaTestUtils$) [2019-09-25 19:40:22,769] INFO testEnter0007-maybeIncrementLeaderHW:Set(35 [0 : 2765]) newHighWatermark:35 oldHighWatermark 35; (com.code260.ss.KafkaTestUtils$) ******************************** [2019-09-25 19:40:22,773] INFO Handling request:RequestHeader(apiKey=PRODUCE, apiVersion=5, clientId=producer-1, correlationId=35) -- {acks=-1,timeout=30000,partitionSizes=[test.vv19-0=79]} from connection 172.16.113.38:9094-172.16.113.38:60308-1;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$) [2019-09-25 19:40:22,834] INFO Updated PartitionLeaderEpoch. New: {epoch:27, offset:35}, Current: {epoch:25, offset34} for Partition: test.vv19-0. Cache now contains 11 entries. (kafka.server.epoch.LeaderEpochFileCache) ******************************** ******************************** [2019-09-25 19:40:23,313] INFO testEnter0007-maybeIncrementLeaderHW:Set(35 [0 : 2765], 36 [0 : 2844]) newHighWatermark:35 oldHighWatermark 35; (com.code260.ss.KafkaTestUtils$) ******************************** [2019-09-25 19:40:23,400] INFO Handling request:RequestHeader(apiKey=FETCH, apiVersion=5, clientId=broker-0-fetcher-0, correlationId=8857) -- {replica_id=0,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,topics=[{topic=test.vv19,partitions=[{partition=0,fetch_offset=36,log_start_offset=0,max_bytes=1048576}]}]} from connection 172.16.113.38:9094-172.16.113.38:49385-0;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$) [2019-09-25 19:40:23,400] INFO testEnter0006-replica:0 newLogEndOffset:36 oldLogEndOffsetMetadata 35; (com.code260.ss.KafkaTestUtils$) ******************************** [2019-09-25 19:40:23,401] INFO testEnter0007-maybeIncrementLeaderHW:Set(36 [0 : 2844]) newHighWatermark:36 oldHighWatermark 35; (com.code260.ss.KafkaTestUtils$) ******************************** [2019-09-25 19:40:23,922] INFO Handling request:RequestHeader(apiKey=FETCH, apiVersion=5, clientId=broker-0-fetcher-0, correlationId=8858) -- {replica_id=0,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,topics=[{topic=test.vv19,partitions=[{partition=0,fetch_offset=36,log_start_offset=0,max_bytes=1048576}]}]} from connection 172.16.113.38:9094-172.16.113.38:49385-0;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$) [2019-09-25 19:40:23,923] INFO testEnter0006-replica:0 newLogEndOffset:36 oldLogEndOffsetMetadata 36; (com.code260.ss.KafkaTestUtils$) [2019-09-25 19:40:23,923] INFO testEnter0007-maybeIncrementLeaderHW:Set(36 [0 : 2844]) newHighWatermark:36 oldHighWatermark 36; (com.code260.ss.KafkaTestUtils$)
更新leader上維護的follower的LEO testEnter0006 時的調用棧
也是在處理follower發佈過來的fetch請求時更新apache
Replica.logEndOffset_$eq(LogOffsetMetadata) line: 98 Replica.updateLogReadResult(LogReadResult) line: 83 Partition.updateReplicaLogReadResult(Replica, LogReadResult) line: 276 ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(Tuple2<TopicPartition,LogReadResult>) line: 1314 ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(Object) line: 1308 TraversableLike$$anonfun$map$1.apply(A) line: 234 TraversableLike$$anonfun$map$1.apply(Object) line: 234 ResizableArray$class.foreach(ResizableArray, Function1) line: 59 ArrayBuffer<A>.foreach(Function1<A,U>) line: 48 TraversableLike$class.map(TraversableLike, Function1, CanBuildFrom) line: 234 ArrayBuffer<A>(AbstractTraversable<A>).map(Function1<A,B>, CanBuildFrom<Traversable<A>,B,That>) line: 104 ReplicaManager.updateFollowerLogReadResults(int, Seq<Tuple2<TopicPartition,LogReadResult>>) line: 1308 ReplicaManager.readFromLog$1(int, int, boolean, Seq, ReplicaQuota, IsolationLevel, boolean, boolean, boolean) line: 799 ReplicaManager.fetchMessages(long, int, int, int, boolean, Seq<Tuple2<TopicPartition,PartitionData>>, ReplicaQuota, Function1<Seq<Tuple2<TopicPartition,FetchPartitionData>>,BoxedUnit>, IsolationLevel) line: 803 KafkaApis.handleFetchRequest(RequestChannel$Request) line: 597
更新leader的HW testEnter0007-maybeIncrementLeaderHW 調用棧
在處理follower發佈過來的fetch請求時更新api
Partition.kafka$cluster$Partition$$maybeIncrementLeaderHW(Replica, long) line: 396 Partition$$anonfun$maybeExpandIsr$1.apply$mcZ$sp() line: 325 Partition$$anonfun$maybeExpandIsr$1.apply() line: 309 Partition$$anonfun$maybeExpandIsr$1.apply() line: 309 CoreUtils$.inLock(Lock, Function0<T>) line: 217 CoreUtils$.inWriteLock(ReadWriteLock, Function0<T>) line: 225 Partition.maybeExpandIsr(int, LogReadResult) line: 307 Partition.updateReplicaLogReadResult(Replica, LogReadResult) line: 283 ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(Tuple2<TopicPartition,LogReadResult>) line: 1314 ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(Object) line: 1308 TraversableLike$$anonfun$map$1.apply(A) line: 234 TraversableLike$$anonfun$map$1.apply(Object) line: 234 ResizableArray$class.foreach(ResizableArray, Function1) line: 59 ArrayBuffer<A>.foreach(Function1<A,U>) line: 48 TraversableLike$class.map(TraversableLike, Function1, CanBuildFrom) line: 234 ArrayBuffer<A>(AbstractTraversable<A>).map(Function1<A,B>, CanBuildFrom<Traversable<A>,B,That>) line: 104 ReplicaManager.updateFollowerLogReadResults(int, Seq<Tuple2<TopicPartition,LogReadResult>>) line: 1308 ReplicaManager.readFromLog$1(int, int, boolean, Seq, ReplicaQuota, IsolationLevel, boolean, boolean, boolean) line: 799 ReplicaManager.fetchMessages(long, int, int, int, boolean, Seq<Tuple2<TopicPartition,PartitionData>>, ReplicaQuota, Function1<Seq<Tuple2<TopicPartition,FetchPartitionData>>,BoxedUnit>, IsolationLevel) line: 803 KafkaApis.handleFetchRequest(RequestChannel$Request) line: 597
Follower節點app
[2019-09-25 19:40:22,767] INFO testEnter0005-Received response:apikey:FETCH correlationId 8855; (com.code260.ss.KafkaTestUtils$) [2019-09-25 19:40:22,767] INFO testEnter0002-001topicPartition:test.vv19:0hwm.messageOffset: [35]lso.messageOffset: [35] (com.code260.ss.KafkaTestUtils$) [2019-09-25 19:40:22,768] INFO testEnter0002-002topicPartition:test.vv19:0hwm.messageOffset: [35]lso.messageOffset: [35] (com.code260.ss.KafkaTestUtils$) ******************************** [2019-09-25 19:40:23,316] INFO testEnter0005-Received response:apikey:FETCH correlationId 8856; (com.code260.ss.KafkaTestUtils$) [2019-09-25 19:40:23,369] INFO Updated PartitionLeaderEpoch. New: {epoch:27, offset:35}, Current: {epoch:25, offset34} for Partition: test.vv19-0. Cache now contains 11 entries. (kafka.server.epoch.LeaderEpochFileCache) [2019-09-25 19:40:23,396] INFO testEnter0002-001topicPartition:test.vv19:0hwm.messageOffset: [35]lso.messageOffset: [35] (com.code260.ss.KafkaTestUtils$) [2019-09-25 19:40:23,397] INFO testEnter0002-002topicPartition:test.vv19:0hwm.messageOffset: [35]lso.messageOffset: [35] (com.code260.ss.KafkaTestUtils$) ******************************** ******************************** [2019-09-25 19:40:23,920] INFO testEnter0005-Received response:apikey:FETCH correlationId 8857; (com.code260.ss.KafkaTestUtils$) [2019-09-25 19:40:23,921] INFO testEnter0002-001topicPartition:test.vv19:0hwm.messageOffset: [35]lso.messageOffset: [35] (com.code260.ss.KafkaTestUtils$) [2019-09-25 19:40:23,922] INFO testEnter0002-002topicPartition:test.vv19:0hwm.messageOffset: [36]lso.messageOffset: [36] (com.code260.ss.KafkaTestUtils$) ******************************** [2019-09-25 19:40:24,426] INFO testEnter0005-Received response:apikey:FETCH correlationId 8858; (com.code260.ss.KafkaTestUtils$) [2019-09-25 19:40:24,427] INFO testEnter0002-001topicPartition:test.vv19:0hwm.messageOffset: [36]lso.messageOffset: [36] (com.code260.ss.KafkaTestUtils$) [2019-09-25 19:40:24,428] INFO testEnter0002-002topicPartition:test.vv19:0hwm.messageOffset: [36]lso.messageOffset: [36] (com.code260.ss.KafkaTestUtils$)
handleProduceRequest
ProduceRequest
ReplicaManager.appendRecordsfetch
主要寫日誌邏輯:
kafka.log.Log.append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int) 623行 要細讀ui
消息寫入時:Log LogSegment FileRecords MeomoryRecords File LogOffsetMetadata 之間的聯繫this
一條新的消息的offset是怎麼產生的?每次append消息後會更新下一次的offset:日誌
// increment the log end offset updateLogEndOffset(appendInfo.lastOffset + 1)
KafkaApis handleProduceRequest
ReplicaManager appendRecords
ReplicaManager appendToLocalLog
processingStatsCallback // KafkaApis def processingStatsCallback(processingStats: Map[TopicPartition, RecordsProcessingStats]): Unit
responseCallback // KafkaApis def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse])
ProduceRequest clearPartitionRecordscode
ReplicaManager appendToLocalLog
生產消息的度量數據收集(爲寫入速度作準備),全局的和topic粒度的。
check是不是向內部topic發送消息的
消息寫入leader partition.appendRecordsToLeader
val info = log.appendAsLeader
replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId))
(info, maybeIncrementLeaderHW(leaderReplica))
更新firstOffset lastOffset numAppendedMessages
生產消息的度量數據收集(條數和消息大小),全局的和topic粒度的。server
Log.append
分配消息的val offset = new LongRef(nextOffsetMetadata.messageOffset)
更新 firstOffset appendInfo.firstOffset = offset.value
更新 lastOffset appendInfo.lastOffset = offset.value - 1
若是有消息校驗不一樣過 收集拒掉的消息的度量數據
更新leader epoch對應的offset(只有當epoch發生改變時才更新,並且更新是直接flush到磁盤且用FD的sync強制落盤,fileOutputStream.getFD().sync()) leaderEpochCache.assign(batch.partitionLeaderEpoch, batch.baseOffset) 並 LeaderEpochCheckpoint 進行flush
segment.append
producerAppendInfo.maybeCacheTxnFirstOffsetMetadata
producerStateManager.update(producerAppendInfo)
事務消息的一些idx的處理
producerStateManager.updateMapEndOffset
updateLogEndOffset
updateFirstUnstableOffset
按需flush()
返回appendInfo
processingStatsCallback
更新produceMessageConversionsRate度量數據conversionCount topic粒度和全局的,conversionCount是指高版本格式消息向低版本格式消息的轉換,轉換邏輯在org.apache.kafka.common.record.AbstractRecords.downConvert(Iterable<? extends RecordBatch>, byte, long, Time),調用發起是在org.apache.kafka.clients.producer.internals.Sender.sendProduceRequest(long, int, short, int, List
responseCallback
是否有錯誤,有的話寫日誌;Throttle處理;
Partition.maybeIncrementLeaderHW分析