基於NIO的消息路由的實現(五) 服務端鏈接管理

1、對客戶端鏈接(也就是SocketChannel)的管理java

  1. 客戶端與服務端創建鏈接後,服務端來維護其鏈接,首先定義了一個GVConnection類,用來定義鏈接;socket

此類包括四個成員:token、userId、channel、lastAccessTime;以下:學習

public class GVConnection {

    private String token;
    private String userId;
    private SocketChannel channel;
    private int lastAccessTime;

    public GVConnection(String token,String userId, SocketChannel channel, int lastAccessTime) {

        this.token = token;
        this.userId = userId;
        this.channel = channel;
        this.lastAccessTime = lastAccessTime;

    }

    public void setLastAccessTime(int lastAccessTime){
        this.lastAccessTime = lastAccessTime;
    }

    public SocketChannel getChannel() {
        return channel;
    }

    public String getToken() {
        return token;
    }

    public String getUserId(){
        return userId;
    }

    public int getLastAccessTime() {
        return lastAccessTime;
    }
}

2.對GVConnection的操做單獨封裝一個類,GVConnTools,在這個類中,我定義了兩個map,用來保存userId與GVConnection的對應關係,以及token與GVConnection的對應關係。這兩個Map須要保持同步,在刪除的時候,能夠經過userId或者token進行刪除。這兩個方法有可能同步執行會遭遇異常,因此我增長了一個互斥鎖(我不肯定這樣作的合理性,敬請指導)。其餘新增和更新的方法我均採用了同步方法。
測試

public class GVConnTools {

    private static Map<String, GVConnection> CONNECTION_BY_USERID = new ConcurrentHashMap<String, GVConnection>();
    private static Map<String, GVConnection> CONNECTION_BY_TOKEN = new ConcurrentHashMap<String, GVConnection>();

    private static Lock lock = new ReentrantLock();

    public static GVConnection getConnByToken(String token) {

        if (CONNECTION_BY_TOKEN.get(token) != null) {
            return CONNECTION_BY_TOKEN.get(token);
        } else {
            return null;
        }

    }

    public static GVConnection getConnByUserId(String userId) {

        return CONNECTION_BY_USERID.get(userId);

    }

    public static SocketChannel getChannelByUserId(String userId) {
        if (CONNECTION_BY_USERID.get(userId) != null) {
            return CONNECTION_BY_USERID.get(userId).getChannel();
        } else {
            return null;
        }

    }

    public static SocketChannel getChannelByToken(String token) {

        return CONNECTION_BY_TOKEN.get(token).getChannel();

    }

    public static synchronized void addConn2Cache(GVConnection conn) {

        CONNECTION_BY_TOKEN.put(conn.getToken(), conn);
        CONNECTION_BY_USERID.put(conn.getUserId(), conn);

    }

    public static void removeConnByUserId(String userId) {

        lock.lock();
        try {

            String token = CONNECTION_BY_USERID.get(userId).getToken();
            CONNECTION_BY_USERID.remove(userId);
            CONNECTION_BY_TOKEN.remove(token);

        } finally {
            lock.unlock();
        }
    }

    public static void removeConnByToken(String token) {

        lock.lock();
        try {

            String userId = CONNECTION_BY_TOKEN.get(token).getToken();
            CONNECTION_BY_TOKEN.remove(token);
            CONNECTION_BY_USERID.remove(userId);

        } finally {
            lock.unlock();
        }

    }

    public static String getUserIdByToken(String token) {

        return CONNECTION_BY_TOKEN.get(token).getUserId();

    }

    public static String getTokenByUserId(String userId) {

        return CONNECTION_BY_USERID.get(userId).getToken();

    }

    public static synchronized void updLastAccessTime(String token, int lastAccessTime) {

        String userId = CONNECTION_BY_TOKEN.get(token).getUserId();
        CONNECTION_BY_USERID.get(userId).setLastAccessTime(lastAccessTime);
        CONNECTION_BY_TOKEN.get(token).setLastAccessTime(lastAccessTime);

    }

    public static Map getConnCache() {
        return CONNECTION_BY_TOKEN;
    }

}

3.通道清理線程,目前的鏈接清理採用的是超時清理:此處我遇到一個問題,就是我根本沒法判斷客戶端的異常中斷,因此我只能依靠客戶端發給服務端的心跳,不斷的更新GVConnection中的lastAccessTime,在個人超時線程中,對GVConnection的集合進行遍歷,一旦發現有channel關閉的以及超時的,就將其進行清理。而客戶端主動關閉,我是能夠獲取的,在GVServerthis

public class ConnTimeoutCleanThread implements Runnable {
    /**
     * 超時時間
     */
    private int outTime;
    /**
     * 執行週期
     */
    private int cycle;

    private static Logger logger = LogManager.getLogger(ConnTimeoutCleanThread.class.getName());

    public ConnTimeoutCleanThread(int outTime, int cycle) {
        this.outTime = outTime;
        this.cycle = cycle;
    }

    public void run() {

        logger.info("按期通道清理線程啓動……");
        while (true) {
            logger.info("按期通道清理開啓……");

            Map<String, GVConnection> connCache = GVConnTools.getConnCache();

            for (String token : connCache.keySet()) {
                int currentTime = CommonTools.systemTimeUtc();
                GVConnection gvConn = GVConnTools.getConnByToken(token);
                SocketChannel socketChannel = gvConn.getChannel();
                if (!socketChannel.isOpen()) {
                    try {
                        socketChannel.close();
                        GVConnTools.removeConnByToken(token);
                        logger.info("清理了關閉的鏈接:token:<" + token + ">");
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }else if (currentTime - gvConn.getLastAccessTime() > outTime) {
                    if (socketChannel.isOpen()) {
                        try {
                            socketChannel.close();
                            GVConnTools.removeConnByToken(token);
                            logger.info("清理了超時的鏈接:token:<" + token + ">");
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
                try {
                    Thread.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
            logger.info("按期通道清理執行完畢!");
            try {
                Thread.sleep(cycle * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}


3、再談發博文目的:spa

我之因此發博文是由於:線程

  1. 個人代碼其實大部分是我四處拼拼湊湊,有些是我本身想的,畢竟9年沒有寫過代碼了,個人java思惟仍然停留在jdk1.4剛剛出現的時候,因此我但願你們可以一塊兒學習進步,代碼中有不少幼稚的、不科學的、不肯定的、甚至我本身也不明白的地方,可是是可執行,而且通過我測試的。但願有時間、有精力的朋友可以幫助我,或者想學習的能夠跟我一塊兒學習。code

  2. 個人代碼仍然在不斷完善中,由於我自己不是搞開發的,徹底是一種興趣,因此更新的頻率不會太快。可能會出現四、5天都不更新的狀況。orm

  3. 自從看了少幫主的znet以後,以爲本身作的簡直不堪入目,因此我會對個人代碼進行改寫。不過改寫以前會講以前已經作的都整理出來,至少能夠看當作長的經歷。token

相關文章
相關標籤/搜索