包括如下主要組件:java
這些組件表明了不一樣的構造:資源、邏輯、通知。編程
Channel是Java NIO的一個基本構造。
在網絡通信中,數據要從發送端應用程序->發送端硬件或則其餘實體->服務端硬件或則其餘實體->服務端應用,所以須要一個數據傳輸過程當中的載體,這個載體就是Channel,它能夠被打開或者被關閉,鏈接或則斷開鏈接。segmentfault
一個回調就是一個方法,好比A方法在某些特定的流程中,調用B方法,這個B方法,就是A方法的回調。安全
在java併發編程學習中,咱們也學到了這個對象,能夠看作是對異步操做結果的一個佔位符,在完成以前會一直阻塞,完成後提供對結果的訪問。Netty也提供了本身的ChannelFuture的實現,用於在執行異步操做的時候使用。服務器
在Netty中,使用不一樣的事件來通知咱們狀態的改變或則狀態的改變,以便咱們基於這些事件來觸發不一樣的動做。
這些事件包括出站入站等,咱們能夠針對這些事件定義不一樣的處理方式,好比記錄日誌、數據轉換、流控制、業務邏輯處理等,這些處理方式,都是在ChannelHandler中實現的。網絡
簡單的說,他就是一個線程池,EventLoopGroup接口最終基礎了ExecutorService接口。併發
服務端引導是ServerBootstrap,客戶端是Bootstrap。
ServerBootstrap用於配置Channel、EventLoopGroup、ChannelHandler以及監聽並接受傳入鏈接請求的端口。
Bootstrap與ServerBootstrap不一樣的是,使用服務端的地址和端口來鏈接服務端。異步
服務端流程:socket
EchoServer,經過ServerBootstrap引導類,配置不一樣的信息,這部分的代碼也是通用性的,咱們實際的業務處理邏輯在ChannelHandler上。ide
public class EchoServer { public static void main(String[] args) throws InterruptedException { final EchoServerHandler echoServerHandler = new EchoServerHandler(); // 建立NioEventLoopGroup類型的EventLoopGroup EventLoopGroup group = new NioEventLoopGroup(); try { // 建立ServerBootstrap ServerBootstrap sbs = new ServerBootstrap(); sbs.group(group) // 設置Channel爲NIO的服務端Channel .channel(NioServerSocketChannel.class) // 綁定本地端口 .localAddress(new InetSocketAddress(Const.PORT)) // 把echoServerHandler加入到ChannelPipeline中 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(echoServerHandler); } }); // 異步綁定服務器,阻塞到服務器綁定完成 ChannelFuture sync = sbs.bind().sync(); // 獲取channel的closeFuture,阻塞到關閉 sync.channel().closeFuture().sync(); } finally { // 優雅的關掉group並釋放全部的資源 group.shutdownGracefully().sync(); } } }
EchoServerHandler,實際的業務邏輯處理代碼,這部分是根據咱們的業務不一樣進行定製化開發的。
// 標記爲ChannelHandler.Sharable時,標識能被多個channel安全的共享,是線程安全的 @ChannelHandler.Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { /** * 服務端接收客戶端信息的時候調用 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; String request = byteBuf.toString(CharsetUtil.UTF_8); String resp = "Hello " + request; System.out.println("服務端收到信息:" + byteBuf.toString(CharsetUtil.UTF_8)); // 把消息發送給客戶端 ctx.write(Unpooled.copiedBuffer(resp, CharsetUtil.UTF_8)); } /** * 服務端處理客戶端最後一條消息後調用 * * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)// flush數據 .addListener(ChannelFutureListener.CLOSE);// 關閉Channel } /** * 服務端處理消息過程當中,對異常的處理 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 打印異常 cause.printStackTrace(); // 關閉Channel ctx.close(); } }
客戶端流程:
EchoClient
public class EchoClient { public static void main(String[] args) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { // 建立Bootstrap Bootstrap bs = new Bootstrap(); // bs.group(group) // 設置Channel爲NIO的客戶端Channel .channel(NioSocketChannel.class) // 設置服務器的地址端口信息 .remoteAddress(new InetSocketAddress(Const.IP, Const.PORT)) .handler(new EchoClientHandler()); // 鏈接遠程服務器,阻塞到鏈接完成 ChannelFuture cf = bs.connect().sync(); // 獲取channel的closeFuture,阻塞到關閉 cf.channel().closeFuture().sync(); } finally { // 優雅的關掉group並釋放全部的資源 group.shutdownGracefully().sync(); } } }
EchoClientHandler
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { /** * 客戶端收到服務器消息後調用 * * @param channelHandlerContext * @param byteBuf * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { // 處理收到的消息 System.out.println("客戶端收到信息:" + byteBuf.toString(CharsetUtil.UTF_8)); } /** * 客戶端與服務器鏈接後調用 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 當channel是活躍的時候,往服務端發送一條消息 ctx.writeAndFlush(Unpooled.copiedBuffer("Netty", CharsetUtil.UTF_8)); } /** * 客戶端處理消息過程當中,對異常的處理 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }