1、簡單介紹:
java
服務端通信主線程是消息路由服務的啓動類,其主要做用以下:網絡
一、初始化相關配置;併發
二、根據配置的ip和port建立tcp服務;dom
三、接收客戶端鏈接,並給客戶端分配令牌;socket
四、接收客戶端的登陸請求,並將客戶端相關信息(令牌、客戶端登陸標識、最後訪問時間、當前token所使用的通道,保存到緩衝區)tcp
五、接收客戶端的報文請求,並添加到通信隊列,等待處理;post
六、接收來自各處的指令發送請求,併發送至相關通道;測試
2、詳細介紹:線程
一、啓動方法:首先加載配置信息;而後啓動主線程、通信報文消費線程(處理通信類報文)、超時、失效通道回收線程(進行超時和失效通道的回收工做)、短消息消費者線程(專門針對短消息隊列進行處理的線程)。尤爲是OP_WRITE,在OP_WRITE以後,必須將selector註冊爲OP_READ,不然會一直循環下去,死循環。debug
public static void main(String arg[]) throws Exception { //初始化配置數據 Config cfg = new Config(arg[0]); final GVServer gvServer = new GVServer(); //啓動ServerSocket通道 if (gvServer.initServer(cfg)) { ExecutorService threadPool = Executors.newCachedThreadPool(); //啓動通信服務主線程 threadPool.execute(gvServer); //啓動通信報文消費線程 threadPool.execute(new CQueueConsumer(cfg.getWaitTime())); //啓動超時通道、失效通道回收線程 threadPool.execute(new ConnTimeoutCleanThread(cfg.getCleanThreadOutTime(), cfg.getCleanThreadCycle())); threadPool.execute(new MQueueConsumer()); } }
二、初始化配置:打開tcp服務等待鏈接(略);
三、通信事件處理:通信主線程的run方法,主要對接收到的事件分別處理。這個地方尤爲要注意的是,我第一篇文章提到的,全部觸發的時間都必須被消費,不然會一直循環下去。
public void run() { while (true) { try { //監聽事件key selector.select(2000); //迭代一組事件key Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); while (keys.hasNext()) { //定義一個socket通道 SocketChannel socketChannel = null; int count = 0; SelectionKey key = keys.next(); // Logs.info("有網絡事件被觸發,事件類型爲:" + key.interestOps()); //刪除Iterator中的當前key,避免重複處理 keys.remove(); if (!key.isValid()) { continue; } else if (key.isAcceptable()) { //從客戶端送來的key中獲取ServerSocket通道 serverSocketChannel = (ServerSocketChannel) key.channel(); //接收此ServerSocket通道中的Socket通道,accept是一個阻塞方法,一直到獲取到鏈接纔會繼續 socketChannel = serverSocketChannel.accept(); //將此socket通道設置爲非阻塞模式 socketChannel.configureBlocking(false); //將此通道註冊到selector,並等待接收客戶端的讀入數據 socketChannel.register(selector, SelectionKey.OP_READ); allocToken(socketChannel); } else if (key.isReadable()) { //獲取事件key中的channel socketChannel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(Config.getReadBlock()); //清理緩衝區,便於使用 byteBuffer.clear(); //將channel中的字節流讀入緩衝區 count = socketChannel.read(byteBuffer); byteBuffer.flip(); //處理粘包 if (count > 0) { try { handlePacket(socketChannel, byteBuffer); } catch (Exception e) { e.printStackTrace(); // continue;//若是當前包存在非法拋出異常,那麼再也不進行處理直接跳出循環,處理下一個包;此處存疑,測試階段暫時註釋 } } else if (count == 0) { continue; } else { socketChannel.close(); logger.info("客戶端"+socketChannel.toString()+"鏈接關閉!"); } } else if (key.isWritable()) { ((SocketChannel) key.channel()).register(selector, SelectionKey.OP_READ); } } } catch (IOException e) { e.printStackTrace(); } } }
四、將合法報文放入通信隊列:我在配置初始化的時候,明確規定了幾種報文類,並在協議中限定了報文的格式,凡是不符合我報文格式的報文均視爲非法報文,直接會給客戶端回覆一個錯誤指令。
/** * 處理不合法報文以及將合法報文放入隊列等待處理 * * @param socketChannel * @param strPacket */ private void offerPacket(SocketChannel socketChannel, String strPacket) { IPacket packet = AnalyseTools.analysePacket(strPacket); if (packet.getHeader().equals(LoginPacket.HEADER)) { handleLoginPacket(socketChannel, packet); } //若是類爲空或者從handle單例map中沒法取到類,則證實報文非法 if (packet == null || Config.getPacketInstance(packet.getHeader()) == null) { //不在服務端識別範圍內的報文,回覆E響應,告知客戶端不合法 ErrorOrder errorOrder = (ErrorOrder) Config.getOrderInstance(ErrorOrder.HEADER); errorOrder.initErrorOrder(errorOrder.INVAILD_REQ_CODE, errorOrder.INVAILD_REQ_MSG); logger.info("客戶端發送非法報文:" + strPacket); GVServer.write2Client(errorOrder, socketChannel); //將合法報文放入消息隊列中等待處理 } else { if (!GVQueue.CQUEUE.offer(packet)) { logger.error("消息隊列已滿,請增長隊列容量!"); } else { if (logger.isDebugEnabled()) { logger.debug("將添加至消息隊列,隊列內消息數量爲:" + GVQueue.CQUEUE.size()); } } } }
五、分配客戶端的令牌:客戶端鏈接以後,服務端立刻會給客戶端回覆一個T指令,告訴客戶端它的令牌,今後之後,客戶端每次報文,都必須攜帶此令牌;在通信這一層裏,服務端根據token肯定客戶端;
private void allocToken(SocketChannel socketChannel) { //給鏈接上來的通道分配token TokenOrder tokenOrder = (TokenOrder) Config.getOrderInstance(TokenOrder.HEADER); String token = UUID.randomUUID().toString(); tokenOrder.initTokenOrder(token); //返回給客戶端token write2Client(tokenOrder, socketChannel); logger.info("客戶端:<" + token + ">已經鏈接!"); //將鏈接後的channel保存到全局map中 //2015.8.13修改,先把userId存爲null,等待用戶登陸後,在將, // GVConnection gvConnection = new GVConnection(token,null, socketChannel, CommonTools.systemTimeUtc()); // GVConnTools.addConn2Cache(gvConnection); }
六、客戶端登陸處理:僅憑藉客戶端token,我無法將服務用於業務中,業務中每每會存在一個用戶的用戶標記,我須要可以根據用戶的標記,往通道里面寫入消息;因此,有了客戶端登陸過程,客戶端將本身惟一的業務標記提交到服務端。服務端創建一個token、用戶標記、用戶最後訪問時間、通道的緩衝區(統一成了一個類GVConnection),專門用語指令的發送,而且保持幾項內容的同步,GVConnTools爲操做這些內容的惟一入口;
/** * 專門處理客戶端登陸報文,保存GVConn到緩衝區 * 【注】對於userId重複的狀況,在這裏不作處理了,由業務系統本身處理, * 這裏對userId重複至關於後登陸的用戶替換了先登陸用戶的通道。 * * @param socketChannel * @param packet */ private void handleLoginPacket(SocketChannel socketChannel, IPacket packet) { GVConnection gvConn = new GVConnection(packet.getClientToken(), packet.getPacketBody(), socketChannel, CommonTools.systemTimeUtc()); GVConnTools.addConn2Cache(gvConn); }
七、消息寫入通道:其實我徹底能夠在奔雷的外部提供一個專門的寫入方法,可是當時好像腦子進水了,這個等之後迭代的時候在考慮如何處理吧。暫時放到這裏。須要注意的是,這個方法是惟一對協議的前四位包頭進行封裝的方法,在全部其餘的類中,都不須要對報文的前四位予以考慮。在客戶端讀取的時候,也會將前四位截取掉以後,或者將字符串放入隊列,或者將一個報文(指令)對象放入隊列。(爲何須要這四位,我將在下一個小部分——粘包、斷包中講解)
/** * 向客戶端寫入信息的方法 * * @param iOrder 報文處理類接口 * @param socketChannel */ public static synchronized void write2Client(IOrder iOrder, SocketChannel socketChannel) { try { socketChannel.register(selector, SelectionKey.OP_WRITE); //建立一個byteBuffer用來存儲要寫入的buffer ByteBuffer byteBuffer = ByteBuffer.allocate(Config.getWriteBlock()); //得出整個包體的長度 String packetSize = Integer.toString(iOrder.generateOrderStr().getBytes().length); //講包體長度放入buffer的前四位 byteBuffer.put(packetSize.getBytes()); //移動buffer的postion指針到第四位,包體將從第四位開始寫入 byteBuffer.position(PACKET_HEAD_LENGTH); String str = iOrder.generateOrderStr(); //寫入包體 if (logger.isDebugEnabled()) { logger.debug("服務端寫入通道的包體:" + str); } byteBuffer.put(str.getBytes()); byteBuffer.flip(); socketChannel.write(byteBuffer); } catch (IOException e) { e.printStackTrace(); } }
上面的代碼是有問題的,我修改了一下,可是仍然不肯定是否存在問題。就是在此處將iOrder放入隊列,以下:
public static synchronized void write2Client(IOrder iOrder, SocketChannel socketChannel) { try { iOrder.setChannel(socketChannel); writeOrderQueue.offer(iOrder); socketChannel.register(selector, SelectionKey.OP_WRITE); } catch (IOException e) { e.printStackTrace(); } }
而後,在主線程run方法中
} else if (key.isWritable()) { doWrite(); }
doWrite代碼:
private void doWrite() { while (true) { IOrder iOrder = writeOrderQueue.peek(); if (iOrder == null) { break; } if (iOrder != null) { //建立一個byteBuffer用來存儲要寫入的buffer ByteBuffer byteBuffer = ByteBuffer.allocate(Config.getWriteBlock()); //得出整個包體的長度 String packetSize = Integer.toString(iOrder.generateOrderStr().getBytes().length); //講包體長度放入buffer的前四位 byteBuffer.put(packetSize.getBytes()); //移動buffer的postion指針到第四位,包體將從第四位開始寫入 byteBuffer.position(PACKET_HEAD_LENGTH); String str = iOrder.generateOrderStr(); //寫入包體 if (logger.isDebugEnabled()) { logger.debug("服務端寫入通道的包體:" + str); } byteBuffer.put(str.getBytes()); byteBuffer.flip(); //若是上次寫入的是斷包,則將byteBuffer移動到上次寫入的位置, if (writePos > 0) { byteBuffer.position(writePos); } SocketChannel channel = iOrder.getChannel(); try { channel.write(byteBuffer); //若是buffer沒有所有被寫入,那麼記錄寫入位置後,中斷循環,等待下次OP_WRITE if (byteBuffer.remaining() > 0) { writePos = byteBuffer.position(); break; } else { writePos = 0; writeOrderQueue.remove(); channel.register(selector, SelectionKey.OP_READ); } // logger.info("讀完的byteBuffer:" + byteBuffer); } catch (IOException e) { e.printStackTrace(); } } } }
這麼作的目的是nio自己的機制我當初沒有理解,通道的OP_WRITE狀態表明的是準備好寫入,若是我強行寫入是能夠的,可是不保證其會準備好被寫入,當其沒準備好的時候,就能夠寫入失敗。因此我要先將指令對象放入隊列,等通道準備好寫入,再從隊列中拿出指令。可是在寫入的時候,有可能一次寫入不全,因此我判斷了buffer的剩餘字節,以肯定所有寫入,若是沒有所有寫入,那麼就記錄buffer的位置,下次OP_WRITE的時候再次從這個位置開始寫入。不知道這樣作有什麼問題沒有?敬請指導!