1 segment的產生html
當索引一個文檔時,若是存在空閒的segment(未被其餘線程鎖定),則取出空閒segment list中的最後一個segment(LIFO),並鎖定,將文檔索引至該segment,api
找達到flush條件的segment,而後解鎖,歸還至空閒segment list,若是有達到flush條件的segment,flush該segment(同步執行)。less
若是不存在,則建立新的segment,重複上述步驟。異步
總結1:若是並行的執行向一個索引,索引文檔,則須要不一樣的segment。elasticsearch
相關代碼:post
//索引一個文檔。 IndexWriter.updateDocument //索引一個文檔。 DocumentsWriter.updateDocument //一個線程索引時鎖定一個ThreadState對象,索引後歸還至free list。 ThreadState //ThreadState的屬性,一個DocumentsWriterPerThread對應一個segment,flush後,該ThreadState的dwpt爲null, //下次使用該ThreadState,建立新的dwpt,新的segment。 DocumentsWriterPerThread
2 flush條件spa
索引一個文檔後,找出是否有達到flush條件的segment。線程
1:若是maxBufferedDocs(默認-1,es未設置)不等於-1,且當前segment在內存中的doc數量大於等於maxBufferedDocs,則標記該segment的flushPending。code
2:若是不知足1,且ramBufferSizeMB(默認16.0,es設置爲es.index.memory.max_index_buffer_size)不等於-1,當內存中當前IndexWriter全部segment之和(包括deleted docs)大於ramBufferSizeMB時,找出內存中最大的且未標記flushPending的segment,標記該segment的flushPending。orm
3:若是當前1,2以後,當前segment還未標記flushPending,則當前segment大於perThreadHardLimitMB(默認1945,es未設置),標記該segment的flushPending。
123以後,若是當前segment被標記,則flush當前segment。不然從flushQueue中poll一個segment,若是flushQueue(調用flush時,將全部segment加入queue)爲空,則遍歷segment取第一個標記flushPending的segment進行flush。
相關代碼:
//查找符合flush的segment。 DocumentsWriterFlushControl.doAfterDocument //flush當前segment前,reset當前dwpt,下次使用當前ThreadState須要新的dwpt,新的segment。 DocumentsWriterFlushControl.internalTryCheckOutForFlush //flush當前segment,或者其餘segment。 DocumentsWriter.postUpdate
注意:除了達到flush條件的自動flush,還能夠經過調用api flush,如:
1:es refresh
2:es flush
3:es syncedFlush
3 flush
flush將內存中的segment寫到文件(在調用線程中同步執行),但不執行fileChannel.force(nio,bio則fileOutputStream.flush),一部分數據可能在buffer中。
相關代碼:
//flush一個segment。 DocumentsWriter.doFlush DocumentsWriterPerThread.flush DefaultIndexingChain.flush //寫nvd, nvm文件。 writeNorms //寫dvd, dvm文件。 writeDocValues //寫dii, dim文件。 writePoints //寫fdt, fdx文件(該文件在首次indexing時建立,flush時寫入值)。 storedFieldsConsumer.flush //寫doc, pos, tim, tip文件。 termsHash.flush //寫fnm文件。 docWriter.codec.fieldInfosFormat().write //寫cfs, cfe, si, liv(若是有刪除)文件。 DocumentsWriterPerThread.sealFlushedSegment //刪除cfs, cfe, si, liv(若是有刪除)以外的文件。 IndexWriter.doAfterFlush
4 commit
commit執行fileChannel.force,將buffer中的數據寫到磁盤。具體步驟爲:
1:flush all segments 將內存中全部的segments寫到文件。
2:依次sync pending_segments_n,segment files(fileChannel.force)將這寫文件同步到磁盤。
3:將pending_segments_n重命名爲segments_n,刪除舊的segments_n-1。
4:若是步驟1 flush了segment,執行maybeMerge,若是達到merge條件,將會merge。
相關代碼:
//commit。 IndexWriter.commit IndexWriter.commitInternal IndexWriter.prepareCommitInternal //flush segments。 DocumentsWriter.flushAllThreads //sync file。 IndexWriter.startCommit Directory.sync IOUtils.fsync FileChannel.force FileChannelImpl.force //更新commit信息segments_n,刪除舊的segments_n-1。 IndexWriter.finishCommit //若是達到merge條件,將會merge。 IndexWriter.maybeMerge
5 maybeMerge
flush或者commit後,若是flush了segment,執行maybeMerge,若是達到merge條件,將執行merge(異步執行)。具體步驟爲:
1:將segments按size降序排列。
2:計算total segments size 和 minimum segment size。
3:total segments size過濾掉tooBigSegment(大於max_merged_segment/2.0)的segment,並記錄tooBigCount;minSegmentBytes若是小於floor_segment(默認2mb),取2mb。
4:計算allowedSegCountInt,當segments(不包含tooBigSegment)數量大於此數,將觸發merge。
5:從大到小(以前的降序排列),貪心找出不大於maxMergeAtOnce個, 且size總和不大於maxMergedSegmentBytes個segments進行merge。
相關代碼:
//maybeMerge。 IndexWriter.maybeMerge IndexWriter.updatePendingMerges //查找可merge的segments。 TieredMergePolicy.findMerges //執行merge。 ConcurrentMergeScheduler.merge
//控制merge線程數量 ConcurrentMergeScheduler.maybeStall
//用來異步執行merge的線程。
MergeThread
6 es refresh
主要執行lucene的flushAllThreads和maybeMerge。refresh的兩個條件:
1:達到refresh_interval設置的時間間隔。
2:節點全部shard的segments佔用內存(調用lucene api獲取)之和達到indices.memory.index_buffer_size,找出佔用最大的shard執行refresh。
相關代碼:
//refresh_interval refresh。
IndexService.AsyncRefreshTask
//indices.memory.index_buffer_size refresh。
IndexingMemoryController.runUnlocked
IndexingMemoryController.writeIndexingBufferAsync
//es refresh。 InternalEngine.refresh //lucene refresh。 ReferenceManager.maybeRefreshBlocking DirectoryReader.openIfChanged StandardDirectoryReader.doOpenIfChanged IndexWriter.getReader //flush segments。 DocumentsWriter.flushAllThreads //若是flush了segment,則執行maybeMerge。 IndexWriter.maybeMerge
7 es flush
主要執行步驟爲:
1:prepareCommit translog:
1.1 備份 translog.ckp到translog-1.ckp。
1.2 fsync translog-1.ckp以及translog 文件夾。
1.3 建立新的translog數據文件translog-n.tlog,更新translog.ckp(寫入checkPoint)。
2:commit indexWriter(見4 commit)。
3:refresh(見6 es refresh)。
4:commit translog:刪除備份的translog-1.ckp以及舊的translog數據文件translog-n-1.tlog。
相關代碼:
//es flush。 InternalEngine.flush //prepareCommit translog。 Translog.prepareCommit //es commit index writer。 InternalEngine.commitIndexWriter //lucene commit。 IndexWriter.commit //es refresh。 InternalEngine.refresh //commit translog。 Translog.commit
總結2:lucene的flush是指將內存中的segment,寫到磁盤但不執行fileChannel.force,一部分數據會在buffer中;commit會調用force,將buffer中的數據寫到磁盤。
es的refresh調用lucene的flush;flush調用lucene的commit。
參考:
elasticsearch5.6.12,lucene6.6.1 源碼
https://www.outcoldman.com/en/archive/2017/07/13/elasticsearch-explaining-merge-settings/
http://blog.mikemccandless.com/2011/02/visualizing-lucenes-segment-merges.html