netty源碼解解析(4.0)-3 Channel的抽象實現

AbstractChannel和AbstractUnsafe抽象類
io.netty.channel.AbstractChannel
從本章開始,會有大量的篇幅涉及到代碼分析。爲了可以清晰簡潔的地說明代碼的結構和功能,我會用代碼註釋+獨立段落的方式加以呈現。 因此,爲你能更好地理解代碼,請不要忽略代碼中黑體字註釋。
 
AbstractChannel和AbstractUnsafe之間的關係
AbstractChannel實現了Channel接口,AbstractUnsafe實現了Unsafe。這兩個類是抽象類,他們實現了Channel和Unsafe的絕大部分接口。在AbstractChannel的實現中,每一個方法都會直接或間接調用Unsafe對應的同名方法。全部的inbound和outbound方法都是經過pipeline間接調用,其餘的輔助方法直接使用unsafe實例調用。pipline和unsafe實例在AbstractChannel的構造方法建立:
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe(); // AbstractChannel沒有實現這個方法
pipeline = newChannelPipeline(); // newChannelPipline的實現 return new DefaultChannelPipeline(this);
}
直接調用的例子:
@Override
public SocketAddress localAddres) {
SocketAddress localAddress = this.localAddress;
if (localAddress == null) {
try {
// 這裏直接調用了Unsafe的localAddress()方法
this.localAddress = localAddress = unsafe().localAddress();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
return localAddress;
}
 
間接調用的例子
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return pipeline.bind(localAddress); //經過pipline間接調用Unsafe的bind方法
}
關於pipline是怎樣調用Unsafe方法的,會在後面的Pipline相關章節詳細分析,這裏只需記住。pipeline全部方法調用最終都會(若是沒有改變ChannelContextHandler的默認實現)經過使用newUnsafe建立的Unsafe實例調用Unsafe的同名方法(若是有的話)。
netty給出這一對Abstract實現有兩個目的:
  • 進一步明確接口的語意。
  • 簡化Channel接口的實現。
下面來具體看一下AbstractUnsafe的主要方法實現。
 
 
AbstractUnsafe的重要實現
 
register實現
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) { // 檢查是否已經註冊, 避免重複只需註冊動做。
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {// 檢查eventloop是否知足Channel的要求,由子類實現
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
 
// 設置Channel的EventLoop實例
AbstractChannel.this.eventLoop = eventLoop;
 
if (eventLoop.inEventLoop()) { // 檢查是否在當前線程中,若是是,直接調用
register0(promise);
} else {
// 若是不是,把register0包裝到runnable中放到eventloop中調用。
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
這個方法的實現爲咱們展現了netty使用I/O線程的通常套路
if(eventLoop.inEventLoop()){
doSomething();
}else{
eventLoop.execute(new Runnable(){
@Override
public void run() {
doSomething();
}
});
}
對於某個須要放到I/O線性中執行的方法,先檢查當前線程是否是I/O線程,是就直接執行,不是就把它包裝到Ruannable中放到eventLoop中執行。
register的功能總結一句話就是調用register0, 下面看看register0的實現。
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
// 確保promise沒有被取消同時Channel沒有被關閉才能執行後面的動做
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister(); // 執行真正的register操做,留改子類實現
neverRegistered = false;
registered = true; // 設置Channel已經處於registed狀態
 
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
// 觸發handlerAdded事件
pipeline.invokeHandlerAddedIfNeeded();
 
safeSetSuccess(promise);
pipeline.fireChannelRegistered(); // 觸發channelRegistered事件
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {// 確保Channel只有在第一次register 的時候被觸發
  pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// 對於設置了autoRead的Channel執行beginRead();
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
register語義:
  1. 把channel和eventLoop綁定,eventLoop線程就是I/O線程。
  2. 確保真正的register操做在I/O線程中執行。
  3. 確保每一個channel的register操做只執行一次。
  4. 真正的register操做執行成功後, 觸發channelRegistered事件,若是channel此時仍處於active狀態,觸發channelActive事件,並確保這些事件只觸發一次。
  5. 真正的register操做執行成功後, 若是channel此時仍處於active狀態,而且channel的配置支持autoRead, 則執行beginRead操做,讓eventLoop能夠自動觸發channel的read事件。
 
 
bind實現
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
 
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
 
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
// 先保存是否active的狀態
boolean wasActive = isActive();
try {
doBind(localAddress); / /調用doBind, 須要子類實現這個方法完成真正的bind操做
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
 
if (!wasActive && isActive()) {
// 若是執行完doBind後從非active狀態變成active裝,則觸發channelActive事件
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
 
safeSetSuccess(promise);
}
bind語義:
  • 調用抽象方法doBind, 它須要子類實現。
  • 若是channel的狀態從非active變成active狀態,則觸發channelActive事件
 
disconnect實現
disconnect和bind的實現類型,不一樣的是他調用的是doDisconnect方法,這個方法一樣是抽象方法須要子類實現。當channel的狀態從非active變成active狀態時,調用pipeline.fireChannelInactive()觸發channelInactive事件。
 
close實現
@Override
public final void close(final ChannelPromise promise) {
assertEventLoop();
 
close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
}
 
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify)
{
if (!promise.setUncancellable()) {
return;
}
 
if (closeInitiated) { // 這段代碼的做用就是防止屢次執行close操做
if (closeFuture.isDone()) {
// Closed already.
safeSetSuccess(promise);
} else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
// This means close() was called before so we just register a listener and return
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
  promise.setSuccess();
  }
  });
  }
  return;
  }
 
closeInitiated = true;
 
final boolean wasActive = isActive();
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
// 把outboundBuffer置空,在這以後沒法進行write或flush操做
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
Executor closeExecutor = prepareToClose(); // 這個方法默認實現是return null. 若是有些能夠在子類中覆蓋這個方法添加關閉前的準備代
// 下面的if..else執行的是相同的操做,不一樣的是若是closeExecutor能夠用,就在這個executor中執行,不然在當前線程總執行
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// Execute the close.
doClose0(promise); // 執行close操做
} finally {
// Call invokeLater so closeAndDeregister is executed in the EventLoop again!
// close完成以後的操做, 在eventLoop中執行
invokeLater(new Runnable() {
@Override
public void run() {
if (outboundBuffer != null) {
// Fail all the queued messages
// 對outboundBuffer中的數據進行錯誤處理
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
// 執行deregister操做, 若是channel由active變成非active狀態就觸發channelInactive事件
fireChannelInactiveAndDeregister(wasActive);
}
});
}
}
});
} else {
try {
// Close the channel and fail the queued messages in all cases.
doClose0(promise);
} finally {
if (outboundBuffer != null) {
// Fail all the queued messages.
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
}
if (inFlush0) {
// 若是正在執行flush操做,把deregister操做放在eventLoop中執行
invokeLater(new Runnable() {
@Override
public void run() {
  fireChannelInactiveAndDeregister(wasActive);
}
});
} else {
  fireChannelInactiveAndDeregister(wasActive);
}
}
}
 
