副本同步

幾個概念的解釋

  • LEO 日誌的結尾位置,也是最後寫入(append)消息的位置+1。這個位置不表明消費者能看到,僅僅表示單機的日誌寫入位置,由於要考慮其餘副本的寫入狀況。leader與follower都有此指標。
  • HW high water mark的簡稱,對外公開的消費者的非事務消息(即未提交讀模式)的位置。這個值的更新過程比較複雜。leader與follower都有此指標。與LEO的區別參見這裏
  • LSO 事務消息涉及。最後穩定offset。若是是事務消息(即已提交讀模式),這是消費者能看到的最大位置。能夠參見《offset range查詢》中 查詢最新offset 段落。
  • epoch leader的年代。0.11版本引入這個概念,爲了解決0,8版本在broker掛掉的過程當中消息可能丟失和錯亂的問題。具體能夠參見huxi的Kafka水位(high watermark)與leader epoch的討論

The high watermark indicated the offset of messages that are fully replicated, while the end-of-log offset might be larger if there are newly appended records to the leader partition which are not replicated yet.html

副本同步主要結構

主要涉及的類有:AbstractFetcherThread,ReplicaFetcherThread,PartitionFetchState,ReplicaFetcherManager,ShutdownableThread,ReplicaManager , ReplicaFetcherManagerjava

Follower機器邏輯簡述

線程建立部分

  1. handleLeaderAndIsrRequest是入口,在handleLeaderAndIsrRequest時會觸發ReplicaManagerbecomeLeaderOrFollower,makeFollowers
  2. 觸發ReplicaFetcherManageraddFetcherForPartitions
  3. ReplicaFetcherManagercreateFetcherThread方法建立fetcher線程。new ReplicaFetcherThread....

一些字段的賦值說明:api

  • 線程名 s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
  • clientId 就是線程名
  • isInterruptible 建立時用固定值是false
  • includeLogTruncation 建立時用固定值是true,這也就是意味着PartitionFetchState對象一開始建出來時truncatingLog字段是true

Follower機器邏輯執行部分

ReplicaFetcherThread類的結構繼承關係是:
ShutdownableThread
|-- AbstractFetcherThread
|---|-- ReplicaFetcherThreadapp

由於ShutdownableThread是個spin線程,子類實現doWork方法便可對接業務邏輯。
AbstractFetcherThread中的doWork邏輯組織:fetch

  • maybeTruncate 處理可能須要截斷的日誌,針對LEO大於HW(highwater mark的簡稱,下同)的,要截斷到HW位置。
  • 構建FetchRequest請求拉數據。
  • 處理FetchRequest請求的響應,processFetchRequest。注意:該請求不必定能請求到消息數據(Record),並且該請求也不是僅僅請求消息,還請求leader的高水位等值。

maybeTruncate邏輯:ui

  • 過濾出PartitionFetchState實例isTruncatingLog爲true的分區。PartitionFetchState的truncatingLog字段何時爲true,目前分析是ReplicaFetcherThread線程剛建立時,默認賦值了true,也就是一開始可能會截斷。 ReplicaFetcherThread.buildLeaderEpochRequest。
  • 根據上步過濾出來的結果,構建根據epoch查詢offset的請求OffsetsForLeaderEpochRequest。ReplicaFetcherThread.fetchEpochsFromLeader。
  • 根據查詢回來的結果,設置(糾正)正確的offset。我理解成只要mark了正確的位置便可,後面寫的時候覆蓋寫便可,不要真正刪除。(待證明) AbstractFetcherThread.markTruncationComplete 。

處理FetchRequest請求的響應的邏輯:spa

  • 根據結果判斷有誤錯誤碼
  • 結果正常的 調用processPartitionData,並更新分區狀態 partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset)) ReplicaFetcherThread.processPartitionData
  • processPartitionDataReplicaFetcherThread實現,主要邏輯有:
    • 寫副本日誌 replica.log.get.appendAsFollower(records)
    • 更新副本的HW。 highWatermark 能夠理解成 你們都已經確認的offset,不然就不必維護這個了,直接用LEO或者LSO好 與LEO的區別參見這裏. 要構建用例剛好在這行前掛掉follower。
    • 維護到副本對象的元數據中。 一次fetch請求並不能更新副本的highWatermark,須要下一次fethc請求才能完成上一次的。replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)

leader與follower的HW LEO更新過程

Leader機器邏輯執行部分

處理fetch請求的調用棧,消費者客戶端與follower同步的fetch請求走的是一個邏輯,靠是不是來自follower作了些邏輯上的差別處理線程

