Zookeeper源碼分析(三) ----- 單機模式(standalone)運行

zookeeper源碼分析系列文章:html

原創博客,純手敲,轉載請註明出處,謝謝!java

1、Zookeeper單機啓動原理

Zookeeper屬於C/S架構,也就是傳統的客戶端-服務器模式,客戶端發送請求,服務器響應請求。這和高性能網絡框架Netty是同樣的,所以咱們也能夠猜測到它的啓動方式無非就是從main()方法開始,客戶端和服務器各有一個main()方法。算法

那咱們先來看看Zookeeper服務器端的啓動過程,當你打開Zookeeper目錄下/bin目錄中zkServer.cmd文件你就會發現,其實Zookeeper的啓動入口爲org.apache.zookeeper.server.quorum.QuorumPeerMain類的main方法,不管你是單機模式啓動Zookeeper仍是複製模式啓動Zookeeper,執行入口都是這個類,至於如何區別是哪一種模式啓動,該類會根據你配置文件的配置進行判斷,具體的判斷接下來將會詳細講解。apache

zkServer.cmd詳細源代碼:緩存

setlocal
call "%~dp0zkEnv.cmd"

set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain // 設置主類入口
echo on
call %JAVA% "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.root.logger=%ZOO_LOG4J_PROP%" -cp "%CLASSPATH%"  %ZOOMAIN% "%ZOOCFG%" %*  // 執行該類的main()方法

endlocal
複製代碼

下面先看看單機啓動時序圖:bash

一、首先執行main方法 二、解析傳進來的配置文件路徑,默認會去${baseDir}\conf\zoo.cfg找配置文件 三、建立NIOServerCnxnFactory進行監聽客戶端的鏈接請求,在Zookeeper中有兩種ServerCnxnFactory,一種是NIOServerCnxnFactory,另外一種是NettyServerCnxnFactory,前者爲默認工廠,後者除非你在啓動main方法時指定Systemzookeeper.serverCnxnFactory屬性值爲NettyServerCnxnFactory服務器

下面將詳細深刻源碼分析各個階段是如何實現以及工做的。網絡

2、Zookeeper單機模式(standalone)啓動

  • 一、Zookeeper是如何解析配置文件的?

zk的屬性配置分爲兩種:session

一、Java System property:Java系統環境變量,也就是System.setProperty()設置的參數架構

二、No Java system property配置文件屬性,也就是你在配置文件中配置的屬性

配置文件的解析原理很簡單,無非就是解析一些.properties文件中的鍵值對,其實Java已經提供了Properties類來表明.properties文件中全部鍵值對集合,咱們可使用Properties對象的load()方法將一個配置文件裝載進內存,而後對該對象進行遍歷就獲得咱們鎖配置的屬性值集合了。

說到Zookeeper中的配置文件解析,原理也和上面差很少,只不過是在變量鍵值對的時候多了一些Zookeeper自身的邏輯判斷。ZooKeeper中的配置文件解析從QuorumPeerConfig類的parse()方法提及,源代碼以下:

/**
 * Parse a ZooKeeper configuration file 解析一個配置文件
 * @param path the patch of the configuration file
 * @throws ConfigException error processing configuration
 */
public void parse(String path) throws ConfigException {
    File configFile = new File(path);
    
    LOG.info("Reading configuration from: " + configFile);
    
    try {
    	if (!configFile.exists()) {
    		throw new IllegalArgumentException(configFile.toString() + " file is missing");
    	}
        // 聲明一個Properties對象
    	Properties cfg = new Properties();
    	FileInputStream in = new FileInputStream(configFile);
    	try {
    	    // 傳入一個配置文件輸入流便可裝載全部配置
    		cfg.load(in);
    	} finally {
    	    // 涉及到流的操做記得最後將流關閉
    		in.close();
    	}
        // 此處是zk自身的邏輯處理
    	parseProperties(cfg);
    } catch (IOException e) {
    	throw new ConfigException("Error processing " + path, e);
    } catch (IllegalArgumentException e) {
    	throw new ConfigException("Error processing " + path, e);
    }
}
複製代碼

接下來咱們來看看上面的parseProperties(cfg)方法,該方法太長了,硬着頭皮啃完:

/**
 * Parse config from a Properties.
 * @param zkProp Properties to parse from.
 * @throws IOException
 * @throws ConfigException
 */