private void doClose0(ChannelPromise promise) {
try {
doClose(); // 調用doClose執行真正的close操做,它是一個抽象方法,須要在子類中實現。
closeFuture.setClosed();
safeSetSuccess(promise);
} catch (Throwable t) {
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
close實現的代碼雖然比較多,但作的事情比較簡單:首先執行close操做,而後實現deregister操做,觸發channelInactive事件。
在close的實現中,先調用assertEventLoop方法確保當前方法是在eventLoop中執行,而後屢次使用invokeLater方法吧一系列操做放在放在Runnable中執行,這樣作的目的是事爲了保證接下來的操做必定在當前操做完成以後纔會執行,這一點是有eventLoop來保證的,eventLoop執行Runnable的順序和調用execute的順序一致,相關實現會在後面eventLoop章節具體討論。
 
deregister實現
@Override
public final void deregister(final ChannelPromise promise) {
assertEventLoop();
 
deregister(promise, false);
}
 
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
if (!promise.setUncancellable()) {
  return;
}
 
if (!registered) { // 避免屢次執行deregister操做
  safeSetSuccess(promise);
  return;
}
 
// As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
// we need to ensure we do the actual deregister operation later. This is needed as for example,
// we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
// the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
// the deregister operation this could lead to have a handler invoked by different EventLoop and so
// threads.
//
// See:
// https://github.com/netty/netty/issues/4435
invokeLater(new Runnable() {
  @Override
  public void run() {
  try {
    doDeregister(); // 執行真正的deregister操做,這方法默認沒作任何事情,子類能夠根據須要覆蓋實現
  } catch (Throwable t) {
    logger.warn("Unexpected exception occurred while deregistering a channel.", t);
  } finally {
    if (fireChannelInactive) {
      pipeline.fireChannelInactive(); // 觸發channelInactive事件
    }
// Some transports like local and AIO does not allow the deregistration of
// an open channel. Their doDeregister() calls close(). Consequently,
// close() calls deregister() again - no need to fire channelUnregistered, so check
// if it was registered.
    if (registered) {
      registered = false;
      pipeline.fireChannelUnregistered(); // 觸發channelUnregistered事件
    }
    safeSetSuccess(promise);
  }
  }
});
}
語義:
  • 調用doDeregister執行真正的deregister操做
  • 根據參數可能須要觸發channelInactive事件
  • 觸發channelUnregistered事件
 
write實現
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
 
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// 若是outboundBuffer是null, 意味着這個channel已經被close掉了,須要使用promise返回錯誤,而後釋放掉msg
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
 
int size;
try {
  msg = filterOutboundMessage(msg); // 過濾msg, 默認實現中沒有作任何操做,把msg原樣返回, 資料能夠根據須要覆蓋實現
  size = pipeline.estimatorHandle().size(msg); // 計算msg序列化以後的長度
  if (size < 0) {
    size = 0;
  }
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
 
outboundBuffer.addMessage(msg, size, promise); // 把msg放入outboundBuffer中
}
write的操做比較簡單,他只是把消息放到outboundBuffer中,並無作實際的寫操做。
 
flush實現
@Override
public final void flush() {
assertEventLoop(); // 確保在eventLoop中執行
 
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
  return;
}
outboundBuffer.addFlush();
// 若是outboundBuffer不是null才能夠進入真正的write階段
flush0();
}
 
