企業搜索引擎開發之鏈接器connector(二十九)

在哪裏調用監控器管理對象snapshotRepositoryMonitorManager的start方法及stop方法,而後又在哪裏調用CheckpointAndChangeQueue對象的resume方法獲取List<CheckpointAndChange> guaranteedChanges集合html

下面跟蹤到DiffingConnectorTraversalManager類的相關方法,在該類實現的方法中,調用了監控器管理對象snapshotRepositoryMonitorManager的相關方法實現對其操做async

private final DocumentSnapshotRepositoryMonitorManager
      snapshotRepositoryMonitorManager;
  private final TraversalContextManager traversalContextManager;
  /**
   * Boolean to mark TraversalManager as invalid.
   * It's possible for Connector Manager to keep a reference to
   * an outdated TraversalManager (after a new one has been given
   * previous TraversalManagers are invalid to use).
   */
  private boolean isActive = true;

  /**
   * Creates a {@link DiffingConnectorTraversalManager}.
   *
   * @param snapshotRepositoryMonitorManager the
   *        {@link DocumentSnapshotRepositoryMonitorManager}
   *        for use accessing a {@link ChangeSource}
   * @param traversalContextManager {@link TraversalContextManager}
   *        that holds the current {@link TraversalContext}
   */
  public DiffingConnectorTraversalManager(
      DocumentSnapshotRepositoryMonitorManager snapshotRepositoryMonitorManager,
      TraversalContextManager traversalContextManager) {
    this.snapshotRepositoryMonitorManager = snapshotRepositoryMonitorManager;
    this.traversalContextManager = traversalContextManager;
  }

resumeTraversal方法啓動監視器管理對象snapshotRepositoryMonitorManager,並返回DocumentList集合ide

/* @Override */
  public synchronized DocumentList resumeTraversal(String checkpoint)
      throws RepositoryException {
    /* Exhaustive list of method's use:
     resumeTraversal(null) from startTraversal:
       monitors get started from null
     resumeTraversal(null) from Connector Manager sometime after startTraversal:
       monitors already started from previous resumeTraversal call
     resumeTraversal(cp) from Connector Manager without a startTraversal:
       means there was a shutdown or turn off
       monitors get started from cp; should use state
     resumeTraversal(cp) from Connector Manager sometime after some uses:
       is most common case; roll
    */
    if (isActive()) {
        //啓動snapshotRepositoryMonitorManager
      if (!snapshotRepositoryMonitorManager.isRunning()) {
        snapshotRepositoryMonitorManager.start(checkpoint);
      }
      return newDocumentList(checkpoint);
    } else {
      throw new RepositoryException(
          "Inactive FileTraversalManager referanced.");
    }
  }

進一步調用newDocumentList方法返回DocumentList集合函數

private DocumentList newDocumentList(String checkpoint)
      throws RepositoryException {
    //獲取隊列 CheckpointAndChangeQueue(隊列 CheckpointAndChangeQueue只由snapshotRepositoryMonitorManager引用)
    CheckpointAndChangeQueue checkpointAndChangeQueue =
        snapshotRepositoryMonitorManager.getCheckpointAndChangeQueue();

    try {
      DiffingConnectorDocumentList documentList = new DiffingConnectorDocumentList(
          checkpointAndChangeQueue,
          CheckpointAndChangeQueue.initializeCheckpointStringIfNull(
              checkpoint));
      //Map<String, MonitorCheckpoint>
      Map<String, MonitorCheckpoint> guaranteesMade =
          checkpointAndChangeQueue.getMonitorRestartPoints();
      
      snapshotRepositoryMonitorManager.acceptGuarantees(guaranteesMade);

      return new ConfirmActiveDocumentList(documentList);
    } catch (IOException e) {
      throw new RepositoryException("Failure when making DocumentList.", e);
    }
  }

DiffingConnectorDocumentList documentList對象的構造函數裏面封裝了CheckpointAndChangeQueue checkpointAndChangeQueue隊列集合this

DiffingConnectorDocumentList 類完整實現以下:搜索引擎

/**
 * An implementation of {@link DocumentList} for the {@link DiffingConnector}.
 *
 * @since 2.8
 */
public class DiffingConnectorDocumentList implements DocumentList {
  private final Iterator<CheckpointAndChange> checkpointAndChangeIterator;
  private String checkpoint;

