pipeline 有管道,流水線的意思,最先使用在 Unix 操做系統中,可讓不一樣功能的程序相互通信,使軟件更加」高內聚,低耦合」,它以一種」鏈式模型」來串起不一樣的程序或組件,使它們組成一條直線的工做流。promise
ChannelPipeline 是處理或攔截channel的進站事件和出站事件的雙向鏈表,事件在ChannelPipeline中流動和傳遞,能夠增長或刪除ChannelHandler來實現對不一樣業務邏輯的處理。通俗的說,ChannelPipeline是工廠裏的流水線,ChannelHandler是流水線上的工人。
ChannelPipeline在建立Channel時會自動建立,每一個Channel都擁有本身的ChannelPipeline。緩存
ChannelHandlerContext是將ChannelHandler和ChannelPipeline關聯起來的上下文環境,每添加一個handler都會建立ChannelHandlerContext實例,管理ChannelHandler在ChannelPipeline中的傳播流向。安全
ChannelPipeline依賴於Channel的建立而自動建立,保存了channel,將全部handler組織起來,至關於工廠的流水線。
ChannelHandler擁有獨立功能邏輯,能夠註冊到多個ChannelPipeline,是不保存channel的,至關於工廠的工人。
ChannelHandlerContext是關聯ChannelHandler和ChannelPipeline的上下文環境,保存了ChannelPipeline,控制ChannelHandler在ChannelPipeline中的傳播流向,至關於流水線上的小組長。bash
從前面《Netty 源碼解析-服務端啓動流程解析》和《Netty 源碼解析-客戶端鏈接接入及讀I/O解析》咱們知道,當有新鏈接接入時,咱們執行註冊流程,註冊成功後,會調用channelRegistered,咱們從這個方法開始併發
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
initChannel((C) ctx.channel());
ctx.pipeline().remove(this);
ctx.fireChannelRegistered();
}
複製代碼
initChannel是在服務啓動時配置的參數childHandler重寫了父類方法ide
private class IOChannelInitialize extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("initChannel");
ch.pipeline().addLast(new IdleStateHandler(1000, 0, 0));
ch.pipeline().addLast(new IOHandler());
}
}
複製代碼
咱們回憶一下,pipeline是在哪裏建立的oop
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
}
複製代碼
當建立channel時會自動建立pipelinepost
public DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
複製代碼
在這裏會建立兩個默認的handler,一個InboundHandler --> TailContext,一個OutboundHandler --> HeadContext
再看addLast方法ui
@Override
public ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
複製代碼
在這裏生成一個handler名字,生成規則由handler類名加 」#0」this
@Override
public ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
…
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, generateName(h), h);
}
return this;
}
複製代碼
@Override
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
synchronized (this) {
checkDuplicateName(name);
AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
addLast0(name, newCtx);
}
return this;
}
複製代碼
因爲pipeline是線程非安全的,經過加鎖來保證併發訪問的安全,進行handler名稱重複性校驗,將handler包裝成DefaultChannelHandlerContext,最後再添加到pipeline
private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
checkMultiplicity(newCtx);
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
name2ctx.put(name, newCtx);
callHandlerAdded(newCtx);
}
複製代碼
這裏分三步
(1) 對DefaultChannelHandlerContext進行重複性校驗,若是DefaultChannelHandlerContext不是能夠在多個pipeline中共享的,且已經被添加到pipeline中,則拋出異常
(2) 修改pipeline中的指針
添加IdleStateHandler以前
HeadContext --> IOChannelInitialize --> TailContext
添加IdleStateHandler以後
HeadContext --> IOChannelInitialize --> IdleStateHandler --> TailContext
(3) 將handler名和DefaultChannelHandlerContext創建映射關係
(4) 回調handler添加完成監聽事件
最後刪除IOChannelInitialize
在這裏咱們選一個比較典型的讀事件解析,其餘事件流程基本相似
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
…
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
…
}
複製代碼
當boss線程監聽到讀事件,會調用**unsafe.read()**方法
@Override
public final void read() {
…
pipeline.fireChannelRead(byteBuf);
…
}
複製代碼
入站事件從head開始,tail結束
@Override
public ChannelPipeline fireChannelRead(Object msg) {
head.fireChannelRead(msg);
return this;
}
複製代碼
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
if (msg == null) {
throw new NullPointerException("msg");
}
final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(msg);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelRead(msg);
}
});
}
return this;
}
複製代碼
查找pipeline中下一個Inbound事件
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
複製代碼
HeadContext的下一個
Inbound事件是
IdleStateHandler
private void invokeChannelRead(Object msg) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
複製代碼
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
reading = true;
firstReaderIdleEvent = firstAllIdleEvent = true;
}
ctx.fireChannelRead(msg);
}
複製代碼
將這個channel讀事件標識爲true,並傳到下一個handler
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
System.out.println(msg.toString());
}
複製代碼
這裏執行IOHandler重寫的channelRead()方法,並調用父類channelRead方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
複製代碼
繼續調用事件鏈上的下一個handler
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
複製代碼
這裏會調用TailContext的Read方法,釋放msg緩存
總結:傳播Inbound事件是從HeadContext節點往上傳播,一直到TailContext節點結束
ByteBuf resp = Unpooled.copiedBuffer("hello".getBytes());
ctx.channel().write(resp);
複製代碼
咱們在項目中像上面這樣直接調用write寫數據,並不能直接寫進channel,而是寫到緩衝區,還要調用flush方法才能將數據刷進channel,或者直接調用writeAndFlush。
在這裏咱們選擇比較典型的write事件來解析Outbound流程,其餘事件流程相似
@Override
public ChannelFuture write(Object msg) {
return pipeline.write(msg);
}
複製代碼
經過上下文綁定的channel直接調用write方法,調用channel相對應的事件鏈上的handler
@Override
public ChannelFuture write(Object msg) {
return tail.write(msg);
}
複製代碼
寫事件是從tail向head調用,和讀事件恰好相反
@Override
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}
複製代碼
@Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
...
write(msg, false, promise);
...
}
複製代碼
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeWrite(msg, promise);
if (flush) {
next.invokeFlush();
}
...
}
...
}
複製代碼
通過屢次跳轉,獲取上一個Ounbound事件鏈的handler
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
複製代碼
IdleStateHandler既是
Inbound事件,又是
Outbound事件
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
複製代碼
@Override
public final void write(Object msg, ChannelPromise promise) {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
...
outboundBuffer.addMessage(msg, size, promise);
...
}
複製代碼
從這裏咱們看到,最終是把數據丟到了緩衝區,自此netty 的pipeline模型咱們解析完畢
有關inbound事件和outbound事件的傳輸, 可經過下圖進行概括:
以爲對您有幫助請點 "贊"