目錄java
用一張圖來講明session創建過程當中client和server的交互算法
主要流程apache
客戶端要發起鏈接要先啓動,不管是使用curator client仍是zkClient,都是初始化org.apache.zookeeper.ZooKeeper#ZooKeeper
。網絡
Zookeeper初始化的主要工做是初始化本身的一些關鍵組件session
ClientCnxn包含兩個線程框架
ClientCnxn初始化的過程就是初始化啓動這兩個線程,客戶端發起鏈接的主要邏輯在SendThread線程中異步
// org.apache.zookeeper.ClientCnxn.SendThread#run @Override public void run() { clientCnxnSocket.introduce(this,sessionId); clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); int to; long lastPingRwServer = System.currentTimeMillis(); final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds while (state.isAlive()) { try { // client是否鏈接到server,若是沒有鏈接到則鏈接server if (!clientCnxnSocket.isConnected()) { if(!isFirstConnect){ try { Thread.sleep(r.nextInt(1000)); } catch (InterruptedException e) { LOG.warn("Unexpected exception", e); } } // don't re-establish connection if we are closing if (closing || !state.isAlive()) { break; } // 這個裏面去鏈接server startConnect(); clientCnxnSocket.updateLastSendAndHeard(); } // 省略中間代碼... clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this); // 省略中間代碼... }
SendThread#run是一個while循環,只要client沒有被關閉會一直循環,每次循環判斷當前client是否鏈接到server,若是沒有則發起鏈接,發起鏈接調用了startConnectsocket
private void startConnect() throws IOException { state = States.CONNECTING; InetSocketAddress addr; if (rwServerAddress != null) { addr = rwServerAddress; rwServerAddress = null; } else { // 經過hostProvider來獲取一個server地址 addr = hostProvider.next(1000); } // 省略中間代碼... // 創建client與server的鏈接 clientCnxnSocket.connect(addr); }
到這裏client發起了socket鏈接,server監聽的端口收到client的鏈接請求後和client創建鏈接。ide
socket鏈接創建後,client會向server發送一個ConnectRequest來創建session鏈接。兩種狀況會發送ConnectRequestthis
發送ConnectRequest是在下面的方法中
// org.apache.zookeeper.ClientCnxn.SendThread#primeConnection void primeConnection() throws IOException { LOG.info("Socket connection established to " + clientCnxnSocket.getRemoteSocketAddress() + ", initiating session"); isFirstConnect = false; long sessId = (seenRwServerBefore) ? sessionId : 0; ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd); // 省略中間代碼... // 將conReq封裝爲packet放入outgoingQueue等待發送 outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly)); } clientCnxnSocket.enableReadWriteOnly(); if (LOG.isDebugEnabled()) { LOG.debug("Session establishment request sent on " + clientCnxnSocket.getRemoteSocketAddress()); } }
請求中帶的參數
在server啓動的過程當中除了會啓動用於選舉的網絡組件還會啓動用於處理client請求的網絡組件
org.apache.zookeeper.server.NIOServerCnxnFactory
主要啓動了三個線程:
client發起socket鏈接的時候,server監聽了該端口,接收到client的鏈接請求,而後把創建練級的SocketChannel放入隊列裏面,交給SelectorThread處理
// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#addAcceptedConnection public boolean addAcceptedConnection(SocketChannel accepted) { if (stopped || !acceptedQueue.offer(accepted)) { return false; } wakeupSelector(); return true; }
SelectorThread是一個不斷循環的線程,每次循環都會處理剛剛創建的socket鏈接
// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#run while (!stopped) { try { select(); // 處理對立中的socket processAcceptedConnections(); processInterestOpsUpdateRequests(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } // org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#processAcceptedConnections private void processAcceptedConnections() { SocketChannel accepted; while (!stopped && (accepted = acceptedQueue.poll()) != null) { SelectionKey key = null; try { // 向該socket註冊讀事件 key = accepted.register(selector, SelectionKey.OP_READ); // 建立一個NIOServerCnxn維護session NIOServerCnxn cnxn = createConnection(accepted, key, this); key.attach(cnxn); addCnxn(cnxn); // 省略中間代碼... } }
說了這麼久,咱們說的session到底是什麼尚未解釋,session中文翻譯是會話,在這裏就是zk的server和client維護的一個具備一些特別屬性的網絡鏈接,網絡鏈接這裏就是socket鏈接,一些特別的屬性包括
因此session創建的兩步就是
server收到ConnectRequest以後,按照正常處理io的方式處理這個request,server端的主要操做是
整體流程是
其中有一個session生成算法咱們來看下
public static long initializeNextSession(long id) { // sessionId是long類型,共8個字節,64位 long nextSid; // 取時間戳的的低40位做爲初始化sessionId的第16-55位,這裏使用的是無符號右移,不會出現負數 nextSid = (Time.currentElapsedTime() << 24) >>> 8; // 使用serverId(配置文件中指定的myid)做爲高8位 nextSid = nextSid | (id <<56); // nextSid爲long的最小值,這中狀況不可能出現,這裏只是做爲一個case列在這裏 if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) { ++nextSid; // this is an unlikely edge case, but check it just in case } return nextSid; }
初始化sessionId的組成
myid(1字節)+截取的時間戳低40位(5個字節)+2個字節(初始化都是0)
每一個server再基於這個id不斷自增,這樣的算法就保證了每一個server的sessionId是全局惟一的。
session在zk框架中是一個重要概念,不少功能都依賴於session,好比臨時節點,session關閉後就自動刪除了。本文主要介紹了session的創建過程當中client和server各自的處理方式。