分析netty從源碼開始java
準備工做:git
1.下載源代碼:https://github.com/netty/netty.gitgithub
我下載的版本爲4.1
netty提供了一個netty-example工程,數據庫
分類以下:bootstrap
Fundamentalapi
Echo ‐ the very basic client and server
Discard ‐ see how to send an infinite data stream asynchronously without flooding the write buffer
Uptime ‐ implement automatic reconnection mechanism
Text protocolspromise
Telnet ‐ a classic line-based network application
Quote of the Moment ‐ broadcast a UDP/IP packet
SecureChat ‐ an TLS-based chat server, derived from the Telnet example
Binary protocols緩存
ObjectEcho ‐ exchange serializable Java objects
Factorial ‐ write a stateful client and server with a custom binary protocol
WorldClock ‐ rapid protocol protyping with Google Protocol Buffers integration
HTTP安全
Snoop ‐ build your own extremely light-weight HTTP client and server
File server ‐ asynchronous large file streaming in HTTP
Web Sockets (Client & Server) ‐ add a two-way full-duplex communication channel to HTTP using Web Sockets
SPDY (Client & Server) ‐ implement SPDY protocol
CORS demo ‐ implement cross-origin resource sharing
Advanced服務器
Proxy server ‐ write a highly efficient tunneling proxy server
Port unification ‐ run services with different protocols on a single TCP/IP port
UDT
Byte streams ‐ use UDT in TCP-like byte streaming mode
Message flow ‐ use UDT in UDP-like message delivery mode
Byte streams in symmetric peer-to-peer rendezvous connect mode
Message flow in symmetric peer-to-peer rendezvous connect mode
咱們的分析從這裏開始,netty是client-server形式的,咱們以最簡單的discard示例開始,其服務器端代碼以下:
/** * Discards any incoming data. */ public final class DiscardServer { static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", "8009")); public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } p.addLast(new DiscardServerHandler()); } }); // Bind and start to accept incoming connections. ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
上面的代碼中使用了下面幾個類:
實現類爲NioEventLoopGroup,其層次結構爲:
EventExecutorGroup爲全部類的父接口,它經過next()方法來提供EventExecutor供使用。除此之外,它還負責處理它們的生命週期,容許以優雅的方式關閉。
EventExecutor是一種特殊的EventExcutorGroup,它提供了一些便利的方法來查看一個線程是否在一個事件循環中執行過,除此之外,它也擴展了EventExecutorGroup,從而提供了一個通用的獲取方法的方式。
EventLoopGroup是一種特殊的EventExcutorGroup,它提供了channel的註冊功能,channel在事件循環中將被後面的selection來獲取到。
NioEventLoopGroup繼承自MultithreadEventLoopGroup,基於channel的NIO selector會使用該類。
group(EventLoopGroup parentGroup, EventLoopGroup childGroup)方法設置父EventLoopGroup和子EventLoopGroup。這些EventLoopGroup用來處理全部的事件和ServerChannel和Channel的IO。
channel(Class<? extends C> channelClass)方法用來建立一個Channel實例。建立Channel實例要不使用此方法,若是channel實現是無參構造要麼可使用channelFactory來建立。
handler(ChannelHandler handler)方法,channelHandler用來處理請求的。
childHandler(ChannelHandler childHandler)方法,設置用來處理請求的channelHandler。
當Channel註冊到它的eventLoop中時,ChannelInitializer提供了一個方便初始化channel的方法。該類的實現一般用來設置ChannelPipeline的channel,一般使用在Bootstrap#handler(ChannelHandler),ServerBootstrap#handler(ChannelHandler)和ServerBootstrap#childHandler(ChannelHandler)三個場景中。示例:
public class MyChannelInitializer extends ChannelInitializer{ public void initChannel({@link Channel} channel) { channel.pipeline().addLast("myHandler", new MyHandler()); } }
而後:
ServerBootstrap bootstrap = ...; ... bootstrap.childHandler(new MyChannelInitializer());
注意:這個類標示爲可共享的,所以實現類重用時須要時安全的。
理解ChannelPipeline須要先理解ChannelHandler,
4.1 ChannelHandler
處理一個IO事件或者翻譯一個IO操做,而且傳遞給ChannelPineline的下一個handler。
由於這個接口有太多接口須要實現,所以你只有實現ChannelHandlerAdapter就能夠代替實現這個接口。
ChannelHandlerContext封裝了ChannelHandler。ChannelHandler應該經過context對象與它所屬的ChannelPipeLine進行交互。經過使用context對象,ChannelHandler能夠傳遞上行或者下行事件,或者動態修改pipeline,或者存儲特定handler的信息(使用AttributeKey)。
一個channelHandler一般須要存儲一些狀態信息。最簡單最值得推薦的方法是使用member變量:
public interface Message { // your methods here } public class DataServerHandler extends SimpleChannelInboundHandler<Message> { private boolean loggedIn; {@code @Override} protected void messageReceived( ChannelHandlerContext ctx, Message message) { Channel ch = e.getChannel(); if (message instanceof LoginMessage) { authenticate((LoginMessage) message); loggedIn = true; } else (message instanceof GetDataMessage) { if (loggedIn) { ch.write(fetchSecret((GetDataMessage) message)); } else { fail(); } } } ... }
注意:handler的狀態附在ChannePipelineContext上,所以能夠增長相同的handler實例到不一樣的pipeline上:
public class DataServerInitializer extends ChannelInitializer<Channel> { private static final DataServerHandler SHARED = new DataServerHandler(); @Override public void initChannel(Channel channel) { channel.pipeline().addLast("handler", SHARED); } }
在上面的示例中,使用了一個AttributeKey,你可能注意到了@Sharable註解。
若是一個ChannelHandler使用@sharable進行註解,那就意味着你僅僅建立了一個handler一次,能夠添加到一個或者多個ChannelPipeline屢次而不會產生競爭。
若是沒有指定該註解,你必須每次都建立一個新的handler實例,而且增長到一個ChannelPipeline,由於它沒有像member變量同樣,它有一個非共享的狀態。
4.2 ChannelPipeline
ChanelPipeline是一組ChanelHandler的集合,它處理或者解析Channel的Inbound事件和OutBound操做。ChannelPipeline的實現是Intercepting Filter的一種高級形式,這樣用戶能夠控制事件如何處理,一個pipeline內部ChannelHandler如何交互。
pipeline事件流程
上圖描述了IO事件如何被一個ChannelPipeline的ChannelHandler處理的。一個IO事件被一個ChannelInBoundHandler處理或者ChannelOutboundHandler,而後經過調用在ChannelHandlerContext中定義的事件傳播方法傳遞給最近的handler,傳播方法有ChannelHandlerContext#filreChannelRead(Object)和ChannelHandlerContext#write(Object)。
一個Inbound事件一般由Inbound handler來處理,如上如左上。一個Inbound handler一般處理在上圖底部IO線程產生的Inbound數據。Inbound數據經過真實的輸入操做如SocketChannel#read(ByteBuffer)來獲取。若是一個inbound事件越過了最上面的inbound handler,該事件將會被拋棄到而不會通知你或者若是你須要關注,打印出日誌。
一個outbound事件由上圖的右下的outbound handler來處理。一個outbound handler一般由outbound流量如寫請求產生或者轉變的。若是一個outbound事件越過了底部的outbound handler,它將由channel關聯的IO線程處理。IO線程一般運行的是真實的輸出操做如SocketChannel#write(byteBuffer).
示例,假設咱們建立下面這樣一個pipeline:
ChannelPipeline} p = ...;
p.addLast("1", new InboundHandlerA());
p.addLast("2", new InboundHandlerB());
p.addLast("3", new OutboundHandlerA());
p.addLast("4", new OutboundHandlerB());
p.addLast("5", new InboundOutboundHandlerX());
在上例中,inbound開頭的handler意味着它是一個inbound handler。outbound開頭的handler意味着它是一個outbound handler。上例的配置中當一個事件進入inbound時handler的順序是1,2,3,4,5;當一個事件進入outbound時,handler的順序是5,4,3,2,1.在這個最高準則下,ChannelPipeline跳過特定handler的處理來縮短stack的深度:
3,4沒有實現ChannelInboundHandler,於是一個inbound事件的處理順序是1,2,5.
1,2沒有實現ChannelOutBoundhandler,於是一個outbound事件的處理順序是5,4,3
若5同時實現了ChannelInboundHandler和channelOutBoundHandler,一個inbound和一個outbound事件的執行順序分別是125和543.
如上圖所示,一個handler觸發ChannelHandlerContext中的事件傳播方法,而後傳遞到下一個handler。這些方法有:
inbound 事件傳播方法:
ChannelHandlerContext#fireChannelRegistered() ChannelHandlerContext#fireChannelActive() ChannelHandlerContext#fireChannelRead(Object) ChannelHandlerContext#fireChannelReadComplete() ChannelHandlerContext#fireExceptionCaught(Throwable) ChannelHandlerContext#fireUserEventTriggered(Object) ChannelHandlerContext#fireChannelWritabilityChanged() ChannelHandlerContext#fireChannelInactive() ChannelHandlerContext#fireChannelUnregistered()
outbound事件傳播方法:
ChannelHandlerContext#bind(SocketAddress, ChannelPromise) ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise) ChannelHandlerContext#write(Object, ChannelPromise) ChannelHandlerContext#flush() ChannelHandlerContext#read() ChannelHandlerContext#disconnect(ChannelPromise) ChannelHandlerContext#close(ChannelPromise) ChannelHandlerContext#deregister(ChannelPromise)
下面的示例展現了事件是如何傳播的:
public class MyInboundHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext} ctx) { System.out.println("Connected!"); ctx.fireChannelActive(); } } public clas MyOutboundHandler extends ChannelOutboundHandlerAdapter { @Override public void close(ChannelHandlerContext} ctx, ChannelPromise} promise) { System.out.println("Closing .."); ctx.close(promise); } }
在pipeline中,一個用戶通常由一個或者多個ChannelHandler來接收IO事件(例如讀)和IO操做請求(如寫或者close)。例如,一個典型的服務器pipeline一般具備如下幾個handler,但最多有多少handler取決於協議和業務邏輯的複雜度:
Protocol Decoder--將二進制數據(如ByteBuffer)轉換成一個java對象
Protocol Encoder--將一個java對象轉換成二進制數據。
Business Logic Handler--處理真實的業務邏輯(如數據庫訪問)。
讓咱們用下面的示例展現:
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16); ... ChannelPipeline} pipeline = ch.pipeline(); pipeline.addLast("decoder", new MyProtocolDecoder()); pipeline.addLast("encoder", new MyProtocolEncoder()); // Tell the pipeline to run MyBusinessLogicHandler's event handler methods // in a different thread than an I/O thread so that the I/O thread is not blocked by // a time-consuming task. // If your business logic is fully asynchronous or finished very quickly, you don't // need to specify a group. pipeline.addLast(group, "handler", new MyBusinessLogicHandler());
由於ChannelPipeline是線程安全的,一個channelhandler能夠在任意時間內增長或者刪除。例如,當有敏感信息交換時,你能夠插入一個加密handler,而後當信息交換結束後刪除該handler。
4.3 Channel
Channel是網絡socket的一個紐帶或者一個處理IO操做如讀、寫、鏈接、綁定的組件。一個Channel提供以下信息:
當前channel的狀態,如它是否開啓?是否鏈接?
Channel的ChannelConfig的配置參數,如接受緩存大小;
channel支持的IO操做,如讀、寫、鏈接、綁定;
channel支持的ChannelPipeline,它處理全部的IO事件和channel關聯的請求。
在Netty中全部的IO操做都是異步的。這意味着全部的IO調用將當即返回,但不保證在調用結束時請求的IO操做都已經執行完畢。而是在請求操做處理完成、失敗或者取消時返回一個ChannelFuture來通知。
Channel是繼承性的。
一個Channel能夠它如何建立的來獲取它的父Channel(#parent()方法)。例如:一個由ServerSocketChannel接受的SocketChannel調用parent()方法時返回ServerSocketChannel。
繼承的結構依賴於Channel的所屬transport實現。例如,你能夠新寫一個Channel實現,它建立了一個共享同一個socket鏈接的子channel,如BEEP和SSH
一些transport會暴露一些該transport特定的操做。Channel向下轉換到子類型能夠觸發這些操做。例如:老的IO datargram transport,DatagramChannel提供了多播的join和leave操做。
當Channel處理完後,必定記得調用close()或者close(ChannelPromise)來釋放資源。
channelFuture是異步IO操做的返回值。
在Netty中全部的IO操做都是異步的。這意味着全部的IO調用將當即返回,但不保證在調用結束時請求的IO操做都已經執行完畢。而是在請求操做處理完成、失敗或者取消時返回一個ChannelFuture來通知。
當一個IO操做開始時,建立一個新的future。ChannelFuture要麼是uncompleted,要麼是completed。新的future開始時是uncompleted---既不是成功、失敗,也不是取消,由於IO操做尚未開始呢。若IO操做結束時future要麼成功,要麼失敗或者取消,標記爲completed的future有更多特殊的意義,例如失敗的緣由。請注意:即便失敗和取消也屬於completed狀態。
有不少方法能夠查詢IO操做是否完成:等待完成,檢索IO操做的結果。一樣也容許你增長ChannelFutureListenner,這樣你能夠在IO操做完成後得到通知。
在儘量的狀況下,推薦addListenner()方法而不是await()方法,當IO操做完成後去完成接下來的其它任務時去獲取通知。
6.ChannelHandlerContext
對ChannelHandler相關信息的包裝。
netty處理請求的總流程是通過ChannelPipeline中的多個ChannelHandler後,返回結果ChannelFuture。以下圖所示:
具體I/O操做調用的流程,
應用->Channel的I/O操做->調用Pipeline相應的I/O操做->調用ChannelHandlerContext的相應I/O操做->調用ChannelHandler的相應操做->Channel.UnSafe中相關的I/O操做。
應用爲何不直接調用Channel.UnSafe接口中的I/O操做呢,而要繞一個大圈呢?由於它是框架,要支持擴展。
執行者完成操做後,是如何通知命令者的呢?通常流程是這樣的:
Channel.UnSafe中執行相關的I/O操做,根據操做結果->調用ChannelPipeline中相應發fireXXXX()接口->調用ChannelHandlerContext中相應的fireXXXX()接口->調用ChannelHandler中相應方法->調用應用中的相關邏輯。
參考文獻: