基於NIO的消息路由的實現(四) 服務端通信主線程(1)

1、簡單介紹:
java

服務端通信主線程是消息路由服務的啓動類,其主要做用以下:網絡

一、初始化相關配置;併發

二、根據配置的ip和port建立tcp服務;dom

三、接收客戶端鏈接,並給客戶端分配令牌;socket

四、接收客戶端的登陸請求,並將客戶端相關信息(令牌、客戶端登陸標識、最後訪問時間、當前token所使用的通道,保存到緩衝區)tcp

五、接收客戶端的報文請求,並添加到通信隊列,等待處理;post

六、接收來自各處的指令發送請求,併發送至相關通道;測試

2、詳細介紹:線程

一、啓動方法:首先加載配置信息;而後啓動主線程、通信報文消費線程(處理通信類報文)、超時、失效通道回收線程(進行超時和失效通道的回收工做)、短消息消費者線程(專門針對短消息隊列進行處理的線程)。尤爲是OP_WRITE,在OP_WRITE以後,必須將selector註冊爲OP_READ,不然會一直循環下去,死循環。debug

public static void main(String arg[]) throws Exception {
    //初始化配置數據
    Config cfg = new Config(arg[0]);
    final GVServer gvServer = new GVServer();
    //啓動ServerSocket通道
    if (gvServer.initServer(cfg)) {

        ExecutorService threadPool = Executors.newCachedThreadPool();
        //啓動通信服務主線程
        threadPool.execute(gvServer);
        //啓動通信報文消費線程
        threadPool.execute(new CQueueConsumer(cfg.getWaitTime()));
        //啓動超時通道、失效通道回收線程
        threadPool.execute(new ConnTimeoutCleanThread(cfg.getCleanThreadOutTime(), cfg.getCleanThreadCycle()));
        threadPool.execute(new MQueueConsumer());
    }
}

二、初始化配置:打開tcp服務等待鏈接(略);

三、通信事件處理:通信主線程的run方法,主要對接收到的事件分別處理。這個地方尤爲要注意的是,我第一篇文章提到的,全部觸發的時間都必須被消費,不然會一直循環下去。

    public void run() {

        while (true) {
            try {

                //監聽事件key
                selector.select(2000);
                //迭代一組事件key
                Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                while (keys.hasNext()) {
                    //定義一個socket通道
                    SocketChannel socketChannel = null;

                    int count = 0;

                    SelectionKey key = keys.next();
                    //  Logs.info("有網絡事件被觸發,事件類型爲:" + key.interestOps());
                    //刪除Iterator中的當前key,避免重複處理
                    keys.remove();
                    if (!key.isValid()) {
                        continue;
                    } else if (key.isAcceptable()) {
                        //從客戶端送來的key中獲取ServerSocket通道
                        serverSocketChannel = (ServerSocketChannel) key.channel();
                        //接收此ServerSocket通道中的Socket通道,accept是一個阻塞方法,一直到獲取到鏈接纔會繼續
                        socketChannel = serverSocketChannel.accept();
                        //將此socket通道設置爲非阻塞模式
                        socketChannel.configureBlocking(false);
                        //將此通道註冊到selector,並等待接收客戶端的讀入數據
                        socketChannel.register(selector, SelectionKey.OP_READ);
                        allocToken(socketChannel);

                    } else if (key.isReadable()) {

                        //獲取事件key中的channel
                        socketChannel = (SocketChannel) key.channel();
                        ByteBuffer byteBuffer = ByteBuffer.allocate(Config.getReadBlock());
                        //清理緩衝區,便於使用
                        byteBuffer.clear();
                        //將channel中的字節流讀入緩衝區
                        count = socketChannel.read(byteBuffer);
                        byteBuffer.flip();
                        //處理粘包
                        if (count > 0) {
                            try {
                                handlePacket(socketChannel, byteBuffer);
                            } catch (Exception e) {
                                e.printStackTrace();
//                                continue;//若是當前包存在非法拋出異常,那麼再也不進行處理直接跳出循環,處理下一個包;此處存疑,測試階段暫時註釋
                            }
                        } else if (count == 0) {
                            continue;
                        } else {
                            socketChannel.close();
                            logger.info("客戶端"+socketChannel.toString()+"鏈接關閉!");

                        }

                    } else if (key.isWritable()) {
                        ((SocketChannel) key.channel()).register(selector, SelectionKey.OP_READ);
                    }
                }

            } catch (IOException e) {
                e.printStackTrace();
            }

        }
    }

