Netty-登堂篇

簡介:java

        Netty是一款高效的基於reactor線程模型、異步非阻塞事件驅動的網絡編程框架,主要用於網絡編程,適用於中間件開發組建、傳統應用通信開發。react

       Reactor線程模型:線程模型線程模型中有兩種角色,一個是主處理線程(組),一個是工做線程(組),在netty中主處理線程和工做線程都由線程組來維護,主處理線程主要用於處理網絡事件中的Accept事件,工做線程主要用來處理主線程下發下來的網絡連接處理處理任務,如對channel中數據的讀寫等。編程

使用Netty開發網絡應用程序的優點:bootstrap

    (1)採用java NIO原生開發網絡應用,成本相對較大,須要熟練掌握NIO編程中各組件的正確使用(如:多路複用器、channel、緩存機制等)、架構設計上線程模型的選擇及實現、編解碼技術、TCP半包處理等等。api

    (2)Netty已解決或規避原生java NIO的各類問題或bug,比較醒目的就是多路複用的空輪詢。緩存

    (3)Netty趨於成熟,已成爲不少大型底層中間件網絡通信組件(如:Hadoop的RPC框架avro)。網絡

   (4)易用性,netty已儘量地對外屏蔽了底層網絡實現的具體細節,已相對友好的方式對外提供API(具體爲ServerBootstrap、Bootstrap、自實現編解碼抽象類、自定義Handler等)。架構

開發案例:框架

        Client發出查詢時間的指令,Server端接收指令並簡單判斷後,將當前時間寫入channel。異步

maven pom.xml中引入:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.0.Final</version>
</dependency>

輔助類:

package com.best.diamond.netty.time;

/**
 * Created by hengluwen on 17/10/1.
 */
public class NettyConstants {

    public static final String BAD_ORDER = "BAD ORDER";

    public static final String QUERY_TIME_ORDER = "QUERY TIME ORDER";

}

Netty Server端:

package com.best.diamond.netty.time;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.util.CharsetUtil;


/**
 * Created by hengluwen on 17/10/1.
 */
public class TimeServer {

    private static final StringDecoder DECODER = new StringDecoder(CharsetUtil.UTF_8);

    public void bind(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .option(ChannelOption.SO_REUSEADDR, true)
                            //服務端的ServerBootstrap中增長了一個方法childHandler,它的目的是添加handler(包括用戶自定義的handler),用來監聽已經鏈接的客戶端的Channel的動做和狀態。
                    .childHandler(new ChildChannelHandler());
            //綁定端口,同步等待成功
            ChannelFuture channelFuture = b.bind(port).sync();

            //等待服務端監聽端口關閉
            channelFuture.channel().closeFuture().sync();
        } finally {
            //優雅退出,釋放線程池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
            //如下兩行代碼爲了解決半包讀問題
            pipeline.addLast("framer", new LineBasedFrameDecoder(1024));
            pipeline.addLast("decoder", DECODER);
            pipeline.addLast(new TimeServerHandler());
        }
    }


    public static void main(String[] args) throws Exception {
        new TimeServer().bind(8080);
    }
}

       服務端代碼中首先建立了兩個NioEventLoopGroup實例,NioEventLoopGroup是個線程組,它包含了一組NIO線程,專門用於網絡事件的處理,實際上它們就是Reactor線程組,建立兩個的緣由是:一個用於服務端接收客戶端的連接,一個用於進行SocketChannel的讀寫。設置建立Channel爲NioServerSocketChannel,它的功能對應於JDK NIO類庫中的ServerSocketChannel類,而後配置NioServerSocketChannel的TCP參數,此處將它的backlog設置爲1024(該參數表示未完成TCP三次握手的連接數與完成TCP三次握手的鏈接數的總和,這個參數會限制client的總連接數),ServerBootstrap是netty服務端啓動輔助類,目的爲下降服務端開發的複雜度。綁定I/O事件的處理類ChailChannelHandler,它的做用相似於Reactor模式中的Handler類,主要用於處理網絡I/O事件。

package com.best.diamond.netty.time;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.Date;

/**
 * Created by hengluwen on 17/10/1.
 */
public class TimeServerHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String request) throws Exception {
        System.out.println("The time server receive order : " + request);
        String currentTime = NettyConstants.QUERY_TIME_ORDER.equalsIgnoreCase(request) ? new Date().toString() : NettyConstants.BAD_ORDER;
        sendMessage(ctx, currentTime);
    }

    private void sendMessage(ChannelHandlerContext ctx, String data) {
        ByteBuf byteBuf = Unpooled.copiedBuffer(data.getBytes());
        ctx.writeAndFlush(byteBuf);
    }
}

       當接收到client發送的數據時,因爲在啓動時已經加入了String類型的消息解碼器(StringDecoder,此消息解碼器由Netty提供,不須要自定義),所以用戶自定義的Handler中接收到的request的數據類型爲String,沒必要再次進行手工解碼。

Netty Client端:

package com.best.diamond.netty.time;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.InetSocketAddress;

/**
 * Created by hengluwen on 17/10/1.
 */
public class TimeClient {

    public void connect(String host, int port) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioServerSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new TimeClientHandler());
                        }
                    });

            ChannelFuture channelFuture = b.connect(new InetSocketAddress(host, port)).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        try {
            new TimeClient().connect("127.0.0.1", 8080);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

client端的啓動代碼與server相似,因爲client更多的是要處理網絡事件的I/O,且正常狀況下client只會處理一個或兩個channel(channel失效重連時),所以只須要一個線程組便可。

package com.best.diamond.netty.time;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * Created by hengluwen on 17/10/1.
 */
public class TimeClientHandler extends SimpleChannelInboundHandler<String> {

    private final ByteBuf firstMessage;

    public TimeClientHandler() {
        firstMessage = Unpooled.buffer().writeBytes(NettyConstants.QUERY_TIME_ORDER.getBytes());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(firstMessage).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    System.out.println("error" + future.cause());
                }
            }
        });
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String request) throws Exception {
        System.out.println("Now is : " + request);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

channel激活時,發出指定,等待服務端的相應。

總結:

    (1)netty是一個高效的異步非阻塞的網絡通信框架,開發簡單。

    (2)netty易用性強,提供了大量的編碼器/解碼器,已經處理TCP半包問題的輔助類。

    (3)netty的api設計的強大而友好。

相關文章
相關標籤/搜索