zookeeper(5) 客戶端

  zookeeper客戶端主要負責與用戶進行交互,將命令發送到服務器,接收服務器的響應,反饋給用戶。主要分爲一下三層:node

用戶命令處理層服務器

   用戶命令處理層的功能是讀取用戶輸入的命令,解析用戶命令和輸入參數,根據命令和參數,進行一些校驗,而後執行節點操做。網絡

源碼實例(ZooKeeperMain):session

  1 public class ZooKeeperMain {
  2     // 命令解析器。用於解析命令
  3     protected MyCommandOptions cl = new MyCommandOptions();
  4 
  5     // 主函數
  6     public static void main(String args[]) throws KeeperException, IOException, InterruptedException {
  7         // 運行客戶端
  8         ZooKeeperMain main = new ZooKeeperMain(args);
  9         main.run();
 10     }
 11 
 12     public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
 13         // 解析啓動參數
 14         cl.parseOptions(args);
 15         // 獲取server參數,鏈接服務器
 16         connectToZK(cl.getOption("server"));
 17 
 18     }
 19 
 20     // 鏈接服務器
 21     protected void connectToZK(String newHost) throws InterruptedException, IOException {
 22         host = newHost;
 23         zk = new ZooKeeper(host, Integer.parseInt(cl.getOption("timeout")), new MyWatcher());
 24     }
 25 
 26     void run() throws KeeperException, IOException, InterruptedException {
 27         // 循環讀取命令,
 28         BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
 29         String line;
 30         while ((line = br.readLine()) != null) {
 31             // 執行命令
 32             executeLine(line);
 33         }
 34     }
 35 
 36     public void executeLine(String line) throws InterruptedException, IOException, KeeperException {
 37         if (!line.equals("")) {
 38             // 解析命令
 39             cl.parseCommand(line);
 40             // 執行命令
 41             processZKCmd(cl);
 42         }
 43     }
 44 
 45     protected boolean processZKCmd(MyCommandOptions co) throws KeeperException, IOException, InterruptedException {
 46         // 讀取命令和參數
 47         Stat stat = new Stat();
 48         String[] args = co.getArgArray();
 49         String cmd = co.getCommand();
 50         boolean watch = args.length > 2;
 51         String path = null;
 52         List<ACL> acl = Ids.OPEN_ACL_UNSAFE;
 53         // 執行不一樣的命令,主要是進行一些校驗,而後調用zookeeper方法
 54         if (cmd.equals("quit")) {
 55             zk.close();
 56             System.exit(0);
 57         } else if (cmd.equals("redo") && args.length >= 2) {
 58             Integer i = Integer.decode(args[1]);
 59             if (commandCount <= i) {
 60                 return false;
 61             }
 62             cl.parseCommand(history.get(i));
 63             history.put(commandCount, history.get(i));
 64             processCmd(cl);
 65         } else if (cmd.equals("history")) {
 66             for (int i = commandCount - 10; i <= commandCount; ++i) {
 67                 if (i < 0)
 68                     continue;
 69                 System.out.println(i + " - " + history.get(i));
 70             }
 71         } else if (cmd.equals("printwatches")) {
 72             if (args.length == 1) {
 73                 System.out.println("printwatches is " + (printWatches ? "on" : "off"));
 74             } else {
 75                 printWatches = args[1].equals("on");
 76             }
 77         } else if (cmd.equals("connect")) {
 78             if (args.length >= 2) {
 79                 connectToZK(args[1]);
 80             } else {
 81                 connectToZK(host);
 82             }
 83         }
 84         if (cmd.equals("create") && args.length >= 3) {
 85             int first = 0;
 86             CreateMode flags = CreateMode.PERSISTENT;
 87             if ((args[1].equals("-e") && args[2].equals("-s")) || (args[1]).equals("-s") && (args[2].equals("-e"))) {
 88                 first += 2;
 89                 flags = CreateMode.EPHEMERAL_SEQUENTIAL;
 90             } else if (args[1].equals("-e")) {
 91                 first++;
 92                 flags = CreateMode.EPHEMERAL;
 93             } else if (args[1].equals("-s")) {
 94                 first++;
 95                 flags = CreateMode.PERSISTENT_SEQUENTIAL;
 96             }
 97             if (args.length == first + 4) {
 98                 acl = parseACLs(args[first + 3]);
 99             }
100             path = args[first + 1];
101             String newPath = zk.create(path, args[first + 2].getBytes(), acl, flags);
102         } else if (cmd.equals("delete") && args.length >= 2) {
103             path = args[1];
104             zk.delete(path, watch ? Integer.parseInt(args[2]) : -1);
105         } else if (cmd.equals("set") && args.length >= 3) {
106             path = args[1];
107             stat = zk.setData(path, args[2].getBytes(), args.length > 3 ? Integer.parseInt(args[3]) : -1);
108             printStat(stat);
109         } else if (cmd.equals("aget") && args.length >= 2) {
110             path = args[1];
111             zk.getData(path, watch, dataCallback, path);
112         } else if (cmd.equals("get") && args.length >= 2) {
113             path = args[1];
114             byte data[] = zk.getData(path, watch, stat);
115             data = (data == null) ? "null".getBytes() : data;
116             System.out.println(new String(data));
117             printStat(stat);
118         } else if (cmd.equals("ls") && args.length >= 2) {
119             path = args[1];
120             List<String> children = zk.getChildren(path, watch);
121             System.out.println(children);
122         } else if (cmd.equals("ls2") && args.length >= 2) {
123             path = args[1];
124             List<String> children = zk.getChildren(path, watch, stat);
125             System.out.println(children);
126             printStat(stat);
127         } else if (cmd.equals("getAcl") && args.length >= 2) {
128             path = args[1];
129             acl = zk.getACL(path, stat);
130             for (ACL a : acl) {
131                 System.out.println(a.getId() + ": " + getPermString(a.getPerms()));
132             }
133         } else if (cmd.equals("setAcl") && args.length >= 3) {
134             path = args[1];
135             stat = zk.setACL(path, parseACLs(args[2]), args.length > 4 ? Integer.parseInt(args[3]) : -1);
136             printStat(stat);
137         } else if (cmd.equals("stat") && args.length >= 2) {
138             path = args[1];
139             stat = zk.exists(path, watch);
140             printStat(stat);
141         } else if (cmd.equals("listquota") && args.length >= 2) {
142             path = args[1];
143             String absolutePath = Quotas.quotaZookeeper + path + "/" + Quotas.limitNode;
144             byte[] data = null;
145             try {
146                 data = zk.getData(absolutePath, false, stat);
147                 StatsTrack st = new StatsTrack(new String(data));
148                 data = zk.getData(Quotas.quotaZookeeper + path + "/" + Quotas.statNode, false, stat);
149                 System.out.println("Output stat for " + path + " " + new StatsTrack(new String(data)).toString());
150             } catch (KeeperException.NoNodeException ne) {
151                 System.err.println("quota for " + path + " does not exist.");
152             }
153         } else if (cmd.equals("setquota") && args.length >= 4) {
154             String option = args[1];
155             String val = args[2];
156             path = args[3];
157             System.err.println("Comment: the parts are " + "option " + option + " val " + val + " path " + path);
158             if ("-b".equals(option)) {
159                 // we are setting the bytes quota
160                 createQuota(zk, path, Long.parseLong(val), -1);
161             } else if ("-n".equals(option)) {
162                 // we are setting the num quota
163                 createQuota(zk, path, -1L, Integer.parseInt(val));
164             } else {
165                 usage();
166             }
167 
168         } else if (cmd.equals("delquota") && args.length >= 2) {
169             // if neither option -n or -b is specified, we delete
170             // the quota node for thsi node.
171             if (args.length == 3) {
172                 // this time we have an option
173                 String option = args[1];
174                 path = args[2];
175                 if ("-b".equals(option)) {
176                     delQuota(zk, path, true, false);
177                 } else if ("-n".equals(option)) {
178                     delQuota(zk, path, false, true);
179                 }
180             } else if (args.length == 2) {
181                 path = args[1];
182                 // we dont have an option specified.
183                 // just delete whole quota node
184                 delQuota(zk, path, true, true);
185             } else if (cmd.equals("help")) {
186                 usage();
187             }
188         } else if (cmd.equals("close")) {
189             zk.close();
190         } else if (cmd.equals("addauth") && args.length >= 2) {
191             byte[] b = null;
192             if (args.length >= 3)
193                 b = args[2].getBytes();
194 
195             zk.addAuthInfo(args[1], b);
196         } else {
197             usage();
198         }
199         return watch;
200     }
201 }
View Code

  除了基礎的節點操做外,用戶命令層還提供了節點配額的控制。節點配額的控制經過在/zookeeper/quaota對應的目錄下記錄當前節點數據大小和如今大小實現。併發

