待完善java
Channel
、EventLoop
和ChannelFuture
這些類組合在一塊兒,能夠被認爲是Netty網絡抽象的表明:bootstrap
Channel
—Socket;EventLoop
—控制流、多線程處理、併發;ChannelFuture
—異步通知暫略promise
EventLoop
定義了Netty的核心抽象,用於處理鏈接的生命週期中所發生的事件。緩存
一個EventLoopGroup
包含一個或者多個EventLoop
;安全
一個EventLoop
在它的生命週期內只和一個Thread
綁定;服務器
全部由EventLoop
處理的I/O事件都將在它專有的Thread
上被處理;網絡
一個Channel
在它的生命週期內只註冊於一個EventLoop
;多線程
一個EventLoop
可能會被分配給一個或多個Channel
。併發
注意,在這種設計中,必定程度上消除了對於同步的須要。app
暫略
Handles an I/O event or intercepts an I/O operation, and forwards it to its next handler in its ChannelPipeline
.
ChannelHandler
public interface ChannelHandler {
// ChannelHandler添加到ChannelPipeline中時被調用
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
// ChannelHandler從ChannelPipeline中移除時被調用
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
// 處理過程當中在ChannelPipeline中有錯誤產生時被調用
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
複製代碼
ChannelHandler
層次結構以下圖所示:
ChannelHandler
itself does not provide many methods, but you usually have to implement one of its subtypes:
ChannelInboundHandler
to handle inbound I/O events, andChannelOutboundHandler
to handle outbound I/O operations.Alternatively, the following adapter classes are provided for your convenience:
ChannelInboundHandlerAdapter
to handle inbound I/O events,ChannelOutboundHandlerAdapter
to handle outbound I/O operations, andChannelDuplexHandler
to handle both inbound and outbound eventsA ChannelHandler often needs to store some stateful information?
ChannelInboundHandler
public interface ChannelInboundHandler extends ChannelHandler {
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
void channelActive(ChannelHandlerContext ctx) throws Exception;
void channelInactive(ChannelHandlerContext ctx) throws Exception;
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
複製代碼
ChannelInboundHandlerAdapter
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
複製代碼
ChannelInboundHandlerAdapter
的channelRead
方法處理完消息後不會自動釋放消息,若想自動釋放收到的消息,可使用SimpleChannelInboundHandler
。
Usually, channelRead()
handler method is implemented like the following:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
// Do something with msg
} finally {
ReferenceCountUtil.release(msg);
}
}
複製代碼
ChannelOutboundHandler
public interface ChannelOutboundHandler extends ChannelHandler {
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception;
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void read(ChannelHandlerContext ctx) throws Exception;
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
void flush(ChannelHandlerContext ctx) throws Exception;
}
複製代碼
ChannelOutboundHandlerAdapter
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.bind(localAddress, promise);
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.disconnect(promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
}
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
複製代碼
使ChannelHandler
可以與其ChannelPipeline
和其餘處理程序進行交互。除其餘事項外,處理程序能夠通知ChannelPipeline
中的下一個ChannelHandler
,也能夠動態修改它所屬的ChannelPipeline
。
Notify
You can notify the closest handler in the same ChannelPipeline by calling one of the various methods provided here.
Modifying a pipeline
You can get the ChannelPipeline
your handler belongs to by calling pipeline()
. A non-trivial application could insert, remove, or replace handlers in the pipeline dynamically at runtime.
Retrieving for later use
You can keep the ChannelHandlerContext for later use, such as triggering an event outside the handler methods, even from a different thread.
public class MyHandler extends ChannelDuplexHandler {
private ChannelHandlerContext ctx;
public void beforeAdd(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
public void login(String username, password) {
ctx.write(new LoginMessage(username, password));
}
...
}
複製代碼
Storing stateful information
attr(AttributeKey)
allow you to store and access stateful information that is related with a handler and its context. Please refer to ChannelHandler
to learn various recommended ways to manage stateful information.
A handler can have more than one context
Please note that a ChannelHandler
instance can be added to more than one ChannelPipeline
. It means a single ChannelHandler
instance can have more than one ChannelHandlerContext
and therefore the single instance can be invoked with different ChannelHandlerContexts
if it is added to one or more ChannelPipelines
more than once.
For example, the following handler will have as many independent AttributeKeys as how many times it is added to pipelines, regardless if it is added to the same pipeline multiple times or added to different pipelines multiple times:
public class FactorialHandler extends ChannelInboundHandlerAdapter {
private final AttributeKey<Integer> counter = AttributeKey.valueOf("counter");
// This handler will receive a sequence of increasing integers starting
// from 1.
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Integer a = ctx.attr(counter).get();
if (a == null) {
a = 1;
}
attr.set(a * (Integer) msg);
}
}
// Different context objects are given to "f1", "f2", "f3", and "f4" even if
// they refer to the same handler instance. Because the FactorialHandler
// stores its state in a context object (using an AttributeKey), the factorial is
// calculated correctly 4 times once the two pipelines (p1 and p2) are active.
FactorialHandler fh = new FactorialHandler();
ChannelPipeline p1 = Channels.pipeline();
p1.addLast("f1", fh);
p1.addLast("f2", fh);
ChannelPipeline p2 = Channels.pipeline();
p2.addLast("f3", fh);
p2.addLast("f4", fh);
複製代碼
ChannelPipeline
提供了ChannelHandler
鏈的容器,並定義了用於在該鏈上傳播入站和出站事件流的API。當Channel
被建立時,它會被自動地分配到它專屬的ChannelPipeline
。
Creation of a pipeline
==Each channel has its own pipeline and it is created automatically== when a new channel is created.
How an event flows in a pipeline
The following diagram describes how I/O events are processed by ChannelHandler
s in a ChannelPipeline
typically. An I/O event is handled by either a ChannelInboundHandler
or a ChannelOutboundHandler
and be forwarded to its closest handler by calling the event propagation methods defined in ChannelHandlerContext
, such as ChannelHandlerContext.fireChannelRead(Object)
and ChannelHandlerContext.write(Object)
.
I/O Request via Channel or
ChannelHandlerContext
+---------------------------------------------------+---------------+
| ChannelPipeline | |
| \|/ |
| +---------------------+ +-----------+----------+ |
| | Inbound Handler N | | Outbound Handler 1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler N-1 | | Outbound Handler 2 | |
| +----------+----------+ +-----------+----------+ |
| /|\ . |
| . . |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
| [ method call] [method call] |
| . . |
| . \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 2 | | Outbound Handler M-1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 1 | | Outbound Handler M | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
+---------------+-----------------------------------+---------------+
| \|/
+---------------+-----------------------------------+---------------+
| | | |
| [ Socket.read() ] [ Socket.write() ] |
| |
| Netty Internal I/O Threads (Transport Implementation) | +-------------------------------------------------------------------+ 複製代碼
An inbound event is handled by the inbound handlers in the ==bottom-up direction== as shown on the left side of the diagram. An inbound handler usually handles the inbound data generated by the I/O thread on the bottom of the diagram. The inbound data is often read from a remote peer via the actual input operation such as SocketChannel.read(ByteBuffer)
. If an inbound event goes beyond the top inbound handler, it is discarded silently, or logged if it needs your attention.
An outbound event is handled by the outbound handler in the ==top-down direction== as shown on the right side of the diagram. An outbound handler usually generates or transforms the outbound traffic such as write requests. If an outbound event goes beyond the bottom outbound handler, it is handled by an I/O thread associated with the Channel. The I/O thread often performs the actual output operation such as SocketChannel.write(ByteBuffer)
.
Forwarding an event to the next handler
a handler has to invoke the event propagation methods in ChannelHandlerContext
to forward an event to its next handler. Those methods include:
Inbound event propagation methods:
ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()
Outbound event propagation methods:
ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)
ChannelHandlerContext.deregister(ChannelPromise)
Building a pipeline
A user is supposed to have one or more ChannelHandler
s in a pipeline to receive I/O events (e.g. read) and to request I/O operations (e.g. write and close). For example, a typical server will have the following handlers in each channel's pipeline, but your mileage may vary depending on the complexity and characteristics of the protocol and business logic:
ByteBuf
) into a Java object.and it could be represented as shown in the following example:
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
...
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new MyProtocolDecoder());
pipeline.addLast("encoder", new MyProtocolEncoder());
// Tell the pipeline to run MyBusinessLogicHandler's event handler methods
// in a different thread than an I/O thread so that the I/O thread is not blocked by
// a time-consuming task.
// If your business logic is fully asynchronous or finished very quickly, you don't
// need to specify a group.
pipeline.addLast(group, "handler", new MyBusinessLogicHandler());
複製代碼
Thread safety
A ChannelHandler
can be added or removed at any time because a ChannelPipeline
is thread safe. For example, you can insert an encryption handler when sensitive information is about to be exchanged, and remove it after the exchange.
在深刻地學習了ChannelPipeline
、ChannelHandler
和EventLoop
以後,你接下來 的問題多是:「如何將這些部分組織起來,成爲一個可實際運行的應用程序呢?」
答案是?==「引導」(Bootstrapping)==。簡單來講,引導一個應用程序是指對它進行配置,並使它運行起來的過程—儘管該過程的具體細節可能並不如它的定義那樣簡單,尤爲是對於一個網絡應用程序來講。
引導類層次結構
ServerBootstrap
和Bootstrap
分別做用於==服務器==和==客戶端==。ServerBootstrap
致力於使用一個==父Channel==來接受來自客戶端的鏈接,並建立==子Channel==以用於它們之間的通訊;而客戶端只須要==一個單獨的、沒有父Channel的Channel==來用於全部的網絡交互(這也適用於無鏈接的傳輸協議,如UDP,由於它們並非每一個鏈接都須要一個單獨的Channel)。
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable 複製代碼
子類型B
是其父類型的一個類型參數,所以能夠返回到運行時實例的引用以 支持方法的鏈式調用(也就是所謂的流式語法)
爲何引導類是
Cloneable
?
你有時可能會須要建立多個具備相似配置或者徹底相同配置的 Channel 。爲了支持這種模式而又不 需 要 爲 每 個 Channel 都 創 建 並 配 置 一 個 新 的 引 導 類 實 例 , AbstractBootstrap 被 標 記 爲 了 Cloneable 。在一個已經配置完成的引導類實例上調用 clone() 方法將返回另外一個能夠當即使用的引 導類實例。
注意,這種方式只會建立引導類實例的 EventLoopGroup 的一個淺拷貝,因此,後者 將在全部克 隆的 Channel 實例之間共享。這是能夠接受的,由於一般這些克隆的 Channel 的生命週期都很短暫,一 個典型的場景是——建立一個 Channel 以進行一次HTTP請求。
Bootstrap
類被用於客戶端或者使用了無鏈接協議的應用程序中。Bootstrap
類的API以下:
Bootstrap group(EventLoopGroup)
設置用於處理Channel全部事件的EventLoopGroup
Bootstrap channel( Class<? extends C>)
Bootstrap channelFactory(ChannelFactory<? extends C>)
channel()
方法指定了Channel的實現類。若是該實現類沒提供默認的構造函數 , 能夠經過調用channelFactory()
方法來指定一個工廠類,它將會被bind()
方法調用。
Bootstrap localAddress(SocketAddress)
指定Channel應該綁定到的本地地址。若是沒有指定,則將由操做系統建立一個隨機的地址。或者,也能夠經過bind()
或者connect()
方法指定localAddress。
<T> Bootstrap option(ChannelOption<T> option, T value)
設置ChannelOption
, 其將被應用到每一個新建立的Channel的ChannelConfig。 這些選項將會經過bind()
或者connect()
方法設置到Channel ,無論哪一個先被調用。這個方法在Channel已經被建立後再調用將不會有任何的效果。支持的ChannelOption取決於使用的 Channel類型。
<T> Bootstrap attr( Attribute<T> key, T value)
指定新建立的Channel的屬性值。這些屬性值是經過bind()
或者connect()
方法設置到Channel的,具體取決於誰最早被調用。這個方法在Channel被建立後將不會有任何的效果。
Bootstrap handler(ChannelHandler)
設置將被添加到ChannelPipeline
以接收事件通知的ChannelHandler
。
Bootstrap clone()
建立一個當前Bootstrap的克隆,其具備和原始的Bootstrap相同的設置信息。
Bootstrap remoteAddress(SocketAddress)
設置遠程地址。或者,也能夠經過connect()
方法來指定它。
ChannelFuture connect()
鏈接到遠程節點並返回一個ChannelFuture,其將會在鏈接操做完成後接收到通知。
ChannelFuture bind()
綁定Channel並返回一個ChannelFuture,其將會在綁定操做完成後接收到通知,在那以後必須調用Channel.connect()
方法來創建鏈接。
Bootstrap
類負責爲客戶端和使用無鏈接協議的應用程序建立Channel,如圖所示:
在引導的過程當中,在調用
bind()
或者connect()
方法以前,必須調用如下方法來設置所需的組件:
- group();
- channel()或者channelFactory();
- handler()
若是不這樣作,則將會致使IllegalStateException 。對handler()方法的調用尤爲重要,由於它須要配置好ChannelPipeline 。
ServerBootstrap
的API以下:
group
設置ServerBootstrap要用的EventLoopGroup。這個EventLoopGroup將用於ServerChannel和被接受的子Channel的I/O處理。
channel
設置將要被實例化的ServerChannel類。
channelFactory
若是不能經過默認的構造函數建立Channel,那麼能夠提供一個ChannelFactory。
localAddress
指定ServerChannel應該綁定到的本地地址。若是沒有指定,則將由操做系統使用一個隨機地址。或者,能夠經過bind()
方法來指定該localAddress。
option
指定要應用到新建立的ServerChannel的ChannelConfig的ChannelOption
。這些選項將會經過bind()
方法設置到Channel。在bind()
方法被調用以後,設置或者改變 ChannelOption
都不會有任何的效果。所支持的ChannelOption取決於所使用的Channel類型。
childOption
指定當子Channel
被接受時,應用到==子Channel的ChannelConfig==的ChannelOption
。所支持的ChannelOption取決於所使用的Channel的類型。
attr
指定ServerChannel
上的屬性,屬性將會經過bind()
方法設置給Channel。在調用 bind()
方法以後改變它們將不會有任何的效果
childAttr
將屬性設置給已經被接受的子Channel
。
handler
設置被添加到ServerChannel的ChannelPipeline中的ChannelHandler。
childHandler
設置將被添加到已被接受的子Channel的ChannelPipeline中的ChannelHandler。 handler()
方法和childHandler()
方法之間的區別是:前者所添加的ChannelHandler由接受子Channel的ServerChannel處理,而childHandler()方法所添加ChannelHandler將由已被接受的子Channel處理,其表明一個綁定到遠程節點的套接字。
clone
克隆一個設置和原始的ServerBootstrap相同的ServerBootstrap。
bind
綁定ServerChannel而且返回一個ChannelFuture成後收到通知(帶着成功或者失敗的結果)
ServerBootstrap在bind()
方法被調用時建立了一個ServerChannel
,而且該ServerChannel
管理了多個子Channel
。
從Channel引導客戶端?
在引導的過程當中調用了handler()
或者childHandler()
方法來添加單個的ChannelHandler
。對於簡單的應用程序來講可能已經足夠了,可是它不能知足更加複雜的需求。
能夠經過在ChannelPipeline
中將它們連接在一塊兒來部署儘量多的ChannelHandler
,Netty提供了一個特殊的ChannelInitializer
類。
A special
ChannelInboundHandler
which offers an easy way to initialize aChannel
once it was registered to itsEventLoop
. Implementations are most often used in the context ofBootstrap.handler(ChannelHandler)
,ServerBootstrap.handler(ChannelHandler)
andServerBootstrap.childHandler(ChannelHandler)
to setup theChannelPipeline
of aChannel
.
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter 複製代碼
它定義了下面的方法:
protected abstract void initChannel(C ch) throws Exception;
複製代碼
這個方法提供了一種將多個ChannelHandler
添加到一個ChannelPipeline
中的簡便方法。只須要簡單地向Bootstrap
或ServerBootstrap
的實例提供你的ChannelInitializer
實現便可,而且一旦Channel
被註冊到了它的EventLoop
以後,就會調用你的initChannel
方法。在該方法返回以後,ChannelInitializer
的實例將會從ChannelPipeline
中移除它本身。
示例代碼以下:
public class MyChannelInitializer extends ChannelInitializer {
public void initChannel(Channel channel) {
channel.pipeline().addLast("myHandler", new MyHandler());
}
}
ServerBootstrap bootstrap = ...;
...
bootstrap.childHandler(new MyChannelInitializer());
...
複製代碼
使用option()
方法能夠將ChannelOption
應用到引導,你所提供的值將會被自動應用到引導所建立的全部Channel
(這樣就能夠不用在每一個Channel建立時都手動配置它。)。可用的ChannelOption
包括了底層鏈接的詳細信息,如keep-alive或者超時屬性以及緩存區設置。
Netty應用程序一般與組織的專有軟件集成在一塊兒,而像Channel
這樣的組件可能甚至會在正常的Netty生命週期以外被使用。 在某些經常使用的屬性和數據不可用時, Netty提供了 AttributeMap
抽象(一個由Channel
和引導類提供的集合)以及AttributeKey<T>
(一個用於插入和獲取屬性值的泛型類)。使用這些工具,即可以安全地將任何類型的數據項與客戶端和服務器Channel
(包含ServerChannel的子Channel)相關聯了。
有點重要?
優雅是指乾淨地釋放資源。關閉Netty應用程序並無太多的魔法,最重要的是須要關閉EventLoopGroup
,它將處理任何掛起的事件和任務,而且隨後釋放全部活動的線程。
// 建立處理 I/O 的 EventLoopGroup
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
// 建立一個 Bootstrap 類的實例並配置它
bootstrap.group(group)
.channel(NioSocketChannel.class);
...
Future<?> future = group.shutdownGracefully();
// block until the group has shutdown
future.syncUninterruptibly();
複製代碼
每當經過調用 ChannelInboundHandler.channelRead()或者 ChannelOutbound- Handler.write()方法來處理數據時,你都須要確保沒有任何的資源泄漏。
爲了幫助你診斷潛在的(資源泄漏)問題,Netty提供了class ResourceLeakDetector1, 它將對你應用程序的緩衝區分配作大約 1%的採樣來檢測內存泄露。