四、將合法報文放入通信隊列:我在配置初始化的時候,明確規定了幾種報文類,並在協議中限定了報文的格式,凡是不符合我報文格式的報文均視爲非法報文,直接會給客戶端回覆一個錯誤指令。

/**
 * 處理不合法報文以及將合法報文放入隊列等待處理
 *
 * @param socketChannel
 * @param strPacket
 */
private void offerPacket(SocketChannel socketChannel, String strPacket) {
    IPacket packet = AnalyseTools.analysePacket(strPacket);
    if (packet.getHeader().equals(LoginPacket.HEADER)) {
        handleLoginPacket(socketChannel, packet);
    }
    //若是類爲空或者從handle單例map中沒法取到類,則證實報文非法
    if (packet == null || Config.getPacketInstance(packet.getHeader()) == null) {
        //不在服務端識別範圍內的報文,回覆E響應,告知客戶端不合法
        ErrorOrder errorOrder = (ErrorOrder) Config.getOrderInstance(ErrorOrder.HEADER);
        errorOrder.initErrorOrder(errorOrder.INVAILD_REQ_CODE, errorOrder.INVAILD_REQ_MSG);

        logger.info("客戶端發送非法報文:" + strPacket);
        GVServer.write2Client(errorOrder, socketChannel);
        //將合法報文放入消息隊列中等待處理
    } else {
        if (!GVQueue.CQUEUE.offer(packet)) {
            logger.error("消息隊列已滿,請增長隊列容量!");

        } else {
            if (logger.isDebugEnabled()) {
                logger.debug("將添加至消息隊列,隊列內消息數量爲:" + GVQueue.CQUEUE.size());
            }
        }
    }
}

五、分配客戶端的令牌:客戶端鏈接以後,服務端立刻會給客戶端回覆一個T指令,告訴客戶端它的令牌,今後之後,客戶端每次報文,都必須攜帶此令牌;在通信這一層裏,服務端根據token肯定客戶端;

    private void allocToken(SocketChannel socketChannel) {

        //給鏈接上來的通道分配token
        TokenOrder tokenOrder = (TokenOrder) Config.getOrderInstance(TokenOrder.HEADER);
        String token = UUID.randomUUID().toString();
        tokenOrder.initTokenOrder(token);
        //返回給客戶端token
        write2Client(tokenOrder, socketChannel);
        logger.info("客戶端:<" + token + ">已經鏈接!");
        //將鏈接後的channel保存到全局map中
        //2015.8.13修改,先把userId存爲null,等待用戶登陸後,在將,
//        GVConnection gvConnection = new GVConnection(token,null, socketChannel, CommonTools.systemTimeUtc());
//        GVConnTools.addConn2Cache(gvConnection);

    }

六、客戶端登陸處理:僅憑藉客戶端token,我無法將服務用於業務中,業務中每每會存在一個用戶的用戶標記,我須要可以根據用戶的標記,往通道里面寫入消息;因此,有了客戶端登陸過程,客戶端將本身惟一的業務標記提交到服務端。服務端創建一個token、用戶標記、用戶最後訪問時間、通道的緩衝區(統一成了一個類GVConnection),專門用語指令的發送,而且保持幾項內容的同步,GVConnTools爲操做這些內容的惟一入口;

/**
 * 專門處理客戶端登陸報文,保存GVConn到緩衝區
 * 【注】對於userId重複的狀況,在這裏不作處理了,由業務系統本身處理,
 * 這裏對userId重複至關於後登陸的用戶替換了先登陸用戶的通道。
 *
 * @param socketChannel
 * @param packet
 */
private void handleLoginPacket(SocketChannel socketChannel, IPacket packet) {

    GVConnection gvConn = new GVConnection(packet.getClientToken(),
            packet.getPacketBody(), socketChannel, CommonTools.systemTimeUtc());
    GVConnTools.addConn2Cache(gvConn);

}

七、消息寫入通道:其實我徹底能夠在奔雷的外部提供一個專門的寫入方法,可是當時好像腦子進水了,這個等之後迭代的時候在考慮如何處理吧。暫時放到這裏。須要注意的是,這個方法是惟一對協議的前四位包頭進行封裝的方法,在全部其餘的類中,都不須要對報文的前四位予以考慮。在客戶端讀取的時候,也會將前四位截取掉以後,或者將字符串放入隊列,或者將一個報文(指令)對象放入隊列。(爲何須要這四位,我將在下一個小部分——粘包、斷包中講解)