ReplicaManager.kafka$server$ReplicaManager$$read$1(TopicPartition, FetchRequest$PartitionData, int, boolean, int, boolean, boolean, boolean, ReplicaQuota, IsolationLevel) line: 856    
ReplicaManager$$anonfun$readFromLocalLog$1.apply(Tuple2<TopicPartition,PartitionData>) line: 962    
ReplicaManager$$anonfun$readFromLocalLog$1.apply(Object) line: 961  
ResizableArray$class.foreach(ResizableArray, Function1) line: 59    
ArrayBuffer<A>.foreach(Function1<A,U>) line: 48 
ReplicaManager.readFromLocalLog(int, boolean, boolean, int, boolean, Seq<Tuple2<TopicPartition,PartitionData>>, ReplicaQuota, IsolationLevel) line: 961 
ReplicaManager.readFromLog$1(int, int, boolean, Seq, ReplicaQuota, IsolationLevel, boolean, boolean, boolean) line: 790 
ReplicaManager.fetchMessages(long, int, int, int, boolean, Seq<Tuple2<TopicPartition,PartitionData>>, ReplicaQuota, Function1<Seq<Tuple2<TopicPartition,FetchPartitionData>>,BoxedUnit>, IsolationLevel) line: 803  
KafkaApis.handleFetchRequest(RequestChannel$Request) line: 597  
KafkaApis.handle(RequestChannel$Request) line: 101

涉及到的位點信息及更新邏輯:
fetchOffset fetch開始位點,來自於fetch請求體
highWatermark 更新邏輯日誌

kafka.server.ReplicaManager.readFromLocalLog(...).read(...) 操做邏輯:
標誌度量數據
獲取副本對象localReplica 獲取邏輯是根據replica id(即broker id)獲取,發fetch請求的時候 請求體中會帶上replica id
val initialHighWatermark = localReplica.highWatermark.messageOffsetcode

更新leader節點上維護遠端副本的LEO信息
有點繞 就是在leader節點上也 維護了 遠端LEO的信息 ,由於leader節點高水位要靠遠端的LEO來更新,leader節點高水位的更新邏輯就是 全部遠端副本的LEO的最小值
調用棧以下:

Replica.logEndOffset_$eq(LogOffsetMetadata) line: 98    
Replica.updateLogReadResult(LogReadResult) line: 83 
Partition.updateReplicaLogReadResult(Replica, LogReadResult) line: 276  
ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(Tuple2<TopicPartition,LogReadResult>) line: 1314   
ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(Object) line: 1308 
TraversableLike$$anonfun$map$1.apply(A) line: 234   
TraversableLike$$anonfun$map$1.apply(Object) line: 234  
ResizableArray$class.foreach(ResizableArray, Function1) line: 59    
ArrayBuffer<A>.foreach(Function1<A,U>) line: 48 
TraversableLike$class.map(TraversableLike, Function1, CanBuildFrom) line: 234   
ArrayBuffer<A>(AbstractTraversable<A>).map(Function1<A,B>, CanBuildFrom<Traversable<A>,B,That>) line: 104   
ReplicaManager.updateFollowerLogReadResults(int, Seq<Tuple2<TopicPartition,LogReadResult>>) line: 1308  
ReplicaManager.readFromLog$1(int, int, boolean, Seq, ReplicaQuota, IsolationLevel, boolean, boolean, boolean) line: 799 
ReplicaManager.fetchMessages(long, int, int, int, boolean, Seq<Tuple2<TopicPartition,PartitionData>>, ReplicaQuota, Function1<Seq<Tuple2<TopicPartition,FetchPartitionData>>,BoxedUnit>, IsolationLevel) line: 803  
KafkaApis.handleFetchRequest(RequestChannel$Request) line: 597

能夠看出來是在leader處理fetch請求的時候作的邏輯,fetch請求帶上來fetch offset就當成了遠端副本的LEO

TODO

