LearnerZooKeeperServer是全部Follower和Observer的父類,在LearnerZooKeeperServer裏有2個重要的屬性:
//提交請求處理器
protected CommitProcessor commitProcessor;
//同步處理器
protected SyncRequestProcessor syncProcessor;服務器
FollowerZooKeeperServer和ObserverZooKeeperServer都繼承了LearnerZooKeeperServer服務器。異步
//待同步的請求 ConcurrentLinkedQueue<Request> pendingSyncs; //待處理的事務請求 LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<Request>();
構建請求處理鏈,FollowerZooKeeperServer的請求處理鏈是:
FollowerRequestProcessor -> CommitProcessor ->FinalRequestProcessoride
@Override protected void setupRequestProcessors() { //最後的處理器 RequestProcessor finalProcessor = new FinalRequestProcessor(this); //第二個處理器 commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); commitProcessor.start(); //第一個請求處理器FollowerRequestProcessor firstProcessor = new FollowerRequestProcessor(this, commitProcessor); ((FollowerRequestProcessor) firstProcessor).start(); syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor((Learner)getFollower())); syncProcessor.start(); }
該函數將請求進行記錄(放入到對應的隊列中),等待處理。函數
public void logRequest(TxnHeader hdr, Record txn) { Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid()); //zxid不等於0,說明此服務器已經處理過請求 if ((request.zxid & 0xffffffffL) != 0) { // 將該請求放入pendingTxns中,等待事務處理 pendingTxns.add(request); } // 使用SyncRequestProcessor處理請求(其會將請求放在隊列中,異步進行處理) syncProcessor.proce***equest(request); }
函數會提交zxid對應的請求(pendingTxns的隊首元素),其首先會判斷隊首請求對應的zxid是否爲傳入的zxid,而後再進行移除和提交(放在committedRequests隊列中)。性能
public void commit(long zxid) { // 沒有還在等待處理的事務 if (pendingTxns.size() == 0) { LOG.warn("Committing " + Long.toHexString(zxid) + " without seeing txn"); return; } // 隊首元素的zxid long firstElementZxid = pendingTxns.element().zxid; // 若是隊首元素的zxid不等於須要提交的zxid,則退出程序 if (firstElementZxid != zxid) { LOG.error("Committing zxid 0x" + Long.toHexString(zxid) + " but next pending txn 0x" + Long.toHexString(firstElementZxid)); System.exit(12); } // 從待處理事務請求隊列中移除隊首請求 Request request = pendingTxns.remove(); // 提交該請求 commitProcessor.commit(request); }
// 同步處理器是否可用,系統參數控制 private boolean syncRequestProcessorEnabled = this.self.getSyncEnabled(); // 待同步請求隊列 ConcurrentLinkedQueue<Request> pendingSyncs = new ConcurrentLinkedQueue<Request>();
構建請求處理鏈,ObserverZooKeeperServer的請求處理鏈是:ObserverRequestProcessor->CommitProcessor->FinalRequestProcessor,可能會存在SyncRequestProcessor。ui
@Override protected void setupRequestProcessors() { // We might consider changing the processor behaviour of // Observers to, for example, remove the disk sync requirements. // Currently, they behave almost exactly the same as followers. RequestProcessor finalProcessor = new FinalRequestProcessor(this); commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); commitProcessor.start(); firstProcessor = new ObserverRequestProcessor(this, commitProcessor); ((ObserverRequestProcessor) firstProcessor).start(); /* * Observer should write to disk, so that the it won't request * too old txn from the leader which may lead to getting an entire * snapshot. * * However, this may degrade performance as it has to write to disk * and do periodic snapshot which may double the memory requirements */ //是否使用同步處理器,看系統參數配置,會影響性能 if (syncRequestProcessorEnabled) { syncProcessor = new SyncRequestProcessor(this, null); syncProcessor.start(); } }
同步處理器可用,則使用同步處理器進行處理(放入同步處理器的queuedRequests隊列中),而後提交請求(放入提交請求處理器的committedRequests隊列中)this
public void commitRequest(Request request) { if (syncRequestProcessorEnabled) { // Write to txnlog and take periodic snapshot //寫事務日誌,並按期快照 syncProcessor.proce***equest(request); } commitProcessor.commit(request); }