在哪裏調用監控器管理對象snapshotRepositoryMonitorManager的start方法及stop方法,而後又在哪裏調用CheckpointAndChangeQueue對象的resume方法獲取List<CheckpointAndChange> guaranteedChanges集合html
下面跟蹤到DiffingConnectorTraversalManager類的相關方法,在該類實現的方法中,調用了監控器管理對象snapshotRepositoryMonitorManager的相關方法實現對其操做async
private final DocumentSnapshotRepositoryMonitorManager snapshotRepositoryMonitorManager; private final TraversalContextManager traversalContextManager; /** * Boolean to mark TraversalManager as invalid. * It's possible for Connector Manager to keep a reference to * an outdated TraversalManager (after a new one has been given * previous TraversalManagers are invalid to use). */ private boolean isActive = true; /** * Creates a {@link DiffingConnectorTraversalManager}. * * @param snapshotRepositoryMonitorManager the * {@link DocumentSnapshotRepositoryMonitorManager} * for use accessing a {@link ChangeSource} * @param traversalContextManager {@link TraversalContextManager} * that holds the current {@link TraversalContext} */ public DiffingConnectorTraversalManager( DocumentSnapshotRepositoryMonitorManager snapshotRepositoryMonitorManager, TraversalContextManager traversalContextManager) { this.snapshotRepositoryMonitorManager = snapshotRepositoryMonitorManager; this.traversalContextManager = traversalContextManager; }
resumeTraversal方法啓動監視器管理對象snapshotRepositoryMonitorManager,並返回DocumentList集合ide
/* @Override */ public synchronized DocumentList resumeTraversal(String checkpoint) throws RepositoryException { /* Exhaustive list of method's use: resumeTraversal(null) from startTraversal: monitors get started from null resumeTraversal(null) from Connector Manager sometime after startTraversal: monitors already started from previous resumeTraversal call resumeTraversal(cp) from Connector Manager without a startTraversal: means there was a shutdown or turn off monitors get started from cp; should use state resumeTraversal(cp) from Connector Manager sometime after some uses: is most common case; roll */ if (isActive()) { //啓動snapshotRepositoryMonitorManager if (!snapshotRepositoryMonitorManager.isRunning()) { snapshotRepositoryMonitorManager.start(checkpoint); } return newDocumentList(checkpoint); } else { throw new RepositoryException( "Inactive FileTraversalManager referanced."); } }
進一步調用newDocumentList方法返回DocumentList集合函數
private DocumentList newDocumentList(String checkpoint) throws RepositoryException { //獲取隊列 CheckpointAndChangeQueue(隊列 CheckpointAndChangeQueue只由snapshotRepositoryMonitorManager引用) CheckpointAndChangeQueue checkpointAndChangeQueue = snapshotRepositoryMonitorManager.getCheckpointAndChangeQueue(); try { DiffingConnectorDocumentList documentList = new DiffingConnectorDocumentList( checkpointAndChangeQueue, CheckpointAndChangeQueue.initializeCheckpointStringIfNull( checkpoint)); //Map<String, MonitorCheckpoint> Map<String, MonitorCheckpoint> guaranteesMade = checkpointAndChangeQueue.getMonitorRestartPoints(); snapshotRepositoryMonitorManager.acceptGuarantees(guaranteesMade); return new ConfirmActiveDocumentList(documentList); } catch (IOException e) { throw new RepositoryException("Failure when making DocumentList.", e); } }
DiffingConnectorDocumentList documentList對象的構造函數裏面封裝了CheckpointAndChangeQueue checkpointAndChangeQueue隊列集合this
DiffingConnectorDocumentList 類完整實現以下:搜索引擎
/** * An implementation of {@link DocumentList} for the {@link DiffingConnector}. * * @since 2.8 */ public class DiffingConnectorDocumentList implements DocumentList { private final Iterator<CheckpointAndChange> checkpointAndChangeIterator; private String checkpoint; /** * Creates a document list that returns a batch of documents from the provided * {@link CheckpointAndChangeQueue}. * * @param queue a CheckpointAndChangeQueue containing document changes * @param checkpoint point into the change queue after which to start * returning documents * @throws IOException if persisting fails */ public DiffingConnectorDocumentList(CheckpointAndChangeQueue queue, String checkpoint) throws IOException { //CheckpointAndChangeQueue queued的resume方法獲取List<CheckpointAndChange> //本DocumentList批次數據已經加載於內存 List<CheckpointAndChange> guaranteedChanges = queue.resume(checkpoint); checkpointAndChangeIterator = guaranteedChanges.iterator(); this.checkpoint = checkpoint; } /** * 調用方獲取該狀態並持久化,迭代完畢即爲最後的checkpoint */ /* @Override */ public String checkpoint() { return checkpoint; } /* @Override */ public Document nextDocument() throws RepositoryException { if (checkpointAndChangeIterator.hasNext()) { CheckpointAndChange checkpointAndChange = checkpointAndChangeIterator.next(); //更新checkpoint checkpoint = checkpointAndChange.getCheckpoint().toString(); return checkpointAndChange.getChange().getDocumentHandle().getDocument(); } else { return null; } } }
在其構造方法中調用參數CheckpointAndChangeQueue queue的resume方法獲取List<CheckpointAndChange> guaranteedChanges,在其nextDocument()方法中經過迭代獲取CheckpointAndChange checkpointAndChange對象,同時更新checkpoint狀態標識spa
最後獲取與監視器關聯的MonitorCheckpoint對象映射code
//Map<String, MonitorCheckpoint> Map<String, MonitorCheckpoint> guaranteesMade = checkpointAndChangeQueue.getMonitorRestartPoints();
而後調用監控器管理對象snapshotRepositoryMonitorManager的acceptGuarantees方法,相應的監視器對象接收並確認MonitorCheckpoint對象htm
/** * 監視器管理對象收到CheckpointAndChangeQueue對象反饋,分發給對應的監視器處理MonitorCheckpoint */ /* @Override */ public void acceptGuarantees(Map<String, MonitorCheckpoint> guarantees) { for (Map.Entry<String, MonitorCheckpoint> entry : guarantees.entrySet()) { String monitorName = entry.getKey(); MonitorCheckpoint checkpoint = entry.getValue(); DocumentSnapshotRepositoryMonitor monitor = fileSystemMonitorsByName.get(monitorName); if (monitor != null) { // Signal is asynch. Let monitor figure out how to use. //回調 monitor.acceptGuarantee(checkpoint); } } }
與倉庫對象相對應的具體監視器接收確認對象
/** * 監視器收到反饋 [MonitorCheckpoint接收確認] * @param cp */ // Public for DocumentSnapshotRepositoryMonitorTest @VisibleForTesting public void acceptGuarantee(MonitorCheckpoint cp) { snapshotStore.acceptGuarantee(cp); guaranteeCheckpoint = cp; }
倉庫對應的存儲對象處於處理鏈的末端
/** * 反饋MonitorCheckpoint處理 * @param cp */ void acceptGuarantee(MonitorCheckpoint cp) { long readSnapshotNumber = cp.getSnapshotNumber(); if (readSnapshotNumber < 0) { throw new IllegalArgumentException("Received invalid snapshot in: " + cp); } if (oldestSnapshotToKeep > readSnapshotNumber) { LOG.warning("Received an older snapshot than " + oldestSnapshotToKeep + ": " + cp); } else { oldestSnapshotToKeep = readSnapshotNumber; } }
---------------------------------------------------------------------------
本系列企業搜索引擎開發之鏈接器connector系本人原創
轉載請註明出處 博客園 刺蝟的溫馴
本人郵箱: chenying998179@163#com (#改成.)