protected void flush0() {
if (inFlush0) { // 確保不被多個線程同時執行
// Avoid re-entrance
return;
}
 
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) { // 確保outboundBuffer有數據是才執行下面的步驟
  return;
}
 
inFlush0 = true;
 
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) { // 若是channel不是active狀態,返回錯誤
try {
  if (isOpen()) {
    outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
  } else {
    // Do not trigger channelWritabilityChanged because the channel is closed already.
    outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
  }
} finally {
  inFlush0 = false;
}
return;
}
 
try {
  doWrite(outboundBuffer); // 執行真正的寫操做,這是一個抽象方法,須要子類實現。
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
// 如是I/O異常,而且channel配置容許自動關閉,則關閉channel
close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
} else {
try {
  shutdownOutput(voidPromise(), t); // 關閉output通道,不容許執行write操做。
} catch (Throwable t2) {
  close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
}
} finally {
  inFlush0 = false;
}
}
語義:
  • 調用doWrite方法執行真正的寫操做
  • 若是寫操做失敗,調用close或者shutdownOutput進行善後。
 
 
至此,已經分析完了AbstractChannel和AbstractUnsafe的全部重要的實現,回頭總結一下,這個類主要作了這麼幾件事:
1. 明確了AbstractChannel和AbstractUnsafe方法之間的調用關係,或經過unsafe實例直接調用,或經過pipleline間接調用。
2. 規定了Unsafe方法的執行線程,有些必須在eventLoop中執行,這樣的方法第一行就調用assertEventLoop來確保當前方法是在eventLoop線性中,有些不須要必定在eventLoop中執行的則沒有這個調用
3. 確保多線程多線程環境下的執行順序,這一點經過把一系列操做包裝成Runnable放入eventLoop中來保證,invokeLater方法就是一個典型的例子。
4. 定義了事件的觸發條件,在前面的代碼分析中,頻繁地出現pipeline.fireXXX()的調用,這些調用就是在觸發特定的事件,大部分狀況下用戶不要本身去觸發事件。
5. 優化多線程環境下的數據同步性能,使用volatile減小synchronized和Lock的使用, 典型的用法以下所示:
private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
......
return;
}
....
doWrite(outboundBuffer);
 
AbstractUnsafe的擴展點
前面說過,AbstractUnsafe作了不少事,但把臨門一腳的工做交給子類完成,這樣讓子類的實現變得簡單不少。AbstractUsafe把這些工做定義成形如doXXX的抽象方法或是沒有幹任何事的空方法。下面是這些方法的列表:
方法
說明
protected abstract SocketAddress localAddress0()
被localAddress調用,執行真正的獲取本地地址的操做。
protected abstract SocketAddress remoteAddress0()
被remoteAddress調用,是真正的獲取遠程地址的操做。
protected abstract boolean isCompatible(EventLoop loop)
檢查eventLoop是是否和這個Channel兼容。
protected void doRegister()
調用鏈register->register0->doRegister, 真正的註冊操做。
protected abstract void doBind(SocketAddress localAddress)
被bind調用,執行真正綁定本地地址的操做。
protected abstract void doDisconnect()
被disconnect調用,執行真正的斷開鏈接操做。
protected abstract void doClose()
被close掉,執行真正的關閉channel操做。
protected void doShutdownOutput()
被shutdownOutput調用,用來關閉output通道,使Channel不能write。它的的默認實現是調用doClose
protected void doDeregister()
被deregister調用,是真正的註銷操做,雖然不是抽象方法,然而只有一個{}, 仍是要等你來搞定。
protected abstract void doBeginRead()
調用鏈register->register0->beginRead->doBeginRead, 實現讓eventLoop能夠自動觸發read事件。
protected abstract void doWrite(ChannelOutboundBuffer in)
調用鏈flush->flush0->doWrite, 執行真正的寫操做。
protected Object filterOutboundMessage(Object msg)
被write調用,在消息被放到outboundBuffer以前對消息進行處理,默認啥事都沒幹,就是把你傳進去的msg還給你。
相關文章
相關標籤/搜索