SyncRequestProcessor,該處理器將請求存入磁盤,其將請求批量的存入磁盤以提升效率,請求在寫入磁盤以前是不會被轉發到下個處理器的。數據庫
SyncRequestProcessor維護了ZooKeeperServer實例,其用於獲取ZooKeeper的數據庫和其餘信息;維護了一個處理請求的隊列,其用於存放請求;維護了一個處理快照的線程,用於處理快照;維護了一個running標識,標識SyncRequestProcessor是否在運行;同時還維護了一個等待被刷新到磁盤的請求隊列。服務器
// Zookeeper服務器 private final ZooKeeperServer zks; // 請求隊列 private final LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>(); // 下個處理器 private final RequestProcessor nextProcessor; // 快照處理線程 private Thread snapInProcess = null; // 是否在運行中 volatile private boolean running; /** * Transactions that have been written and are waiting to be flushed to * disk. Basically this is the list of SyncItems whose callbacks will be * invoked after flush returns successfully. */ // 等待被刷新到磁盤的請求隊列 private final LinkedList<Request> toFlush = new LinkedList<Request>(); // 隨機數生成器 private final Random r = new Random(); /** * The number of log entries to log before starting a snapshot */ // 快照個數 private static int snapCount = ZooKeeperServer.getSnapCount(); // 結束請求標識 private final Request requestOfDeath = Request.requestOfDeath;
構造函數首先會調用父類的構造函數,而後根據構造函數參數給類的屬性賦值,其中會肯定下個處理器,並會設置該處理器正在運行的標識。app
public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) { super("SyncThread:" + zks.getServerId(), zks .getZooKeeperServerListener()); this.zks = zks; this.nextProcessor = nextProcessor; running = true; }
@Override public void run() { try { // 寫日誌數量初始化爲0 int logCount = 0; // we do this in an attempt to ensure that not all of the servers // in the ensemble take a snapshot at the same time // 防止集羣中全部機器在同一時刻進行數據快照,對是否進行數據快照增長隨機因素 int randRoll = r.nextInt(snapCount/2); while (true) { Request si = null; // 沒有須要刷新到磁盤的請求 if (toFlush.isEmpty()) { // 從請求隊列中取出一個請求,若queuedRequests隊列爲空會阻塞 si = queuedRequests.take(); } else { // 從請求隊列中取出一個請求,若queuedRequests隊列爲空,則返回空,不會阻塞 si = queuedRequests.poll(); // 取出的請求爲空 if (si == null) { // 刷新數據磁盤 flush(toFlush); continue; } } // 在關閉處理器以後,會添加requestOfDeath請求到queuedRequests隊列,表示關閉後再也不處理請求 if (si == requestOfDeath) { break; } // 請求不爲空,處理請求 if (si != null) { // track the number of records written to the log // 將寫請求添加至事務日誌文件 FileTxnSnapLog.append(si) if (zks.getZKDatabase().append(si)) { // 日誌寫入,logCount加1 logCount++; //肯定是否須要進行數據快照 if (logCount > (snapCount / 2 + randRoll)) { randRoll = r.nextInt(snapCount/2); // roll the log // 滾動日誌,從當前日誌文件滾到下一個日誌文件,不是回滾 zks.getZKDatabase().rollLog(); // take a snapshot if (snapInProcess != null && snapInProcess.isAlive()) { // 正在進行快照 LOG.warn("Too busy to snap, skipping"); } else { // 建立線程來處理快照 snapInProcess = new ZooKeeperThread("Snapshot Thread") { public void run() { try { // 進行快照 zks.takeSnapshot(); } catch(Exception e) { LOG.warn("Unexpected exception", e); } } }; // 開始快照線程處理 snapInProcess.start(); } // 重置爲0 logCount = 0; } } else if (toFlush.isEmpty()) {// 讀請求會走到這裏,查看此時toFlush是否爲空,若是爲空,說明近段時間讀多寫少,直接響應 // optimization for read heavy workloads // iff this is a read, and there are no pending // flushes (writes), then just pass this to the next // processor if (nextProcessor != null) { // 下個處理器開始處理請求 nextProcessor.proce***equest(si); // 處理器是Flushable的,刷新數據到磁盤 if (nextProcessor instanceof Flushable) { ((Flushable)nextProcessor).flush(); } } continue; } // 將請求添加至被刷新至磁盤隊列 toFlush.add(si); if (toFlush.size() > 1000) {// 隊列大小大於1000,直接刷新到磁盤 flush(toFlush); } } } } catch (Throwable t) { handleException(this.getName(), t); } finally{ running = false; } LOG.info("SyncRequestProcessor exited!"); }
flush將toFlush隊列中的請求刷新到磁盤中。dom
private void flush(LinkedList<Request> toFlush) throws IOException, RequestProcessorException { if (toFlush.isEmpty()) return; // 提交事務至ZK數據庫 zks.getZKDatabase().commit(); while (!toFlush.isEmpty()) { // 從隊列移除請求 Request i = toFlush.remove(); // 下個處理器開始處理請求 if (nextProcessor != null) { nextProcessor.proce***equest(i); } } if (nextProcessor != null && nextProcessor instanceof Flushable) { ((Flushable)nextProcessor).flush(); } }
函數用於關閉SyncRequestProcessor處理器,其首先會在queuedRequests隊列中添加一個結束請求requestOfDeath,而後再判斷SyncRequestProcessor是否還在運行,如果,則會等待其結束;以後判斷toFlush隊列是否爲空,若不爲空,則刷新到磁盤中ide
public void shutdown() { LOG.info("Shutting down"); // 添加結束請求請求至隊列 queuedRequests.add(requestOfDeath); try { // 還在運行 if(running){ this.join();// 等待該線程終止 } if (!toFlush.isEmpty()) {// 隊列不爲空,刷新到磁盤 flush(toFlush); } } catch(InterruptedException e) { LOG.warn("Interrupted while wating for " + this + " to finish"); } catch (IOException e) { LOG.warn("Got IO exception during shutdown"); } catch (RequestProcessorException e) { LOG.warn("Got request processor exception during shutdown"); } if (nextProcessor != null) { nextProcessor.shutdown(); } }