java nio 網絡框架實現

maven項目
https://github.com/solq360/commongit

  • 鏈式編/解碼
  • 鏈路層鏈式處理
  • 管道管理socket
  • 多協議處理很是方便
  • 仿netty NioEventLoop 單線程串行處理

========
侍加功能 :github

  • 自動化編/解碼
  • rpc 接口加強使用

簡單聊天例子

server

TestNioServer服務器

//建立session管理工廠
ISessionFactory sessionFactory = new SessionFactory();
//建立編/解碼管理
ICoderParserManager coderParserManager = new CoderParserManager();
//註冊包編/解碼,處理業務
coderParserManager.register(CoderParser.valueOf("server chat", PackageDefaultCoder.valueOf(), new ChatTestServerHandle()));
//建立ServerSocket 實例
ServerSocket serverSocket=ServerSocket.valueOf(SocketChannelConfig.valueOf(6969), 10,20,coderParserManager, sessionFactory);

//啓動服務
serverSocket.start();
//阻塞當前線程
serverSocket.sync();
//關閉處理
serverSocket.stop();

client

TestNioClient
傳統方式鏈接session

//建立編/解碼管理
    ICoderParserManager coderParserManager = new CoderParserManager();
    //註冊包編/解碼,處理業務
    coderParserManager.register(CoderParser.valueOf("chat", PackageDefaultCoder.valueOf(), new ChatHandle()));
    //建立ClientSocket 實例
    final ClientSocket clientSocket = ClientSocket.valueOf(SocketChannelConfig.valueOf(6969), new SocketPool("client", null), coderParserManager, new EmptyHandle());

    //模擬鏈接以後發送消息
    Timer timer = new Timer();
    timer.schedule(new TimerTask() {

        @Override
        public void run() {
        clientSocket.send("鏈接服務器成功");
        System.out.println("send ");
        this.cancel();
        }
    }, 1000);
    
    //啓動服務
    clientSocket.start();
    //阻塞當前線程
    clientSocket.sync();
    //關閉處理
    clientSocket.stop();

服務器方式鏈接socket

//建立session管理工廠
    ISessionFactory sessionFactory = new SessionFactory();
    //建立編/解碼管理
    ICoderParserManager coderParserManager = new CoderParserManager();
    //註冊包編/解碼,處理業務
    coderParserManager.register(CoderParser.valueOf("chat", PackageDefaultCoder.valueOf(), new ChatHandle()));
    //建立ClientSocket 實例
    final ServerSocket serverSocket = ServerSocket.valueOf(SocketChannelConfig.valueOf(8888), 10, 20, coderParserManager, sessionFactory);

    //模擬鏈接以後發送消息
    Timer timer = new Timer();
    timer.schedule(new TimerTask() {

        @Override
        public void run() {
        System.out.println("registerClientSocket");
        //主動鏈接服務器
        ClientSocket clientSocket = serverSocket.registerClient(SocketChannelConfig.valueOf(6969));
        clientSocket.send("鏈接服務器成功");
        this.cancel();
        }
    }, 1000);
    
    //啓動服務
    serverSocket.start();
    //阻塞當前線程
    serverSocket.sync();
    //關閉處理
    serverSocket.stop();

源碼實現過程

鏈式編/解碼maven

  • 由 多個 ICoder 輸入/輸出轉換處理
  • CoderParser 類組裝多個 ICoder
  • 編/碼處理器 注意優先級
  • nio read -> packageCoder -> link coders -> handle
  • handle write -> link coders -> packageCoder -> nio write
  • 由 ICoderParserManager 管理調用處理
public interface ICoderParserManager {

    /**
     * 解碼處理
     * 
     * @return CoderResult
     * */
    CoderResult decode(ByteBuffer buffer, ICoderCtx ctx);

    /**
     * 編碼處理
     * */
    ByteBuffer encode(Object message, ICoderCtx ctx);

    void error(ByteBuffer buffer, ICoderCtx ctx);

    /** 註冊 編/碼處理器 */
    void register(CoderParser coderParser);
}

