本文首發於 泊浮目的簡書: https://www.jianshu.com/u/204...
版本 | 日期 | 備註 |
---|---|---|
1.0 | 2020.3.29 | 文章首發 |
咱們知道Zookeeper是一個分佈式協同系統。在一個大型的分佈式系統中,必然會有大量的Client來鏈接Zookeeper。那麼Zookeeper是如何管理這些session的生命週期呢?帶着這個問題,咱們進入今天的正文。java
咱們先來看看session相關的核心類——SessionTracker(會話管理器)的抽象定義:shell
/** * This is the basic interface that ZooKeeperServer uses to track sessions. The * standalone and leader ZooKeeperServer use the same SessionTracker. The * FollowerZooKeeperServer uses a SessionTracker which is basically a simple * shell to track information to be forwarded to the leader. */ public interface SessionTracker { public static interface Session { long getSessionId(); int getTimeout(); boolean isClosing(); } public static interface SessionExpirer { void expire(Session session); long getServerId(); } long createSession(int sessionTimeout); /** * Add a global session to those being tracked. * @param id sessionId * @param to sessionTimeout * @return whether the session was newly added (if false, already existed) */ boolean addGlobalSession(long id, int to); /** * Add a session to those being tracked. The session is added as a local * session if they are enabled, otherwise as global. * @param id sessionId * @param to sessionTimeout * @return whether the session was newly added (if false, already existed) */ boolean addSession(long id, int to); /** * @param sessionId * @param sessionTimeout * @return false if session is no longer active */ boolean touchSession(long sessionId, int sessionTimeout); /** * Mark that the session is in the process of closing. * @param sessionId */ void setSessionClosing(long sessionId); /** * */ void shutdown(); /** * @param sessionId */ void removeSession(long sessionId); /** * @param sessionId * @return whether or not the SessionTracker is aware of this session */ boolean isTrackingSession(long sessionId); /** * Checks whether the SessionTracker is aware of this session, the session * is still active, and the owner matches. If the owner wasn't previously * set, this sets the owner of the session. * * UnknownSessionException should never been thrown to the client. It is * only used internally to deal with possible local session from other * machine * * @param sessionId * @param owner */ public void checkSession(long sessionId, Object owner) throws KeeperException.SessionExpiredException, KeeperException.SessionMovedException, KeeperException.UnknownSessionException; /** * Strictly check that a given session is a global session or not * @param sessionId * @param owner * @throws KeeperException.SessionExpiredException * @throws KeeperException.SessionMovedException */ public void checkGlobalSession(long sessionId, Object owner) throws KeeperException.SessionExpiredException, KeeperException.SessionMovedException; void setOwner(long id, Object owner) throws SessionExpiredException; /** * Text dump of session information, suitable for debugging. * @param pwriter the output writer */ void dumpSessions(PrintWriter pwriter); /** * Returns a mapping of time to session IDs that expire at that time. */ Map<Long, Set<Long>> getSessionExpiryMap(); }
大體能夠看到,該interface定義對會話一系列的控制方法:好比會話的建立、激活及刪除等等。數據庫
那麼咱們來看下其SessionTrackerImpl
實現中比較重要的接口和成員變量以及方法。數組
接下來咱們來看看一個會話實例會包含哪些屬性,話很少說,直接看接口定義:服務器
public static interface Session { long getSessionId(); int getTimeout(); boolean isClosing(); }
咱們能夠看到,在服務端,僅僅記錄了client這樣的三個屬性:sessionId,timeout,isClosing。
網絡
但在client,還會更復雜一點。好比session的狀態就有好多個:session
@InterfaceAudience.Public public enum States { CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY, CLOSED, AUTH_FAILED, NOT_CONNECTED; public boolean isAlive() { return this != CLOSED && this != AUTH_FAILED; } /** * Returns whether we are connected to a server (which * could possibly be read-only, if this client is allowed * to go to read-only mode) * */ public boolean isConnected() { return this == CONNECTED || this == CONNECTEDREADONLY; } }
一般狀況下,由於網絡閃斷或其餘緣由,client會出現和server斷開的狀況。所幸的是,zkClient會自動重連,這時client會變爲connecting,直到連上服務器,則變connected。若是會話超時、權限檢查失敗或client退出程序等異常狀況,則客戶端會變成close狀態。數據結構
protected final ConcurrentHashMap<Long, SessionImpl> sessionsById = new ConcurrentHashMap<Long, SessionImpl>(); private final ExpiryQueue<SessionImpl> sessionExpiryQueue; private final ConcurrentMap<Long, Integer> sessionsWithTimeout;
sessionsById
很顯然,就是經過session的id與session本體作映射的一個字典。sessionExpiryQueue
,聽名字像是一個過時隊列,沒錯,不過裏面使用了分桶策略 ,稍後咱們會作分析。sessionsWithTimeout
,名字說明一切。用於標示session的超時時間,k是sessionId,v是超時時間。該數據結構和Zk的內存數據庫相連通,會被按期持久化到快照裏去。要談會話管理,必然要談到會話是怎麼建立的,否則則顯得有些空洞。這裏不會贅述client的初始化過程。不管如何,咱們須要一個連接,畢竟不能讓會話基於空氣創建:併發
ClientCnxnSocket
去建立與zk之間的TCP連接。readConnectResult
方法來處理請求。這就是會話的大體建立流程了,固然咱們還省去了SyncConnected-None
的事件通知邏輯,由於這在咱們今天要將講的內容裏並不重要。app
會話過時檢查是經過SessionTrackerImpl.run
來作的,這是一個線程的核心方法——顯然,zk的session過時檢查是經過一個線程來作的。
簡單來講,ExpiryQueue
會根據時間將會要過時的sessions進行歸檔。好比在12:12:54將會有session一、session二、session3
會過時,12:12:55會有session四、session五、session6
會過時,那麼時間會做爲一個k,而對應的過時sessions會被做爲一個數組,用字典將它們映射起來:
key | value |
---|---|
12:12:54 | [session1,session2,session3] |
12:12:55 | [session4,session5,session6] |
固然,實際中間隔不會是1s,這裏爲了便於表達,才這麼寫的。真實的狀況是,zk會計算每一個session的過時時間,並將其歸檔到對應的會話桶中。
CurrentTime+SessionTimeout
(見ExpiryQueue的update)。爲了便於理解,咱們能夠舉幾個例子,Zk默認的間隔時間是2000ms:
0 | 2000ms | 4000ms | 6000ms | 8000ms |
---|---|---|---|---|
sessionB | sessionA |
這樣線程就不用遍歷全部的會話去逐一檢查它們的過時時間了,有點妙。在這裏,也能夠簡單的講一下會話清理步驟:
PrepRequestProcessor
,使其在整個Zk集羣裏生效。sessionsWithTimeout
和內存數據庫是共通的。FinalRequestProcessor.processRequest
)。sessionsById
、sessionExpiryQueue
、sessionsWithTimeout
中移除。FinalRequestProcessor.closeSession
)。從上面看來,session彷佛是到了事先計算好的時間就會過時。其實並不是如此——client會經過發送請求or心跳請求來保持會話的有效性,即延遲超時時間。這個過程通常叫作TouchSession(沒錯,代碼裏也是這麼叫的)。咱們來簡單的講一下流程:
/** * Generates an initial sessionId. High order byte is serverId, next 5 * 5 bytes are from timestamp, and low order 2 bytes are 0s. */ public static long initializeNextSession(long id) { long nextSid; nextSid = (Time.currentElapsedTime() << 24) >>> 8; nextSid = nextSid | (id <<56); if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) { ++nextSid; // this is an unlikely edge case, but check it just in case } return nextSid; }
簡單來講,前7位肯定了所在的機器,後57位使用當前時間的毫秒錶示進行隨機。
@Override public void run() { try { while (running) { long waitTime = sessionExpiryQueue.getWaitTime(); if (waitTime > 0) { Thread.sleep(waitTime); continue; } for (SessionImpl s : sessionExpiryQueue.poll()) { setSessionClosing(s.sessionId); expirer.expire(s); } } } catch (InterruptedException e) { handleException(this.getName(), e); } LOG.info("SessionTrackerImpl exited loop!"); }
邏輯很簡單。去sessionExpiryQueue
裏看一下離最近的過時時間還要多久,有的話就等一下子。
接下來是標記成Closing,並開始作使過時
操做。
咱們接着看expirer.expire
:
public void expire(Session session) { long sessionId = session.getSessionId(); LOG.info("Expiring session 0x" + Long.toHexString(sessionId) + ", timeout of " + session.getTimeout() + "ms exceeded"); close(sessionId); }
跳向close
:
private void close(long sessionId) { Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null); setLocalSessionFlag(si); submitRequest(si); }
就是build一個新的請求,而後set本地的flag。關鍵方法是submitRequest
:
public void submitRequest(Request si) { if (firstProcessor == null) { synchronized (this) { try { // Since all requests are passed to the request // processor it should wait for setting up the request // processor chain. The state will be updated to RUNNING // after the setup. while (state == State.INITIAL) { wait(1000); } } catch (InterruptedException e) { LOG.warn("Unexpected interruption", e); } if (firstProcessor == null || state != State.RUNNING) { throw new RuntimeException("Not started"); } } } try { touch(si.cnxn); boolean validpacket = Request.isValid(si.type); if (validpacket) { firstProcessor.processRequest(si); if (si.cnxn != null) { incInProcess(); } } else { LOG.warn("Received packet at server of unknown type " + si.type); new UnimplementedRequestProcessor().processRequest(si); } } catch (MissingSessionException e) { if (LOG.isDebugEnabled()) { LOG.debug("Dropping request: " + e.getMessage()); } } catch (RequestProcessorException e) { LOG.error("Unable to process request:" + e.getMessage(), e); } }
第一段邏輯是等待Processor的chain準備好。接下來是激活一下會話,但會話若是已經被移除或超時,則會拋出異常。這個狀況很正常,由於client的session和這裏的移除請求並非同時作的。
接下來則是提交移除會話
的請求。
synchronized public boolean touchSession(long sessionId, int timeout) { SessionImpl s = sessionsById.get(sessionId); if (s == null) { logTraceTouchInvalidSession(sessionId, timeout); return false; } if (s.isClosing()) { logTraceTouchClosingSession(sessionId, timeout); return false; } updateSessionExpiry(s, timeout); return true; }
獲取和校驗邏輯再也不贅述。直接跳向核心方法ExpiryQueue.update
:
/** * Adds or updates expiration time for element in queue, rounding the * timeout to the expiry interval bucketed used by this queue. * @param elem element to add/update * @param timeout timout in milliseconds * @return time at which the element is now set to expire if * changed, or null if unchanged */ public Long update(E elem, int timeout) { Long prevExpiryTime = elemMap.get(elem); long now = Time.currentElapsedTime(); Long newExpiryTime = roundToNextInterval(now + timeout); if (newExpiryTime.equals(prevExpiryTime)) { // No change, so nothing to update return null; } // First add the elem to the new expiry time bucket in expiryMap. Set<E> set = expiryMap.get(newExpiryTime); if (set == null) { // Construct a ConcurrentHashSet using a ConcurrentHashMap set = Collections.newSetFromMap( new ConcurrentHashMap<E, Boolean>()); // Put the new set in the map, but only if another thread // hasn't beaten us to it Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set); if (existingSet != null) { set = existingSet; } } set.add(elem); // Map the elem to the new expiry time. If a different previous // mapping was present, clean up the previous expiry bucket. prevExpiryTime = elemMap.put(elem, newExpiryTime); if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) { Set<E> prevSet = expiryMap.get(prevExpiryTime); if (prevSet != null) { prevSet.remove(elem); } } return newExpiryTime; }
邏輯很是簡單。計算最新的過時時間,並放置到新的歸檔區間裏,再移除掉老歸檔區間裏的會話實例。
在本文中,筆者和你們一塊兒了剖析了Zk的Session管理機制。其中的分桶策略在這種大量Client會話場景下顯得很是有用,顯著提高了會話超時的清理效率。