源碼實例(ZooKeeperMain.createQuota):app

 1 public static boolean createQuota(ZooKeeper zk, String path,
 2             long bytes, int numNodes)
 3         throws KeeperException, IOException, InterruptedException
 4     {
 5         //判斷指定路徑是否存在
 6         Stat initStat = zk.exists(path, false);
 7         if (initStat == null) {
 8             throw new IllegalArgumentException(path + " does not exist.");
 9         }
10         String quotaPath = Quotas.quotaZookeeper;
11         String realPath = Quotas.quotaZookeeper + path;
12         try {
13             //判斷在子節點中是否有限量設置
14             List<String> children = zk.getChildren(realPath, false);
15             for (String child: children) {
16                 if (!child.startsWith("zookeeper_")) {
17                     throw new IllegalArgumentException(path + " has child " +
18                             child + " which has a quota");
19                 }
20             }
21         } catch(KeeperException.NoNodeException ne) {
22             // this is fine
23         }
24         //判斷夫節點中是否有限量設置
25         checkIfParentQuota(zk, path);
26         //若是當前節點限量設置爲空,逐級建立節點數據
27         if (zk.exists(quotaPath, false) == null) {
28             try {
29                 zk.create(Quotas.procZookeeper, null, Ids.OPEN_ACL_UNSAFE,
30                         CreateMode.PERSISTENT);
31                 zk.create(Quotas.quotaZookeeper, null, Ids.OPEN_ACL_UNSAFE,
32                         CreateMode.PERSISTENT);
33             } catch(KeeperException.NodeExistsException ne) {
34                 // do nothing
35             }
36         }
37         String[] splits = path.split("/");
38         StringBuilder sb = new StringBuilder();
39         sb.append(quotaPath);
40         for (int i=1; i<splits.length; i++) {
41             sb.append("/" + splits[i]);
42             quotaPath = sb.toString();
43             try {
44                 zk.create(quotaPath, null, Ids.OPEN_ACL_UNSAFE ,
45                         CreateMode.PERSISTENT);
46             } catch(KeeperException.NodeExistsException ne) {
47                 //do nothing
48             }
49         }
50         //建立限量設置節點
51         String statPath = quotaPath + "/" + Quotas.statNode;
52         quotaPath = quotaPath + "/" + Quotas.limitNode;
53         StatsTrack strack = new StatsTrack(null);
54         strack.setBytes(bytes);
55         strack.setCount(numNodes);
56         try {
57             zk.create(quotaPath, strack.toString().getBytes(),
58                     Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
59             StatsTrack stats = new StatsTrack(null);
60             stats.setBytes(0L);
61             stats.setCount(0);
62             zk.create(statPath, stats.toString().getBytes(),
63                     Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
64         } catch(KeeperException.NodeExistsException ne) {
65             byte[] data = zk.getData(quotaPath, false , new Stat());
66             StatsTrack strackC = new StatsTrack(new String(data));
67             if (bytes != -1L) {
68                 strackC.setBytes(bytes);
69             }
70             if (numNodes != -1) {
71                 strackC.setCount(numNodes);
72             }
73             zk.setData(quotaPath, strackC.toString().getBytes(), -1);
74         }
75         return true;
76     }
View Code

節點處理層異步

  節點處理層主要是提供節點操做功能,將節點操做參數封裝成數據對象,而後經過網絡層發送數據對象,並返回結果。網絡層提供了同步和異步兩種網絡請求方式。ide

建立節點(ZooKeeper):函數

public void create(final String path, byte data[], List<ACL> acl,
            CreateMode createMode,  StringCallback cb, Object ctx)
    {
        final String clientPath = path;
//解析client相對路徑到全路徑
        final String serverPath = prependChroot(clientPath);
//設置請求頭
        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.create);
//設置建立節點請求體
        CreateRequest request = new CreateRequest();
        CreateResponse response = new CreateResponse();
        ReplyHeader r = new ReplyHeader();
        request.setData(data);
        request.setFlags(createMode.toFlag());
        request.setPath(serverPath);
        request.setAcl(acl);
//經過網絡層發送請求
        cnxn.queuePacket(h, r, request, response, cb, clientPath,
                serverPath, ctx, null);
    }
View Code

刪除節點(ZooKeeper):ui

 1 public void delete(final String path, int version)
 2         throws InterruptedException, KeeperException
 3     {
 4         final String clientPath = path;
 5       //解析client相對路徑到全路徑
 6         final String serverPath = prependChroot(clientPath);
 7        //設置請求頭
 8         RequestHeader h = new RequestHeader();
 9         h.setType(ZooDefs.OpCode.delete);
10         //設置刪除節點請求體
11         DeleteRequest request = new DeleteRequest();
12         request.setPath(serverPath);
13         request.setVersion(version);
14         cnxn.queuePacket(h, new ReplyHeader(), request, null, cb, clientPath,
15                 serverPath, ctx, null);
16     }
View Code

其餘方法(ZooKeeper):

  1 public void exists(final String path, Watcher watcher,
  2             StatCallback cb, Object ctx)
  3     {
  4         final String clientPath = path;
  5         PathUtils.validatePath(clientPath);
  6 
  7         // the watch contains the un-chroot path
  8         WatchRegistration wcb = null;
  9         if (watcher != null) {
 10             wcb = new ExistsWatchRegistration(watcher, clientPath);
 11         }
 12 
 13         final String serverPath = prependChroot(clientPath);
 14 
 15         RequestHeader h = new RequestHeader();
 16         h.setType(ZooDefs.OpCode.exists);
 17         ExistsRequest request = new ExistsRequest();
 18         request.setPath(serverPath);
 19         request.setWatch(watcher != null);
 20         SetDataResponse response = new SetDataResponse();
 21         cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
 22                 clientPath, serverPath, ctx, wcb);
 23     }
 24     public void getData(final String path, Watcher watcher,
 25             DataCallback cb, Object ctx)
 26     {
 27         final String clientPath = path;
 28         PathUtils.validatePath(clientPath);
 29 
 30         // the watch contains the un-chroot path
 31         WatchRegistration wcb = null;
 32         if (watcher != null) {
 33             wcb = new DataWatchRegistration(watcher, clientPath);
 34         }
 35 
 36         final String serverPath = prependChroot(clientPath);
 37 
 38         RequestHeader h = new RequestHeader();
 39         h.setType(ZooDefs.OpCode.getData);
 40         GetDataRequest request = new GetDataRequest();
 41         request.setPath(serverPath);
 42         request.setWatch(watcher != null);
 43         GetDataResponse response = new GetDataResponse();
 44         cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
 45                 clientPath, serverPath, ctx, wcb);
 46     }
 47     public void setData(final String path, byte data[], int version,
 48             StatCallback cb, Object ctx)
 49     {
 50         final String clientPath = path;
 51         PathUtils.validatePath(clientPath);
 52 
 53         final String serverPath = prependChroot(clientPath);
 54 
 55         RequestHeader h = new RequestHeader();
 56         h.setType(ZooDefs.OpCode.setData);
 57         SetDataRequest request = new SetDataRequest();
 58         request.setPath(serverPath);
 59         request.setData(data);
 60         request.setVersion(version);
 61         SetDataResponse response = new SetDataResponse();
 62         cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
 63                 clientPath, serverPath, ctx, null);
 64     }
 65 
 66     public void getACL(final String path, Stat stat, ACLCallback cb,
 67             Object ctx)
 68     {
 69         final String clientPath = path;
 70         PathUtils.validatePath(clientPath);
 71 
 72         final String serverPath = prependChroot(clientPath);
 73 
 74         RequestHeader h = new RequestHeader();
 75         h.setType(ZooDefs.OpCode.getACL);
 76         GetACLRequest request = new GetACLRequest();
 77         request.setPath(serverPath);
 78         GetACLResponse response = new GetACLResponse();
 79         cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
 80                 clientPath, serverPath, ctx, null);
 81     }
 82     public void setACL(final String path, List<ACL> acl, int version,
 83             StatCallback cb, Object ctx)
 84     {
 85         final String clientPath = path;
 86         PathUtils.validatePath(clientPath);
 87 
 88         final String serverPath = prependChroot(clientPath);
 89 
 90         RequestHeader h = new RequestHeader();
 91         h.setType(ZooDefs.OpCode.setACL);
 92         SetACLRequest request = new SetACLRequest();
 93         request.setPath(serverPath);
 94         request.setAcl(acl);
 95         request.setVersion(version);
 96         SetACLResponse response = new SetACLResponse();
 97         cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
 98                 clientPath, serverPath, ctx, null);
 99     }
