netty的一些基本概念

netty掃盲

本文目的是在於對netty有一個大體印象,便於深刻了解netty前有一些基本概念java

簡介

  1. NIO通訊框架
  2. 單點支持的鏈接數與機器內存有關(1G內存10W+左右),不在受制於系統的最大句柄數(65536個)
  3. API接口簡單(相比於JDK的NIO)
  4. 事件驅動
  5. 經常使用用於遊戲、金融等須要長連接,實時性要求高的業務

NIO中的數據流程

sequenceDiagram
Channel->>Buffer: 讀取 
Buffer->>程序:讀取
程序->>Buffer:寫入
Buffer->>Channel: 寫入
  1. 在NIO中,全部的數據都是從Channel中讀取並緩存到Buffer中,用戶本身的代碼再從Buffer中讀取
  2. 要想寫數據,則必須先寫入Buffer中,而後在把Buffer寫入到Channnel中。

netty中幾個重要的對象

  1. Channel
  2. ByteBuf
  3. ChannelHandler
  4. ChannelHandlerContext
  5. ChannelPipeline

Channel

Channel 一個鏈接的抽象。全部的讀寫數據,最終都是經過Channel流通的。主要有NioSocketChannel、NioServerSocketChannelbootstrap


ByteBuf

netty本身實現的一個緩存Buf,相比於JDK的ByteBuffer有一下優勢緩存

  1. 長度能夠動態拓展(在寫入的時候,判斷到capacity不夠,會從新開闢一段大容量的buf,而後把以前buf中的數據拷貝到新的buf中)
  2. 操做簡單。JDK的ByteBuffer只有一個位置指針position。netty的ByteBuf則有readIndex(度位置)、writeIndex(寫位置)。在對buf操做時候,不須要flip()和rewind();操做簡單。

ChannelHandler

ChannelHandler 從Channel中讀取到數據後,就須要把數據交給ChannelHandler進行處理app


ChannelHandlerContext

記錄當前ChannelHandler的環境上下文,大體有如下信息。 每個ChannelHandler都會有一個ChannelHandlerContext與之對應(一對一關係)框架

  1. 記錄當前ChannelHandler對象
  2. 標識是inbound仍是outbound
  3. 所屬的ChannelPipeline
  4. 前一個ChannelHandlerContext、後一個ChannelHandlerContext。 這樣造成一個處理鏈,相似於SpringMVC中的攔截器鏈。

ChannelPipeline

netty是事件驅動的。在獲取到不一樣的事件後(數據),會作不一樣的業務邏輯處理,這時候有的可能須要多個Handler協做完成,有的Handler可能對當前的事件不作關心,有的可能處理完了,不想要後面的Handler處理了。<br/> 這時候如何對事件進行傳播處理,這時候就須要用到ChannelPipeline了。ChannelPipeline中保存了頭部的ChannelHandlerContext(進來的類型事件會從頭開始)和尾部的ChannelHandlerContext(出去的類型事件會從尾部開始),他們是一個串行連接。socket

ChannelHandler、ChannelHandlerContext、ChannelPipeline的關係

image image image

Channel和Channel之間都是經過ChannelHandlerContext聯繫起來的。ide

具體內容這篇博客講的很詳細oop


netty的數據流程圖

netty的數據流圖

netty官方demo

server端

package io.netty.example.discard;
    
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
    
/**
 * Discards any incoming data.
 */
public class DiscardServer {
    
    private int port;
    
    public DiscardServer(int port) {
        this.port = port;
    }
    
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)
    
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new DiscardServer(port).run();
    }
}
package io.netty.example.discard;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Handles a server-side channel.
 */
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        try {
            // Do something with msg
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}
package io.netty.example.time;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        
        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

client端

package io.netty.example.time;

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
相關文章
相關標籤/搜索