消息寫入

消息寫入

生產消息時的rpc請求日誌

leader節點java

[2019-09-25 19:40:22,266] INFO Handling request:RequestHeader(apiKey=FETCH, apiVersion=5, clientId=broker-0-fetcher-0, correlationId=8855) -- {replica_id=0,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,topics=[{topic=test.vv19,partitions=[{partition=0,fetch_offset=35,log_start_offset=0,max_bytes=1048576}]}]} from connection 172.16.113.38:9094-172.16.113.38:49385-0;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:22,266] INFO testEnter0006-replica:0 newLogEndOffset:35 oldLogEndOffsetMetadata 35; (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:22,267] INFO testEnter0007-maybeIncrementLeaderHW:Set(35 [0 : 2765]) newHighWatermark:35 oldHighWatermark 35; (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:22,610] INFO Handling request:RequestHeader(apiKey=METADATA, apiVersion=5, clientId=producer-1, correlationId=34) -- {topics=[test.vv19],allow_auto_topic_creation=true} from connection 172.16.113.38:9094-172.16.113.38:60308-1;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$)


********************************
[2019-09-25 19:40:22,769] INFO Handling request:RequestHeader(apiKey=FETCH, apiVersion=5, clientId=broker-0-fetcher-0, correlationId=8856) -- {replica_id=0,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,topics=[{topic=test.vv19,partitions=[{partition=0,fetch_offset=35,log_start_offset=0,max_bytes=1048576}]}]} from connection 
********************************


172.16.113.38:9094-172.16.113.38:49385-0;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:22,769] INFO testEnter0006-replica:0 newLogEndOffset:35 oldLogEndOffsetMetadata 35; (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:22,769] INFO testEnter0007-maybeIncrementLeaderHW:Set(35 [0 : 2765]) newHighWatermark:35 oldHighWatermark 35; (com.code260.ss.KafkaTestUtils$)

********************************
[2019-09-25 19:40:22,773] INFO Handling request:RequestHeader(apiKey=PRODUCE, apiVersion=5, clientId=producer-1, correlationId=35) -- {acks=-1,timeout=30000,partitionSizes=[test.vv19-0=79]} from connection 172.16.113.38:9094-172.16.113.38:60308-1;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:22,834] INFO Updated PartitionLeaderEpoch. New: {epoch:27, offset:35}, Current: {epoch:25, offset34} for Partition: test.vv19-0. Cache now contains 11 entries. (kafka.server.epoch.LeaderEpochFileCache)
********************************



********************************
[2019-09-25 19:40:23,313] INFO testEnter0007-maybeIncrementLeaderHW:Set(35 [0 : 2765], 36 [0 : 2844]) newHighWatermark:35 oldHighWatermark 35; (com.code260.ss.KafkaTestUtils$)
********************************


[2019-09-25 19:40:23,400] INFO Handling request:RequestHeader(apiKey=FETCH, apiVersion=5, clientId=broker-0-fetcher-0, correlationId=8857) -- {replica_id=0,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,topics=[{topic=test.vv19,partitions=[{partition=0,fetch_offset=36,log_start_offset=0,max_bytes=1048576}]}]} from connection 172.16.113.38:9094-172.16.113.38:49385-0;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:23,400] INFO testEnter0006-replica:0 newLogEndOffset:36 oldLogEndOffsetMetadata 35; (com.code260.ss.KafkaTestUtils$)


********************************
[2019-09-25 19:40:23,401] INFO testEnter0007-maybeIncrementLeaderHW:Set(36 [0 : 2844]) newHighWatermark:36 oldHighWatermark 35; (com.code260.ss.KafkaTestUtils$)
********************************


[2019-09-25 19:40:23,922] INFO Handling request:RequestHeader(apiKey=FETCH, apiVersion=5, clientId=broker-0-fetcher-0, correlationId=8858) -- {replica_id=0,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,topics=[{topic=test.vv19,partitions=[{partition=0,fetch_offset=36,log_start_offset=0,max_bytes=1048576}]}]} from connection 172.16.113.38:9094-172.16.113.38:49385-0;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:23,923] INFO testEnter0006-replica:0 newLogEndOffset:36 oldLogEndOffsetMetadata 36; (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:23,923] INFO testEnter0007-maybeIncrementLeaderHW:Set(36 [0 : 2844]) newHighWatermark:36 oldHighWatermark 36; (com.code260.ss.KafkaTestUtils$)

更新leader上維護的follower的LEO testEnter0006 時的調用棧
也是在處理follower發佈過來的fetch請求時更新apache