/**
 * 向客戶端寫入信息的方法
 *
 * @param iOrder        報文處理類接口
 * @param socketChannel
 */
public static synchronized void write2Client(IOrder iOrder, SocketChannel socketChannel) {
    try {
        socketChannel.register(selector, SelectionKey.OP_WRITE);
        //建立一個byteBuffer用來存儲要寫入的buffer
        ByteBuffer byteBuffer = ByteBuffer.allocate(Config.getWriteBlock());
        //得出整個包體的長度
        String packetSize = Integer.toString(iOrder.generateOrderStr().getBytes().length);
        //講包體長度放入buffer的前四位
        byteBuffer.put(packetSize.getBytes());
        //移動buffer的postion指針到第四位,包體將從第四位開始寫入
        byteBuffer.position(PACKET_HEAD_LENGTH);
        String str = iOrder.generateOrderStr();
        //寫入包體
        if (logger.isDebugEnabled()) {
            logger.debug("服務端寫入通道的包體:" + str);
        }
        byteBuffer.put(str.getBytes());
        byteBuffer.flip();
        socketChannel.write(byteBuffer);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

上面的代碼是有問題的,我修改了一下,可是仍然不肯定是否存在問題。就是在此處將iOrder放入隊列,以下:

public static synchronized void write2Client(IOrder iOrder, SocketChannel socketChannel) {
    try {
        iOrder.setChannel(socketChannel);
        writeOrderQueue.offer(iOrder);
        socketChannel.register(selector, SelectionKey.OP_WRITE);
            } catch (IOException e) {
        e.printStackTrace();
    }
}

而後,在主線程run方法中

} else if (key.isWritable()) {
    doWrite();
}

doWrite代碼:

private void doWrite() {

        while (true) {
            IOrder iOrder = writeOrderQueue.peek();
            if (iOrder == null) {
                break;
            }
            if (iOrder != null) {
                //建立一個byteBuffer用來存儲要寫入的buffer
                ByteBuffer byteBuffer = ByteBuffer.allocate(Config.getWriteBlock());
                //得出整個包體的長度
                String packetSize = Integer.toString(iOrder.generateOrderStr().getBytes().length);
                //講包體長度放入buffer的前四位
                byteBuffer.put(packetSize.getBytes());
                //移動buffer的postion指針到第四位,包體將從第四位開始寫入
                byteBuffer.position(PACKET_HEAD_LENGTH);
                String str = iOrder.generateOrderStr();
                //寫入包體
                if (logger.isDebugEnabled()) {
                    logger.debug("服務端寫入通道的包體:" + str);
                }
                byteBuffer.put(str.getBytes());

                byteBuffer.flip();
                //若是上次寫入的是斷包,則將byteBuffer移動到上次寫入的位置,
                if (writePos > 0) {
                    byteBuffer.position(writePos);
                }

                SocketChannel channel = iOrder.getChannel();
                try {
                    channel.write(byteBuffer);
                    //若是buffer沒有所有被寫入,那麼記錄寫入位置後,中斷循環,等待下次OP_WRITE
                    if (byteBuffer.remaining() > 0) {
                        writePos = byteBuffer.position();
                        break;
                    } else {
                        writePos = 0;
                        writeOrderQueue.remove();
                        channel.register(selector, SelectionKey.OP_READ);
                    }
//                logger.info("讀完的byteBuffer:" + byteBuffer);
                } catch (IOException e) {
                    e.printStackTrace();
                }


            }

        }
    }

這麼作的目的是nio自己的機制我當初沒有理解,通道的OP_WRITE狀態表明的是準備好寫入,若是我強行寫入是能夠的,可是不保證其會準備好被寫入,當其沒準備好的時候,就能夠寫入失敗。因此我要先將指令對象放入隊列,等通道準備好寫入,再從隊列中拿出指令。可是在寫入的時候,有可能一次寫入不全,因此我判斷了buffer的剩餘字節,以肯定所有寫入,若是沒有所有寫入,那麼就記錄buffer的位置,下次OP_WRITE的時候再次從這個位置開始寫入。不知道這樣作有什麼問題沒有?敬請指導!

相關文章
相關標籤/搜索