Netty中另外兩個重要組件—— ChannelHandle,ChannelHandleContext,Pipeline。Netty中I/O事件的傳播機制以及數據的過濾和寫出均由它們負責。git
// AbstractChannel(..)
protected AbstractChannel(Channel parent) {
...
// 建立pipeline
pipeline = newChannelPipeline();
}
// newChannelPipeline()
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
// DefaultChannelPipeline(..)
protected DefaultChannelPipeline(Channel channel) {
...
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
複製代碼
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
...
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
...
}
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
...
}
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
...
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
...
}
複製代碼
head與tail它們都會調用父類AbstractChannelHandlerContext構造器去完成初始化,由此咱們能夠預見ChanelPipeline裏面存放的是一個個ChannelHandlerContext,根據DefaultChannelPipeline構造方法咱們能夠知道它們數據結構爲雙向鏈表,根據AbstractChannelHandlerContext構造方法,咱們能夠發現head指定的爲出棧處理,而tail指定的爲入棧處理器。github
pipeline裏面的事件傳播機制咱們接下來驗證,可是咱們能夠推測出入棧從head開始傳播,由於它是出棧處理器,因此它只管往下傳播不作任何處理,一直到tail會結束。出棧從tail開始傳播,由於他是入棧處理器,因此它只管往下傳播事件便可,也不作任何處理。這麼看來對於入棧,從head開始到tail結束;對於出棧偏偏相反,從tail開始到head結束。promise
// filterName(..)
private String filterName(String name, ChannelHandler handler) {
if (name == null) {
return generateName(handler);
}
checkDuplicateName(name);
return name;
}
// 判斷重名
private void checkDuplicateName(String name) {
if (context0(name) != null) {
throw new IllegalArgumentException("Duplicate handler name: " + name);
}
}
// 找有沒有同名的context
private AbstractChannelHandlerContext context0(String name) {
AbstractChannelHandlerContext context = head.next;
while (context != tail) {
if (context.name().equals(name)) {
return context;
}
context = context.next;
}
return null;
}
複製代碼
// 插入到鏈表中tail節點的前面。
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
複製代碼
final void callHandlerAdded() throws Exception {
...
if (setAddComplete()) {
// 調用具體handler的handlerAdded方法
handler().handlerAdded(this);
}
}
複製代碼
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
if (ctx == null) {
throw new NoSuchElementException(handler.getClass().getName());
} else {
return ctx;
}
}
// 相同堆內地址即爲找到
public final ChannelHandlerContext context(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
AbstractChannelHandlerContext ctx = head.next;
for (;;) {
if (ctx == null) {
return null;
}
if (ctx.handler() == handler) {
return ctx;
}
ctx = ctx.next;
}
}
複製代碼
private static void remove0(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
}
複製代碼
final void callHandlerRemoved() throws Exception {
try {
// Only call handlerRemoved(...) if we called handlerAdded(...) before.
if (handlerState == ADD_COMPLETE) {
handler().handlerRemoved(this);
}
} finally {
// Mark the handler as removed in any case.
setRemoved();
}
}
複製代碼
// 省略代碼
...
serverBootstrap
...
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new Inbound1())
.addLast(new InBound2())
.addLast(new Inbound3());
}
});
...
public class Inbound1 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("at InBound1: " + msg);
ctx.fireChannelRead(msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().pipeline().fireChannelRead("hello cj");
}
}
public class Inbound2 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("at InBound2: " + msg);
ctx.fireChannelRead(msg);
}
}
public class Inbound3 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("at InBound3: " + msg);
ctx.fireChannelRead(msg);
}
}
複製代碼
從head開始一直向下一個inboud傳播直到tail結束,也能夠看到ChannelHandlerContext起到的正是中間紐帶的做用, 它能拿到handle也能夠向上獲取到channel與pipeline,一個channel只會有一個pipeline,一個pipeline能夠有多個入棧handler和出棧handler,並且每一個handler都會被ChannelHandlerContext包裹着。事件傳播依賴的ChannelHandlerContext的fire*方法。bash
按照咱們上邊說的那樣 InBoud1 -> InBound2 -> InBoud3數據結構
public class Outbound1 extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
System.out.println("oubound1 write:" + msg);
ctx.write(msg, promise);
}
}
public class Outbound2 extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
System.out.println("oubound2 write:" + msg);
ctx.write(msg, promise);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ctx.executor().schedule(()-> {
ctx.channel().pipeline().write("hello cj...");
}, 5, TimeUnit.SECONDS);
}
}
public class Outbound3 extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
System.out.println("oubound3 write:" + msg);
ctx.write(msg, promise);
}
}
複製代碼
與入棧事件傳遞順序是徹底相反的,也就是從鏈表尾部開始。ide
public class Inbound1 extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("Inbound1...");
super.exceptionCaught(ctx, cause);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
throw new RuntimeException("cj test throw caught...");
}
}
public class Inbound3 extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("Inbound2...");
super.exceptionCaught(ctx, cause);
}
}
public class Outbound1 extends ChannelOutboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("Outbound1...");
super.exceptionCaught(ctx, cause);
}
}
public class Outbound2 extends ChannelOutboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("Outbound2...");
super.exceptionCaught(ctx, cause);
}
}
public class Outbound3 extends ChannelOutboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("Outbound3...");
super.exceptionCaught(ctx, cause);
}
}
複製代碼
異常的傳播過程是從head一直遍歷到tail結束,並在tail中將其打印出來。ui
ctx.write("hello cj...");
ctx.pipeline().write("hello cj...");
複製代碼
ctx.write(..) 咱們按照上面的內容是能夠想到的,ctx.write實際上是直接激活當前節點的下一個節點write,因此它不會從尾部開始向前遍歷全部的outbound,而ctx.pipeline().write(..)咱們看源碼能夠知道,它先調用pipeline的write方法,跟蹤源碼(下圖)能夠發現,他是從tail開始遍歷的,全部的outboud會依次被執行。同理inbound也是如此this