1、前言數據庫
在分析了PrepRequestProcessor處理器後,接着來分析SyncRequestProcessor,該處理器將請求存入磁盤,其將請求批量的存入磁盤以提升效率,請求在寫入磁盤以前是不會被轉發到下個處理器的。服務器
2、SyncRequestProcessor源碼分析app
2.1 類的繼承關係 dom
public class SyncRequestProcessor extends Thread implements RequestProcessor {}
說明:與PrepRequestProcessor同樣,SyncRequestProcessor也繼承了Thread類並實現了RequestProcessor接口,表示其能夠做爲線程使用。函數
2.2 類的屬性 源碼分析
public class SyncRequestProcessor extends Thread implements RequestProcessor { // 日誌 private static final Logger LOG = LoggerFactory.getLogger(SyncRequestProcessor.class); // Zookeeper服務器 private final ZooKeeperServer zks; // 請求隊列 private final LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>(); // 下個處理器 private final RequestProcessor nextProcessor; // 快照處理線程 private Thread snapInProcess = null; // 是否在運行中 volatile private boolean running; /** * Transactions that have been written and are waiting to be flushed to * disk. Basically this is the list of SyncItems whose callbacks will be * invoked after flush returns successfully. */ // 等待被刷新到磁盤的請求隊列 private final LinkedList<Request> toFlush = new LinkedList<Request>(); // 隨機數生成器 private final Random r = new Random(System.nanoTime()); /** * The number of log entries to log before starting a snapshot */ // 快照個數 private static int snapCount = ZooKeeperServer.getSnapCount(); /** * The number of log entries before rolling the log, number * is chosen randomly */ // 日誌滾動以前記錄的日誌號,編號是隨機選擇的 private static int randRoll; // 結束請求標識 private final Request requestOfDeath = Request.requestOfDeath; }
說明:其中,SyncRequestProcessor維護了ZooKeeperServer實例,其用於獲取ZooKeeper的數據庫和其餘信息;維護了一個處理請求的隊列,其用於存放請求;維護了一個處理快照的線程,用於處理快照;維護了一個running標識,標識SyncRequestProcessor是否在運行;同時還維護了一個等待被刷新到磁盤的請求隊列。this
2.3 類的構造函數 spa
public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) { // 調用父類構造函數 super("SyncThread:" + zks.getServerId()); // 給字段賦值 this.zks = zks; this.nextProcessor = nextProcessor; running = true; }
說明:構造函數首先會調用Thread類的構造函數,而後根據構造函數參數給類的屬性賦值,其中會肯定下個處理器,並會設置該處理器正在運行的標識。.net
2.4 類的核心函數分析線程
1. run函數
public void run() { try { // 寫日誌數量初始化爲0 int logCount = 0; // we do this in an attempt to ensure that not all of the servers // in the ensemble take a snapshot at the same time // 確保全部的服務器在同一時間不是使用的同一個快照 setRandRoll(r.nextInt(snapCount/2)); while (true) { // // 初始請求爲null Request si = null; if (toFlush.isEmpty()) { // 沒有須要刷新到磁盤的請求 // 從請求隊列中取出一個請求,若隊列爲空會阻塞 si = queuedRequests.take(); } else { // 隊列不爲空,即有須要刷新到磁盤的請求 // 從請求隊列中取出一個請求,若隊列爲空,則返回空,不會阻塞 si = queuedRequests.poll(); if (si == null) { // 取出的請求爲空 // 刷新到磁盤 flush(toFlush); // 跳事後面的處理 continue; } } if (si == requestOfDeath) { // 在關閉處理器以後,會添加requestOfDeath,表示關閉後再也不處理請求 break; } if (si != null) { // 請求不爲空 // track the number of records written to the log if (zks.getZKDatabase().append(si)) { // 將請求添加至日誌文件,只有事務性請求才會返回true // 寫入一條日誌,logCount加1 logCount++; if (logCount > (snapCount / 2 + randRoll)) { // 知足roll the log的條件 randRoll = r.nextInt(snapCount/2); // roll the log zks.getZKDatabase().rollLog(); // take a snapshot if (snapInProcess != null && snapInProcess.isAlive()) { // 正在進行快照 LOG.warn("Too busy to snap, skipping"); } else { // 未被處理 snapInProcess = new Thread("Snapshot Thread") { // 建立線程來處理快照 public void run() { try { // 進行快照 zks.takeSnapshot(); } catch(Exception e) { LOG.warn("Unexpected exception", e); } } }; // 開始處理 snapInProcess.start(); } // 重置爲0 logCount = 0; } } else if (toFlush.isEmpty()) { // 等待被刷新到磁盤的請求隊列爲空 // optimization for read heavy workloads // iff this is a read, and there are no pending // flushes (writes), then just pass this to the next // processor // 查看此時toFlush是否爲空,若是爲空,說明近段時間讀多寫少,直接響應 if (nextProcessor != null) { // 下個處理器不爲空 // 下個處理器開始處理請求 nextProcessor.processRequest(si); if (nextProcessor instanceof Flushable) { // 處理器是Flushable的 // 刷新到磁盤 ((Flushable)nextProcessor).flush(); } } // 跳事後續處理 continue; } // 將請求添加至被刷新至磁盤隊列 toFlush.add(si); if (toFlush.size() > 1000) { // 隊列大小大於1000,直接刷新到磁盤 flush(toFlush); } } } } catch (Throwable t) { // 出現異常 LOG.error("Severe unrecoverable error, exiting", t); // 設置運行標識爲false,表示該處理器再也不運行 running = false; // 退出程序 System.exit(11); } LOG.info("SyncRequestProcessor exited!"); }
說明:該函數是整個處理器的核心,其邏輯大體以下
(1) 設置randRoll大小,確保全部的服務器在同一時間不是使用的同一個快照。
(2) 判斷toFlush隊列是否爲空,如果,則表示沒有須要刷新到磁盤的請求,進入(3),若否,進入(4)。
(3) 從queuedRequests中取出一個請求,進入(6)。
(4) 從queuedRequests中取出一個請求,判斷該請求是否爲null,如果,則進入(5),若否,則進入(6)。
(5) 調用flush函數,將toFlush中的請求刷新到磁盤,跳過以後的處理部分。
(6) 判斷請求是不是結束請求(在調用shutdown以後,會在隊列中添加一個requestOfDeath)。如果,則退出,不然,進入(7)。
(7) 判斷請求是否爲null,若否,則進入(8),不然進入(2)。
(8) 若寫入日誌成功,返回true(表示爲事務性請求),進入(9),不然進入(18)。
(9) logCount加1,並判斷是否大於了閾值,如果,則進入(10),不然進入(18)。
(10) 調用rollLog函數翻轉日誌文件。
(11) 判斷snapInProcess是否爲空而且是否存活,如果,則輸出日誌,不然,進入(12)。
(12) 建立snapInProcess線程並啓動。
(13) 重置logCount爲0。
(14) 判斷toFlush隊列是否爲空,如果,進入(15)。
(15) 判斷nextProcessor是否爲空,若否,則使用nextProcessor處理請求,不然進入(16)。
(16) 判斷nextProcessor是不是Flushable的,如果,則調用flush函數刷新請求至磁盤,不然進入(17)
(17) 跳過以後的處理步驟。
(18) 將請求添加至toFlush隊列。
(19) 若toFlush隊列大小大於1000,則刷新至磁盤,進入(2)。
其中會調用flush函數,其源碼以下
// 刷新到磁盤 private void flush(LinkedList<Request> toFlush) throws IOException, RequestProcessorException { if (toFlush.isEmpty()) // 隊列爲空,返回 return; // 提交至ZK數據庫 zks.getZKDatabase().commit(); while (!toFlush.isEmpty()) { // 隊列不爲空 // 從隊列移除請求 Request i = toFlush.remove(); if (nextProcessor != null) { // 下個處理器不爲空 // 下個處理器開始處理請求 nextProcessor.processRequest(i); } } if (nextProcessor != null && nextProcessor instanceof Flushable) { // 下個處理器不爲空而且是Flushable的 // 刷新到磁盤 ((Flushable)nextProcessor).flush(); } }
說明:該函數主要用於將toFlush隊列中的請求刷新到磁盤中。
2. shutdown函數
public void shutdown() { LOG.info("Shutting down"); // 添加結束請求請求至隊列 queuedRequests.add(requestOfDeath); try { if(running){ // 還在運行 // 等待該線程終止 this.join(); } if (!toFlush.isEmpty()) { // 隊列不爲空 // 刷新到磁盤 flush(toFlush); } } catch(InterruptedException e) { LOG.warn("Interrupted while wating for " + this + " to finish"); } catch (IOException e) { LOG.warn("Got IO exception during shutdown"); } catch (RequestProcessorException e) { LOG.warn("Got request processor exception during shutdown"); } if (nextProcessor != null) { nextProcessor.shutdown(); } }
說明:該函數用於關閉SyncRequestProcessor處理器,其首先會在queuedRequests隊列中添加一個結束請求,而後再判斷SyncRequestProcessor是否還在運行,如果,則會等待其結束;以後判斷toFlush隊列是否爲空,若不爲空,則刷新到磁盤中。
3、總結
本篇講解了SyncRequestProcessor的工做原理,其主要做用包含將事務性請求刷新到磁盤,而且對請求進行快照處理。也謝謝各位園友的觀看~
參考連接:http://blog.csdn.net/pwlazy/article/details/8137121