附錄

  1. 建立同步fetch線程
    ReplicaFetcherManager.createFetcherThread(int, BrokerEndPoint) line: 30
    ReplicaFetcherManager(AbstractFetcherManager).kafka$server$AbstractFetcherManager$$addAndStartFetcherThread$1(BrokerAndFetcherId, BrokerIdAndFetcherId) line: 80
    AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(Tuple2<BrokerAndFetcherId,Map<TopicPartition,BrokerAndInitialOffset>>) line: 94
    AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(Object) line: 85
    TraversableLike$WithFilter$$anonfun$foreach$1.apply(A) line: 733
    Map$Map1<A,B>.foreach(Function1<Tuple2<A,B>,U>) line: 116
    TraversableLike$WithFilter.foreach(Function1<A,U>) line: 732
    ReplicaFetcherManager(AbstractFetcherManager).addFetcherForPartitions(Map<TopicPartition,BrokerAndInitialOffset>) line: 85
    ReplicaManager.makeFollowers(int, int, Map<Partition,PartitionState>, int, Map<TopicPartition,Errors>) line: 1272
    ReplicaManager.becomeLeaderOrFollower(int, LeaderAndIsrRequest, Function2<Iterable ,Iterable ,BoxedUnit>) line: 1065
    KafkaApis.handleLeaderAndIsrRequest(RequestChannel$Request) line: 173
    KafkaApis.handle(RequestChannel$Request) line: 103
    KafkaRequestHandler.run() line: 65
    KafkaThread(Thread).run() line: 748

  2. 構建請求
    ReplicaFetcherThread.buildFetchRequest(Seq<Tuple2<TopicPartition,PartitionFetchState>>) line: 234
    AbstractFetcherThread$$anonfun$2.apply() line: 104
    AbstractFetcherThread$$anonfun$2.apply() line: 103
    CoreUtils$.inLock(Lock, Function0 ) line: 217
    ReplicaFetcherThread(AbstractFetcherThread).doWork() line: 103
    ReplicaFetcherThread(ShutdownableThread).run() line: 64

-------------


看的過程當中的臨時記錄:

[2019-09-25 18:07:13,787] INFO Handling request:RequestHeader(apiKey=OFFSET_FOR_LEADER_EPOCH, apiVersion=0, clientId=broker-0-fetcher-0, correlationId=0) -- {topics=[{topic=test.vv19,partitions=[{partition=0,leader_epoch=25}]}]} from connection 172.16.113.38:9094-172.16.113.38:49385-0;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$)



[2019-09-25 22:13:02,501] INFO testEnter0002-001topicPartition:test.vv19:0hwm.messageOffset: [37]lso.messageOffset: [37] (com.code260.ss.KafkaTestUtils$)
[2019-09-25 22:13:02,502] INFO testEnter0002-002topicPartition:test.vv19:0hwm.messageOffset: [37]lso.messageOffset: [37] (com.code260.ss.KafkaTestUtils$)
[2019-09-25 22:13:03,006] INFO testEnter0005-Received response:apikey:FETCH correlationId 24; (com.code260.ss.KafkaTestUtils$)



LeaderEpochFileCache$$anonfun$assign$1.apply$mcV$sp() line: 62  
LeaderEpochFileCache$$anonfun$assign$1.apply() line: 61 
LeaderEpochFileCache$$anonfun$assign$1.apply() line: 61 
CoreUtils$.inLock(Lock, Function0<T>) line: 217 
CoreUtils$.inWriteLock(ReadWriteLock, Function0<T>) line: 225   
LeaderEpochFileCache.assign(int, long) line: 60 
Log$$anonfun$append$2$$anonfun$apply$9.apply(MutableRecordBatch) line: 689  
Log$$anonfun$append$2$$anonfun$apply$9.apply(Object) line: 687  
Iterator$class.foreach(Iterator, Function1) line: 891   
Wrappers$JIteratorWrapper<A>(AbstractIterator<A>).foreach(Function1<A,U>) line: 1334    
IterableLike$class.foreach(IterableLike, Function1) line: 72    
Wrappers$JIterableWrapper<A>(AbstractIterable<A>).foreach(Function1<A,U>) line: 54  
Log$$anonfun$append$2.apply() line: 687 
Log$$anonfun$append$2.apply() line: 624 
Log.maybeHandleIOException(Function0<String>, Function0<T>) line: 1669  
Log.append(MemoryRecords, boolean, boolean, int) line: 624  
Log.appendAsLeader(MemoryRecords, int, boolean) line: 597   
Partition$$anonfun$13.apply() line: 500 
Partition$$anonfun$13.apply() line: 488 
CoreUtils$.inLock(Lock, Function0<T>) line: 217 
CoreUtils$.inReadLock(ReadWriteLock, Function0<T>) line: 223    
Partition.appendRecordsToLeader(MemoryRecords, boolean, int) line: 487  
ReplicaManager$$anonfun$appendToLocalLog$2.apply(Tuple2<TopicPartition,MemoryRecords>) line: 724    
ReplicaManager$$anonfun$appendToLocalLog$2.apply(Object) line: 708  
TraversableLike$$anonfun$map$1.apply(A) line: 234   
TraversableLike$$anonfun$map$1.apply(Object) line: 234  
HashMap$$anonfun$foreach$1.apply(DefaultEntry<A,B>) line: 130   
HashMap$$anonfun$foreach$1.apply(Object) line: 130  
HashTable$class.foreachEntry(HashTable, Function1) line: 236    
HashMap<A,B>.foreachEntry(Function1<DefaultEntry<A,B>,U>) line: 40  
HashMap<A,B>.foreach(Function1<Tuple2<A,B>,U>) line: 130    
TraversableLike$class.map(TraversableLike, Function1, CanBuildFrom) line: 234   
HashMap<A,B>(AbstractTraversable<A>).map(Function1<A,B>, CanBuildFrom<Traversable<A>,B,That>) line: 104 
ReplicaManager.appendToLocalLog(boolean, boolean, Map<TopicPartition,MemoryRecords>, short) line: 708   
ReplicaManager.appendRecords(long, short, boolean, boolean, Map<TopicPartition,MemoryRecords>, Function1<Map<TopicPartition,PartitionResponse>,BoxedUnit>, Option<Lock>, Function1<Map<TopicPartition,RecordsProcessingStats>,BoxedUnit>) line: 458 
KafkaApis.handleProduceRequest(RequestChannel$Request) line: 460    
KafkaApis.handle(RequestChannel$Request) line: 100  
KafkaRequestHandler.run() line: 65  
KafkaThread(Thread).run() line: 748 





