zookeeper源碼 — 4、session創建

目錄java

  • session創建的主要過程
  • 客戶端發起鏈接
  • 服務端建立session

session創建的主要過程

用一張圖來講明session創建過程當中client和server的交互算法

主要流程apache

  • 服務端啓動,客戶端啓動
  • 客戶端發起socket鏈接
  • 服務端accept socket鏈接,socket鏈接創建
  • 客戶端發送ConnectRequest給server
  • server收到後初始化ServerCnxn,表明一個和客戶端的鏈接,即session,server發送ConnectResponse給client
  • client處理ConnectResponse,session創建完成

客戶發起鏈接

和server創建socket鏈接

客戶端要發起鏈接要先啓動,不管是使用curator client仍是zkClient,都是初始化org.apache.zookeeper.ZooKeeper#ZooKeeper網絡

Zookeeper初始化的主要工做是初始化本身的一些關鍵組件session

  • Watcher,外部構造好傳入
  • 初始化StaticHostProvider,決定客戶端選擇鏈接哪個server
  • ClientCnxn,客戶端網絡通訊的組件,主要啓動邏輯就是啓動這個類

ClientCnxn包含兩個線程框架

  • SendThread,負責client端消息的發送和接收
  • EventThread,負責處理event

ClientCnxn初始化的過程就是初始化啓動這兩個線程,客戶端發起鏈接的主要邏輯在SendThread線程中異步

