Netty是一個NIO的客服端服務器框架,它能夠簡單、快速的搭建器一個協議包客服端服務器的應用程序。它極大的簡化了TCP和UDP這類的網絡編程。
java
「快速」和「簡單」並不意味着會讓你的最終應用產生維護性或性能上的問題。Netty 是一個吸取了多種協議的實現經驗,這些協議包括FTP,SMTP,HTTP,各類二進制,文本協議,並通過至關精心設計的項目,最終,Netty 成功的找到了一種方式,在保證易於開發的同時還保證了其應用的性能,穩定性和伸縮性。git
這裏簡單記錄下學習要點,詳細的講解。能夠看官網(github:https://github.com/netty/netty )或者查看李林鋒的的系列文章http://ifeve.com/author/linfeng/ 。github
體系結構圖:
編程
由李林鋒講解的易懂的架構圖:
promise
一、兩個selector線程:mainReactor處理accpet事件、subReactor處理connection、read、send事件
服務器
二、業務處理線程池:包括編碼、解碼、業務處理。
網絡
a、處理bytes的serverhandler
架構
/** * @see 進入的channel:用於處理接受時候的事件處理 */ public class TimeServerHandler extends ChannelInboundHandlerAdapter { /** * @see 當一個channel準備好的時候,發送一個32位的數字 */ public void channelActive(final ChannelHandlerContext ctx) { // ByteBuf:沒有了flip()。它只有2個功能:讀、寫 // 讀: // 寫:當你寫的時候,若是讀取下標沒有改變,則繼續增加 final ByteBuf time = ctx.alloc().buffer(4); time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); final ChannelFuture f = ctx.writeAndFlush(time); // 當寫如完成的時候,執行 f.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { // TODO Auto-generated method stub assert f == future; ctx.close(); } }); } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
sever的啓動部分框架
public class TimeServer { private int port; public TimeServer() { this.port = port; } public void runn() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup); b.channel(NioServerSocketChannel.class); b.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // TODO Auto-generated method stub ch.pipeline().addLast(new TimeServerHandler()); } }); b.option(ChannelOption.SO_BACKLOG, 128); b.childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) { try { new TimeServer().runn(); } catch (Exception e) { e.printStackTrace(); } } }
b、client部分:處理字節
socket
public class TimeDecoder extends ByteToMessageDecoder { /** * @see 定義一個回調的數據累加buff * @see 若是有out,則表示解析成功。 */ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() < 4) { return; } out.add(in.readBytes(4)); } }
client的 channel處理類
public class TimeClientHandler extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf buf = (ByteBuf) msg; try { long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } catch (Exception e) { e.printStackTrace(); } finally { buf.release(); } } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
client的啓動類:
public class TimeClient { public static void main(String[] args) throws InterruptedException { String host = args[0]; int port = Integer.parseInt(args[1]); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap();// 啓動客服端鏈接 b.group(workerGroup);// 同時用於主線程和工做線程 b.channel(NioSocketChannel.class);// 客服端須要的channel b.option(ChannelOption.SO_KEEPALIVE, true); // socketChannel沒有父類 b.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // TODO Auto-generated method stub ch.pipeline().addLast(new TimeClientHandler()); } }); ChannelFuture f = b.connect(host, port).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } }
小型buffer的socket傳送流傳輸依據TCP/IP,接受的數據是儲存在一個接受的socket的buffer中。可是,傳送的buffer不是一個隊列包、而是一個隊列btyes。這就意味着,即便你使用兩個包去傳送兩端信息,系統不會將它們視爲兩端信息,而是做爲一串bytes。所以,這不能保證你讀去的數據是你遠程寫入的數據。例如,咱們須要使用系統的TCP/IP棧接受到3個數據包。
由於根據流協議,你頗有可能在你的應用中讀取到你下面的部分
因次,在服務器和客服端的接受部分,對接受數據必須定義一個協議的框架(處理方式),這個框架可以被應用程序使用。接收到的部分必須是下面這種方式。
a、第一種解決方式:
在TIME client的實例中。咱們一樣是有一個類似的問題,一個很是小的32位bit的整數數據,它不太可能分散。然而,隨着流量的增長,問題是它會碎片化。
簡單的解決方式,增長一個內部的累加buffer,而後將接受的4bytes傳輸到這個buffer中。在TimeClientHandler直接修改
public class TimeClientHandler2 extends ChannelInboundHandlerAdapter { private ByteBuf buf; @Override public void handlerAdded(ChannelHandlerContext ctx) { buf = ctx.alloc().buffer(4); } @Override public void handlerRemoved(ChannelHandlerContext ctx) { buf.release(); buf = null; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; buf.writeBytes(m); m.release(); if (buf.readableBytes() >= 4) { long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
b、第二種就是前面的實例方式、將decode分離出來處理。看起來清晰、方便
object
public class UnixTime { private final long value; public UnixTime() { this(System.currentTimeMillis() / 1000L + 2208988800L); } public UnixTime(long value) { this.value = value; } public long value() { return this.value; } public String toString() { return new Date((value() - 2208988800L) * 1000L).toString(); } }
object:decode
public class TimeDecoder2 extends ByteToMessageDecoder { /** * @see 定義一個回調的數據累加buff * @see 若是有out,則表示解析成功。 */ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() < 4) { return; } out.add(new UnixTime(in.readUnsignedInt())); } }
objec:encode
public class TimeEncoder extends ChannelOutboundHandlerAdapter { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { UnixTime m = (UnixTime) msg; ByteBuf encoded = ctx.alloc().buffer(4); encoded.writeInt((int) m.value()); ctx.write(encoded, promise); }
server:handler
public class TimeServerHandler2 extends ChannelInboundHandlerAdapter { /** * @see 當一個channel準備好的時候,發送一個32位的數字 */ public void channelActive(final ChannelHandlerContext ctx) { // ByteBuf:沒有了flip()。它只有2個功能:讀、寫 // 讀: // 寫:當你寫的時候,若是讀取下標沒有改變,則繼續增加 final ByteBuf time = ctx.alloc().buffer(4); time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); final ChannelFuture f = ctx.writeAndFlush(new UnixTime()); // 當寫如完成的時候,執行 f.addListener(ChannelFutureListener.CLOSE); } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
client:handler
public class TimeClientHandler3 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { UnixTime m = (UnixTime) msg; System.out.println(m); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }