運算符 | 描述 |
---|---|
& | 與 |
| | 或 |
~ | 非 |
^ | 異或 |
<< | 左移 |
>> | 右移 |
&(與)緩存
十進制 | 二進制 |
---|---|
3 | 0 0 1 1 |
5 | 0 1 0 1 |
& 後結果:1 | 0 0 0 1 |
即:對應位都爲 1 時,才爲 1,不然全爲 0。安全
|(或)ide
十進制 | 二進制 |
---|---|
3 | 0 0 1 1 |
5 | 0 1 0 1 |
| 後結果 :7 | 0 1 1 1 |
即:對應位只要有 1 時,即爲 1,不然全爲 0。oop
~(非)學習
十進制 | 二進制 |
---|---|
3 | 0 0 1 1 |
~ 後結果:12 | 1 1 0 0 |
即:對應位取反。this
異或 ^atom
十進制 | 二進制 |
---|---|
3 | 0 0 1 1 |
5 | 0 1 0 1 |
^ 後結果:6 | 0 1 1 0 |
即:只要對應爲不一樣即爲 1。spa
咱們在以往學習Netty中見到過相似於如下代碼:.net
selectionKey.interestOps(interestOps | readInterestOp);
咱們重點關注位運算:interestOps | readInterestOp線程
該行代碼的意思是位運算計算一個數字,該數字包含 | 先後的數字!
//初始化一個值 int interestOps = 0; //給當前這個值增長一個可讀事件 interestOps |= OP_READ; //給當前的值增長一個可寫的事件 interestOps |= OP_WRITE; //判斷當前的事件是否是包含可讀事件 true boolean isRead = (interestOps & OP_READ) == OP_READ; //判斷當前的事件是否是不包含可讀事件 false boolean isRead = (interestOps & OP_READ) == 0; //剔除可讀事件 interestOps &= ~OP_READ; //剔除可寫事件 interestOps &= ~OP_WRITE;
io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); //建立一個管道上下文 尾部節點 tail = new TailContext(this); //建立一個管道上下文 頭部節點 head = new HeadContext(this); //頭部節點的下一個節點設置爲尾部節點 head.next = tail; //尾部節點的上一個節點設置爲頭部節點 tail.prev = head; }
能夠看到,這裏初始化管道的時候,管道內部存在兩個Handler tail和head節點,兩個節點組成雙向鏈表!
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channelRegistered"); super.channelRegistered(ctx); } });
上述代碼再一個Netty開發中是很常見的一個代碼,這裏向通道內添加了一個 ChannelInboundHandlerAdapter
,咱們進入到addLast方法:
@Override public final ChannelPipeline addLast(ChannelHandler... handlers) { return addLast(null, handlers); } //進入到 addLast @Override public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { ObjectUtil.checkNotNull(handlers, "handlers"); for (ChannelHandler h: handlers) { if (h == null) { break; } addLast(executor, null, h); } return this; } //進入到 addLast(executor, null, h); @Override public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { //驗證是否重複添加改handler checkMultiplicity(handler); //將handler封裝爲上下文對象 newCtx = newContext(group, filterName(name, handler), handler); //將該節點添加到雙向鏈表中 addLast0(newCtx); ........................忽略其餘代碼.............. } ........................忽略其餘代碼.............. return this; }
這裏總共分爲兩步:
驗證Handler是否被重複添加
checkMultiplicity(handler);
private static void checkMultiplicity(ChannelHandler handler) { //驗證是否是 ChannelHandlerAdapter 類型的,若是不是直接忽略 if (handler instanceof ChannelHandlerAdapter) { ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler; //若是不是可共享的並且是已經添加過的直接報錯 if (!h.isSharable() && h.added) { throw new ChannelPipelineException( h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times."); } //若是是可共享的或者未添加的,將該handler內的 added屬性設置爲true證實該handler已經被添加 h.added = true; } }
他是如何判斷是否被添加過的呢?
每個Handler中都存在一個 added屬性,當這個屬性爲true的時候,證實這個Handler已經被添加過了,Netty常規狀況下爲了考慮線程安全問題,是不容許一個Handler被重複的使用的!
可是咱們有時候會有這樣一個需求,Handler的功能比較相似,並且咱們經過代碼手段,避免了線程安全問題,因此又想重複添加Handler,Netty提供了一個註解 @Sharable
註解,當存在該註解的時候,證實這個Handler是能夠被複用的,能夠被重複添加!
因此,checkMultiplicity方法經過判斷類是否增長了@Sharable
註解和added
屬性是否爲空來驗證Handle是否違規重複添加了!
當驗證經過以後,將added設置爲true,證實這個Handler已經被添加過了!
將Handler封裝爲包裝對象
newCtx = newContext(group, filterName(name, handler), handler);
這裏比較難理解的就是這個,咱們進入到newContext方法裏面:
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) { return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler); }
進入到 DefaultChannelHandlerContext類的源碼裏面:
DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { //調用父類進行掩碼計算 super(pipeline, executor, name, handler.getClass()); //保存一個handler this.handler = handler; }
這裏除了會保存一個handler還會調用父類,咱們介入到父類裏面:
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; //標識 是in仍是out this.executionMask = mask(handlerClass); // 若是由EventLoop或給定的Executor驅動的驅動程序是OrderedEventExecutor的實例,則其順序爲。 ordered = executor == null || executor instanceof OrderedEventExecutor; }
這裏會保存一些屬性,這些屬性都是咱們前面講過的,你們自行分析下,咱們重點關注掩碼的計算:
this.executionMask = mask(handlerClass);
static int mask(Class<? extends ChannelHandler> clazz) { //直接再緩存中取出 Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get(); Integer mask = cache.get(clazz); //緩存中不存在 if (mask == null) { mask = mask0(clazz); cache.put(clazz, mask); } return mask; }
先從緩存中取出,若是不存在就調用 mask0(clazz); 方法計算,而後再放進緩存,咱們進入到mask0(clazz);
方法:
private static int mask0(Class<? extends ChannelHandler> handlerType) { int mask = MASK_EXCEPTION_CAUGHT; try { if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) { // 若是是 ChannelInboundHandler 實例,全部 Inbound 事件置爲 1 mask |= MASK_ALL_INBOUND; //判斷是否存在Skip註解 若是催你在這個跳過的註解 就移除這個 if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) { mask &= ~MASK_CHANNEL_REGISTERED; } ..................忽略相似的代碼..................... } if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) { mask |= MASK_ALL_OUTBOUND; if (isSkippable(handlerType, "bind", ChannelHandlerContext.class, SocketAddress.class, ChannelPromise.class)) { mask &= ~MASK_BIND; } ..................忽略相似的代碼..................... } } catch (Exception e) { ..................忽略異常的代碼..................... } return mask; }
這會區分兩種狀況,一種是ChannelInboundHandler
類型的,一種是ChannelOutboundHandler
類型的,兩者邏輯相同,咱們以ChannelInboundHandler爲例:
首先,再ChannelHandlerMask類裏面定義了不少的預設掩碼值:
/** * 如下是方法表明的掩碼值 */ static final int MASK_EXCEPTION_CAUGHT = 1; /** * channelRegistered方法的掩碼 */ static final int MASK_CHANNEL_REGISTERED = 1 << 1; /** * channelUnregistered方法的掩碼 */ static final int MASK_CHANNEL_UNREGISTERED = 1 << 2; /** * 後面的以此類推 */ static final int MASK_CHANNEL_ACTIVE = 1 << 3; static final int MASK_CHANNEL_INACTIVE = 1 << 4; static final int MASK_CHANNEL_READ = 1 << 5; static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6; static final int MASK_USER_EVENT_TRIGGERED = 1 << 7; static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8; /** * bind方法的掩碼 */ static final int MASK_BIND = 1 << 9; /** * connect方法的掩碼 */ static final int MASK_CONNECT = 1 << 10; /** * 後面的以此類推 */ static final int MASK_DISCONNECT = 1 << 11; static final int MASK_CLOSE = 1 << 12; static final int MASK_DEREGISTER = 1 << 13; static final int MASK_READ = 1 << 14; static final int MASK_WRITE = 1 << 15; static final int MASK_FLUSH = 1 << 16; /** * 包含所有 Inbound方法的掩碼 */ private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_CHANNEL_REGISTERED | MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ | MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED; /** * 包含所有 outbound方法的掩碼 */ private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_BIND | MASK_CONNECT | MASK_DISCONNECT | MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
咱們回到 mask0方法:
mask |= MASK_ALL_INBOUND;
一開始,咱們會直接將一個handler的掩碼計算爲擁有所有方法的掩碼!
if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) { mask &= ~MASK_CHANNEL_REGISTERED; }
判斷該方法是否存在 @Skip
註解,若是存在就排除掉這個掩碼!
整個邏輯執行完畢後,這個掩碼就只會包含handler中沒有被@Sikp註解註解的方法掩碼!
有同窗可能疑問,我在書寫handler的時候並無增長@Sikp
註解呀! 咱們都知道,實現一個Handler就一定須要繼承 ChannelInboundHandlerAdapter
或者ChannelOutboundHandlerAdapter
, 咱們隨便挑一個類進去看:
能夠看到,這些方法其實都是被默認添加了的,只不過咱們重寫以後沒添加!如今咱們明白,handler是如何區分你實現了那些方法的了!
這裏會將handler包裝爲HandlerContext對象,相似於tailContext和HeadContext同樣,此時上下文對象的結構以下:
將HandlerContext添加進pipeline中:
addLast0(newCtx);
private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; }
整個過程如上,無非就是指針指向地址的變換,比較簡單,不作深刻分析!
ch.pipeline().remove("xxxxxx")
@Override public final ChannelPipeline remove(ChannelHandler handler) { remove(getContextOrDie(handler)); return this; }
尋找處理器Handler的上下文
getContextOrDie(handler)
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) { //尋找handler AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler); if (ctx == null) { throw new NoSuchElementException(handler.getClass().getName()); } else { return ctx; } } //context(handler); @Override public final ChannelHandlerContext context(ChannelHandler handler) { ObjectUtil.checkNotNull(handler, "handler"); AbstractChannelHandlerContext ctx = head.next; for (;;) { if (ctx == null) { return null; } //循環迭代 判斷是否尋找到這個handler if (ctx.handler() == handler) { //返回這個handler的上下文對象 return ctx; } ctx = ctx.next; } }
刪除這個處理器
remove(getContextOrDie(handler));
private <T extends ChannelHandler> T removeIfExists(ChannelHandlerContext ctx) { if (ctx == null) { return null; } return (T) remove((AbstractChannelHandlerContext) ctx).handler(); } //直接進入到 刪除Handler的主要邏輯 //(T) remove((AbstractChannelHandlerContext) ctx).handler(); private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) { //首先刪除的handler不是tail和尾節點 assert ctx != head && ctx != tail; synchronized (this) { //刪除上下文對象 atomicRemoveFromHandlerList(ctx); ................忽略.................... EventExecutor executor = ctx.executor(); if (!executor.inEventLoop()) { executor.execute(new Runnable() { @Override public void run() { //回調handlerRemoved方法 callHandlerRemoved0(ctx); } }); return ctx; } } callHandlerRemoved0(ctx); return ctx; }
首先咱們關注 atomicRemoveFromHandlerList(ctx);
:
private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) { //獲取該節點的上級節點 AbstractChannelHandlerContext prev = ctx.prev; //獲取該節點的下級節點 AbstractChannelHandlerContext next = ctx.next; //重建指針位置 prev.next = next; next.prev = prev; }
指針位置重建以後,咱們回調handlerRemoved方法
callHandlerRemoved0(ctx);
至此咱們就完成了pipeline的建立、添加、刪除的源碼解析!
咱們前面見到過不少的事件傳播代碼,咱們以 channelRegistered
方法的事件回調爲例:
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
//通知管道 傳播channelRegistered事件 // 觸發 channelRegistered 事件 pipeline.fireChannelRegistered();
咱們進入到改行代碼的源碼:
@Override public final ChannelPipeline fireChannelRegistered() { //執行註冊方法 從head方法 AbstractChannelHandlerContext.invokeChannelRegistered(head); return this; }
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRegistered(); } }); } }
咱們能夠看到,這裏使用了 next.invokeChannelRegistered();
方法 咱們依舊按照同步方法進行分析!
private void invokeChannelRegistered() { if (invokeHandler()) { try { //如今調用的HeadContext的handler ((ChannelInboundHandler) handler()).channelRegistered(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRegistered(); } }
咱們如今進入到了headContext,因此咱們進入到: io.netty.channel.DefaultChannelPipeline.HeadContext#channelRegistered
:
@Override public void channelRegistered(ChannelHandlerContext ctx) { invokeHandlerAddedIfNeeded(); //向下傳播事件 ctx.fireChannelRegistered(); }
這一段代碼除了執行Head的invokeHandlerAddedIfNeeded方法以外,還又一次傳播了channelRegistered事件,咱們進入到 ctx.fireChannelRegistered();
:
@Override public ChannelHandlerContext fireChannelRegistered() { invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED)); return this; }
咱們若是想要向下傳播,咱們首先應該找到下一個節點是誰才能傳播,Netty這裏調用了findContextInbound(MASK_CHANNEL_REGISTERED)
查找下一個節點,我咱們先關注如下參數 MASK_CHANNEL_REGISTERED
, 他是channelRegistered方法的掩碼
, 咱們進入到 findContextInbound
方法源碼:
private AbstractChannelHandlerContext findContextInbound(int mask) { AbstractChannelHandlerContext ctx = this; do { //獲取下一個inbun事件 ctx = ctx.next; //只要和掩碼&運算後不爲0的都是 inbunt事件 } while ((ctx.executionMask & mask) == 0); return ctx; }
從當前節點向下尋找,只要 掩碼計算包含這個方法,就證實該context包含channelRegistered方法,就直接返回!
尋找到了handler以後,就開始調用了:
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRegistered(); } }); } }
next.invokeChannelRegistered();
具體邏輯就和上面分析的一致了,調用該handler的ChannelRegistered方法!
傳播某一個事件,就會使用哪一個事件的掩碼,從當前節點向下尋找,知道對應的Handler以後,回調對應的方法!
關於管道的傳播,你明白了嗎?
提一個問題, 觀察如下兩種傳播方式有何不一樣:
ctx.fireChannelRegistered(); ctx.pipeline().fireChannelRegistered();