基於NIO的消息路由的實現(六)報文隊列的處理

1、報文隊列的處理:java

若是將多路複用器獲取到的全部事件,阻塞式的同步處理,那恐怕會嚴重影響selector的性能,因此我把從客戶端接收到的大部分消息,都放入了隊列中,而後另外啓動隊列的消費線程對消息進行異步的處理;具體以下:異步

1.通信報文隊列消費者:在selector對read事件的處理過程當中,我在最後都把客戶端發送的報文放入了一個叫CQUEUE的隊列中,具體定義以下,CQUEUE是全部客戶端發送報文的隊列,在CQUEUE隊列中的消費者線程中,我又對M類報文進行了對壘處理,放入了另外一個隊列MQUEUE。socket

public class GVQueue {
    //通信級別報文的隊列
    public static BlockingQueue CQUEUE = new ArrayBlockingQueue<Packet>(3000);
    //短消息級別報文的隊列
    public static BlockingQueue MQUEUE = new ArrayBlockingQueue<Packet>(3000);
}

2.CQUEUE隊列的消費者線程,專門針對通信層面的消息進行處理,好比:客戶端鏈路維護的迴應等;以下:性能

public class CQueueConsumer extends Thread {

    private int waitTime;

    private static Logger logger = LogManager.getLogger(CQueueConsumer.class.getName());

    public CQueueConsumer(int waitTime) {
        this.waitTime = waitTime;
    }

    public void run() {
        logger.info("通信隊列消費者線程啓動……");
        boolean isRunning = true;
        try {
            while (isRunning) {
                IPacket packet = (IPacket) GVQueue.CQUEUE.poll(1, TimeUnit.SECONDS);
                if (packet != null) {
                    handleQueue(packet);
/*                    if (logger.isDebugEnabled()) {
                        logger.debug("讀出消息隊列收到的客戶端消息:" + packet.getPacketStr());
                    }*/
                    logger.debug("讀出消息隊列收到的客戶端消息:" + packet.getPacketStr());
                } else {
                    Thread.sleep(waitTime);
                    if (logger.isDebugEnabled()) {
                        logger.debug("消息隊列中沒有消息,休息一下子……");
                    }
                }
            }
        } catch (InterruptedException e) {
            logger.info("通信隊列消費者處理線程終止……");
            e.printStackTrace();
        }
    }

    /**
     * 通信層處理(對除了M報文以外的報文進行處理)
     * @param packet
     */
    private void handleQueue(IPacket packet) {

        //若是是短消息類報文,則直接放入短消息隊列等待短消息消費者處理;
        if (packet.getHeader().equals(MsgPacket.HEADER)){
            GVQueue.MQUEUE.offer(packet);
        }
        if (!packet.getHeader().equals(ReplyPacket.HEADER)) {
            //須要更新通道的最後訪問時間
            GVConnection gvConn = GVConnTools.getConnByToken(packet.getClientToken());
            if (gvConn!=null){
                //更改最後訪問時間
                GVConnTools.updLastAccessTime(packet.getClientToken(),CommonTools.systemTimeUtc());
                SocketChannel socketChannel = gvConn.getChannel();
                //對客戶端的報文作出R相應
                if (socketChannel != null) {
                    ReplyOrder replyOrder = (ReplyOrder)Config.getOrderInstance(ReplyOrder.HEADER);
                    replyOrder.initReplyOrder(packet.getRid());
                    GVServer.write2Client(replyOrder, socketChannel);
                }
            }

        }

    }
}


3.而MQUEUE隊列的消費者線程,則專門針對M類報文進行處理,它的工做是拿出M報文,找到目標通道,而後將報文內容轉入目標通道(目前離線存儲還沒有實現)。以下:this

public class MQueueConsumer extends Thread {
    private static Logger logger = LogManager.getLogger(MQueueConsumer.class.getName());
    public void run() {
        logger.info("短消息隊列消費者線程啓動……");
        while (true) {

            try {
                Packet packet = (Packet) GVQueue.MQUEUE.poll(1, TimeUnit.SECONDS);
                if (packet != null) {
//                    Logs.info("讀出消息隊列收到的客戶端消息:" + packet.getPacketStr());
                    MsgInfo msgInfo = new MsgInfo();
                    msgInfo = msgInfo.generaterMsgInfo(packet.getPacketBody());

                    SocketChannel channel = GVConnTools.getChannelByUserId(msgInfo.getReceiver());

                    if(channel!=null && channel.isOpen()) {
                        MsgOrder msgOrder = (MsgOrder) Config.getOrderInstance(MsgOrder.HEADER);
                        msgOrder.initMsgOrder(packet.getPacketBody());
                        GVServer.write2Client(msgOrder, channel);
                        if (logger.isDebugEnabled()) {
                            logger.debug("短消息發送至:<" + msgInfo.getReceiver() + ">");
                        }
                    }else{
                        /*
                        此處將數據放入離線存儲隊列
                         */
                        if(logger.isDebugEnabled()) {
                            logger.debug("短消息放入離線短消息隊列:<" + msgInfo.getReceiver() + ">");
                        }
                    }


                } else {
                    Thread.sleep(200);
                    if(logger.isDebugEnabled()) {
                        logger.debug("消息隊列中沒有消息,休息一下子……");
                    }
                }
            } catch (InterruptedException e) {
                logger.info("短消息隊列消費者處理線程終止……");
                e.printStackTrace();
            }

        }
    }
}
相關文章
相關標籤/搜索