1、前言java
前面介紹了Zookeeper的系統模型,下面進一步學習Zookeeper的底層序列化機制,Zookeeper的客戶端與服務端之間會進行一系列的網絡通訊來實現數據傳輸,Zookeeper使用Jute組件來完成數據的序列化和反序列化操做。node
2、Juteexpress
Jute是Zookeeper底層序列化組件,其用於Zookeeper進行網絡數據傳輸和本地磁盤數據存儲的序列化和反序列化工做。apache
2.1 Jute序列化 編程
MockReHeader實體類服務器
package com.hust.grid.leesf.jute.examples; import org.apache.jute.InputArchive; import org.apache.jute.OutputArchive; import org.apache.jute.Record; public class MockReHeader implements Record { private long sessionId; private String type; public MockReHeader() { } public MockReHeader(long sessionId, String type) { this.sessionId = sessionId; this.type = type; } public void setSessionId(long sessionId) { this.sessionId = sessionId; } public void setType(String type) { this.type = type; } public long getSessionId() { return sessionId; } public String getType() { return type; } public void serialize(OutputArchive oa, String tag) throws java.io.IOException { oa.startRecord(this, tag); oa.writeLong(sessionId, "sessionId"); oa.writeString(type, "type"); oa.endRecord(this, tag); } public void deserialize(InputArchive ia, String tag) throws java.io.IOException { ia.startRecord(tag); this.sessionId = ia.readLong("sessionId"); this.type = ia.readString("type"); ia.endRecord(tag); } @Override public String toString() { return "sessionId = " + sessionId + ", type = " + type; } }
Main 網絡
package com.hust.grid.leesf.jute.examples; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; import org.apache.zookeeper.server.ByteBufferInputStream; public class Main { public static void main(String[] args) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); new MockReHeader(0x3421eccb92a34el, "ping").serialize(boa, "header"); ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); ByteBufferInputStream bbis = new ByteBufferInputStream(bb); BinaryInputArchive bia = BinaryInputArchive.getArchive(bbis); MockReHeader header2 = new MockReHeader(); System.out.println(header2); header2.deserialize(bia, "header"); System.out.println(header2); bbis.close(); baos.close(); } }
運行結果session
sessionId = 0, type = null
sessionId = 14673999700337486, type = ping
說明:能夠看到MockReHeader實體類須要實現Record接口而且實現serialize和deserialize方法。app
OutputArchive和InputArchive分別是Jute底層的序列化器和反序列化器。less
在Zookeeper的src文件夾下有zookeeper.jute文件,其內容以下
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ module org.apache.zookeeper.data { class Id { ustring scheme; ustring id; } class ACL { int perms; Id id; } // information shared with the client class Stat { long czxid; // created zxid long mzxid; // last modified zxid long ctime; // created long mtime; // last modified int version; // version int cversion; // child version int aversion; // acl version long ephemeralOwner; // owner id if ephemeral, 0 otw int dataLength; //length of the data in the node int numChildren; //number of children of this node long pzxid; // last modified children } // information explicitly stored by the server persistently class StatPersisted { long czxid; // created zxid long mzxid; // last modified zxid long ctime; // created long mtime; // last modified int version; // version int cversion; // child version int aversion; // acl version long ephemeralOwner; // owner id if ephemeral, 0 otw long pzxid; // last modified children } // information explicitly stored by the version 1 database of servers class StatPersistedV1 { long czxid; //created zxid long mzxid; //last modified zxid long ctime; //created long mtime; //last modified int version; //version int cversion; //child version int aversion; //acl version long ephemeralOwner; //owner id if ephemeral. 0 otw } } module org.apache.zookeeper.proto { class ConnectRequest { int protocolVersion; long lastZxidSeen; int timeOut; long sessionId; buffer passwd; } class ConnectResponse { int protocolVersion; int timeOut; long sessionId; buffer passwd; } class SetWatches { long relativeZxid; vector<ustring>dataWatches; vector<ustring>existWatches; vector<ustring>childWatches; } class RequestHeader { int xid; int type; } class MultiHeader { int type; boolean done; int err; } class AuthPacket { int type; ustring scheme; buffer auth; } class ReplyHeader { int xid; long zxid; int err; } class GetDataRequest { ustring path; boolean watch; } class SetDataRequest { ustring path; buffer data; int version; } class SetDataResponse { org.apache.zookeeper.data.Stat stat; } class GetSASLRequest { buffer token; } class SetSASLRequest { buffer token; } class SetSASLResponse { buffer token; } class CreateRequest { ustring path; buffer data; vector<org.apache.zookeeper.data.ACL> acl; int flags; } class DeleteRequest { ustring path; int version; } class GetChildrenRequest { ustring path; boolean watch; } class GetChildren2Request { ustring path; boolean watch; } class CheckVersionRequest { ustring path; int version; } class GetMaxChildrenRequest { ustring path; } class GetMaxChildrenResponse { int max; } class SetMaxChildrenRequest { ustring path; int max; } class SyncRequest { ustring path; } class SyncResponse { ustring path; } class GetACLRequest { ustring path; } class SetACLRequest { ustring path; vector<org.apache.zookeeper.data.ACL> acl; int version; } class SetACLResponse { org.apache.zookeeper.data.Stat stat; } class WatcherEvent { int type; // event type int state; // state of the Keeper client runtime ustring path; } class ErrorResponse { int err; } class CreateResponse { ustring path; } class ExistsRequest { ustring path; boolean watch; } class ExistsResponse { org.apache.zookeeper.data.Stat stat; } class GetDataResponse { buffer data; org.apache.zookeeper.data.Stat stat; } class GetChildrenResponse { vector<ustring> children; } class GetChildren2Response { vector<ustring> children; org.apache.zookeeper.data.Stat stat; } class GetACLResponse { vector<org.apache.zookeeper.data.ACL> acl; org.apache.zookeeper.data.Stat stat; } } module org.apache.zookeeper.server.quorum { class LearnerInfo { long serverid; int protocolVersion; } class QuorumPacket { int type; // Request, Ack, Commit, Ping long zxid; buffer data; // Only significant when type is request vector<org.apache.zookeeper.data.Id> authinfo; } } module org.apache.zookeeper.server.persistence { class FileHeader { int magic; int version; long dbid; } } module org.apache.zookeeper.txn { class TxnHeader { long clientId; int cxid; long zxid; long time; int type; } class CreateTxnV0 { ustring path; buffer data; vector<org.apache.zookeeper.data.ACL> acl; boolean ephemeral; } class CreateTxn { ustring path; buffer data; vector<org.apache.zookeeper.data.ACL> acl; boolean ephemeral; int parentCVersion; } class DeleteTxn { ustring path; } class SetDataTxn { ustring path; buffer data; int version; } class CheckVersionTxn { ustring path; int version; } class SetACLTxn { ustring path; vector<org.apache.zookeeper.data.ACL> acl; int version; } class SetMaxChildrenTxn { ustring path; int max; } class CreateSessionTxn { int timeOut; } class ErrorTxn { int err; } class Txn { int type; buffer data; } class MultiTxn { vector<org.apache.zookeeper.txn.Txn> txns; } }
其定義了全部的實體類的所屬包名、類名及類的全部成員變量和類型,該文件會在源代碼編譯時,Jute會使用不一樣的代碼生成器爲這些類定義生成實際編程語言的類文件,如java語言生成的類文件保存在src/java/generated目錄下,每一個類都會實現Record接口。
3、通訊協議
基於TCP/IP協議,Zookeeper實現了本身的通訊協議來玩按成客戶端與服務端、服務端與服務端之間的網絡通訊,對於請求,主要包含請求頭和請求體,對於響應,主要包含響應頭和響應體。
3.1 請求協議
對於請求協議而言,以下爲獲取節點數據請求的完整協議定義
class RequestHeader { int xid; int type; }
從zookeeper.jute中可知RequestHeader包含了xid和type,xid用於記錄客戶端請求發起的前後序號,用來確保單個客戶端請求的響應順序,type表明請求的操做類型,如建立節點(OpCode.create)、刪除節點(OpCode.delete)、獲取節點數據(OpCode.getData)。
協議的請求主體內容部分,包含了請求的全部操做內容,不一樣的請求類型請求體不一樣。對於會話建立而言,其請求體以下
class ConnectRequest { int protocolVersion; long lastZxidSeen; int timeOut; long sessionId; buffer passwd; }
Zookeeper客戶端和服務器在建立會話時,會發送ConnectRequest請求,該請求包含協議版本號protocolVersion、最近一次接收到服務器ZXID lastZxidSeen、會話超時時間timeOut、會話標識sessionId和會話密碼passwd。
對於獲取節點數據而言,其請求體以下
class GetDataRequest { ustring path; boolean watch; }
Zookeeper客戶端在向服務器發送節點數據請求時,會發送GetDataRequest請求,該請求包含了數據節點路徑path、是否註冊Watcher的標識watch。
對於更新節點數據而言,其請求體以下
class SetDataRequest { ustring path; buffer data; int version; }
Zookeeper客戶端在向服務器發送更新節點數據請求時,會發送SetDataRequest請求,該請求包含了數據節點路徑path、數據內容data、節點數據的指望版本號version。
針對不一樣的請求類型,Zookeeper都會定義不一樣的請求體,能夠在zookeeper.jute中查看。
3.2 響應協議
對於響應協議而言,以下爲獲取節點數據響應的完整協議定義
響應頭中包含了每一個響應最基本的信息,包括xid、zxid和err:
class ReplyHeader { int xid; long zxid; int err; }
xid與請求頭中的xid一致,zxid表示Zookeeper服務器上當前最新的事務ID,err則是一個錯誤碼,表示當請求處理過程出現異常狀況時,就會在錯誤碼中標識出來,常見的包括處理成功(Code.OK)、節點不存在(Code.NONODE)、沒有權限(Code.NOAUTH)。
協議的響應主體內容部分,包含了響應的全部數據,不一樣的響應類型請求體不一樣。對於會話建立而言,其響應體以下
class ConnectResponse { int protocolVersion; int timeOut; long sessionId; buffer passwd; }
針對客戶端的會話建立請求,服務端會返回客戶端一個ConnectResponse響應,該響應體包含了版本號protocolVersion、會話的超時時間timeOut、會話標識sessionId和會話密碼passwd。
對於獲取節點數據而言,其響應體以下
class GetDataResponse { buffer data; org.apache.zookeeper.data.Stat stat; }
針對客戶端的獲取節點數據請求,服務端會返回客戶端一個GetDataResponse響應,該響應體包含了數據節點內容data、節點狀態stat。
對於更新節點數據而言,其響應體以下
class SetDataResponse { org.apache.zookeeper.data.Stat stat; }
針對客戶端的更新節點數據請求,服務端會返回客戶端一個SetDataResponse響應,該響應體包含了最新的節點狀態stat。
針對不一樣的響應類型,Zookeeper都會定義不一樣的響應體,能夠在zookeeper.jute中查看。
4、總結
本篇博客講解了Zookeeper中的序列化機制和客戶端與服務端、服務端與服務端的通訊協議,內容相對較爲簡單,容易理解,謝謝各位園友的觀看~