Replica.logEndOffset_$eq(LogOffsetMetadata) line: 98    
Replica.updateLogReadResult(LogReadResult) line: 83 
Partition.updateReplicaLogReadResult(Replica, LogReadResult) line: 276  
ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(Tuple2<TopicPartition,LogReadResult>) line: 1314   
ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(Object) line: 1308 
TraversableLike$$anonfun$map$1.apply(A) line: 234   
TraversableLike$$anonfun$map$1.apply(Object) line: 234  
ResizableArray$class.foreach(ResizableArray, Function1) line: 59    
ArrayBuffer<A>.foreach(Function1<A,U>) line: 48 
TraversableLike$class.map(TraversableLike, Function1, CanBuildFrom) line: 234   
ArrayBuffer<A>(AbstractTraversable<A>).map(Function1<A,B>, CanBuildFrom<Traversable<A>,B,That>) line: 104   
ReplicaManager.updateFollowerLogReadResults(int, Seq<Tuple2<TopicPartition,LogReadResult>>) line: 1308  
ReplicaManager.readFromLog$1(int, int, boolean, Seq, ReplicaQuota, IsolationLevel, boolean, boolean, boolean) line: 799 
ReplicaManager.fetchMessages(long, int, int, int, boolean, Seq<Tuple2<TopicPartition,PartitionData>>, ReplicaQuota, Function1<Seq<Tuple2<TopicPartition,FetchPartitionData>>,BoxedUnit>, IsolationLevel) line: 803  
KafkaApis.handleFetchRequest(RequestChannel$Request) line: 597

更新leader的HW testEnter0007-maybeIncrementLeaderHW 調用棧
在處理follower發佈過來的fetch請求時更新api

Partition.kafka$cluster$Partition$$maybeIncrementLeaderHW(Replica, long) line: 396  
Partition$$anonfun$maybeExpandIsr$1.apply$mcZ$sp() line: 325    
Partition$$anonfun$maybeExpandIsr$1.apply() line: 309   
Partition$$anonfun$maybeExpandIsr$1.apply() line: 309   
CoreUtils$.inLock(Lock, Function0<T>) line: 217 
CoreUtils$.inWriteLock(ReadWriteLock, Function0<T>) line: 225   
Partition.maybeExpandIsr(int, LogReadResult) line: 307  
Partition.updateReplicaLogReadResult(Replica, LogReadResult) line: 283  
ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(Tuple2<TopicPartition,LogReadResult>) line: 1314   
ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(Object) line: 1308 
TraversableLike$$anonfun$map$1.apply(A) line: 234   
TraversableLike$$anonfun$map$1.apply(Object) line: 234  
ResizableArray$class.foreach(ResizableArray, Function1) line: 59    
ArrayBuffer<A>.foreach(Function1<A,U>) line: 48 
TraversableLike$class.map(TraversableLike, Function1, CanBuildFrom) line: 234   
ArrayBuffer<A>(AbstractTraversable<A>).map(Function1<A,B>, CanBuildFrom<Traversable<A>,B,That>) line: 104   
ReplicaManager.updateFollowerLogReadResults(int, Seq<Tuple2<TopicPartition,LogReadResult>>) line: 1308  
ReplicaManager.readFromLog$1(int, int, boolean, Seq, ReplicaQuota, IsolationLevel, boolean, boolean, boolean) line: 799 
ReplicaManager.fetchMessages(long, int, int, int, boolean, Seq<Tuple2<TopicPartition,PartitionData>>, ReplicaQuota, Function1<Seq<Tuple2<TopicPartition,FetchPartitionData>>,BoxedUnit>, IsolationLevel) line: 803  
KafkaApis.handleFetchRequest(RequestChannel$Request) line: 597

Follower節點app

[2019-09-25 19:40:22,767] INFO testEnter0005-Received response:apikey:FETCH correlationId 8855; (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:22,767] INFO testEnter0002-001topicPartition:test.vv19:0hwm.messageOffset: [35]lso.messageOffset: [35] (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:22,768] INFO testEnter0002-002topicPartition:test.vv19:0hwm.messageOffset: [35]lso.messageOffset: [35] (com.code260.ss.KafkaTestUtils$)


********************************
[2019-09-25 19:40:23,316] INFO testEnter0005-Received response:apikey:FETCH correlationId 8856; (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:23,369] INFO Updated PartitionLeaderEpoch. New: {epoch:27, offset:35}, Current: {epoch:25, offset34} for Partition: test.vv19-0. Cache now contains 11 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2019-09-25 19:40:23,396] INFO testEnter0002-001topicPartition:test.vv19:0hwm.messageOffset: [35]lso.messageOffset: [35] (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:23,397] INFO testEnter0002-002topicPartition:test.vv19:0hwm.messageOffset: [35]lso.messageOffset: [35] (com.code260.ss.KafkaTestUtils$)
********************************



