maven項目
https://github.com/solq360/commongit
========
侍加功能 :github
TestNioServer服務器
//建立session管理工廠 ISessionFactory sessionFactory = new SessionFactory(); //建立編/解碼管理 ICoderParserManager coderParserManager = new CoderParserManager(); //註冊包編/解碼,處理業務 coderParserManager.register(CoderParser.valueOf("server chat", PackageDefaultCoder.valueOf(), new ChatTestServerHandle())); //建立ServerSocket 實例 ServerSocket serverSocket=ServerSocket.valueOf(SocketChannelConfig.valueOf(6969), 10,20,coderParserManager, sessionFactory); //啓動服務 serverSocket.start(); //阻塞當前線程 serverSocket.sync(); //關閉處理 serverSocket.stop();
TestNioClient
傳統方式鏈接session
//建立編/解碼管理 ICoderParserManager coderParserManager = new CoderParserManager(); //註冊包編/解碼,處理業務 coderParserManager.register(CoderParser.valueOf("chat", PackageDefaultCoder.valueOf(), new ChatHandle())); //建立ClientSocket 實例 final ClientSocket clientSocket = ClientSocket.valueOf(SocketChannelConfig.valueOf(6969), new SocketPool("client", null), coderParserManager, new EmptyHandle()); //模擬鏈接以後發送消息 Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { clientSocket.send("鏈接服務器成功"); System.out.println("send "); this.cancel(); } }, 1000); //啓動服務 clientSocket.start(); //阻塞當前線程 clientSocket.sync(); //關閉處理 clientSocket.stop();
服務器方式鏈接socket
//建立session管理工廠 ISessionFactory sessionFactory = new SessionFactory(); //建立編/解碼管理 ICoderParserManager coderParserManager = new CoderParserManager(); //註冊包編/解碼,處理業務 coderParserManager.register(CoderParser.valueOf("chat", PackageDefaultCoder.valueOf(), new ChatHandle())); //建立ClientSocket 實例 final ServerSocket serverSocket = ServerSocket.valueOf(SocketChannelConfig.valueOf(8888), 10, 20, coderParserManager, sessionFactory); //模擬鏈接以後發送消息 Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { System.out.println("registerClientSocket"); //主動鏈接服務器 ClientSocket clientSocket = serverSocket.registerClient(SocketChannelConfig.valueOf(6969)); clientSocket.send("鏈接服務器成功"); this.cancel(); } }, 1000); //啓動服務 serverSocket.start(); //阻塞當前線程 serverSocket.sync(); //關閉處理 serverSocket.stop();
鏈式編/解碼maven
public interface ICoderParserManager { /** * 解碼處理 * * @return CoderResult * */ CoderResult decode(ByteBuffer buffer, ICoderCtx ctx); /** * 編碼處理 * */ ByteBuffer encode(Object message, ICoderCtx ctx); void error(ByteBuffer buffer, ICoderCtx ctx); /** 註冊 編/碼處理器 */ void register(CoderParser coderParser); }
其中核心
decode
encodeide
@Override public CoderResult decode(ByteBuffer buffer, ICoderCtx ctx) { final SocketChannelCtx socketChannelCtx = (SocketChannelCtx) ctx; final ClientSocket clientSocket = socketChannelCtx.getClientSocket(); for (CoderParser coderParser : coderParsers.values()) { final IPackageCoder packageCoder = coderParser.getPackageCoder(); final ICoder<?, ?>[] linkCoders = coderParser.getCoders(); final IHandle handle = coderParser.getHandle(); Object value = null; synchronized (buffer) { // 已解析完 if (socketChannelCtx.getCurrPackageIndex() >= buffer.limit()) { return CoderResult.valueOf(ResultValue.UNFINISHED); } // 包協議處理 if (!packageCoder.verify(buffer, ctx)) { continue; } // 包解析 value = packageCoder.decode(buffer, ctx); if (value == null) { // 包未讀完整 return CoderResult.valueOf(ResultValue.UNFINISHED); } } // 鏈式處理 if (linkCoders != null) { for (ICoder coder : linkCoders) { value = coder.decode(value, ctx); if (value == null) { throw new CoderException("解碼出錯 : " + coder.getClass()); } } } // 業務解碼處理 value = handle.decode(value, ctx); clientSocket.readBefore(socketChannelCtx, value); handle.handle(value, ctx); clientSocket.readAfter(socketChannelCtx, value); return CoderResult.valueOf(ResultValue.SUCCEED); } return CoderResult.valueOf(ResultValue.NOT_FIND_CODER); } @Override public ByteBuffer encode(Object message, ICoderCtx ctx) { for (CoderParser coderParser : coderParsers.values()) { final IPackageCoder packageCoder = coderParser.getPackageCoder(); final ICoder<?, ?>[] linkCoders = coderParser.getCoders(); final IHandle handle = coderParser.getHandle(); // 業務檢查 if (!handle.verify(message, ctx)) { continue; } // 業務編碼處理 Object value = handle.encode(message, ctx); // 鏈式處理 if (linkCoders != null) { for (int i = linkCoders.length - 1; i >= 0; i--) { ICoder coder = linkCoders[i]; value = coder.encode(value, ctx); if (value == null) { throw new CoderException("編碼出錯 : " + coder.getClass()); } } } // 打包消息處理 value = packageCoder.encode(value, ctx); if (value != null) { return (ByteBuffer) value; } throw new CoderException("編碼出錯 :" + packageCoder.getClass()); } throw new CoderException("未找到編/解碼處理器 "); }
boolean run = true; // 粘包處理 while (run) { ByteBuffer cpbuffer = socketChannelCtx.coderBegin(); cpbuffer.mark(); CoderResult coderResult = coderParserManager.decode(cpbuffer, socketChannelCtx); switch (coderResult.getValue()) { case SUCCEED: break; case NOT_FIND_CODER: final int readySize = socketChannelCtx.getWriteIndex() - socketChannelCtx.getCurrPackageIndex(); final int headLimit = 255; if (readySize >= headLimit) { throw new CoderException("未找到編/解碼處理器 "); } run = false; break; case UNFINISHED: case UNKNOWN: case ERROR: default: run = false; // TODO throw break; } }
未完侍加oop