// org.apache.zookeeper.ClientCnxn.SendThread#run
@Override
public void run() {
    clientCnxnSocket.introduce(this,sessionId);
    clientCnxnSocket.updateNow();
    clientCnxnSocket.updateLastSendAndHeard();
    int to;
    long lastPingRwServer = System.currentTimeMillis();
    final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
    while (state.isAlive()) {
        try {
            // client是否鏈接到server,若是沒有鏈接到則鏈接server
            if (!clientCnxnSocket.isConnected()) {
                if(!isFirstConnect){
                    try {
                        Thread.sleep(r.nextInt(1000));
                    } catch (InterruptedException e) {
                        LOG.warn("Unexpected exception", e);
                    }
                }
                // don't re-establish connection if we are closing
                if (closing || !state.isAlive()) {
                    break;
                }
                // 這個裏面去鏈接server
                startConnect();
                clientCnxnSocket.updateLastSendAndHeard();
            }

            // 省略中間代碼...
            clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
            // 省略中間代碼...
}

SendThread#run是一個while循環,只要client沒有被關閉會一直循環,每次循環判斷當前client是否鏈接到server,若是沒有則發起鏈接,發起鏈接調用了startConnectsocket

private void startConnect() throws IOException {
    state = States.CONNECTING;

    InetSocketAddress addr;
    if (rwServerAddress != null) {
        addr = rwServerAddress;
        rwServerAddress = null;
    } else {
        // 經過hostProvider來獲取一個server地址
        addr = hostProvider.next(1000);
    }
    // 省略中間代碼...

    // 創建client與server的鏈接
    clientCnxnSocket.connect(addr);
}

到這裏client發起了socket鏈接,server監聽的端口收到client的鏈接請求後和client創建鏈接。ide

經過一個request來創建session鏈接

socket鏈接創建後,client會向server發送一個ConnectRequest來創建session鏈接。兩種狀況會發送ConnectRequestthis

  • 在上面的connect方法中會判斷是否socket已經創建成功,若是創建成功就會發送ConnectRequest
  • 若是socket沒有當即創建成功(socket鏈接創建是異步的),則發送這個packet要延後到doTransport中

發送ConnectRequest是在下面的方法中

// org.apache.zookeeper.ClientCnxn.SendThread#primeConnection
void primeConnection() throws IOException {
    LOG.info("Socket connection established to "
             + clientCnxnSocket.getRemoteSocketAddress()
             + ", initiating session");
    isFirstConnect = false;
    long sessId = (seenRwServerBefore) ? sessionId : 0;
    ConnectRequest conReq = new ConnectRequest(0, lastZxid,
                                               sessionTimeout, sessId, sessionPasswd);
        // 省略中間代碼...
        // 將conReq封裝爲packet放入outgoingQueue等待發送
        outgoingQueue.addFirst(new Packet(null, null, conReq,
                                          null, null, readOnly));
    }
    clientCnxnSocket.enableReadWriteOnly();
    if (LOG.isDebugEnabled()) {
        LOG.debug("Session establishment request sent on "
                  + clientCnxnSocket.getRemoteSocketAddress());
    }
}

請求中帶的參數

  • lastZxid:上一個事務的id
  • sessionTimeout:client端配置的sessionTimeout
  • sessId:sessionId,若是以前創建過鏈接取的是上一次鏈接的sessionId
  • sessionPasswd:session的密碼

服務端建立session

和client創建socket鏈接

在server啓動的過程當中除了會啓動用於選舉的網絡組件還會啓動用於處理client請求的網絡組件

org.apache.zookeeper.server.NIOServerCnxnFactory

主要啓動了三個線程:

  • AcceptThread:用於接收client的鏈接請求,創建鏈接後交給SelectorThread線程處理
  • SelectorThread:用於處理讀寫請求
  • ConnectionExpirerThread:檢查session鏈接是否過時

client發起socket鏈接的時候,server監聽了該端口,接收到client的鏈接請求,而後把創建練級的SocketChannel放入隊列裏面,交給SelectorThread處理

// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#addAcceptedConnection
public boolean addAcceptedConnection(SocketChannel accepted) {
    if (stopped || !acceptedQueue.offer(accepted)) {
        return false;
    }
    wakeupSelector();
    return true;
}

創建session鏈接

SelectorThread是一個不斷循環的線程,每次循環都會處理剛剛創建的socket鏈接

// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#run
while (!stopped) {
    try {
        select();
        //  處理對立中的socket
        processAcceptedConnections();
        processInterestOpsUpdateRequests();
    } catch (RuntimeException e) {
        LOG.warn("Ignoring unexpected runtime exception", e);
    } catch (Exception e) {
        LOG.warn("Ignoring unexpected exception", e);
    }
}

// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#processAcceptedConnections
private void processAcceptedConnections() {
    SocketChannel accepted;
    while (!stopped && (accepted = acceptedQueue.poll()) != null) {
        SelectionKey key = null;
        try {
            // 向該socket註冊讀事件
            key = accepted.register(selector, SelectionKey.OP_READ);
            // 建立一個NIOServerCnxn維護session
            NIOServerCnxn cnxn = createConnection(accepted, key, this);
            key.attach(cnxn);
            addCnxn(cnxn);
        // 省略中間代碼...
    }
}

說了這麼久,咱們說的session到底是什麼尚未解釋,session中文翻譯是會話,在這裏就是zk的server和client維護的一個具備一些特別屬性的網絡鏈接,網絡鏈接這裏就是socket鏈接,一些特別的屬性包括

  • sessionId:惟一標示一個會話
  • sessionTimeout:這個鏈接的超時時間,超過這個時間server就會把鏈接斷開

因此session創建的兩步就是

  • 創建socket鏈接
  • client發起創建session請求,server創建一個實例來維護這個鏈接

server收到ConnectRequest以後,按照正常處理io的方式處理這個request,server端的主要操做是

  • 反序列化爲ConnectRequest
  • 根據request中的sessionId來判斷是新的session鏈接仍是session重連
    • 若是是新鏈接
      • 生成sessionId
      • 建立新的SessionImpl並放入org.apache.zookeeper.server.SessionTrackerImpl#sessionExpiryQueue
      • 封裝該請求爲新的request在processorChain中傳遞,最後交給FinalRequestProcessor處理
    • 若是是重連
      • 關閉sessionId對應的原來的session
        • 關閉原來的socket鏈接
        • sessionImp會在sessionExpiryQueue中因爲過時被清理
      • 從新打開一個session
        • 將原來的sessionId設置到當前的NIOServerCnxn實例中,做爲新的鏈接的sessionId
        • 校驗密碼是否正確密碼錯誤的時候直接返回給客戶端,不可用的session
        • 密碼正確的話,新建SessionImpl
        • 返回給客戶端sessionId

整體流程是

其中有一個session生成算法咱們來看下

public static long initializeNextSession(long id) {
    // sessionId是long類型,共8個字節,64位
    long nextSid;
    // 取時間戳的的低40位做爲初始化sessionId的第16-55位,這裏使用的是無符號右移,不會出現負數
    nextSid = (Time.currentElapsedTime() << 24) >>> 8;
    // 使用serverId(配置文件中指定的myid)做爲高8位
    nextSid =  nextSid | (id <<56);
    // nextSid爲long的最小值,這中狀況不可能出現,這裏只是做爲一個case列在這裏
    if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) {
        ++nextSid;  // this is an unlikely edge case, but check it just in case
    }
    return nextSid;
}

初始化sessionId的組成

myid(1字節)+截取的時間戳低40位(5個字節)+2個字節(初始化都是0)

每一個server再基於這個id不斷自增,這樣的算法就保證了每一個server的sessionId是全局惟一的。

總結

session在zk框架中是一個重要概念,不少功能都依賴於session,好比臨時節點,session關閉後就自動刪除了。本文主要介紹了session的創建過程當中client和server各自的處理方式。

相關文章
相關標籤/搜索