深刻淺出Zookeeper(三):會話管理

本文首發於 泊浮目的簡書: https://www.jianshu.com/u/204...
版本 日期 備註
1.0 2020.3.29 文章首發

前言

咱們知道Zookeeper是一個分佈式協同系統。在一個大型的分佈式系統中,必然會有大量的Client來鏈接Zookeeper。那麼Zookeeper是如何管理這些session的生命週期呢?帶着這個問題,咱們進入今天的正文。java

Session管理者:SessionTracker

咱們先來看看session相關的核心類——SessionTracker(會話管理器)的抽象定義:shell

/**
 * This is the basic interface that ZooKeeperServer uses to track sessions. The
 * standalone and leader ZooKeeperServer use the same SessionTracker. The
 * FollowerZooKeeperServer uses a SessionTracker which is basically a simple
 * shell to track information to be forwarded to the leader.
 */
public interface SessionTracker {
    public static interface Session {
        long getSessionId();
        int getTimeout();
        boolean isClosing();
    }
    public static interface SessionExpirer {
        void expire(Session session);

        long getServerId();
    }

    long createSession(int sessionTimeout);

    /**
     * Add a global session to those being tracked.
     * @param id sessionId
     * @param to sessionTimeout
     * @return whether the session was newly added (if false, already existed)
     */
    boolean addGlobalSession(long id, int to);

    /**
     * Add a session to those being tracked. The session is added as a local
     * session if they are enabled, otherwise as global.
     * @param id sessionId
     * @param to sessionTimeout
     * @return whether the session was newly added (if false, already existed)
     */
    boolean addSession(long id, int to);

    /**
     * @param sessionId
     * @param sessionTimeout
     * @return false if session is no longer active
     */
    boolean touchSession(long sessionId, int sessionTimeout);

    /**
     * Mark that the session is in the process of closing.
     * @param sessionId
     */
    void setSessionClosing(long sessionId);

    /**
     *
     */
    void shutdown();

    /**
     * @param sessionId
     */
    void removeSession(long sessionId);

    /**
     * @param sessionId
     * @return whether or not the SessionTracker is aware of this session
     */
    boolean isTrackingSession(long sessionId);

    /**
     * Checks whether the SessionTracker is aware of this session, the session
     * is still active, and the owner matches. If the owner wasn't previously
     * set, this sets the owner of the session.
     *
     * UnknownSessionException should never been thrown to the client. It is
     * only used internally to deal with possible local session from other
     * machine
     *
     * @param sessionId
     * @param owner
     */
    public void checkSession(long sessionId, Object owner)
            throws KeeperException.SessionExpiredException,
            KeeperException.SessionMovedException,
            KeeperException.UnknownSessionException;

    /**
     * Strictly check that a given session is a global session or not
     * @param sessionId
     * @param owner
     * @throws KeeperException.SessionExpiredException
     * @throws KeeperException.SessionMovedException
     */
    public void checkGlobalSession(long sessionId, Object owner)
            throws KeeperException.SessionExpiredException,
            KeeperException.SessionMovedException;

    void setOwner(long id, Object owner) throws SessionExpiredException;

    /**
     * Text dump of session information, suitable for debugging.
     * @param pwriter the output writer
     */
    void dumpSessions(PrintWriter pwriter);

    /**
     * Returns a mapping of time to session IDs that expire at that time.
     */
    Map<Long, Set<Long>> getSessionExpiryMap();
}

大體能夠看到,該interface定義對會話一系列的控制方法:好比會話的建立、激活及刪除等等。數據庫

那麼咱們來看下其SessionTrackerImpl實現中比較重要的接口和成員變量以及方法。數組

會話的屬性與狀態

接下來咱們來看看一個會話實例會包含哪些屬性,話很少說,直接看接口定義:服務器

public static interface Session {
        long getSessionId();
        int getTimeout();
        boolean isClosing();
    }

