netty搭建Tcp服務器實踐

netty基本組件介紹中,咱們大體瞭解了netty的一些基本組件,今天咱們來搭建一個基於netty的Tcp服務端程序,經過代碼來了解和熟悉這些組件的功能和使用方法。html

首先咱們本身建立一個Server類,命名爲TCPServer服務器

第一步初始化ServerBootstrap,ServerBootstrap是netty中的一個服務器引導類,對ServerBootstrap的實例化就是建立netty服務器的入口微信

public class TCPServer {
    private Logger log = LoggerFactory.getLogger(getClass());
    //端口號
    private int port=5080;
    //服務器運行狀態
    private volatile boolean isRunning = false; 
    //處理Accept鏈接事件的線程,這裏線程數設置爲1便可,netty處理連接事件默認爲單線程,過分設置反而浪費cpu資源
    private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    //處理hadnler的工做線程,其實也就是處理IO讀寫 。線程數據默認爲 CPU 核心數乘以2
    private final EventLoopGroup workerGroup = new NioEventLoopGroup();
     
    public void init() throws Exception{
        //建立ServerBootstrap實例
        ServerBootstrap serverBootstrap=new ServerBootstrap();
        //初始化ServerBootstrap的線程組
        serverBootstrap.group(workerGroup,workerGroup);//
        //設置將要被實例化的ServerChannel類
        serverBootstrap.channel(NioServerSocketChannel.class);//
        //在ServerChannelInitializer中初始化ChannelPipeline責任鏈,並添加到serverBootstrap中
        serverBootstrap.childHandler(new ServerChannelInitializer());
        //標識當服務器請求處理線程全滿時,用於臨時存放已完成三次握手的請求的隊列的最大長度
        serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
        // 是否啓用心跳保活機機制
        serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);    
        //綁定端口後,開啓監聽
        ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
        if(channelFuture.isSuccess()){
            System.out.println("TCP服務啓動 成功---------------");
        }
    }
    
    /**
     * 服務啓動
     */
    public synchronized void startServer() {
        try {
              this.init();
        }catch(Exception ex) {
            
        }
    }
    
    /**
     * 服務關閉
     */
    public synchronized void stopServer() {
        if (!this.isRunning) {
            throw new IllegalStateException(this.getName() + " 未啓動 .");
        }
        this.isRunning = false;
        try {
            Future<?> future = this.workerGroup.shutdownGracefully().await();
            if (!future.isSuccess()) {
                log.error("workerGroup 沒法正常中止:{}", future.cause());
            }

            future = this.bossGroup.shutdownGracefully().await();
            if (!future.isSuccess()) {
                log.error("bossGroup 沒法正常中止:{}", future.cause());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.log.info("TCP服務已經中止...");
    }
    
    private String getName() {
        return "TCP-Server";
    }
}

上面的代碼中主要使用到的ServerBootstrap類的方法有如下這些:框架

group  :設置SeverBootstrap要用到的EventLoopGroup,也就是定義netty服務的線程模型,處理Acceptor連接的主"線程池"以及用於I/O工做的從"線程池";socket

channel:設置將要被實例化的SeverChannel類;ide

option :指定要應用到新建立SeverChannel的ChannelConfig的ChannelOption.其實也就是服務自己的一些配置;oop

chidOption:子channel的ChannelConfig的ChannelOption。也就是與客戶端創建的鏈接的一些配置;post

childHandler:設置將被添加到已被接收的子Channel的ChannelPipeline中的ChannelHandler,其實就是讓你在裏面定義處理鏈接收發數據,須要哪些ChannelHandler按什麼順序去處理;this

第二步接下來咱們實現ServerChannelInitializer類,這個類繼承實現自netty的ChannelInitializer抽象類,這個類的做用就是對channel(鏈接)的ChannelPipeline進行初始化工做,說白了就是你要把處理數據的方法添加到這個任務鏈中去,netty才知道每一步拿着socket鏈接和數據去作什麼。編碼

@ChannelHandler.Sharable
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    static final EventExecutorGroup group = new DefaultEventExecutorGroup(2);
     
    public ServerChannelInitializer() throws InterruptedException {
    }
    
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {    
        ChannelPipeline pipeline = socketChannel.pipeline();
        //IdleStateHandler心跳機制,若是超時觸發Handle中userEventTrigger()方法
        pipeline.addLast("idleStateHandler",
                new IdleStateHandler(15, 0, 0, TimeUnit.MINUTES));
        // netty基於分割符的自帶解碼器,根據提供的分隔符解析報文,這裏是0x7e;1024表示單條消息的最大長度,解碼器在查找分隔符的時候,達到該長度還沒找到的話會拋異常
//        pipeline.addLast(
//                new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(new byte[] { 0x7e }),
//                        Unpooled.copiedBuffer(new byte[] { 0x7e })));
         //自定義編解碼器
         pipeline.addLast(
                 new MessagePacketDecoder(),
                 new MessagePacketEncoder()
                );
        //自定義Hadler
        pipeline.addLast("handler",new TCPServerHandler());
        //自定義Hander,可用於處理耗時操做,不阻塞IO處理線程
        pipeline.addLast(group,"BussinessHandler",new BussinessHandler());
    }
}