public void parseProperties(Properties zkProp) throws IOException, ConfigException {
    int clientPort = 0;
    String clientPortAddress = null;
    // 遍歷全部的key-value鍵值對
    for (Entry<Object, Object> entry : zkProp.entrySet()) {
        // 注意這裏要把首尾空格去掉
        String key = entry.getKey().toString().trim();
        String value = entry.getValue().toString().trim();
        // 存儲快照文件snapshot的目錄配置
        if (key.equals("dataDir")) {
        	dataDir = value;
        	// 事務日誌存儲目錄
        } else if (key.equals("dataLogDir")) {
        	dataLogDir = value;
        	// 客戶端鏈接server的端口,zk啓動總得有個端口吧!若是你沒有配置,則會報錯!通常咱們會將端口配置爲2181
        } else if (key.equals("clientPort")) {
        	clientPort = Integer.parseInt(value);
        	// 服務器IP地址
        } else if (key.equals("clientPortAddress")) {
        	clientPortAddress = value.trim();
        	// zk中的基本事件單位,用於心跳和session最小過時時間爲2*tickTime
        } else if (key.equals("tickTime")) {
        	tickTime = Integer.parseInt(value);
        	// 客戶端併發鏈接數量,注意是一個客戶端跟一臺服務器的併發鏈接數量,也就是說,假設值爲3,那麼某個客戶端不能同時併發鏈接3次到同一臺服務器(併發嘛!),不然會出現下面錯誤too many connections from /127.0.0.1 - max is 3
        } else if (key.equals("maxClientCnxns")) {
        	maxClientCnxns = Integer.parseInt(value);
        } else if (key.equals("minSessionTimeout")) {
        	minSessionTimeout = Integer.parseInt(value);
        } else if (key.equals("maxSessionTimeout")) {
        	maxSessionTimeout = Integer.parseInt(value);
        } else if (key.equals("initLimit")) {
        	initLimit = Integer.parseInt(value);
        } else if (key.equals("syncLimit")) {
        	syncLimit = Integer.parseInt(value);
        } else if (key.equals("electionAlg")) {
        	electionAlg = Integer.parseInt(value);
        } else if (key.equals("quorumListenOnAllIPs")) {
        	quorumListenOnAllIPs = Boolean.parseBoolean(value);
        } else if (key.equals("peerType")) {
            if (value.toLowerCase().equals("observer")) {
        	peerType = LearnerType.OBSERVER;
            } else if (value.toLowerCase().equals("participant")) {
        	peerType = LearnerType.PARTICIPANT;
            } else {
        	throw new ConfigException("Unrecognised peertype: " + value);
        }
	......
複製代碼

下面對解析的全部配置項用表格總結下: 全部的配置項均可以在官網查詢到。

下面咱們一塊兒看看Zookkeeper的配置文件屬性:

配置項 說明 異常狀況 是否報錯? 錯誤 or 備註
clientPort 服務端server監聽客戶端鏈接的端口 不配置 clientPort is not set
clientPortAddress 客戶端鏈接的服務器ip地址 不配置 默認使用網卡的地址
dataDir 數據快照目錄 不配置 dataDir is not set
dataLogDir 事務日誌存放目錄 不配置 默認跟dataDir目錄相同
tickTime ZK基本時間單元(毫秒),用於心跳和超時.minSessionTimeout默認是兩倍ticket 不配置 tickTime is not set
maxClientCnxns 同一ip地址最大併發鏈接數(也就是說同一個ip最多能夠同時維持與服務器連接的個數) 不配置 默認最大鏈接數爲60,設置爲0則無限制
minSessionTimeout 最小會話超時時間,默認2*ticket 不配置 否,若minSessionTimeout > maxSessionTimeout,則報錯 minSessionTimeout must not be larger than maxSessionTimeout
maxSessionTimeout 最大會話超時時間,默認20*ticket 不配置 不能小於minSessionTimeout
initLimit 容許follower同步和鏈接到leader的時間總量,以ticket爲單位 不配置 initLimit is not set,若是zk管理的數據量特別大,則辭職應該調大
syncLimit followerleader之間同步的世間量 不配置 syncLimit is not set
electionAlg zk選舉算法選擇,默認值爲3,表示採用快速選舉算法 不配置 若是沒有配置選舉地址server,則拋Missing election port for server: serverid
quorumListenOnAllIPs 當設置爲true時,ZooKeeper服務器將偵聽來自全部可用IP地址的對等端的鏈接,而不只僅是在配置文件的服務器列表中配置的地址(即集羣中配置的server.1,server.2。。。。)。 它會影響處理ZAB協議和Fast Leader Election協議的鏈接。 默認值爲false 不配置
peerType 服務器的角色,是觀察者observer仍是參與選舉或成爲leader,默認爲PARTICIPANT 不配置 若配置了不知支持的角色,則報Unrecognised peertype:
autopurge.snapRetainCount 數據快照保留個數,默認是3,最小也是3 不配置
autopurge.purgeInterval 執行日誌、快照清除任務的時間間隔(小時) 不配置 默認是 0
server.x=[hostname]:nnnnn[:nnnnn] 集羣服務器配置 不配置 單機:否;集羣:是 zk集羣啓動將加載該該配置,每臺zk服務器必須有一個myid文件,裏邊存放服務器的id,該id值必須匹配server.x中的x ; 第一個端口表示與leader鏈接的端口,第二個端口表示用於選舉的端口,第二個端口是可選的
  • 二、Zookeeper是如何判斷何種模式啓動服務器的?

由於ZookeeperZkServer.cmd啓動文件指定的統一入口爲org.apache.zookeeper.server.quorum.QuorumPeerMain,那麼咱們就要問了,那ZK是怎麼判斷我要單機模式啓動仍是集羣方式啓動呢?答案是明顯的,也就是取決於你在配置文件zoo.cfg中是否有配置server.x=hostname:port1:port2,以上的配置項代表咱們想讓ZK以集羣模式運行,那麼在代碼中是如何體現的呢?

上面講到ZK解析配置文件的原理,咱們依舊走進parseProperties()方法,看看以下代碼:

.....
// 此處解析配置文件以server.開頭的配置
} else if (key.startsWith("server.")) {
    // server.3
    int dot = key.indexOf('.');
    long sid = Long.parseLong(key.substring(dot + 1));
    String parts[] = splitWithLeadingHostname(value);
    if ((parts.length != 2) && (parts.length != 3) && (parts.length != 4)) {
    LOG.error(value + " does not have the form host:port or host:port:port "
    		+ " or host:port:port:type");
}
    LearnerType type = null;
    String hostname = parts[0];
    Integer port = Integer.parseInt(parts[1]);
    Integer electionPort = null;
    if (parts.length > 2) {
    electionPort = Integer.parseInt(parts[2]);
    }
if (parts.length > 3) {
    if (parts[3].toLowerCase().equals("observer")) {
    	type = LearnerType.OBSERVER;
    } else if (parts[3].toLowerCase().equals("participant")) {
    	type = LearnerType.PARTICIPANT;
    } else {
    	throw new ConfigException("Unrecognised peertype: " + value);
    }
}
if (type == LearnerType.OBSERVER) {
    observers.put(Long.valueOf(sid),
    		new QuorumServer(sid, hostname, port, electionPort, type));
    } else {
    // 若是配置了,那麼就加進servers中,其中servers是一個本地緩存Map,用於存儲配置的ip地址
    servers.put(Long.valueOf(sid), new QuorumServer(sid, hostname, port, electionPort, type));
}
複製代碼

若是配置了,那麼serverssize>0,解析完成以後,回到QuorumPeerMaininitializeAndRun()方法:

// 若是servers長度大於0,則集羣方式啓動,不然,單機啓動
 if (args.length == 1 && config.servers.size() > 0) {
        runFromConfig(config);
    } else {
        LOG.warn("Either no config or no quorum defined in config, running "
                + " in standalone mode");
        // there is only server in the quorum -- run as standalone
        ZooKeeperServerMain.main(args);
    }
複製代碼

從上面能夠看出,單機啓動的入口爲ZooKeeperServerMain類,而統一的入口類爲QuorumPeerMain,因此,在ZK中,服務器端的啓動類就只有這兩個了。

  • 三、單機模式下,Zookeeper是如何處理客戶端請求的?

不管是哪一種方式啓動Zookeeper,它都必須對客戶端的請求進行處理,那麼ZK是如何處理客戶端請求的呢?讓咱們一塊兒來看看源碼是怎麼寫的!

上面說到,Zk單機啓動的入口類爲ZooKeeperServerMain,咱們一塊兒看下其runFromConfig()方法:

/**
 * Run from a ServerConfig.
 * @param config ServerConfig to use.
 * @throws IOException
 */
public void runFromConfig(ServerConfig config) throws IOException {
    LOG.info("Starting server");
    FileTxnSnapLog txnLog = null;
    try {
        // 建立一個ZooKeeperServer,ZooKeeperServer表明具體運行的zk服務器,包含監聽客戶端請求
        final ZooKeeperServer zkServer = new ZooKeeperServer();
        // 這個是代表上面建立的ZooKeeperServer線程執行完以後,當前主線程才結束,相似Thread的join()方法
        final CountDownLatch shutdownLatch = new CountDownLatch(1);
        // 關閉服務器時的回調處理器
        zkServer.registerServerShutdownHandler(
                new ZooKeeperServerShutdownHandler(shutdownLatch));
        // 執行快照數據,日誌的定時保存操做,指定保存路徑
        txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(
                config.dataDir));
        zkServer.setTxnLogFactory(txnLog);
        zkServer.setTickTime(config.tickTime);
        zkServer.setMinSessionTimeout(config.minSessionTimeout);
        zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
        // 建立ServerCnxnFactory,默認實現爲NIOServerCnxnFactory,也能夠指定爲NettyServerCnxnFactory
        cnxnFactory = ServerCnxnFactory.createFactory();
        cnxnFactory.configure(config.getClientPortAddress(),
                config.getMaxClientCnxns());
        // 啓動服務器,將一個服務器zkServer丟給工廠,而後啓動
        cnxnFactory.startup(zkServer);
        // 這裏將會等待,除非調用shutdown()方法
        shutdownLatch.await();
        shutdown();
        // 這裏會等待直到zkServer線程完成
        cnxnFactory.join();
        if (zkServer.canShutdown()) {
            zkServer.shutdown(true);
        }
    } catch (InterruptedException e) {
        // warn, but generally this is ok
        LOG.warn("Server interrupted", e);
    } finally {
        if (txnLog != null) {
            txnLog.close();
        }
    }
}
複製代碼

瞭解完上面的代碼,咱們明白單機啓動ZooKeeperServerZK作了什麼工做,主要點在zk建立的是哪一種工廠,至於NIOServerCnxnFactory的代碼,我就不說了,你們有興趣能夠去看看。

迴歸正題,讓咱們進入NIOServerCnxnFactoryrun()方法中看看:

public void run() {
while (!ss.socket().isClosed()) {
try {
    // 每一秒輪詢一次
	selector.select(1000);
	Set<SelectionKey> selected;
	synchronized (this) {
		selected = selector.selectedKeys();
	}
	ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
	Collections.shuffle(selectedList);
	for (SelectionKey k : selectedList) {
	    // 若是有讀請求或者鏈接請求,則接收請求
		if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
			SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
			InetAddress ia = sc.socket().getInetAddress();
			int cnxncount = getClientCnxnCount(ia);
			// 這裏對maxClientCnxns作出判斷,防止DOS攻擊
			if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) {
				LOG.warn("Too many connections from " + ia + " - max is " + maxClientCnxns);
				sc.close();
			} else {
				LOG.info("Accepted socket connection from " + sc.socket().getRemoteSocketAddress());
				sc.configureBlocking(false);
				SelectionKey sk = sc.register(selector, SelectionKey.OP_READ);
				NIOServerCnxn cnxn = createConnection(sc, sk);
				sk.attach(cnxn);
				addCnxn(cnxn);
			}
		// 若是有讀請求且客戶端以前有鏈接過的,則直接處理
		} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
			NIOServerCnxn c = (NIOServerCnxn) k.attachment();
			c.doIO(k);
		} else {
			if (LOG.isDebugEnabled()) {
				LOG.debug("Unexpected ops in select " + k.readyOps());
			}
		}
	}
	selected.clear();
} catch (RuntimeException e) {
	LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
	LOG.warn("Ignoring exception", e);
}
}
closeAll();
LOG.info("NIOServerCnxn factory exited run method");
}
複製代碼

看到這,我以爲對於Zk如何監聽處理客戶端的請求就清晰多了,上面的代碼主要採用輪詢機制,每一秒輪詢一次,經過selector.select(1000)方法指定,這裏的監聽方式和傳統的BIO不一樣,傳統的網絡監聽採用阻塞的accept()方法,zk採用java的nio實現。

謝謝閱讀~~

相關文章
相關標籤/搜索