LeaderEpochFileCache$$anonfun$assign$1.apply$mcV$sp() line: 62  
LeaderEpochFileCache$$anonfun$assign$1.apply() line: 61 
LeaderEpochFileCache$$anonfun$assign$1.apply() line: 61 
CoreUtils$.inLock(Lock, Function0<T>) line: 217 
CoreUtils$.inWriteLock(ReadWriteLock, Function0<T>) line: 225   
LeaderEpochFileCache.assign(int, long) line: 60 
Log$$anonfun$append$2$$anonfun$apply$9.apply(MutableRecordBatch) line: 689  
Log$$anonfun$append$2$$anonfun$apply$9.apply(Object) line: 687  
Iterator$class.foreach(Iterator, Function1) line: 891   
Wrappers$JIteratorWrapper<A>(AbstractIterator<A>).foreach(Function1<A,U>) line: 1334    
IterableLike$class.foreach(IterableLike, Function1) line: 72    
Wrappers$JIterableWrapper<A>(AbstractIterable<A>).foreach(Function1<A,U>) line: 54  
Log$$anonfun$append$2.apply() line: 687 
Log$$anonfun$append$2.apply() line: 624 
Log.maybeHandleIOException(Function0<String>, Function0<T>) line: 1669  
Log.append(MemoryRecords, boolean, boolean, int) line: 624  
Log.appendAsFollower(MemoryRecords) line: 607   
ReplicaFetcherThread.processPartitionData(TopicPartition, long, ReplicaFetcherThread$PartitionData) line: 123   
ReplicaFetcherThread.processPartitionData(TopicPartition, long, AbstractFetcherThread$PartitionData) line: 62   
AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(PartitionFetchState) line: 184  
AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(Object) line: 172   
Some<A>(Option<A>).foreach(Function1<A,U>) line: 257    
AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(Tuple2<TopicPartition,PartitionData>) line: 172  
AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(Object) line: 169    
ResizableArray$class.foreach(ResizableArray, Function1) line: 59    
ArrayBuffer<A>.foreach(Function1<A,U>) line: 48 
AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp() line: 169   
AbstractFetcherThread$$anonfun$processFetchRequest$2.apply() line: 169  
AbstractFetcherThread$$anonfun$processFetchRequest$2.apply() line: 169  
CoreUtils$.inLock(Lock, Function0<T>) line: 217 
ReplicaFetcherThread(AbstractFetcherThread).processFetchRequest(AbstractFetcherThread$FetchRequest) line: 167   
ReplicaFetcherThread(AbstractFetcherThread).doWork() line: 113  
ReplicaFetcherThread(ShutdownableThread).run() line: 64 




ReplicaFetcherThread.fetchEpochsFromLeader(Map<TopicPartition,Object>) line: 332    
ReplicaFetcherThread(AbstractFetcherThread).maybeTruncate() line: 130   
ReplicaFetcherThread(AbstractFetcherThread).doWork() line: 102  
ReplicaFetcherThread(ShutdownableThread).run() line: 64  

  

kafka.server.ReplicaManager.lastOffsetForLeaderEpoch  
  

kafka.server.ReplicaFetcherThread.maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset])  
  


kafka.server.ReplicaFetcherThread.handleOffsetOutOfRange(topicPartition: TopicPartition)  
  
kafka.server.ReplicaFetcherThread.maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset])
相關文章
相關標籤/搜索