1、報文隊列的處理:java
若是將多路複用器獲取到的全部事件,阻塞式的同步處理,那恐怕會嚴重影響selector的性能,因此我把從客戶端接收到的大部分消息,都放入了隊列中,而後另外啓動隊列的消費線程對消息進行異步的處理;具體以下:異步
1.通信報文隊列消費者:在selector對read事件的處理過程當中,我在最後都把客戶端發送的報文放入了一個叫CQUEUE的隊列中,具體定義以下,CQUEUE是全部客戶端發送報文的隊列,在CQUEUE隊列中的消費者線程中,我又對M類報文進行了對壘處理,放入了另外一個隊列MQUEUE。socket
public class GVQueue { //通信級別報文的隊列 public static BlockingQueue CQUEUE = new ArrayBlockingQueue<Packet>(3000); //短消息級別報文的隊列 public static BlockingQueue MQUEUE = new ArrayBlockingQueue<Packet>(3000); }
2.CQUEUE隊列的消費者線程,專門針對通信層面的消息進行處理,好比:客戶端鏈路維護的迴應等;以下:性能
public class CQueueConsumer extends Thread { private int waitTime; private static Logger logger = LogManager.getLogger(CQueueConsumer.class.getName()); public CQueueConsumer(int waitTime) { this.waitTime = waitTime; } public void run() { logger.info("通信隊列消費者線程啓動……"); boolean isRunning = true; try { while (isRunning) { IPacket packet = (IPacket) GVQueue.CQUEUE.poll(1, TimeUnit.SECONDS); if (packet != null) { handleQueue(packet); /* if (logger.isDebugEnabled()) { logger.debug("讀出消息隊列收到的客戶端消息:" + packet.getPacketStr()); }*/ logger.debug("讀出消息隊列收到的客戶端消息:" + packet.getPacketStr()); } else { Thread.sleep(waitTime); if (logger.isDebugEnabled()) { logger.debug("消息隊列中沒有消息,休息一下子……"); } } } } catch (InterruptedException e) { logger.info("通信隊列消費者處理線程終止……"); e.printStackTrace(); } } /** * 通信層處理(對除了M報文以外的報文進行處理) * @param packet */ private void handleQueue(IPacket packet) { //若是是短消息類報文,則直接放入短消息隊列等待短消息消費者處理; if (packet.getHeader().equals(MsgPacket.HEADER)){ GVQueue.MQUEUE.offer(packet); } if (!packet.getHeader().equals(ReplyPacket.HEADER)) { //須要更新通道的最後訪問時間 GVConnection gvConn = GVConnTools.getConnByToken(packet.getClientToken()); if (gvConn!=null){ //更改最後訪問時間 GVConnTools.updLastAccessTime(packet.getClientToken(),CommonTools.systemTimeUtc()); SocketChannel socketChannel = gvConn.getChannel(); //對客戶端的報文作出R相應 if (socketChannel != null) { ReplyOrder replyOrder = (ReplyOrder)Config.getOrderInstance(ReplyOrder.HEADER); replyOrder.initReplyOrder(packet.getRid()); GVServer.write2Client(replyOrder, socketChannel); } } } } }
3.而MQUEUE隊列的消費者線程,則專門針對M類報文進行處理,它的工做是拿出M報文,找到目標通道,而後將報文內容轉入目標通道(目前離線存儲還沒有實現)。以下:this
public class MQueueConsumer extends Thread { private static Logger logger = LogManager.getLogger(MQueueConsumer.class.getName()); public void run() { logger.info("短消息隊列消費者線程啓動……"); while (true) { try { Packet packet = (Packet) GVQueue.MQUEUE.poll(1, TimeUnit.SECONDS); if (packet != null) { // Logs.info("讀出消息隊列收到的客戶端消息:" + packet.getPacketStr()); MsgInfo msgInfo = new MsgInfo(); msgInfo = msgInfo.generaterMsgInfo(packet.getPacketBody()); SocketChannel channel = GVConnTools.getChannelByUserId(msgInfo.getReceiver()); if(channel!=null && channel.isOpen()) { MsgOrder msgOrder = (MsgOrder) Config.getOrderInstance(MsgOrder.HEADER); msgOrder.initMsgOrder(packet.getPacketBody()); GVServer.write2Client(msgOrder, channel); if (logger.isDebugEnabled()) { logger.debug("短消息發送至:<" + msgInfo.getReceiver() + ">"); } }else{ /* 此處將數據放入離線存儲隊列 */ if(logger.isDebugEnabled()) { logger.debug("短消息放入離線短消息隊列:<" + msgInfo.getReceiver() + ">"); } } } else { Thread.sleep(200); if(logger.isDebugEnabled()) { logger.debug("消息隊列中沒有消息,休息一下子……"); } } } catch (InterruptedException e) { logger.info("短消息隊列消費者處理線程終止……"); e.printStackTrace(); } } } }