Netty項目是一個提供異步事件驅動網絡應用框架和快速開發可維護的高性能高擴展性服務端和客戶端協議工具集的成果。
換句話說,Netty是一個NIO客戶端服務端框架,它使得快速而簡單的開發像服務端客戶端協議的網絡應用成爲了可能。它它極大的簡化並流線化了如TCP和UDP套接字服務器開發的網絡編程。
「快速且簡便」不意味着目標應用將容忍維護性和性能上的問題。Netty在吸收了大量協議實現(如FTP,SMTP,HTTP以及各類二進制,基於文本的傳統協議)的經驗上進行了精心的設計。由此,Netty成功找到了一個無需折衷妥協而讓開發、性能、穩定性和靈活性相互協調的方法。html
a) http協議是咱們接觸最多的,定義的API都是基於Http協議的,Http協議屬於應用層協議。傳輸層協議,TCP / UDP協議,TCP是經過三次握手,保障通訊的可信,也就是咱們常說的長鏈接。Netty是對TCP協議的封裝。java
b) JAVA BIO / NIO / AIO linux
Java1.4版本引入NIO概念,實現了對「多路複用IO」的支持,Java1.7版本引入AIO概念。AIO是最晚提出的,理應是更先進的技術,可是並無大規模的在商業領域應用。Unix提供了五種參考網絡模型,在linux領域,並無異步IO網絡模型的成熟方案(linux發行版內核不支持AIO,須要本身安裝擴展包)。JAVA的新版本的AIO,仍是用採用多路複用IO實現。在Windows平臺經過IOCP協議實現,可參考 IOCP淺析。編程
如今從最簡單的Echo程序入手,逐步深刻地分享利用Netty如何編程。windows
Netty Server Exempleapi
EventLoopGroup group = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(group, workGroup) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoServerHandler()); } }); ChannelFuture f = b.bind().sync(); System.out.println(EchoServer.class.getName() + " started and listen on " + f.channel().localAddress()); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); }
Netty Server Handler Exemple數組
public class EchoServerHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { ByteBuf in = (ByteBuf) msg; System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8)); ctx.write(Unpooled.copiedBuffer("Response from server. You have input \"" + in.toString(CharsetUtil.UTF_8) + "\"!", CharsetUtil.UTF_8)); ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); }
Netty Client Exemple服務器
EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(host, port)) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new EchoClientHandler()); } }); ChannelFuture f = b.connect().sync(); if (f.channel().isActive()) { f.channel().writeAndFlush(Unpooled.copiedBuffer("Hello Casper!", CharsetUtil.UTF_8)); } Thread.sleep(1000); } finally { group.shutdownGracefully().sync(); }
Netty Client Handler Exemple網絡
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
一個Netty程序開始於Bootstrap類,Bootstrap類是Netty提供的一個能夠經過簡單配置來設置或"引導"程序的一個很重要的類。啓動Sever端,須要初始化ServerBootStrap。定義兩個EventLoopGroup,分別處理鏈接請求和socket數據傳輸。Netty巧妙的把接收請求和處理請求,都抽象成ChannelHanndler處理,模式上更加統一。經過閱讀代碼,接收請求的處理以下:接到鏈接請求後,設置其初始化參數,而後註冊到childGroup處理。數據結構
void init(Channel channel) throws Exception { ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = this.childGroup; final ChannelHandler currentChildHandler = this.childHandler; p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() { public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ch.eventLoop().execute(new Runnable() { public void run() { pipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)}); } }); } }}); } private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { private final EventLoopGroup childGroup; private final ChannelHandler childHandler; ServerBootstrapAcceptor(final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler, Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) { this.childGroup = childGroup; this.childHandler = childHandler; } public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel)msg; child.pipeline().addLast(new ChannelHandler[]{this.childHandler}); try { this.childGroup.register(child).addListener(new ChannelFutureListener() { }); } catch (Throwable var8) { } } }
SingleThreadEventLoopGroup 一個線程處理全部的Channel
ThreadPerChannelLoopGroup 每一個線程處理一個channel
MultiThreadEventLoopGroup 經過線程組處理channel
NIOEventLoopGroup
EpollEventLoopGroup 根據Selecter的不一樣實現,不一樣的處理策略。NIOEventLoopGroup,默認採用sellect方式,參考JAVA NIO實現。EpollEventLoopGroup只能在linux平臺使用,更高效。
宏觀上,ChannelPipline串聯起全部的ChannelHandler,在特定的時間節點,調用Handler的函數,完成處理流程。處理流程以下圖所示:
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler { public void channelRegistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelRegistered(); } public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); } public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); } public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); } public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); } public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); } public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } }
ChannelPipline:內部包含一個ChannelHandlerContext的鏈表數據結構,每次從header item開始,調用context.invoke函數,開始處理流程。
ChannelHandlerContext:在Channelhandler中,調用context的fire函數;fire函數在pipline中查找須要執行的下一個context,調用context.invoke函數;調用channelhandler的函數。
ChannelHandler:channelChandler分爲inbound和outbond,inbound處理接收消息,outbound處理傳出。如上所示,channelInboundHandler中定義了一系列的函數,通常狀況下,編寫Netty程序,只須要在ChannelHandler中處理業務邏輯,編程模型至關簡單。
代碼實例中,channelHandler中讀到的是ByteBuf對象,其實就是byte數組,在程序內部,把byte數組轉換成了字符串。
問題一、半包問題,沒有讀到完整的數據,就進行了轉換,致使錯誤。
問題二、編解碼與業務柔和在一塊兒,設計上不完美。
Netty提供了編碼器,解碼器以及編解碼器。
public class RpcChannelHandler extends SimpleChannelInboundHandler<RpcResponse> implements RpcChannel { // 讀取數據,讀的是對象 @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcResponse rpcResponse) throws Exception { String requestId = rpcResponse.getRequestId(); if (rpcFutureMap.containsKey(requestId)) { RpcFuture rpcFuture = rpcFutureMap.get(requestId); rpcFutureMap.remove(requestId); rpcFuture.finish(rpcResponse); } } @Override public RpcFuture call(RpcRequest request, RpcCallback callback) { this.channel.writeAndFlush(request); // 寫數據,寫的也是對象 } }
編碼器
public class RpcEncoder extends MessageToByteEncoder { @Override public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception { if (genericClass.isInstance(in)) { byte[] data = SerializationUtil.serialize(in); out.writeInt(data.length); out.writeBytes(data); } } }
解碼器
public class RpcDecoder extends ByteToMessageDecoder { @Override public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() < 4) { return; } in.markReaderIndex(); int dataLength = in.readInt(); if (in.readableBytes() < dataLength) { in.resetReaderIndex(); return; } byte[] data = new byte[dataLength]; in.readBytes(data); Object obj = SerializationUtil.deserialize(data, genericClass); out.add(obj); } }
處理流程以下:
編碼解碼器類結構
編碼器是一個ChannelHandler,放在pipline中,做爲處理請求消息的一個環節。處理傳入參數的是Decoder,負責把二進制的數據,轉換成程序可識別的數據結構,其實現了InboundChannelHandler接口;通常狀況下,decoder要放在pip的header位置,即addFrist。處理傳出參數的是Encoder,負責把程序內部的數據結構,轉換成可在網絡傳輸的二進制數據;通常狀況下,encoder須要放在pipline的最後處理。
分別實現Encoder和Decoder,多是代碼放在兩個類裏實現;Netty提供了Codec,在一個類裏實現編碼和解碼。
Netty對協議的支持,都是經過提供編碼解碼器實現的。例如:http協議
HttpRequestEncoder,將HttpRequest或HttpContent編碼成ByteBuf
HttpRequestDecoder,將ByteBuf解碼成HttpRequest和HttpContent
HttpResponseEncoder,將HttpResponse或HttpContent編碼成ByteBuf
HttpResponseDecoder,將ByteBuf解碼成HttpResponse和HttpContent
1)、線程模型
如上的幾種模式,在ServerBootStrap.goup初始化時設置,咱們的例子,採用的是主從Reactor多線程模型。
2)、Zero Copy
public ByteBuf ioBuffer() {
return PlatformDependent.hasUnsafe() ? this.directBuffer(256) : this.heapBuffer(256);
}
private int addComponent0(boolean increaseWriterIndex, int cIndex, ByteBuf buffer) {
......
wasAdded = this.components.add(c);
......
return var11;
}
private native long transferTo0(FileDescriptor var1, long var2, long var4, FileDescriptor var6);
3)、Select VS Epoll
多路複用IO有多種實現方式,其中,select/poll是全部操做系統都支持的方式,提出時間較早。JAVA NIO默認實現是用的Select,windows操做系統只支持Select。Epoll提出較晚,在linux系統提供,是目前比較成熟穩定的方案。
1)、Rpc框架,表明 Dubbo
Http VS TCP
Tips:
2)、服務代理,表明 NRedis-Proxy 參考: http://www.javashuo.com/article/p-aimzncum-hx.html
NRedis-Proxy 是一個Redis中間件服務,第一個Java 版本開源Redis中間件,無須修改業務應用程序任何代碼與配置,與業務解耦;以Spring爲基礎開發自定義標籤,讓它可配置化,使其更加容易上手;以netty 做爲通訊傳輸工具,讓它具備高性能,高併發,可分佈式擴展部署等特色,單片性能損耗約5%左右。
3)、中間件Vert.x 參考 https://vertx.io/
clipse Vert.x is event driven and non blocking. This means your app can handle a lot of concurrency using a small number of kernel threads. Vert.x lets your app scale with minimal hardware.
參考文章