100     public void getChildren(final String path, Watcher watcher,
101             Children2Callback cb, Object ctx)
102     {
103         final String clientPath = path;
104         final String serverPath = prependChroot(clientPath);
105         
106         WatchRegistration wcb = null;
107         if (watcher != null) {
108             wcb = new ChildWatchRegistration(watcher, clientPath);
109         }
110         
111         RequestHeader h = new RequestHeader();
112         h.setType(ZooDefs.OpCode.getChildren2);
113         GetChildren2Request request = new GetChildren2Request();
114         request.setPath(serverPath);
115         request.setWatch(watcher != null);
116         GetChildren2Response response = new GetChildren2Response();
117         cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
118                 clientPath, serverPath, ctx, wcb);
119     }
120     public void sync(final String path, VoidCallback cb, Object ctx){
121         final String clientPath = path;
122         PathUtils.validatePath(clientPath);
123 
124         final String serverPath = prependChroot(clientPath);
125 
126         RequestHeader h = new RequestHeader();
127         h.setType(ZooDefs.OpCode.sync);
128         SyncRequest request = new SyncRequest();
129         SyncResponse response = new SyncResponse();
130         request.setPath(serverPath);
131         cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
132                 clientPath, serverPath, ctx, null);
133     }
View Code

網絡請求層

  網絡請求層最爲複雜,主要實現nio異步網絡請求以及結果回調,watcher管理。

  提供了同步和異步兩種通訊方式。同步通訊其實也是經過異步通訊實現,首先會使用異步通訊發送請求,而後判斷返回結果是否ready,若是沒有則經過wait進入阻塞狀態。當異步通訊返回請求時,會設置返回結果狀態,而且喚醒阻塞的線程。

