一,總體工做流程圖:java
1,ExpiryQueue做用session
ZooKeeper服務端管理客戶端會話超時使用到ExpiryQueue數據結構
2,ExpiryQueue的類圖app
該類中主要包含了一下變量:this
nextExpirationTime(下一個過時的時間點),atom
expirationInterval(過時時間間隔),spa
elemMap(key是session對象,value爲過時時間),3d
expiryMap(key爲過時時間,value爲session)對象
該類中主要方法:blog
構造方法初始化nextExpirationTime
update增長或更新session的過時時間
remove清除過時session
getWaitTime判斷當前時間是否已經超過了nextExpirationTime,超過返回0,沒有超過返回nextExpirationTime-now,zookeeper中經過不停的輪詢這個方法來判斷是否清除過時session
poll拉取過時session進行清除
數據結構補充(從網上找的一個圖感受還不錯)
ExpiryQueue根據expirationInterval將時間分段,將每段區間的時間放入對應的一個集合進行管理。如圖二所示,時間段在1503556830000-1503556860000中的數據將會放到1503556860000對應的集合中,1503556860000-1503556890000中的數據將會放到1503556890000的集合中,以此類推。
在ExpiryQueue的數據結構中,圖二中的集合由ConcurrentHashMap進行管理,其中的Key值爲到期時間。
數據分段使用公式爲:(當前時間(毫秒)/ expirationInterval + 1)* expirationInterval。該公式表示將當前時間按照expirationInterval間隔算份數,算完後再加一個份額,最後再乘以expirationInterval間隔,就得出了下一個到期時間。
源碼閱讀(本代碼是從源碼那改寫的,能夠直接執行):
package com.weshare.eel.task.utils; /** * Created by 陳穩 on 2018/11/27. */ import java.io.PrintWriter; import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.ConcurrentHashMap; /** * ExpiryQueue tracks elements in time sorted fixed duration buckets. * It's used by SessionTrackerImpl to expire sessions and NIOServerCnxnFactory * to expire connections. */ public class ExpiryQueue<E> { private final ConcurrentHashMap<E, Long> elemMap = new ConcurrentHashMap<E, Long>(); /** * The maximum number of buckets is equal to max timeout/expirationInterval, * so the expirationInterval should not be too small compared to the * max timeout that this expiry queue needs to maintain. */ private final ConcurrentHashMap<Long, Set<E>> expiryMap = new ConcurrentHashMap<Long, Set<E>>(); private final AtomicLong nextExpirationTime = new AtomicLong(); private final int expirationInterval; public ExpiryQueue(int expirationInterval) { this.expirationInterval = expirationInterval; nextExpirationTime.set(roundToNextInterval(Time.currentElapsedTime())); } private long roundToNextInterval(long time) { long cnt = time / expirationInterval; return (cnt + 1) * expirationInterval; } /** * Removes element from the queue. * @param elem element to remove * @return time at which the element was set to expire, or null if * it wasn't present */ public Long remove(E elem) { Long expiryTime = elemMap.remove(elem); if (expiryTime != null) { Set<E> set = expiryMap.get(expiryTime); if (set != null) { set.remove(elem); // We don't need to worry about removing empty sets, // they'll eventually be removed when they expire. } } return expiryTime; } /** * Adds or updates expiration time for element in queue, rounding the * timeout to the expiry interval bucketed used by this queue. * @param elem element to add/update * @param timeout timout in milliseconds * @return time at which the element is now set to expire if * changed, or null if unchanged */ public Long update(E elem, int timeout) { Long prevExpiryTime = elemMap.get(elem); long now = Time.currentElapsedTime(); Long newExpiryTime = roundToNextInterval(now + timeout); if (newExpiryTime.equals(prevExpiryTime)) { // No change, so nothing to update return null; } // First add the elem to the new expiry time bucket in expiryMap. Set<E> set = expiryMap.get(newExpiryTime); if (set == null) { // Construct a ConcurrentHashSet using a ConcurrentHashMap set = Collections.newSetFromMap( new ConcurrentHashMap<E, Boolean>()); // Put the new set in the map, but only if another thread // hasn't beaten us to it Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set); if (existingSet != null) { set = existingSet; } } set.add(elem); // Map the elem to the new expiry time. If a different previous // mapping was present, clean up the previous expiry bucket. prevExpiryTime = elemMap.put(elem, newExpiryTime); if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) { Set<E> prevSet = expiryMap.get(prevExpiryTime); if (prevSet != null) { prevSet.remove(elem); } } return newExpiryTime; } /** * @return milliseconds until next expiration time, or 0 if has already past */ public long getWaitTime() { long now = Time.currentElapsedTime(); long expirationTime = nextExpirationTime.get(); return now < expirationTime ? (expirationTime - now) : 0L; } /** * Remove the next expired set of elements from expireMap. This method needs * to be called frequently enough by checking getWaitTime(), otherwise there * will be a backlog of empty sets queued up in expiryMap. * * @return next set of expired elements, or an empty set if none are * ready */ public Set<E> poll() { long now = Time.currentElapsedTime(); long expirationTime = nextExpirationTime.get(); if (now < expirationTime) { return Collections.emptySet(); } Set<E> set = null; long newExpirationTime = expirationTime + expirationInterval; if (nextExpirationTime.compareAndSet( expirationTime, newExpirationTime)) { set = expiryMap.remove(expirationTime); } if (set == null) { return Collections.emptySet(); } return set; } public void dump(PrintWriter pwriter) { pwriter.print("Sets ("); pwriter.print(expiryMap.size()); pwriter.print(")/("); pwriter.print(elemMap.size()); pwriter.println("):"); ArrayList<Long> keys = new ArrayList<Long>(expiryMap.keySet()); Collections.sort(keys); for (long time : keys) { Set<E> set = expiryMap.get(time); if (set != null) { pwriter.print(set.size()); pwriter.print(" expire at "); pwriter.print(Time.elapsedTimeToDate(time)); pwriter.println(":"); for (E elem : set) { pwriter.print("\t"); pwriter.println(elem.toString()); } } } } /** * Returns an unmodifiable view of the expiration time -> elements mapping. */ public Map<Long, Set<E>> getExpiryMap() { return Collections.unmodifiableMap(expiryMap); } }
package com.weshare.eel.task.utils;
/**
* Created by 陳穩 on 2018/11/27.
*/
import java.util.Date;
public class Time {
/**
* Returns time in milliseconds as does System.currentTimeMillis(),
* but uses elapsed time from an arbitrary epoch more like System.nanoTime().
* The difference is that if somebody changes the system clock,
* Time.currentElapsedTime will change but nanoTime won't. On the other hand,
* all of ZK assumes that time is measured in milliseconds.
* @return The time in milliseconds from some arbitrary point in time.
*/
public static long currentElapsedTime() {
return System.nanoTime() / 1000000;
}
/**
* Explicitly returns system dependent current wall time.
* @return Current time in msec.
*/
public static long currentWallTime() {
return System.currentTimeMillis();
}
/**
* This is to convert the elapsedTime to a Date.
* @return A date object indicated by the elapsedTime.
*/
public static Date elapsedTimeToDate(long elapsedTime) {
long wallTime = currentWallTime() + elapsedTime - currentElapsedTime();
return new Date(wallTime);
}
}