最近作的一個項目須要在服務端對鏈接端進行管理,故將方案記錄於此。java
由於服務端與客戶端實現的是長鏈接,因此須要對客戶端的鏈接狀況進行監控,防止無效鏈接佔用資源。redis
完成相似於心跳的接收以及處理apache
即:緩存
當鏈接過長事件(keep-alive Time)沒有發送新的消息時,則在服務端切斷其客戶端的鏈接。服務器
在處理鏈接(Accpet事件)時:socket
將SocketChannel存入HashSet;ide
以SocketChannel的HashCode做爲Key來存儲鏈接時間(以服務器時間爲準)函數
(開闢一個HashMap或者利用Redis進行緩存)this
在處理讀取(Readable)事件時:spa
以SocketChannel的HashCode做爲Key來存儲讀取事件發生的時間(以服務器時間爲準);
處理讀取事件
開啓一個定時反覆運行的管理線程,每次運行對HashSet中的SocketChannel進行輪詢,並以SocketChannel的HashCode去取對應的時間(LastSendTime)
獲取當前時間(CurrentTime),進行計算,若是大於Keep-Alive Time,則刪除HashMap(/Redis)中的鍵值對,以及HashSet中的SocketChannel對象,並關閉SocketChannel。
鏈接端
ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.bind(new InetSocketAddress("127.0.0.2",1234)); serverChannel.configureBlocking(false); AnalyisUtil util=new AnalyisUtil(); RedisConnectionPool connectionPool=new RedisConnectionPool(); Selector selector = Selector.open(); SelectionKey key = serverChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { int select = selector.select(); if (select > 0) { Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); // 接收鏈接請求 if (selectionKey.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) selectionKey .channel(); SocketChannel socketChannel = channel.accept(); logger.info("接收到一個新的鏈接請求"+ socketChannel.getRemoteAddress().toString()); socketChannel.configureBlocking(false); //每接收請求,註冊到同一個selector中處理 socketChannel.register(selector, SelectionKey.OP_READ);
//在Redis中存儲鏈接的時間,以SocketChannel的HashCode做爲Key connectionPool.getJedis().set("LST_"+socketChannel.hashCode(),new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
//將SocketChannel放入HashSet中管理 connectedSokectList.add(socketChannel); } else if (selectionKey.isReadable()) { //執行讀事件,在讀事件的處理函數中,從新以SocketChannel的HashCode再次存儲事件,以刷新時間 util.handleReadEvent(selectionKey,messageQueue,logger); } } } }
鏈接處理線程
package ConnectionSystem; import Util.RedisConnectionPool; import org.apache.log4j.Logger; import redis.clients.jedis.Jedis; import java.io.IOException; import java.net.SocketAddress; import java.nio.channels.SocketChannel; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashSet; import java.util.Iterator; public class ConnectionManagerTask implements Runnable { private HashSet<SocketChannel> connectedSokectList; private long keepalive_Time=5000; private Logger logger=Logger.getLogger(ConnectionManagerTask.class); ConnectionManagerTask(HashSet<SocketChannel> list){ logger.info("TCP監聽已經啓動... ..."); this.connectedSokectList=list; } private long cucalateIsAlive(Date lastSendTime) throws ParseException { Date currentTime=new Date(); return currentTime.getTime()-lastSendTime.getTime(); } private boolean handleSocket(SocketChannel channel){ int channel_code= channel.hashCode(); RedisConnectionPool connectionPool=new RedisConnectionPool(); Jedis jedisCilent; SocketAddress ipLocation; try{ ipLocation=channel.getRemoteAddress(); jedisCilent=connectionPool.getJedis(); String SendTime=jedisCilent.get("LST_"+channel_code); if(SendTime!=null) { SimpleDateFormat dfs = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date lastSendTime = dfs.parse(SendTime); if (cucalateIsAlive(lastSendTime) > keepalive_Time) { //超過期間 try { if(channel.isConnected()){ channel.close(); jedisCilent.del("LST_"+channel_code); logger.debug("鏈接被TCP管理線程關閉,ip:" + ipLocation + ",上次迴應時間:" + lastSendTime); }else { logger.debug("當前通道,ip:" + ipLocation + "已經關閉... ..."+ ",上次迴應時間:" + lastSendTime); } return true; } catch (IOException e) { logger.error("通道,ip:" + ipLocation + "關閉時發生了異常",e); } }else { return false; } } if(channel.isConnected()){ channel.close(); logger.debug("鏈接被TCP管理線程關閉,ip:" + ipLocation + ":未檢測到登錄時間... ..."); }else { logger.debug("當前通道,ip:" + ipLocation + "已經關閉... ..."); } }catch (Exception e){ logger.error("通道關閉時發生了異常",e); } return true; } @Override public void run() { logger.info("當前鏈接數"+connectedSokectList.size()); if(connectedSokectList.isEmpty()){ return; } Iterator<SocketChannel> iterator = connectedSokectList.iterator(); while (iterator.hasNext()){ SocketChannel socketChannel=iterator.next(); Boolean removeFlag=handleSocket(socketChannel); if(removeFlag){ iterator.remove(); } } } }