咱們能夠看到,在服務端,僅僅記錄了client這樣的三個屬性:sessionId,timeout,isClosing。網絡

但在client,還會更復雜一點。好比session的狀態就有好多個:session

@InterfaceAudience.Public
    public enum States {
        CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
        CLOSED, AUTH_FAILED, NOT_CONNECTED;

        public boolean isAlive() {
            return this != CLOSED && this != AUTH_FAILED;
        }

        /**
         * Returns whether we are connected to a server (which
         * could possibly be read-only, if this client is allowed
         * to go to read-only mode)
         * */
        public boolean isConnected() {
            return this == CONNECTED || this == CONNECTEDREADONLY;
        }
    }

一般狀況下,由於網絡閃斷或其餘緣由,client會出現和server斷開的狀況。所幸的是,zkClient會自動重連,這時client會變爲connecting,直到連上服務器,則變connected。若是會話超時、權限檢查失敗或client退出程序等異常狀況,則客戶端會變成close狀態。數據結構

重要成員變量

protected final ConcurrentHashMap<Long, SessionImpl> sessionsById =
        new ConcurrentHashMap<Long, SessionImpl>();

    private final ExpiryQueue<SessionImpl> sessionExpiryQueue;

    private final ConcurrentMap<Long, Integer> sessionsWithTimeout;
  • 第一個sessionsById很顯然,就是經過session的id與session本體作映射的一個字典。
  • 第二個sessionExpiryQueue,聽名字像是一個過時隊列,沒錯,不過裏面使用了分桶策略 ,稍後咱們會作分析。
  • 第三個sessionsWithTimeout,名字說明一切。用於標示session的超時時間,k是sessionId,v是超時時間。該數據結構和Zk的內存數據庫相連通,會被按期持久化到快照裏去。

會話管理

會話的建立

要談會話管理,必然要談到會話是怎麼建立的,否則則顯得有些空洞。這裏不會贅述client的初始化過程。不管如何,咱們須要一個連接,畢竟不能讓會話基於空氣創建:併發

  1. 咱們的client會隨機選一個咱們提供的地址,而後委託給ClientCnxnSocket去建立與zk之間的TCP連接。
  2. 接下來SendThread(Client的網絡發送線程)構造出一個ConnectRequest請求(表明客戶端與服務器建立一個會話)。同時,Zookeeper客戶端還會進一步將請求包裝成網絡IO的Packet對象,放入請求發送隊列——outgoingQueue中去。
  3. ClientCnxnSocket從outgoingQueue中取出Packet對象,將其序列化成ByteBuffer後,向服務器進行發送。
  4. 服務端的SessionTracker爲該會話分配一個SessionId,併發送響應。
  5. Client收到響應後,而且早已明白本身沒有初始化,所以會用readConnectResult方法來處理請求。
  6. ClientCnxnSocket會對接受到的服務端響應進行反序列化,獲得ConnectResponse對象,並從中獲取到Zookeeper服務端分配的會話SessionId。
  7. 通知SendThread,更新Client會話參數(好比重要的connectTimeout),並更新Client狀態;另外,通知地址管理器HostProvider當前成功連接的服務器地址。

這就是會話的大體建立流程了,固然咱們還省去了SyncConnected-None的事件通知邏輯,由於這在咱們今天要將講的內容裏並不重要。app

會話過時檢查

會話過時檢查是經過SessionTrackerImpl.run來作的,這是一個線程的核心方法——顯然,zk的session過時檢查是經過一個線程來作的。

簡單來講,ExpiryQueue會根據時間將會要過時的sessions進行歸檔。好比在12:12:54將會有session一、session二、session3會過時,12:12:55會有session四、session五、session6會過時,那麼時間會做爲一個k,而對應的過時sessions會被做爲一個數組,用字典將它們映射起來:

key value
12:12:54 [session1,session2,session3]
12:12:55 [session4,session5,session6]

