zookeeper
源碼分析系列文章:html
原創博客,純手敲,轉載請註明出處,謝謝!java
Zookeeper
單機啓動原理Zookeeper
屬於C/S
架構,也就是傳統的客戶端-服務器模式,客戶端發送請求,服務器響應請求。這和高性能網絡框架Netty
是同樣的,所以咱們也能夠猜測到它的啓動方式無非就是從main()
方法開始,客戶端和服務器各有一個main()
方法。算法
那咱們先來看看Zookeeper
服務器端的啓動過程,當你打開Zookeeper
目錄下/bin
目錄中zkServer.cmd
文件你就會發現,其實Zookeeper
的啓動入口爲org.apache.zookeeper.server.quorum.QuorumPeerMain
類的main
方法,不管你是單機模式啓動Zookeeper
仍是複製模式啓動Zookeeper
,執行入口都是這個類,至於如何區別是哪一種模式啓動,該類會根據你配置文件的配置進行判斷,具體的判斷接下來將會詳細講解。apache
zkServer.cmd
詳細源代碼:緩存
setlocal
call "%~dp0zkEnv.cmd"
set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain // 設置主類入口
echo on
call %JAVA% "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.root.logger=%ZOO_LOG4J_PROP%" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %* // 執行該類的main()方法
endlocal
複製代碼
下面先看看單機啓動時序圖:bash
一、首先執行main
方法 二、解析傳進來的配置文件路徑,默認會去${baseDir}\conf\zoo.cfg
找配置文件 三、建立NIOServerCnxnFactory
進行監聽客戶端的鏈接請求,在Zookeeper
中有兩種ServerCnxnFactory
,一種是NIOServerCnxnFactory
,另外一種是NettyServerCnxnFactory
,前者爲默認工廠,後者除非你在啓動main
方法時指定System
的zookeeper.serverCnxnFactory
屬性值爲NettyServerCnxnFactory
。服務器
下面將詳細深刻源碼分析各個階段是如何實現以及工做的。網絡
Zookeeper
單機模式(standalone
)啓動Zookeeper
是如何解析配置文件的?zk的屬性配置分爲兩種:session
一、
Java System property
:Java系統環境變量,也就是System.setProperty()
設置的參數架構二、
No Java system property
配置文件屬性,也就是你在配置文件中配置的屬性
配置文件的解析原理很簡單,無非就是解析一些.properties
文件中的鍵值對,其實Java
已經提供了Properties
類來表明.properties
文件中全部鍵值對集合,咱們可使用Properties
對象的load()
方法將一個配置文件裝載進內存,而後對該對象進行遍歷就獲得咱們鎖配置的屬性值集合了。
說到Zookeeper
中的配置文件解析,原理也和上面差很少,只不過是在變量鍵值對的時候多了一些Zookeeper
自身的邏輯判斷。ZooKeeper
中的配置文件解析從QuorumPeerConfig
類的parse()
方法提及,源代碼以下:
/**
* Parse a ZooKeeper configuration file 解析一個配置文件
* @param path the patch of the configuration file
* @throws ConfigException error processing configuration
*/
public void parse(String path) throws ConfigException {
File configFile = new File(path);
LOG.info("Reading configuration from: " + configFile);
try {
if (!configFile.exists()) {
throw new IllegalArgumentException(configFile.toString() + " file is missing");
}
// 聲明一個Properties對象
Properties cfg = new Properties();
FileInputStream in = new FileInputStream(configFile);
try {
// 傳入一個配置文件輸入流便可裝載全部配置
cfg.load(in);
} finally {
// 涉及到流的操做記得最後將流關閉
in.close();
}
// 此處是zk自身的邏輯處理
parseProperties(cfg);
} catch (IOException e) {
throw new ConfigException("Error processing " + path, e);
} catch (IllegalArgumentException e) {
throw new ConfigException("Error processing " + path, e);
}
}
複製代碼
接下來咱們來看看上面的parseProperties(cfg)
方法,該方法太長了,硬着頭皮啃完:
/**
* Parse config from a Properties.
* @param zkProp Properties to parse from.
* @throws IOException
* @throws ConfigException
*/
public void parseProperties(Properties zkProp) throws IOException, ConfigException {
int clientPort = 0;
String clientPortAddress = null;
// 遍歷全部的key-value鍵值對
for (Entry<Object, Object> entry : zkProp.entrySet()) {
// 注意這裏要把首尾空格去掉
String key = entry.getKey().toString().trim();
String value = entry.getValue().toString().trim();
// 存儲快照文件snapshot的目錄配置
if (key.equals("dataDir")) {
dataDir = value;
// 事務日誌存儲目錄
} else if (key.equals("dataLogDir")) {
dataLogDir = value;
// 客戶端鏈接server的端口,zk啓動總得有個端口吧!若是你沒有配置,則會報錯!通常咱們會將端口配置爲2181
} else if (key.equals("clientPort")) {
clientPort = Integer.parseInt(value);
// 服務器IP地址
} else if (key.equals("clientPortAddress")) {
clientPortAddress = value.trim();
// zk中的基本事件單位,用於心跳和session最小過時時間爲2*tickTime
} else if (key.equals("tickTime")) {
tickTime = Integer.parseInt(value);
// 客戶端併發鏈接數量,注意是一個客戶端跟一臺服務器的併發鏈接數量,也就是說,假設值爲3,那麼某個客戶端不能同時併發鏈接3次到同一臺服務器(併發嘛!),不然會出現下面錯誤too many connections from /127.0.0.1 - max is 3
} else if (key.equals("maxClientCnxns")) {
maxClientCnxns = Integer.parseInt(value);
} else if (key.equals("minSessionTimeout")) {
minSessionTimeout = Integer.parseInt(value);
} else if (key.equals("maxSessionTimeout")) {
maxSessionTimeout = Integer.parseInt(value);
} else if (key.equals("initLimit")) {
initLimit = Integer.parseInt(value);
} else if (key.equals("syncLimit")) {
syncLimit = Integer.parseInt(value);
} else if (key.equals("electionAlg")) {
electionAlg = Integer.parseInt(value);
} else if (key.equals("quorumListenOnAllIPs")) {
quorumListenOnAllIPs = Boolean.parseBoolean(value);
} else if (key.equals("peerType")) {
if (value.toLowerCase().equals("observer")) {
peerType = LearnerType.OBSERVER;
} else if (value.toLowerCase().equals("participant")) {
peerType = LearnerType.PARTICIPANT;
} else {
throw new ConfigException("Unrecognised peertype: " + value);
}
......
複製代碼
下面對解析的全部配置項用表格總結下: 全部的配置項均可以在官網查詢到。
下面咱們一塊兒看看Zookkeeper
的配置文件屬性:
配置項 | 說明 | 異常狀況 | 是否報錯? | 錯誤 or 備註 |
---|---|---|---|---|
clientPort |
服務端server監聽客戶端鏈接的端口 | 不配置 | 是 | clientPort is not set |
clientPortAddress |
客戶端鏈接的服務器ip地址 | 不配置 | 否 | 默認使用網卡的地址 |
dataDir |
數據快照目錄 | 不配置 | 是 | dataDir is not set |
dataLogDir |
事務日誌存放目錄 | 不配置 | 否 | 默認跟dataDir 目錄相同 |
tickTime |
ZK基本時間單元(毫秒),用於心跳和超時.minSessionTimeout 默認是兩倍ticket |
不配置 | 是 | tickTime is not set |
maxClientCnxns |
同一ip地址最大併發鏈接數(也就是說同一個ip最多能夠同時維持與服務器連接的個數) | 不配置 | 否 | 默認最大鏈接數爲60,設置爲0則無限制 |
minSessionTimeout |
最小會話超時時間,默認2*ticket | 不配置 | 否,若minSessionTimeout > maxSessionTimeout ,則報錯 |
minSessionTimeout must not be larger than maxSessionTimeout |
maxSessionTimeout |
最大會話超時時間,默認20*ticket | 不配置 | 否 | 不能小於minSessionTimeout |
initLimit |
容許follower 同步和鏈接到leader 的時間總量,以ticket 爲單位 |
不配置 | 是 | initLimit is not set ,若是zk管理的數據量特別大,則辭職應該調大 |
syncLimit |
follower 與leader 之間同步的世間量 |
不配置 | 是 | syncLimit is not set |
electionAlg |
zk選舉算法選擇,默認值爲3,表示採用快速選舉算法 |
不配置 | 否 | 若是沒有配置選舉地址server ,則拋Missing election port for server: serverid |
quorumListenOnAllIPs |
當設置爲true時,ZooKeeper服務器將偵聽來自全部可用IP地址的對等端的鏈接,而不只僅是在配置文件的服務器列表中配置的地址(即集羣中配置的server.1,server.2。。。。)。 它會影響處理ZAB協議和Fast Leader Election協議的鏈接。 默認值爲false | 不配置 | 否 | |
peerType |
服務器的角色,是觀察者observer 仍是參與選舉或成爲leader ,默認爲PARTICIPANT |
不配置 | 否 | 若配置了不知支持的角色,則報Unrecognised peertype: |
autopurge.snapRetainCount |
數據快照保留個數,默認是3,最小也是3 | 不配置 | 否 | |
autopurge.purgeInterval |
執行日誌、快照清除任務的時間間隔(小時) | 不配置 | 否 | 默認是 0 |
server.x=[hostname]:nnnnn[:nnnnn] |
集羣服務器配置 | 不配置 | 單機:否;集羣:是 | zk集羣啓動將加載該該配置,每臺zk服務器必須有一個myid文件,裏邊存放服務器的id,該id值必須匹配server.x中的x ; 第一個端口表示與leader 鏈接的端口,第二個端口表示用於選舉的端口,第二個端口是可選的 |
Zookeeper
是如何判斷何種模式啓動服務器的?由於Zookeeper
的ZkServer.cmd
啓動文件指定的統一入口爲org.apache.zookeeper.server.quorum.QuorumPeerMain
,那麼咱們就要問了,那ZK
是怎麼判斷我要單機模式啓動仍是集羣方式啓動呢?答案是明顯的,也就是取決於你在配置文件zoo.cfg
中是否有配置server.x=hostname:port1:port2
,以上的配置項代表咱們想讓ZK
以集羣模式運行,那麼在代碼中是如何體現的呢?
上面講到ZK
解析配置文件的原理,咱們依舊走進parseProperties()
方法,看看以下代碼:
.....
// 此處解析配置文件以server.開頭的配置
} else if (key.startsWith("server.")) {
// server.3
int dot = key.indexOf('.');
long sid = Long.parseLong(key.substring(dot + 1));
String parts[] = splitWithLeadingHostname(value);
if ((parts.length != 2) && (parts.length != 3) && (parts.length != 4)) {
LOG.error(value + " does not have the form host:port or host:port:port "
+ " or host:port:port:type");
}
LearnerType type = null;
String hostname = parts[0];
Integer port = Integer.parseInt(parts[1]);
Integer electionPort = null;
if (parts.length > 2) {
electionPort = Integer.parseInt(parts[2]);
}
if (parts.length > 3) {
if (parts[3].toLowerCase().equals("observer")) {
type = LearnerType.OBSERVER;
} else if (parts[3].toLowerCase().equals("participant")) {
type = LearnerType.PARTICIPANT;
} else {
throw new ConfigException("Unrecognised peertype: " + value);
}
}
if (type == LearnerType.OBSERVER) {
observers.put(Long.valueOf(sid),
new QuorumServer(sid, hostname, port, electionPort, type));
} else {
// 若是配置了,那麼就加進servers中,其中servers是一個本地緩存Map,用於存儲配置的ip地址
servers.put(Long.valueOf(sid), new QuorumServer(sid, hostname, port, electionPort, type));
}
複製代碼
若是配置了,那麼servers
的size>0
,解析完成以後,回到QuorumPeerMain
的initializeAndRun()
方法:
// 若是servers長度大於0,則集羣方式啓動,不然,單機啓動
if (args.length == 1 && config.servers.size() > 0) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
複製代碼
從上面能夠看出,單機啓動的入口爲ZooKeeperServerMain
類,而統一的入口類爲QuorumPeerMain
,因此,在ZK
中,服務器端的啓動類就只有這兩個了。
Zookeeper
是如何處理客戶端請求的?不管是哪一種方式啓動Zookeeper
,它都必須對客戶端的請求進行處理,那麼ZK
是如何處理客戶端請求的呢?讓咱們一塊兒來看看源碼是怎麼寫的!
上面說到,Zk
單機啓動的入口類爲ZooKeeperServerMain
,咱們一塊兒看下其runFromConfig()
方法:
/**
* Run from a ServerConfig.
* @param config ServerConfig to use.
* @throws IOException
*/
public void runFromConfig(ServerConfig config) throws IOException {
LOG.info("Starting server");
FileTxnSnapLog txnLog = null;
try {
// 建立一個ZooKeeperServer,ZooKeeperServer表明具體運行的zk服務器,包含監聽客戶端請求
final ZooKeeperServer zkServer = new ZooKeeperServer();
// 這個是代表上面建立的ZooKeeperServer線程執行完以後,當前主線程才結束,相似Thread的join()方法
final CountDownLatch shutdownLatch = new CountDownLatch(1);
// 關閉服務器時的回調處理器
zkServer.registerServerShutdownHandler(
new ZooKeeperServerShutdownHandler(shutdownLatch));
// 執行快照數據,日誌的定時保存操做,指定保存路徑
txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(
config.dataDir));
zkServer.setTxnLogFactory(txnLog);
zkServer.setTickTime(config.tickTime);
zkServer.setMinSessionTimeout(config.minSessionTimeout);
zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
// 建立ServerCnxnFactory,默認實現爲NIOServerCnxnFactory,也能夠指定爲NettyServerCnxnFactory
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
// 啓動服務器,將一個服務器zkServer丟給工廠,而後啓動
cnxnFactory.startup(zkServer);
// 這裏將會等待,除非調用shutdown()方法
shutdownLatch.await();
shutdown();
// 這裏會等待直到zkServer線程完成
cnxnFactory.join();
if (zkServer.canShutdown()) {
zkServer.shutdown(true);
}
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Server interrupted", e);
} finally {
if (txnLog != null) {
txnLog.close();
}
}
}
複製代碼
瞭解完上面的代碼,咱們明白單機啓動ZooKeeperServer
時ZK
作了什麼工做,主要點在zk
建立的是哪一種工廠,至於NIOServerCnxnFactory
的代碼,我就不說了,你們有興趣能夠去看看。
迴歸正題,讓咱們進入NIOServerCnxnFactory
的run()
方法中看看:
public void run() {
while (!ss.socket().isClosed()) {
try {
// 每一秒輪詢一次
selector.select(1000);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
Collections.shuffle(selectedList);
for (SelectionKey k : selectedList) {
// 若是有讀請求或者鏈接請求,則接收請求
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
InetAddress ia = sc.socket().getInetAddress();
int cnxncount = getClientCnxnCount(ia);
// 這裏對maxClientCnxns作出判斷,防止DOS攻擊
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) {
LOG.warn("Too many connections from " + ia + " - max is " + maxClientCnxns);
sc.close();
} else {
LOG.info("Accepted socket connection from " + sc.socket().getRemoteSocketAddress());
sc.configureBlocking(false);
SelectionKey sk = sc.register(selector, SelectionKey.OP_READ);
NIOServerCnxn cnxn = createConnection(sc, sk);
sk.attach(cnxn);
addCnxn(cnxn);
}
// 若是有讀請求且客戶端以前有鏈接過的,則直接處理
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
c.doIO(k);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected ops in select " + k.readyOps());
}
}
}
selected.clear();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring exception", e);
}
}
closeAll();
LOG.info("NIOServerCnxn factory exited run method");
}
複製代碼
看到這,我以爲對於Zk
如何監聽處理客戶端的請求就清晰多了,上面的代碼主要採用輪詢機制,每一秒輪詢一次,經過selector.select(1000)
方法指定,這裏的監聽方式和傳統的BIO不一樣,傳統的網絡監聽採用阻塞的accept()
方法,zk採用java的nio實現。
謝謝閱讀~~