同步請求(ClientCnxn.submitRequest):

 1 public ReplyHeader submitRequest(RequestHeader h, Record request,
 2             Record response, WatchRegistration watchRegistration)
 3             throws InterruptedException {
 4         //異步發送請求包
 5         ReplyHeader r = new ReplyHeader();
 6         Packet packet = queuePacket(h, r, request, response, null, null, null,
 7                     null, watchRegistration);
 8         //若是請求包沒有返回數據,則線上等待
 9         synchronized (packet) {
10             while (!packet.finished) {
11                 packet.wait();
12             }
13         }
14         return r;
15     }
View Code

  異步請求的參數會被封裝成一個Packet對象放入outgoingQueue隊列中。會有一個發送線程從outgoingQueue隊列中取出一個可發送的Packet對象,併發送序列化信息,而後把該Packet放入到pendingQueue隊列中,當接收到服務端響應,反序列號出結果數據,而後在pendingQueue中找到對應的Packet,設置結果,最後對於有回調和watcher的命令封裝成事件放入事件隊列中,會有另外一個事件線程,從事件隊列中讀取事件消息,,執行回調和watcher邏輯。

異步請求(ClientCnxn.queuePacket):

 1 Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
 2             Record response, AsyncCallback cb, String clientPath,
 3             String serverPath, Object ctx, WatchRegistration watchRegistration)
 4     {
 5         
 6         Packet packet = null;
 7         synchronized (outgoingQueue) {
 8             //設置一個全局惟一的id,做爲數據包的id
 9             if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) {
10                 h.setXid(getXid());
11             }
12             //將請求頭,請求體,返回結果,watcher等封裝成數據包。
13             packet = new Packet(h, r, request, response, null,
14                     watchRegistration);
15             packet.cb = cb;
16             packet.ctx = ctx;
17             packet.clientPath = clientPath;
18             packet.serverPath = serverPath;
19             //將數據包添加到outgoing隊列中。
20             outgoingQueue.add(packet);
21         }
22         sendThread.wakeup();
23         return packet;
24     }
View Code

  發送線程執行流程以下:

  1.啓動線程,創建服務器鏈接。(狀態爲Connecting)

  2.創建鏈接後,進行初始化,主要是向服務器發送默認watcher命令、auth命令、connect命令。(狀態爲Connected) 

  3. 從outgoing隊列中讀取數據包,發送到服務端。

  4.接收服務端請求,處理返回結構,connect命令記錄sessionid、sessionpwd、timeout等;若是是其餘命令,而後在pendingQueue中找到對應的Packet,設置結果。

  5.對於有回調和watcher的命令封裝成事件放入事件隊列中。

