QuorumPeerMain
public static void main(String[] args) { // QuorumPeerMain main = new QuorumPeerMain(); try { // 初始化服務端,並運行服務端 // todo 跟進去看他如何處理 服務端的配置文件,以及根據服務端的配置文件作出來那些動做 main.initializeAndRun(args);
跟進initializeAndRun()
方法 , 這個方法中主要作了以下三件事node
args[0]
解析出配置文件的位置,建立QuorumPeerConfig
配置類對象(能夠把這個對象理解成單個ZK server的配置對象),而後將配置文件中的內容加載進內存,並完成對java配置類的屬性的賦值protected void initializeAndRun(String[] args) throws ConfigException, IOException { // todo 這個類是關聯配置文件的類, 咱們在配置文件中輸入的各類配置都是他的屬性 QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { // todo config.parse(args[0]); } // Start and schedule the the purge task // todo 啓動並清除計劃任務 DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config .getDataDir(), config.getDataLogDir(), config .getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); // todo config.servers.size() > 0 說明添加了關於集羣的配置 if (args.length == 1 && config.servers.size() > 0) { // todo 根據配置啓動服務器, 跟進去, 就在下面 runFromConfig(config); } else { // todo 沒添加集羣的配置 LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode"); // there is only server in the quorum -- run as standalone // todo 啓動單機 ZooKeeperServerMain.main(args); } }
下面跟進parse
, 這個方法的目的是將磁盤上的配置信息讀取到文件中,完成對QuorumPeerConfig
的初始化主要作了以下兩件事ios
.properties
結尾的,所以呢選擇了Properties.java
(格式是 key=value)來解析讀取配置文件parseProperties()
方法,對解析出來的配置文件進行進一步的處理public void parse(String path) throws ConfigException { File configFile = new File(path); LOG.info("Reading configuration from: " + configFile); try { if (!configFile.exists()) { throw new IllegalArgumentException(configFile.toString() + " file is missing"); } Properties cfg = new Properties(); FileInputStream in = new FileInputStream(configFile); try { // todo 使用 Properties 按行讀取出配置文件內容 cfg.load(in); } finally { in.close(); } // todo 將按行讀取處理出來的進行分隔處理, 對當前的配置類進行賦值 parseProperties(cfg); } catch (IOException e) { throw new ConfigException("Error processing " + path, e); } catch (IllegalArgumentException e) { throw new ConfigException("Error processing " + path, e); } }
看一看,他是如何處理已經被加載到內存的配置文件的,編程
peerType=observer
,可是這是爲了人們查看方便設計的,換句話說,一個普通的Follower的配置文件,即使是添加上了這條配置文件,它一樣不是observer,後續還會有進一步的檢驗,由於zk集羣的配置文件大同小異,一開始即使是咱們不添加這個配置,observer角色的server依然會成爲observer,可是對於人們來講,就不用點開dataDir中的myid文件查看究竟當前的server是否是Observer了else if (key.startsWith("server."))
標記着配置文件中有關集羣的配置信息開始了,它根據不一樣的配置信息,將不一樣身份的server存放進兩個map中,就像下面那樣,若是是Observer類型的,就存放在observers
中,若是是Follower類型的就添加進servers
map中
QuorumVerifer
時,使用servers
的容量public void parseProperties(Properties zkProp) throws IOException, ConfigException { int clientPort = 0; String clientPortAddress = null; for (Entry<Object, Object> entry : zkProp.entrySet()) { String key = entry.getKey().toString().trim(); String value = entry.getValue().toString().trim(); if (key.equals("dataDir")) { dataDir = value; } else if (key.equals("dataLogDir")) { dataLogDir = value; } else if (key.equals("clientPort")) { clientPort = Integer.parseInt(value); } else if (key.equals("clientPortAddress")) { clientPortAddress = value.trim(); } else if (key.equals("tickTime")) { . . . . } else if (key.equals("peerType")) { if (value.toLowerCase().equals("observer")) { // todo 這是推薦配置作法在 observer 的配置文件中配置上添加 peerType=observer //todo 可是若是給一臺不是observer的機器加上了這個配置, 它也不會是observer. 在這個函數的最後會有校驗 peerType = LearnerType.OBSERVER; } else if (value.toLowerCase().equals("participant")) { peerType = LearnerType.PARTICIPANT; } else { throw new ConfigException("Unrecognised peertype: " + value); } . . . } else if (key.startsWith("server.")) { // todo 所有以server.開頭的配置所有放到了 servers int dot = key.indexOf('.'); long sid = Long.parseLong(key.substring(dot + 1)); String parts[] = splitWithLeadingHostname(value); if ((parts.length != 2) && (parts.length != 3) && (parts.length !=4)) { LOG.error(value + " does not have the form host:port or host:port:port " + " or host:por . . . // todo 不管是普通節點,仍是觀察者節點,都是 QuorumServer, 只不過添加進到不一樣的容器 if (type == LearnerType.OBSERVER){ // todo 若是不是觀察者的話,就不會放在 servers, // todo server.1=localhost:2181:3887 // todo server.2=localhost:2182:3888 // todo server.3=localhost:2183:3889 // todo port是對外提供服務的端口 electionPort是用於選舉的port // todo 查看zk的數據一致性咱們使用的端口是 port observers.put(Long.valueOf(sid), new QuorumServer(sid, hostname, port, electionPort, type)); } else { // todo 其餘的普通節點放在 servers servers.put(Long.valueOf(sid), new QuorumServer(sid, hostname, port, electionPort, type)); } . . . . /* * Default of quorum config is majority */ if(serverGroup.size() > 0){ if(servers.size() != serverGroup.size()) throw new ConfigException("Every server must be in exactly one group"); /* * The deafult weight of a server is 1 */ for(QuorumServer s : servers.values()){ if(!serverWeight.containsKey(s.id)) serverWeight.put(s.id, (long) 1); } /* * Set the quorumVerifier to be QuorumHierarchical */ quorumVerifier = new QuorumHierarchical(numGroups, serverWeight, serverGroup); } else { /* * The default QuorumVerifier is QuorumMaj */ // todo 默認的仲裁方式, 過半機制中,是不包含 observer 的數量的 LOG.info("Defaulting to majority quorums"); quorumVerifier = new QuorumMaj(servers.size()); } // Now add observers to servers, once the quorums have been // figured out // todo 最後仍是將 Observers 添加進了 servers servers.putAll(observers); /** * todo 當時搭建僞集羣時,在每個節點的dataDir文件中都添加進去了一個 myid文件 * 分別在zk、zk二、zk三、的dataDir中新建myid文件, 寫入一個數字, 該數字表示這是第幾號server. * 該數字必須和zoo.cfg文件中的server.X中的X一一對應. * myid的值是zoo.cfg文件裏定義的server.A項A的值, * Zookeeper 啓動時會讀取這個文件,拿到裏面的數據與 zoo.cfg 裏面的配置信息比較從而判斷究竟是那個server,只是一個標識做用。 * */ // todo 找到當前節點的dataDir 下面的 myid文件 File myIdFile = new File(dataDir, "myid"); if (!myIdFile.exists()) { throw new IllegalArgumentException(myIdFile.toString() + " file is missing"); } BufferedReader br = new BufferedReader(new FileReader(myIdFile)); String myIdString; try { // todo 讀取出myid裏面的內容 myIdString = br.readLine(); } finally { br.close(); } try { // todo myid文件中存到的數據就是 配置文件中server.N 中的 N這個數字 serverId = Long.parseLong(myIdString); MDC.put("myid", myIdString); } catch (NumberFormatException e) { throw new IllegalArgumentException("serverid " + myIdString + " is not a number"); } // todo 經過檢查上面的Observers map 中是否存在 serverId, 這個serverId其實就是myid, 對應上了後,就將它的 // Warn about inconsistent peer type LearnerType roleByServersList = observers.containsKey(serverId) ? LearnerType.OBSERVER : LearnerType.PARTICIPANT; if (roleByServersList != peerType) { LOG.warn("Peer type from servers list (" + roleByServersList + ") doesn't match peerType (" + peerType + "). Defaulting to servers list."); peerType = roleByServersList; }
在一開始的QuorumPeerMain.java
類中的Initializer()
方法中,存在以下的邏輯,判斷是單機版本啓動仍是集羣的啓動緩存
if (args.length == 1 && config.servers.size() > 0) { // todo 根據配置啓動服務器, 跟進去, 就在下面 runFromConfig(config); } else { // todo 沒添加集羣的配置 LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode"); // there is only server in the quorum -- run as standalone // todo 啓動單機 ZooKeeperServerMain.main(args); }
若是是單機版本的話,會進入else塊今後構建ZookeeperServerMain
對象, 能夠把這個ZooKeeperServerMain
理解成一個輔助類,通過它,初始化並啓動一個ZooKeeperServer.java的對象服務器
繼續跟進網絡
public static void main(String[] args) { // todo 使用無參的構造方法實例化服務端, 單機模式 ZooKeeperServerMain main = new ZooKeeperServerMain(); try { // todo 跟進去看他如何解析配置文件 main.initializeAndRun(args);
繼續跟進session
protected void initializeAndRun(String[] args) throws ConfigException, IOException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } // todo 這個配置類, 對應着單機模式的配置類 , 裏面的配置信息不多 ServerConfig config = new ServerConfig(); if (args.length == 1) { config.parse(args[0]); } else { // todo 單機版本 config.parse(args); } // todo 讀取配置,啓動單機節點 runFromConfig(config); }
此次再進入這個方法,咱們直接跳過它是若是從配置文件中讀取出配置信息了,而後直接看它的啓動方法app
runFromConfig方法
主要作了以下幾件事框架
ZooKeeperServer
它是單機ZK服務端的實例以下的ZooKeeperServer相關的屬性 private FileTxnSnapLog txnLogFactory = null; private ZKDatabase zkDb; protected RequestProcessor firstProcessor 以及它能夠構建DataTree
ZooKeeperServerShutdownHandler
監控ZkServer關閉狀態的處理器FileTxnSnapLog
文件快照相關的工具類單位時間trickTime
(節點心跳交流的時間)處理事務,快照相關的工具類
public void runFromConfig(ServerConfig config) throws IOException { LOG.info("Starting server"); FileTxnSnapLog txnLog = null; try { // Note that this thread isn't going to be doing anything else, // so rather than spawning another thread, we will just call run() in this thread. // todo 請注意,當前線程不會作其餘任何事情,所以咱們只在當前線程中調用Run方法,而不是開啓新線程 // create a file logger url from the command line args // todo 根據命令中的args 建立一個logger文件 final ZooKeeperServer zkServer = new ZooKeeperServer(); // Registers shutdown handler which will be used to know the server error or shutdown state changes. // todo 註冊一個shutdown handler, 經過他了解server發生的error或者瞭解shutdown 狀態的更改 final CountDownLatch shutdownLatch = new CountDownLatch(1); zkServer.registerServerShutdownHandler( new ZooKeeperServerShutdownHandler(shutdownLatch)); // todo FileTxnSnapLog工具類, 與 文件快照相關 txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(config.dataDir)); txnLog.setServerStats(zkServer.serverStats()); zkServer.setTxnLogFactory(txnLog); zkServer.setTickTime(config.tickTime); zkServer.setMinSessionTimeout(config.minSessionTimeout); zkServer.setMaxSessionTimeout(config.maxSessionTimeout); // todo 建立Server上下文的工廠,工廠方法模式 // todo ServerCnxnFactory是個抽象類,他有不一樣是實現, NIO版本的 Netty版本的 cnxnFactory = ServerCnxnFactory.createFactory(); // todo 創建socket,默認是NIOServerCnxnFactory(是一個線程) cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); // todo 跟進這個方法 cnxnFactory.startup(zkServer);
FileSnap
和FileTxnLog
對象中public FileTxnSnapLog(File dataDir, File snapDir) throws IOException { LOG.debug("Opening datadir:{} snapDir:{}", dataDir, snapDir); // todo 關聯上指定數據文件和日誌文件 // todo 給FileTxnSnapLog賦值 this.dataDir = new File(dataDir, version + VERSION); this.snapDir = new File(snapDir, version + VERSION); if (!this.dataDir.exists()) { ... . . // todo 將這兩個文件封裝進 FileTxnLog 給當前類維護的兩種事務快照( TnxnSnap ) 賦值 txnLog = new FileTxnLog(this.dataDir); snapLog = new FileSnap(this.snapDir);
如上圖,將ServerCnxnFactory.java
的繼承圖,不一樣的上下文工廠的實現能夠建立出不一樣的上下文,經過這個圖能夠看到,netty不只支持傳統的NIO,還有一套Netty的實現,當前我選擇的是原生的實現NIOServerCnxnFactory的實現,那麼由他建立出來的就是NIOServerCnxn
啓動流程以下圖
NIOSocket
在這個方法中建立了ZooKeeperThread
,這個類ZK中設計的線程類,幾乎所有的線程都由此類完成,當前方法中的作法是將建立的Thread賦值給了當前的類的引用,實際上約等於當前類就是線程類,還有須要注意的地方就是雖然進行了初始化,可是並無開啓
此處看到的就是java原生的NIO Socket編程, 當前線程類被設置成守護線程
Thread thread; @Override public void configure(InetSocketAddress addr, int maxcc) throws IOException { configureSaslLogin(); // todo 把當前類做爲線程 thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr); //todo 因此這裏的這個線程是爲了和JVM生命週期綁定,只剩下這個線程時已經沒有意義了,應該關閉掉。 thread.setDaemon(true); maxClientCnxns = maxcc; // todo 看到了NIO原生的代碼,使用打開服務端的 Channel, 綁定端口,設置爲非阻塞,註冊上感興趣的事件是 accept 鏈接事件 this.ss = ServerSocketChannel.open(); ss.socket().setReuseAddress(true); LOG.info("binding to port " + addr); ss.socket().bind(addr); ss.configureBlocking(false); ss.register(selector, SelectionKey.OP_ACCEPT); }
NIOServerCnxn
下面是它的屬性,能夠看到其實這個上下文涵蓋的很全面,甚至服務端的ZK都被他維護着,
NIOServerCnxnFactory factory; final SocketChannel sock; protected final SelectionKey sk; boolean initialized; ByteBuffer lenBuffer = ByteBuffer.allocate(4); ByteBuffer incomingBuffer = lenBuffer; LinkedBlockingQueue<ByteBuffer> outgoingBuffers = new LinkedBlockingQueue<ByteBuffer>(); int sessionTimeout; protected final ZooKeeperServer zkServer;
看完了ZooKeeperServerMain
中runFromConfig
方法中的建立ZKServer,FileTxnSnapLog
等重要對象的邏輯,下面,上下文啓動, 直接點擊去查看這個方法,確定直接進入ServerFactoryCnxn
,咱們選擇的是它的實現了NIOServerCnxnFactory
public void runFromConfig(ServerConfig config) throws IOException { . . . cnxnFactory.startup(zkServer);
下面是NIOServerCnxnFactory
的實現,它作的第一件事就是開啓上面實例化的所說的線程類,這條線程的開啓標記着,服務端今後能夠接收客戶端發送的請求了
這個方法還作了以下三件事
public void startup(ZooKeeperServer zks) throws IOException, InterruptedException { // todo start(); ==> run() 開啓線程 start(); //todo 實如今上面, 到目前爲止服務端已經能夠接受客戶端的請求了 // todo 將ZKS 交給NIOServerCnxnFactory管理,意味着NIOServerCnxnFactory是目前來講,服務端功能最多的對象 setZooKeeperServer(zks); // todo 由於是服務端剛剛啓動,須要從從disk將數據恢復到內存 zks.startdata(); // todo 繼續跟進 zks.startup(); }
跟進startData()方法
, 看到先建立ZKDatabase
,這個對象就是存在於內存中的對象,對磁盤中數據可視化描述
// todo 將數據加載進緩存中 public void startdata() throws IOException, InterruptedException { //check to see if zkDb is not null if (zkDb == null) { // todo 若是沒初始化的話就初始化 zkDb = new ZKDatabase(this.txnLogFactory); } if (!zkDb.isInitialized()) { // todo 恢復數據 loadData(); } }
跟進建立ZKDataBase的邏輯, 最直觀的能夠看見,這個DB維護了DataTree和SnapLog
public ZKDatabase(FileTxnSnapLog snapLog) { // todo 建立了DataTree 數據樹的空對象 dataTree = new DataTree(); sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>(); //todo 用初始化好了的存有關於系統事務日誌將snaplog初始化 this.snapLog = snapLog; }
loaddata()
public void loadData() throws IOException, InterruptedException { // todo zkDatabase 已經初始化了 if(zkDb.isInitialized()){ // todo zxid = 最近的一次znode的事務id setZxid(zkDb.getDataTreeLastProcessedZxid()); } else { //todo zkDB 沒有初始化就使用 zkDb.loadDataBase() , 跟進去看, 他從快照中獲取數據 setZxid(zkDb.loadDataBase()); } // Clean up dead sessions LinkedList<Long> deadSessions = new LinkedList<Long>(); for (Long session : zkDb.getSessions()) { if (zkDb.getSessionWithTimeOuts().get(session) == null) { deadSessions.add(session); } } zkDb.setDataTreeInit(true); for (long session : deadSessions) { // XXX: Is lastProcessedZxid really the best thing to use? killSession(session, zkDb.getDataTreeLastProcessedZxid()); } }
zks.startup();
它的源碼在下面,其中的計時器類也是一個線程類// todo 繼續啓動, 服務端和客戶端創建鏈接後會保留一個session, 其中這個sessiion的生命週期倒計時就在下面的 createSessionTracker(); public synchronized void startup() { if (sessionTracker == null) { // todo 建立session計時器 createSessionTracker(); } // todo 開啓計時器 startSessionTracker(); // todo 設置請求處理器, zookeeper中存在不一樣的請求處理器, 就在下面 setupRequestProcessors(); //todo 是一個爲應用程序、設備、系統等植入管理功能的框架。 //todo JMX能夠跨越一系列異構操做系統平臺、系統體系結構和網絡傳輸協議,靈活的開發無縫集成的系統、網絡和服務管理應用 registerJMX(); // todo 修改狀態 --> running setState(State.RUNNING); // todo 喚醒全部線程, 由於前面有一個線程等待處理器 睡了一秒 notifyAll(); }
着重看一下它的setupRequestProcessors()
添加請求處理器,單機模式下僅僅存在三個處理器,除了最後一個不是線程類以外,其餘兩個都是線程類
protected void setupRequestProcessors() { // todo 下面的三個處理器的第二個參數是在指定 下一個處理器是誰 RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); // todo 在服務端, 數據的處理 socket -> packet -> request -> queue // todo 而後由下面的requestprocessor 鏈 進行下一步處理request // todo 開啓新線程, 服務端接收的客戶端的請求都放在了 隊列中,用處理器異步處理 ((SyncRequestProcessor)syncProcessor).start(); //todo 第一個處理器 , 下一個處理器是 syncProcessor 最後一個處理器 finalProcessor firstProcessor = new PrepRequestProcessor(this, syncProcessor); // todo 開啓新線程 服務端接收的客戶端的請求都放在了 隊列中,用處理器異步處理 ((PrepRequestProcessor)firstProcessor).start(); }
代碼看到這裏,從新調整一下思路接着往下看,首先做爲服務端咱們看到了上面的NIOServerCnxnFactory.java
類中的開啓了本類維護的新線程,讓服務端有了接收新鏈接的能力
既然是線程類,就存有Run方法,ZK的設計思路就是在NIOServerCnxnFactory.java
的run()方法中檢測客戶端有感興趣的事件時,就進入DoIO()
從bytebuffer中將用戶的請求解析出來,而後交由最後面的三個處理器排隊處理
NIOServerCnxnFactory.java
的run方法部分代碼以下
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { // todo 接收數據,這裏會間歇性的接收到客戶端ping NIOServerCnxn c = (NIOServerCnxn) k.attachment(); // todo 跟進去, 和客戶端的那一套很類似了 c.doIO(k); } else {
繼續跟進readPayload()
-->readRequest()
-->zkServer.processPacket(this, incomingBuffer)
, 以下是processPacket()
方法的部分源碼
else { // todo 將上面的信息包裝成 request Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); si.setOwner(ServerCnxn.me); // todo 提交request, 其實就是提交給服務端的 process處理器進行處理 submitRequest(si); }
繼續跟進submitRequest()
,終於能夠看到它嘗試將這個request交給第一個處理器處理,可是由於這是在服務器啓動的過程當中,服務端並不肯定服務器的第一個處理器線程到底有沒有開啓,所以它先驗證,甚至會等一秒,直處處理器線程完成了啓動的邏輯
// todo 交由服務器作出request的處理動做 public void submitRequest(Request si) { // todo 若是 firstProcessor 不存在,就報錯了 if (firstProcessor == null) { synchronized (this) { try { while (state == State.INITIAL) { wait(1000); } } catch (InterruptedException e) { LOG.warn("Unexpected interruption", e); } if (firstProcessor == null || state != State.RUNNING) { throw new RuntimeException("Not started"); } } } try { touch(si.cnxn); // todo 驗證合法性 boolean validpacket = Request.isValid(si.type); if (validpacket) { // todo request合法的化,交給firstProcessor (實際是PrepRequestProcessor)處理 跟進去 firstProcessor.processRequest(si); if (si.cnxn != null) { incInProcess(); }
通過上面的閱讀,不難發現,最終來自於客戶端的request都將會流經服務端的三個處理器,下面就看看它們到底作了哪些事
由於他自己就是線程類,咱們直接看他的run()
,最直接的能夠看到,它將請求交給了pRequest(req)
處理
public void run() { try { while (true) { // todo 取出請求 Request request = submittedRequests.take(); long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK; //todo 處理請求 if (request.type == OpCode.ping) { traceMask = ZooTrace.CLIENT_PING_TRACE_MASK; } if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, traceMask, 'P', request, ""); } if (Request.requestOfDeath == request) { break; } // todo 着重看這裏, 跟進去 pRequest(request); }
下面跟進它的pRequest()
,下面是它的源碼,經過switch分支針對不一樣類型的請求作出不一樣的處理,下面用create類型的請求舉例
protected void pRequest(Request request) throws RequestProcessorException { // LOG.info("Prep>>> cxid = " + request.cxid + " type = " + // request.type + " id = 0x" + Long.toHexString(request.sessionId)); request.hdr = null; request.txn = null; // todo 下面的不一樣類型的信息, 對應這不一樣的處理器方式 try { switch (request.type) { case OpCode.create: // todo 建立每條記錄對應的bean , 如今仍是空的, 在面的pRequest2Txn 完成賦值 CreateRequest createRequest = new CreateRequest(); // todo 跟進這個方法, 再從這個方法出來,往下運行,能夠看到調用了下一個處理器 pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true); break; . . . request.zxid = zks.getZxid(); // todo 調用下一個處理器處理器請求 SyncRequestProcessor nextProcessor.processRequest(request);
總覽思路,如今當前的處理器進行狀態的相關處理,處理完以後移交給下一個處理器
跟進pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);
依然是用create類型距離, 它在下面的方法中作了以下幾件事
CreateRequest
類中outstandingChanges
集合中// todo 第二個參數位置上的 record 是上一步new 出來的空對象--> protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException { // todo 使用request的相關屬性,建立出 事務Header request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type); switch (type) { case OpCode.create: // todo 校驗session的狀況 zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); CreateRequest createRequest = (CreateRequest)record; if(deserialize) // todo 反序列化 ByteBufferInputStream.byteBuffer2Record(request.request, createRequest); // todo 獲取出request中的path String path = createRequest.getPath(); int lastSlash = path.lastIndexOf('/'); if (lastSlash == -1 || path.indexOf('\0') != -1 || failCreate) { LOG.info("Invalid path " + path + " with session 0x" + Long.toHexString(request.sessionId)); throw new KeeperException.BadArgumentsException(path); } // todo 進行權限的驗證 List<ACL> listACL = removeDuplicates(createRequest.getAcl()); if (!fixupACL(request.authInfo, listACL)) { throw new KeeperException.InvalidACLException(path); } // todo 獲取父級路徑 String parentPath = path.substring(0, lastSlash); // todo 跟進這個方法, 跟進父節點的路徑找到 parentRecord ChangeRecord parentRecord = getRecordForPath(parentPath); // todo 校驗 checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo); // todo 取出父節點的C version (子節點的version) int parentCVersion = parentRecord.stat.getCversion(); CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags()); if (createMode.isSequential()) { path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion); } validatePath(path, request.sessionId); try { if (getRecordForPath(path) != null) { throw new KeeperException.NodeExistsException(path); } } catch (KeeperException.NoNodeException e) { // ignore this one } // todo 判斷當前的父節點 是否是臨時節點 boolean ephemeralParent = parentRecord.stat.getEphemeralOwner() != 0; if (ephemeralParent) { // todo 父節點若是是臨時節點, 直接拋異常結束 throw new KeeperException.NoChildrenForEphemeralsException(path); } // todo 父節點不是臨時節點, 將建立的節點的VCersion 就是在父節點的基礎上+1 int newCversion = parentRecord.stat.getCversion()+1; request.txn = new CreateTxn(path, createRequest.getData(), listACL, createMode.isEphemeral(), newCversion); StatPersisted s = new StatPersisted(); if (createMode.isEphemeral()) { s.setEphemeralOwner(request.sessionId); } // todo 修改了父節點的一些元信息 parentRecord = parentRecord.duplicate(request.hdr.getZxid()); parentRecord.childCount++; parentRecord.stat.setCversion(newCversion); //todo 添加兩條修改記錄 addChangeRecord(parentRecord); addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, s, 0, listACL)); break;
一個create請求通過第一個處理器進行狀態相關的處理以後,就來到當前這個第二個處理器, 當前處理器的主要做用就是負責同步持久化,將request持久化到磁盤,人們說的打快照,也就是將DataTree序列化後持久化的工做,他的主要邏輯都在下面的Run方法中
while(true)
保證了做爲線程類的它能夠無休止的一直運行下去if-else
分支進行不一樣的處理
public void run() { try { // todo 寫日誌的初始數量 int logCount = 0; // we do this in an attempt to ensure that not all of the serversin the ensemble take a snapshot at the same time // todo 設置RandRoll的大小, 確保全部服務器在同一個時間不使用同一個快照 setRandRoll(r.nextInt(snapCount / 2)); //todo 這個處理器擁有本身的無限循環 while (true) { // todo 初始請求爲null Request si = null; // todo toFlush是一個LinkedList, 裏面存放着須要 持久化到磁盤中的request if (toFlush.isEmpty()) { // todo 沒有須要刷新進disk的 // todo 這個take()是LinkedList原生的方法 // todo 從請求隊列中取出一個請求,若是隊列爲空就會阻塞在這裏 si = queuedRequests.take(); } else { // todo 若是隊列爲空,直接取出request, 並不會阻塞 si = queuedRequests.poll(); if (si == null) { //todo 刷新進磁盤 flush(toFlush); continue; } } // todo 在關閉處理器以前,會添加requestOfDeadth,表示關閉後再也不接收任何請求 if (si == requestOfDeath) { break; } //todo 成功的從隊列中取出了請求 if (si != null) { // track the number of records written to the log // todo 將request 追加到日誌文件, 只有事物性的請求才會返回true if (zks.getZKDatabase().append(si)) { // todo 剛纔的事物日誌放到請求成功後,添加一次, log數+1 logCount++; // todo 當持久化的request數量 > (快照數/2 +randRoll) 時, 建立新的日誌文件 if (logCount > (snapCount / 2 + randRoll)) { setRandRoll(r.nextInt(snapCount / 2)); // todo roll the log // todo 跟進去這個方法, 最終也會執行 this.logStream.flush(); // todo 新生成一個日誌文件 // todo 調用rollLog函數翻轉日誌文件 zks.getZKDatabase().rollLog(); // todo 拍攝日誌快照 if (snapInProcess != null && snapInProcess.isAlive()) { LOG.warn("Too busy to snap, skipping"); } else { // todo 建立線程處理快照 snapInProcess = new ZooKeeperThread("Snapshot Thread") { public void run() { try { // todo 打快照, 跟進去 zks.takeSnapshot(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } } }; // todo 開啓快照線程 snapInProcess.start(); } // todo 重置爲0 logCount = 0; } } else if (toFlush.isEmpty()) { // todo 若是等待被刷新進disk的request爲空 // optimization for read heavy workloads // iff this is a read, and there are no pending // flushes (writes), then just pass this to the next // processor // todo 查看此時toFlush是否爲空,若是爲空,說明近段時間讀多寫少,直接響應 if (nextProcessor != null) { // todo 最終也會調用 nextProcessor 處理request FinalRequestProcess nextProcessor.processRequest(si); if (nextProcessor instanceof Flushable) { ((Flushable) nextProcessor).flush(); } } continue; } // todo 流裏面的內容不了當即刷新, 調用 toFlush.add(si); 累積request toFlush.add(si); if (toFlush.size() > 1000) { // todo 當toFlush中的 request數量 > 1000 將會flush flush(toFlush); } } }
到底是不是 事務類型的req,是在上面的代碼中的zks.getZKDatabase().append(si)
實現的,true表示屬於事務類型,跟進這個方法,最終回來到FileTxnLog.java
的append()
,源碼以下
代碼是挺長的,可是邏輯也算是請求,以下
continue
沒有一點持久化到磁盤的邏輯if (logStream==null) {
if (logCount > (snapCount / 2 + randRoll))
以後,就會進行一第二天志文件的滾動,說白了,就是如今的日誌文件體積太大了,而後得保存原來的就日誌文件,建立一個新的空的日誌文件繼續使用public synchronized boolean append(TxnHeader hdr, Record txn) throws IOException { if (hdr == null) { return false; } if (hdr.getZxid() <= lastZxidSeen) { LOG.warn("Current zxid " + hdr.getZxid() + " is <= " + lastZxidSeen + " for " + hdr.getType()); } else { lastZxidSeen = hdr.getZxid(); } // todo 第一次來==null。 再執行過來就不進來了,等着在 SyncRequestProcessor中批量處理 // todo logStream == BufferedOutputStream if (logStream==null) { if(LOG.isInfoEnabled()){ LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid())); } // todo 關聯上 咱們指定的logdir位置的日誌文件 logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid())); // todo 包裝進文件輸出流 fos = new FileOutputStream(logFileWrite); logStream=new BufferedOutputStream(fos); oa = BinaryOutputArchive.getArchive(logStream); FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId); fhdr.serialize(oa, "fileheader"); // Make sure that the magic number is written before padding. logStream.flush(); filePadding.setCurrentSize(fos.getChannel().position()); streamsToFlush.add(fos); } filePadding.padFile(fos.getChannel()); byte[] buf = Util.marshallTxnEntry(hdr, txn); if (buf == null || buf.length == 0) { throw new IOException("Faulty serialization for header " + "and txn"); } Checksum crc = makeChecksumAlgorithm(); crc.update(buf, 0, buf.length); oa.writeLong(crc.getValue(), "txnEntryCRC"); Util.writeTxnBytes(oa, buf); return true; }
終於來到了FinalRequestProcessor
處理器,它並非線程類,可是它確實是和前兩個線程類並列的,單機模式下最後一個處理器類
它處理request的邏輯那是至關長我挑着貼在下面,只是關注下面的幾個點,代碼並不完整哦
它的解釋我寫在源碼的下面
public void processRequest(Request request) { ProcessTxnResult rc = null; // 看一看!!!!!!!!! // 看一看!!!!!!!!! // 看一看!!!!!!!!! // 它在消費 outstandingChanges 隊列, 沒錯,這個隊列中對象, 就是第一個個處理器調用addChange()方法添加進去的record // 看一看!!!!!!!!! // 看一看!!!!!!!!! // 看一看!!!!!!!!! synchronized (zks.outstandingChanges) { // todo outstandingChanges不爲空且首個元素的zxid小於等於請求的zxid while (!zks.outstandingChanges.isEmpty() && zks.outstandingChanges.get(0).zxid <= request.zxid) { //todo 移除並返回第一個元素 ChangeRecord cr = zks.outstandingChanges.remove(0); // todo 若是record的zxid < request.zxid 警告 if (cr.zxid < request.zxid) { LOG.warn("Zxid outstanding " + cr.zxid + " is less than current " + request.zxid); } // todo 根據路徑獲得Record並判斷是否爲cr if (zks.outstandingChangesForPath.get(cr.path) == cr) { // 移除cr的路徑對應的記錄 zks.outstandingChangesForPath.remove(cr.path); } } //todo 請求頭不爲空 if (request.hdr != null) { // 獲取請求頭 TxnHeader hdr = request.hdr; // 獲取事務 Record txn = request.txn; // todo 跟進這個方法-----<--!!!!!!-----處理事務的邏輯,在這裏面有向客戶端發送事件的邏輯, 回調客戶端的watcher----!!!!!!--> // todo 在這個方法裏面更新了內存 rc = zks.processTxn(hdr, txn); } // do not add non quorum packets to the queue. // todo 只將quorum包(事務性請求)添加進隊列 if (Request.isQuorum(request.type)) { zks.getZKDatabase().addCommittedProposal(request); } } if (request.cnxn == null) { return; } ServerCnxn cnxn = request.cnxn; String lastOp = "NA"; zks.decInProcess(); Code err = Code.OK; Record rsp = null; boolean closeSession = false; // todo 根據請求頭的不一樣類型進行不一樣的處理 switch (request.type) { //todo PING case OpCode.ping: { //todo 更新延遲 zks.serverStats().updateLatency(request.createTime); lastOp = "PING"; //todo 更新響應的狀態 cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp, request.createTime, Time.currentElapsedTime()); cnxn.sendResponse(new ReplyHeader(-2, zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response"); return; } . . . // todo 若是是create , 在這裏返回給客戶端 結果 case OpCode.create: { lastOp = "CREA"; rsp = new CreateResponse(rc.path); // todo 在下面代碼的最後 返回出去 rsp err = Code.get(rc.err); break; } long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid(); ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue()); zks.serverStats().updateLatency(request.createTime); cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp, request.createTime, Time.currentElapsedTime()); // todo 在這裏將向客戶端返回信息, 跟進去查看就能看到socket相關的內容 cnxn.sendResponse(hdr, rsp, "response");
rc = zks.processTxn(hdr, txn);
cnxn.sendResponse(hdr, rsp, "response");