固然,實際中間隔不會是1s,這裏爲了便於表達,才這麼寫的。真實的狀況是,zk會計算每一個session的過時時間,並將其歸檔到對應的會話桶中。

  • 計算一個會話的過時時間大體爲:CurrentTime+SessionTimeout(見ExpiryQueue的update)。
  • 而歸檔到Zk的時間節點爲:(會話過時時間/ExpirationInterval+1) * ExpirationInterval。

爲了便於理解,咱們能夠舉幾個例子,Zk默認的間隔時間是2000ms:

  • 好比咱們計算出來一個sessionA在3000ms後過時,那麼其會坐落在(3000/2000+1)*2000=4000ms這個key裏。
  • 好比咱們計算出來一個sessionB在1500ms後過時,那麼其會坐落在(3000/2000+1)*2000=2000ms這個key裏。
0 2000ms 4000ms 6000ms 8000ms
sessionB sessionA

這樣線程就不用遍歷全部的會話去逐一檢查它們的過時時間了,有點妙。在這裏,也能夠簡單的講一下會話清理步驟:

  1. 標記會話爲"isClosing」。這樣在會話清理期間接收到客戶端的新請求也沒法繼續處理了。
  2. 發起關閉會話請求給PrepRequestProcessor,使其在整個Zk集羣裏生效。
  3. 收集須要清理的臨時節點——在上面提到過sessionsWithTimeout 和內存數據庫是共通的。
  4. 發起「節點刪除」請求,這個事務會被髮到outstandingChanges中去。
  5. 刪除臨時節點,該邏輯由FinalRequestProcessor觸發Zk內存數據庫(見FinalRequestProcessor.processRequest)。
  6. 移除會話。從sessionsById sessionExpiryQueue sessionsWithTimeout 中移除。
  7. 關閉ServerCnxn:從ServerCnxnFactory找出對應的ServerCnxn,將其關閉(見FinalRequestProcessor.closeSession)。

會話激活

從上面看來,session彷佛是到了事先計算好的時間就會過時。其實並不是如此——client會經過發送請求or心跳請求來保持會話的有效性,即延遲超時時間。這個過程通常叫作TouchSession(沒錯,代碼裏也是這麼叫的)。咱們來簡單的講一下流程:

  1. 檢查該會話是否被關閉,若是關閉,則再也不激活。
  2. 計算新的超時時間(參考上面提到的會話超時計算方法,也能夠看ExpiryQueue.update)
  3. 遷移會話(從老桶到新桶)

重要源碼解析

SessionId的分配

/**
     * Generates an initial sessionId. High order byte is serverId, next 5
     * 5 bytes are from timestamp, and low order 2 bytes are 0s.
     */
    public static long initializeNextSession(long id) {
        long nextSid;
        nextSid = (Time.currentElapsedTime() << 24) >>> 8;
        nextSid =  nextSid | (id <<56);
        if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) {
            ++nextSid;  // this is an unlikely edge case, but check it just in case
        }
        return nextSid;
    }

簡單來講,前7位肯定了所在的機器,後57位使用當前時間的毫秒錶示進行隨機。

SessionTrackerImpl.run

@Override
    public void run() {
        try {
            while (running) {
                long waitTime = sessionExpiryQueue.getWaitTime();
                if (waitTime > 0) {
                    Thread.sleep(waitTime);
                    continue;
                }

                for (SessionImpl s : sessionExpiryQueue.poll()) {
                    setSessionClosing(s.sessionId);
                    expirer.expire(s);
                }
            }
        } catch (InterruptedException e) {
            handleException(this.getName(), e);
        }
        LOG.info("SessionTrackerImpl exited loop!");
    }

邏輯很簡單。去sessionExpiryQueue 裏看一下離最近的過時時間還要多久,有的話就等一下子。

接下來是標記成Closing,並開始作使過時操做。

咱們接着看expirer.expire

public void expire(Session session) {
        long sessionId = session.getSessionId();
        LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
                + ", timeout of " + session.getTimeout() + "ms exceeded");
        close(sessionId);
    }