創建鏈接,進行初始化(ClientCnxn.SendThread.primeConnection):

 1         private void primeConnection(SelectionKey k) throws IOException {
 2             ConnectRequest conReq = new ConnectRequest(0, lastZxid,
 3                     sessionTimeout, sessionId, sessionPasswd);
 4             //序列化鏈接命令
 5             ByteArrayOutputStream baos = new ByteArrayOutputStream();
 6             BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
 7             boa.writeInt(-1, "len");
 8             conReq.serialize(boa, "connect");
 9             baos.close();
10             ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
11             bb.putInt(bb.capacity() - 4);
12             bb.rewind();
13             synchronized (outgoingQueue) {
14                 //發送設置監聽器請求,將請求封裝成數據包,放入outgoing隊列中
15                 if (!disableAutoWatchReset) {
16                     List<String> dataWatches = zooKeeper.getDataWatches();
17                     List<String> existWatches = zooKeeper.getExistWatches();
18                     List<String> childWatches = zooKeeper.getChildWatches();
19                     if (!dataWatches.isEmpty()
20                                 || !existWatches.isEmpty() || !childWatches.isEmpty()) {
21                         SetWatches sw = new SetWatches(lastZxid,
22                                 prependChroot(dataWatches),
23                                 prependChroot(existWatches),
24                                 prependChroot(childWatches));
25                         RequestHeader h = new RequestHeader();
26                         h.setType(ZooDefs.OpCode.setWatches);
27                         h.setXid(-8);
28                         Packet packet = new Packet(h, new ReplyHeader(), sw, null, null,
29                                 null);
30                         outgoingQueue.addFirst(packet);
31                     }
32                 }
33                 //發送認證信息
34                 for (AuthData id : authInfo) {
35                     outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
36                             OpCode.auth), null, new AuthPacket(0, id.scheme,
37                             id.data), null, null, null));
38                 }
39                 //發送鏈接命令請求
40                 outgoingQueue.addFirst((new Packet(null, null, null, null, bb,
41                         null)));
42             }
43             //註冊通道
44             synchronized (this) {
45                 k.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
46             }
47         }
View Code

處理返回結果,主要處理connect返回結果和其餘請求返回結果。

connect命令主要返回sessionID, sessonpwd,timeout,(ClientCnxn.SendThread.readConnectResult):

 1 //讀取connect命令的結果
 2 void readConnectResult() throws IOException {
 3 //反序列化connect命令結果
 4             ByteBufferInputStream bbis = new ByteBufferInputStream(
 5                     incomingBuffer);
 6             BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
 7             ConnectResponse conRsp = new ConnectResponse();
 8             conRsp.deserialize(bbia, "connect");
 9             //獲取timeout,session等信息
10             readTimeout = negotiatedSessionTimeout * 2 / 3;
11             connectTimeout = negotiatedSessionTimeout / serverAddrs.size();
12             sessionId = conRsp.getSessionId();
13             sessionPasswd = conRsp.getPasswd();
14             zooKeeper.state = States.CONNECTED;
15 //向消息隊列放入鏈接成功消息
16             eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
17                     Watcher.Event.KeeperState.SyncConnected, null));
18         }
View Code
相關文章
相關標籤/搜索