這裏咱們注意下

pipeline.addLast(group,"BussinessHandler",new BussinessHandler());

在這裏咱們能夠把一些比較耗時的操做(如存儲、入庫)等操做放在BussinessHandler中進行,由於咱們爲它單獨分配了EventExecutorGroup 線程池執行,因此說即便這裏發生阻塞,也不會影響TCPServerHandler中數據的接收。

最後就是各個部分的具體實現

解碼器的實現:

public class MessagePacketDecoder extends ByteToMessageDecoder
{

    public MessagePacketDecoder() throws Exception
    {
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception
    {
        try {
            if (buffer.readableBytes() > 0) {
                // 待處理的消息包
                byte[] bytesReady = new byte[buffer.readableBytes()];
                buffer.readBytes(bytesReady);
                //這之間能夠進行報文的解析處理
                out.add(bytesReady );
            }
        }finally {
            
        }
    }


}

編碼器的實現

public class MessagePacketEncoder extends MessageToByteEncoder<Object>
{
    public MessagePacketEncoder()
    {
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception
    {
        try {
            //在這以前能夠實現編碼工做。
            out.writeBytes((byte[])msg); 
        }finally {
            
        }  
    }
}

TCPServerHandler的實現

    public class TCPServerHandler extends ChannelInboundHandlerAdapter {
        public TCPServerHandler() {
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object source) throws Exception {
            // 拿到傳過來的msg數據,開始處理
            ByteBuf recvmg = (ByteBuf) source;// 轉化爲ByteBuf
            ctx.writeAndFlush(respmsg);// 收到及發送,這裏若是沒有writeAndFlush,上面聲明的ByteBuf須要ReferenceCountUtil.release主動釋放

        }
    }

BussinessHandler的實現與TCPServerHandler基本相似,它能夠處理一些相對比較耗時的操做,咱們這裏就不實現了。

經過以上的代碼咱們能夠看到,一個基於netty的TCP服務的搭建基本就是三大塊:

一、對引導服務器類ServerBootstrap的初始化;

二、對ChannelPipeline的定義,也就是把多個ChannelHandler組成一條任務鏈;

三、對 ChannelHandler的具體實現,其中能夠有編解碼器,能夠有對收發數據的業務處理邏輯;

以上代碼只是在基於netty框架搭建一個最基本的TCP服務,其中包含了一些netty基本的特性和功能,固然這只是netty運用的一個簡單的介紹,若有不正確的地方還望指出與海涵。

 

關注微信公衆號,查看更多技術文章。

相關文章
相關標籤/搜索