跳向close:

private void close(long sessionId) {
        Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
        setLocalSessionFlag(si);
        submitRequest(si);
    }

就是build一個新的請求,而後set本地的flag。關鍵方法是submitRequest:

public void submitRequest(Request si) {
        if (firstProcessor == null) {
            synchronized (this) {
                try {
                    // Since all requests are passed to the request
                    // processor it should wait for setting up the request
                    // processor chain. The state will be updated to RUNNING
                    // after the setup.
                    while (state == State.INITIAL) {
                        wait(1000);
                    }
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected interruption", e);
                }
                if (firstProcessor == null || state != State.RUNNING) {
                    throw new RuntimeException("Not started");
                }
            }
        }
        try {
            touch(si.cnxn);
            boolean validpacket = Request.isValid(si.type);
            if (validpacket) {
                firstProcessor.processRequest(si);
                if (si.cnxn != null) {
                    incInProcess();
                }
            } else {
                LOG.warn("Received packet at server of unknown type " + si.type);
                new UnimplementedRequestProcessor().processRequest(si);
            }
        } catch (MissingSessionException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Dropping request: " + e.getMessage());
            }
        } catch (RequestProcessorException e) {
            LOG.error("Unable to process request:" + e.getMessage(), e);
        }
    }

第一段邏輯是等待Processor的chain準備好。接下來是激活一下會話,但會話若是已經被移除或超時,則會拋出異常。這個狀況很正常,由於client的session和這裏的移除請求並非同時作的。

接下來則是提交移除會話的請求。

SessionTrackerImpl.touch

synchronized public boolean touchSession(long sessionId, int timeout) {
        SessionImpl s = sessionsById.get(sessionId);

        if (s == null) {
            logTraceTouchInvalidSession(sessionId, timeout);
            return false;
        }

        if (s.isClosing()) {
            logTraceTouchClosingSession(sessionId, timeout);
            return false;
        }

        updateSessionExpiry(s, timeout);
        return true;
    }

獲取和校驗邏輯再也不贅述。直接跳向核心方法ExpiryQueue.update:

/**
     * Adds or updates expiration time for element in queue, rounding the
     * timeout to the expiry interval bucketed used by this queue.
     * @param elem     element to add/update
     * @param timeout  timout in milliseconds
     * @return         time at which the element is now set to expire if
     *                 changed, or null if unchanged
     */
    public Long update(E elem, int timeout) {
        Long prevExpiryTime = elemMap.get(elem);
        long now = Time.currentElapsedTime();
        Long newExpiryTime = roundToNextInterval(now + timeout);

        if (newExpiryTime.equals(prevExpiryTime)) {
            // No change, so nothing to update
            return null;
        }

        // First add the elem to the new expiry time bucket in expiryMap.
        Set<E> set = expiryMap.get(newExpiryTime);
        if (set == null) {
            // Construct a ConcurrentHashSet using a ConcurrentHashMap
            set = Collections.newSetFromMap(
                new ConcurrentHashMap<E, Boolean>());
            // Put the new set in the map, but only if another thread
            // hasn't beaten us to it
            Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
            if (existingSet != null) {
                set = existingSet;
            }
        }
        set.add(elem);

        // Map the elem to the new expiry time. If a different previous
        // mapping was present, clean up the previous expiry bucket.
        prevExpiryTime = elemMap.put(elem, newExpiryTime);
        if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
            Set<E> prevSet = expiryMap.get(prevExpiryTime);
            if (prevSet != null) {
                prevSet.remove(elem);
            }
        }
        return newExpiryTime;
    }

邏輯很是簡單。計算最新的過時時間,並放置到新的歸檔區間裏,再移除掉老歸檔區間裏的會話實例。

小結

在本文中,筆者和你們一塊兒了剖析了Zk的Session管理機制。其中的分桶策略在這種大量Client會話場景下顯得很是有用,顯著提高了會話超時的清理效率。

相關文章
相關標籤/搜索