********************************
[2019-09-25 19:40:23,920] INFO testEnter0005-Received response:apikey:FETCH correlationId 8857; (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:23,921] INFO testEnter0002-001topicPartition:test.vv19:0hwm.messageOffset: [35]lso.messageOffset: [35] (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:23,922] INFO testEnter0002-002topicPartition:test.vv19:0hwm.messageOffset: [36]lso.messageOffset: [36] (com.code260.ss.KafkaTestUtils$)
********************************



[2019-09-25 19:40:24,426] INFO testEnter0005-Received response:apikey:FETCH correlationId 8858; (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:24,427] INFO testEnter0002-001topicPartition:test.vv19:0hwm.messageOffset: [36]lso.messageOffset: [36] (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:24,428] INFO testEnter0002-002topicPartition:test.vv19:0hwm.messageOffset: [36]lso.messageOffset: [36] (com.code260.ss.KafkaTestUtils$)

請求處理

handleProduceRequest ProduceRequest
ReplicaManager.appendRecordsfetch

  • timeout: Long // 來自請求體
  • requiredAcks: Short // 來自請求體
  • internalTopicsAllowed: Boolean // request.header.clientId是不是__admin_client
  • isFromClient: Boolean // 固定送true
  • entriesPerPartition: Map[TopicPartition, MemoryRecords]
  • responseCallback: Map[TopicPartition, ProduceResponse.PartitionResponse] => Unit
  • delayedProduceLock: Option[Lock] // 未送
  • processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] => Unit

主要寫日誌邏輯:
kafka.log.Log.append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int) 623行 要細讀ui

消息寫入時:Log LogSegment FileRecords MeomoryRecords File LogOffsetMetadata 之間的聯繫this

一條新的消息的offset是怎麼產生的?每次append消息後會更新下一次的offset:日誌

// increment the log end offset
updateLogEndOffset(appendInfo.lastOffset + 1)

leader節點在處理生產消息請求時對相應的offset的處理

KafkaApis handleProduceRequest
ReplicaManager appendRecords
ReplicaManager appendToLocalLog
processingStatsCallback // KafkaApis def processingStatsCallback(processingStats: Map[TopicPartition, RecordsProcessingStats]): Unit
responseCallback // KafkaApis def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse])
ProduceRequest clearPartitionRecordscode

ReplicaManager appendToLocalLog
生產消息的度量數據收集(爲寫入速度作準備),全局的和topic粒度的。
check是不是向內部topic發送消息的
消息寫入leader partition.appendRecordsToLeader
val info = log.appendAsLeader
replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId))
(info, maybeIncrementLeaderHW(leaderReplica))
更新firstOffset lastOffset numAppendedMessages
生產消息的度量數據收集(條數和消息大小),全局的和topic粒度的。server

Log.append
分配消息的val offset = new LongRef(nextOffsetMetadata.messageOffset)
更新 firstOffset appendInfo.firstOffset = offset.value
更新 lastOffset appendInfo.lastOffset = offset.value - 1
若是有消息校驗不一樣過 收集拒掉的消息的度量數據
更新leader epoch對應的offset(只有當epoch發生改變時才更新,並且更新是直接flush到磁盤且用FD的sync強制落盤,fileOutputStream.getFD().sync()) leaderEpochCache.assign(batch.partitionLeaderEpoch, batch.baseOffset) 並 LeaderEpochCheckpoint 進行flush
segment.append
producerAppendInfo.maybeCacheTxnFirstOffsetMetadata
producerStateManager.update(producerAppendInfo)
事務消息的一些idx的處理
producerStateManager.updateMapEndOffset
updateLogEndOffset
updateFirstUnstableOffset
按需flush()
返回appendInfo

processingStatsCallback
更新produceMessageConversionsRate度量數據conversionCount topic粒度和全局的,conversionCount是指高版本格式消息向低版本格式消息的轉換,轉換邏輯在org.apache.kafka.common.record.AbstractRecords.downConvert(Iterable<? extends RecordBatch>, byte, long, Time),調用發起是在org.apache.kafka.clients.producer.internals.Sender.sendProduceRequest(long, int, short, int, List )

responseCallback
是否有錯誤,有的話寫日誌;Throttle處理;

Partition.maybeIncrementLeaderHW分析

相關文章
相關標籤/搜索