【Zookeeper】源碼分析之服務器(五)之ObserverZooKeeperServer

1、前言函數

  前面分析了FollowerZooKeeperServer,接着分析ObserverZooKeeperServer。源碼分析

2、ObserverZooKeeperServer源碼分析ui

  2.1 類的繼承關係 this

public class ObserverZooKeeperServer extends LearnerZooKeeperServer {}

  說明:ObserverZooKeeperServer也繼承了LearnerZooKeeperServer抽象類,角色爲Observer,其請求處理鏈爲ObserverRequestProcessor -> CommitProcessor -> FinalRequestProcessor。可能會存在SyncRequestProcessor。spa

  2.2 類的屬性  日誌

public class ObserverZooKeeperServer extends LearnerZooKeeperServer {
    // 日誌
    private static final Logger LOG =
        LoggerFactory.getLogger(ObserverZooKeeperServer.class);        
    
    /**
     * Enable since request processor for writing txnlog to disk and
     * take periodic snapshot. Default is ON.
     */
    // 同步處理器是否可用
    private boolean syncRequestProcessorEnabled = this.self.getSyncEnabled();
    
    /*
     * Request processors
     */
    // 提交請求處理器
    private CommitProcessor commitProcessor;
    // 同步請求處理器
    private SyncRequestProcessor syncProcessor;
    
    /*
     * Pending sync requests
     */
    // 待同步請求隊列
    ConcurrentLinkedQueue<Request> pendingSyncs = 
        new ConcurrentLinkedQueue<Request>();
}

  說明:該類維護了提交請求處理器和同步請求處理器,同時維護一個待同步請求的隊列,是否使用同步請求處理器要根據其標誌而定。code

  2.3 類的構造函數  server

    ObserverZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self,
            DataTreeBuilder treeBuilder, ZKDatabase zkDb) throws IOException {
        // 父類構造函數
        super(logFactory, self.tickTime, self.minSessionTimeout,
                self.maxSessionTimeout, treeBuilder, zkDb, self);
        LOG.info("syncEnabled =" + syncRequestProcessorEnabled);
    }

  說明:其會調用父類構造函數進行初始化操做,同時可肯定此時同步請求處理器是否可用。blog

  2.4 核心函數分析繼承

  1. commitRequest函數  

    public void commitRequest(Request request) {     
        if (syncRequestProcessorEnabled) { // 同步處理器可用
            // Write to txnlog and take periodic snapshot
            // 使用同步處理器處理請求
            syncProcessor.processRequest(request);
        }
        // 提交請求
        commitProcessor.commit(request);        
    }

  說明:若同步處理器可用,則使用同步處理器進行處理(放入同步處理器的queuedRequests隊列中),而後提交請求(放入提交請求處理器的committedRequests隊列中)。

  2. sync函數 

    synchronized public void sync(){
        if(pendingSyncs.size() ==0){ // 沒有未完成的同步請求
            LOG.warn("Not expecting a sync.");
            return;
        }
        // 移除隊首元素        
        Request r = pendingSyncs.remove();
        // 提交請求
        commitProcessor.commit(r);
    }

  說明:若還有未完成的同步請求,則移除該請求,而且進行提交。

3、總結

  本篇博文分析了ObserverZooKeeperServer的源碼,其核心也是請求處理鏈對於請求的處理。至此,ZooKeeper的源碼分析就告一段落了,其中之分析了部分源碼,還有不少的沒有分析到,以後在使用過程當中遇到則再進行分析,也謝謝各位園友的觀看~

相關文章
相關標籤/搜索