其中核心
decode
encodeide

@Override
    public CoderResult decode(ByteBuffer buffer, ICoderCtx ctx) {
    final SocketChannelCtx socketChannelCtx = (SocketChannelCtx) ctx;
    final ClientSocket clientSocket = socketChannelCtx.getClientSocket();

    for (CoderParser coderParser : coderParsers.values()) {
        final IPackageCoder packageCoder = coderParser.getPackageCoder();
        final ICoder<?, ?>[] linkCoders = coderParser.getCoders();
        final IHandle handle = coderParser.getHandle();
        Object value = null;
        synchronized (buffer) {
        // 已解析完
        if (socketChannelCtx.getCurrPackageIndex() >= buffer.limit()) {
            return CoderResult.valueOf(ResultValue.UNFINISHED);
        }
        // 包協議處理
        if (!packageCoder.verify(buffer, ctx)) {
            continue;
        }
        // 包解析
        value = packageCoder.decode(buffer, ctx);
        if (value == null) {
            // 包未讀完整
            return CoderResult.valueOf(ResultValue.UNFINISHED);
        }
        }
        // 鏈式處理
        if (linkCoders != null) {
        for (ICoder coder : linkCoders) {
            value = coder.decode(value, ctx);
            if (value == null) {
            throw new CoderException("解碼出錯 : " + coder.getClass());
            }
        }
        }
        // 業務解碼處理
        value = handle.decode(value, ctx);
        clientSocket.readBefore(socketChannelCtx, value);
        handle.handle(value, ctx);
        clientSocket.readAfter(socketChannelCtx, value);

        return CoderResult.valueOf(ResultValue.SUCCEED);
    }
    return CoderResult.valueOf(ResultValue.NOT_FIND_CODER);
    }

    @Override
    public ByteBuffer encode(Object message, ICoderCtx ctx) {

    for (CoderParser coderParser : coderParsers.values()) {
        final IPackageCoder packageCoder = coderParser.getPackageCoder();
        final ICoder<?, ?>[] linkCoders = coderParser.getCoders();
        final IHandle handle = coderParser.getHandle();
        // 業務檢查
        if (!handle.verify(message, ctx)) {
        continue;
        }
        // 業務編碼處理
        Object value = handle.encode(message, ctx);
        // 鏈式處理
        if (linkCoders != null) {
        for (int i = linkCoders.length - 1; i >= 0; i--) {
            ICoder coder = linkCoders[i];
            value = coder.encode(value, ctx);
            if (value == null) {
            throw new CoderException("編碼出錯 : " + coder.getClass());
            }
        }
        }
        // 打包消息處理
        value = packageCoder.encode(value, ctx);
        if (value != null) {
        return (ByteBuffer) value;
        }
        throw new CoderException("編碼出錯  :" + packageCoder.getClass());
    }

    throw new CoderException("未找到編/解碼處理器 ");
   }
  • 半包/帖包處理 : AbstractISocketChannel doRead方法摘要,根據解碼返回的狀態作處理。
  • 半包:當不是完成狀態時,繼續解碼,從最後一次包索引開始處理
  • 帖包:當完成包解碼移動包索引,等侍下輪解碼處理
boolean run = true;
        // 粘包處理
        while (run) {
        ByteBuffer cpbuffer = socketChannelCtx.coderBegin();
        cpbuffer.mark();
        CoderResult coderResult = coderParserManager.decode(cpbuffer, socketChannelCtx);
        switch (coderResult.getValue()) {
        case SUCCEED:
            break;
        case NOT_FIND_CODER:
            final int readySize = socketChannelCtx.getWriteIndex() - socketChannelCtx.getCurrPackageIndex();
            final int headLimit = 255;
            if (readySize >= headLimit) {
            throw new CoderException("未找到編/解碼處理器 ");
            }
            run = false;
            break;
        case UNFINISHED:
        case UNKNOWN:
        case ERROR:
        default:
            run = false;
            // TODO throw
            break;
        }
      }

未完侍加oop

相關文章
相關標籤/搜索