  /**
   * Creates a document list that returns a batch of documents from the provided
   * {@link CheckpointAndChangeQueue}.
   *
   * @param queue a CheckpointAndChangeQueue containing document changes
   * @param checkpoint point into the change queue after which to start
   *        returning documents
   * @throws IOException if persisting fails
   */
  public DiffingConnectorDocumentList(CheckpointAndChangeQueue queue,
      String checkpoint) throws IOException {
      //CheckpointAndChangeQueue queued的resume方法獲取List<CheckpointAndChange>
      //本DocumentList批次數據已經加載於內存
    List<CheckpointAndChange> guaranteedChanges = queue.resume(checkpoint);
    checkpointAndChangeIterator = guaranteedChanges.iterator();
    this.checkpoint = checkpoint;
  }
  
  /**
   * 調用方獲取該狀態並持久化,迭代完畢即爲最後的checkpoint
   */
  /* @Override */
  public String checkpoint() {
    return checkpoint;
  }

  /* @Override */
  public Document nextDocument() throws RepositoryException {
    if (checkpointAndChangeIterator.hasNext()) {
      CheckpointAndChange checkpointAndChange =
        checkpointAndChangeIterator.next();
      //更新checkpoint
      checkpoint = checkpointAndChange.getCheckpoint().toString();
      return checkpointAndChange.getChange().getDocumentHandle().getDocument();
    } else {
      return null;
    }
  }
}

在其構造方法中調用參數CheckpointAndChangeQueue queue的resume方法獲取List<CheckpointAndChange> guaranteedChanges,在其nextDocument()方法中經過迭代獲取CheckpointAndChange checkpointAndChange對象,同時更新checkpoint狀態標識spa

最後獲取與監視器關聯的MonitorCheckpoint對象映射code

//Map<String, MonitorCheckpoint>
      Map<String, MonitorCheckpoint> guaranteesMade =
          checkpointAndChangeQueue.getMonitorRestartPoints();

而後調用監控器管理對象snapshotRepositoryMonitorManager的acceptGuarantees方法,相應的監視器對象接收並確認MonitorCheckpoint對象htm

 /**
   * 監視器管理對象收到CheckpointAndChangeQueue對象反饋,分發給對應的監視器處理MonitorCheckpoint
   */
  /* @Override */
  public void acceptGuarantees(Map<String, MonitorCheckpoint> guarantees) {
    for (Map.Entry<String, MonitorCheckpoint> entry : guarantees.entrySet()) {
      String monitorName = entry.getKey();
      MonitorCheckpoint checkpoint = entry.getValue();
      DocumentSnapshotRepositoryMonitor monitor = fileSystemMonitorsByName.get(monitorName);
      if (monitor != null) {
        // Signal is asynch.  Let monitor figure out how to use.
          //回調
        monitor.acceptGuarantee(checkpoint);
      }
    }
  }

與倉庫對象相對應的具體監視器接收確認對象

/**
   * 監視器收到反饋 [MonitorCheckpoint接收確認]
   * @param cp
   */
  // Public for DocumentSnapshotRepositoryMonitorTest
  @VisibleForTesting
  public void acceptGuarantee(MonitorCheckpoint cp) {
    snapshotStore.acceptGuarantee(cp);
    guaranteeCheckpoint = cp;
  }

倉庫對應的存儲對象處於處理鏈的末端

/**
   * 反饋MonitorCheckpoint處理
   * @param cp
   */
  void acceptGuarantee(MonitorCheckpoint cp) {
    long readSnapshotNumber = cp.getSnapshotNumber();
    if (readSnapshotNumber < 0) {
      throw new IllegalArgumentException("Received invalid snapshot in: " + cp);
    }
    if (oldestSnapshotToKeep > readSnapshotNumber) {
      LOG.warning("Received an older snapshot than " + oldestSnapshotToKeep + ": " + cp);
    } else {
      oldestSnapshotToKeep = readSnapshotNumber;
    }
  }

---------------------------------------------------------------------------

本系列企業搜索引擎開發之鏈接器connector系本人原創

轉載請註明出處 博客園 刺蝟的溫馴

本人郵箱: chenying998179@163#com (#改成.)

本文連接 http://www.cnblogs.com/chenying99/p/3789650.html

相關文章
相關標籤/搜索