NIO 服務端TCP鏈接管理的方案

最近作的一個項目須要在服務端對鏈接端進行管理,故將方案記錄於此。java

方案實現的結果與背景

   由於服務端與客戶端實現的是長鏈接,因此須要對客戶端的鏈接狀況進行監控,防止無效鏈接佔用資源。redis

   完成相似於心跳的接收以及處理apache

    即:緩存

      當鏈接過長事件(keep-alive Time)沒有發送新的消息時,則在服務端切斷其客戶端的鏈接。服務器

具體細節

    在處理鏈接(Accpet事件)時:socket

      將SocketChannel存入HashSet;ide

         以SocketChannel的HashCode做爲Key來存儲鏈接時間(以服務器時間爲準)函數

      (開闢一個HashMap或者利用Redis進行緩存)this

   在處理讀取(Readable)事件時:spa

       以SocketChannel的HashCode做爲Key來存儲讀取事件發生的時間(以服務器時間爲準);

       處理讀取事件


 

    開啓一個定時反覆運行的管理線程,每次運行對HashSet中的SocketChannel進行輪詢,並以SocketChannel的HashCode去取對應的時間(LastSendTime)

    獲取當前時間(CurrentTime),進行計算,若是大於Keep-Alive Time,則刪除HashMap(/Redis)中的鍵值對,以及HashSet中的SocketChannel對象,並關閉SocketChannel。

 

     鏈接端

       ServerSocketChannel serverChannel = ServerSocketChannel.open();
            serverChannel.bind(new InetSocketAddress("127.0.0.2",1234));
            serverChannel.configureBlocking(false);
            AnalyisUtil util=new AnalyisUtil();
            RedisConnectionPool connectionPool=new RedisConnectionPool();
            Selector selector = Selector.open();
            SelectionKey key = serverChannel.register(selector, SelectionKey.OP_ACCEPT);
            while (true) {
                int select = selector.select();
                if (select > 0) {
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectedKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        iterator.remove();
                        // 接收鏈接請求
                        if (selectionKey.isAcceptable()) {
                            ServerSocketChannel channel = (ServerSocketChannel) selectionKey
                                    .channel();
                            SocketChannel socketChannel = channel.accept();
                            logger.info("接收到一個新的鏈接請求"+ socketChannel.getRemoteAddress().toString());
                            socketChannel.configureBlocking(false);
                            //每接收請求,註冊到同一個selector中處理
                            socketChannel.register(selector, SelectionKey.OP_READ);
                 //在Redis中存儲鏈接的時間,以SocketChannel的HashCode做爲Key connectionPool.getJedis().set(
"LST_"+socketChannel.hashCode(),new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
                 //將SocketChannel放入HashSet中管理 connectedSokectList.add(socketChannel); }
else if (selectionKey.isReadable()) { //執行讀事件,在讀事件的處理函數中,從新以SocketChannel的HashCode再次存儲事件,以刷新時間 util.handleReadEvent(selectionKey,messageQueue,logger); } } } }

    鏈接處理線程

    

package ConnectionSystem;

import Util.RedisConnectionPool;
import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;

public class ConnectionManagerTask implements Runnable {
    private HashSet<SocketChannel> connectedSokectList;
    private long keepalive_Time=5000;
    private Logger logger=Logger.getLogger(ConnectionManagerTask.class);

    ConnectionManagerTask(HashSet<SocketChannel> list){
        logger.info("TCP監聽已經啓動... ...");
        this.connectedSokectList=list;
    }

    private long cucalateIsAlive(Date lastSendTime) throws ParseException {
        Date currentTime=new Date();
        return currentTime.getTime()-lastSendTime.getTime();
    }

    private boolean handleSocket(SocketChannel channel){
        int channel_code= channel.hashCode();
        RedisConnectionPool connectionPool=new RedisConnectionPool();
        Jedis jedisCilent;
        SocketAddress ipLocation;
        try{
            ipLocation=channel.getRemoteAddress();
            jedisCilent=connectionPool.getJedis();
            String SendTime=jedisCilent.get("LST_"+channel_code);
            if(SendTime!=null) {
                SimpleDateFormat dfs = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                Date lastSendTime = dfs.parse(SendTime);
                if (cucalateIsAlive(lastSendTime) > keepalive_Time) {
                    //超過期間
                    try {
                        if(channel.isConnected()){
                            channel.close();
                            jedisCilent.del("LST_"+channel_code);
                            logger.debug("鏈接被TCP管理線程關閉,ip:" + ipLocation + ",上次迴應時間:" + lastSendTime);
                        }else {
                            logger.debug("當前通道,ip:" + ipLocation + "已經關閉... ..."+ ",上次迴應時間:" + lastSendTime);
                        }
                        return true;
                    } catch (IOException e) {
                        logger.error("通道,ip:" + ipLocation + "關閉時發生了異常",e);
                    }
                }else {
                    return false;
                }
            }
            if(channel.isConnected()){
                channel.close();
                logger.debug("鏈接被TCP管理線程關閉,ip:" + ipLocation + ":未檢測到登錄時間... ...");
            }else {
                logger.debug("當前通道,ip:" + ipLocation + "已經關閉... ...");
            }

        }catch (Exception e){
            logger.error("通道關閉時發生了異常",e);
        }
        return true;
    }

    @Override
    public void run() {
        logger.info("當前鏈接數"+connectedSokectList.size());
        if(connectedSokectList.isEmpty()){
            return;
        }
        Iterator<SocketChannel> iterator = connectedSokectList.iterator();
        while (iterator.hasNext()){
            SocketChannel socketChannel=iterator.next();
            Boolean removeFlag=handleSocket(socketChannel);
            if(removeFlag){
                iterator.remove();
            }
        }
    }
}
相關文章
相關標籤/搜索