// 客戶端引導對象 Bootstrap bootstrap = new Bootstrap(); // 設置各類參數 .channel(socketChannelClass) // Disable Nagle's Algorithm since we don't want packets to wait // 關閉Nagle算法 .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs()) .option(ChannelOption.ALLOCATOR, pooledAllocator); // socket接收緩衝區 if (conf.receiveBuf() > 0) { bootstrap.option(ChannelOption.SO_RCVBUF, conf.receiveBuf()); } // socket發送緩衝區 // 對於接收和發送緩衝區的設置應該用以下的公式計算: // 延遲 *帶寬 // 例如延遲是1ms,帶寬是10Gbps,那麼緩衝區大小應該設爲1.25MB if (conf.sendBuf() > 0) { bootstrap.option(ChannelOption.SO_SNDBUF, conf.sendBuf()); } final AtomicReference<TransportClient> clientRef = new AtomicReference<>(); final AtomicReference<Channel> channelRef = new AtomicReference<>(); // 設置handler(處理器對象) bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { TransportChannelHandler clientHandler = context.initializePipeline(ch); clientRef.set(clientHandler.getClient()); channelRef.set(ch); } }); // Connect to the remote server long preConnect = System.nanoTime(); // 與服務端創建鏈接,啓動方法 ChannelFuture cf = bootstrap.connect(address);
public ChannelFuture connect(SocketAddress remoteAddress) { // 檢查非空 ObjectUtil.checkNotNull(remoteAddress, "remoteAddress"); // 一樣是對一些成員變量檢查非空,主要檢查EventLoopGroup,ChannelFactory,handler對象 validate(); return doResolveAndConnect(remoteAddress, config.localAddress()); }
public B channel(Class<? extends C> channelClass) { return channelFactory(new ReflectiveChannelFactory<C>( ObjectUtil.checkNotNull(channelClass, "channelClass") )); }
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { // 初始化一個Channel對象並註冊到EventLoop中 final ChannelFuture regFuture = initAndRegister(); final Channel channel =; if (regFuture.isDone()) { // 若是註冊失敗,世界返回失敗的future對象 if (!regFuture.isSuccess()) { return regFuture; } return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise()); } else {// 若是註冊還在進行中,須要向future對象添加一個監聽器,以便在註冊成功的時候作一些工做,監聽器實際上就是一個回調對象 // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // Directly obtain the cause and do a null check so we only need one volatile read in case of a // failure. Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See promise.registered(); // 註冊成功後仍然調用doResolveAndConnect0方法完成鏈接創建的過程 doResolveAndConnect0(channel, remoteAddress, localAddress, promise); } } }); return promise; }
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 經過channel工廠類建立一個channel對象
channel = channelFactory.newChannel();
// 調用init方法對channel進行一些初始化的設置
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
// 註冊到EventLoopGroup中 ChannelFuture regFuture = config().group().register(channel); // 若是發生異常,須要關閉已經創建的鏈接 if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; }
public NioSocketChannel() { this(DEFAULT_SELECTOR_PROVIDER); }
public NioSocketChannel(SelectorProvider provider) {
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { // 父類構造方法 super(parent); = ch; this.readInterestOp = readInterestOp; try { // 設置非阻塞 ch.configureBlocking(false); } catch (IOException e) { try { // 若是發生異常,關閉該channel ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } }
protected AbstractChannel(Channel parent) { this.parent = parent; // 建立一個ChannelId對象,惟一標識該channel id = newId(); // Unsafe對象,封裝了jdk底層的api調用 unsafe = newUnsafe(); // 建立一個DefaultChannelPipeline對象 pipeline = newChannelPipeline(); }
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
// 向ChannelPipeline中添加一個處理器,這個處理器就是咱們以前設置的處理器
final Map<ChannelOption<?>, Object> options = options0(); // 設置參數,最終是經過調用SocketChannelConfig的一些參數設置接口設置參數 synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); // 設置屬性 synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } }
在完成channel的建立和初始化以後,咱們就要將這個channel註冊到一個EventLoop中,NioNioEventLoop繼承自MultithreadEventLoopGroup, 經過調用SingleThreadEventLoop的register方法完成註冊
public ChannelFuture register(Channel channel) { return next().register(channel); }
public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); }
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");, promise);
return promise;
public final void register(EventLoop eventLoop, final ChannelPromise promise) { // 作一些非空檢查 if (eventLoop == null) { throw new NullPointerException("eventLoop"); } // 若是重複註冊,經過future對象拋出一個異常 // 一個channel只能註冊到一個EventLoopGroup對象上 if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } // 檢查channel類型和EventLoopGroup類型是否匹配 if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } // 將channel內部的eventLoop成員設置爲相應的對象 // 也就是將這個channel綁定到指定頂eventLoop上 AbstractChannel.this.eventLoop = eventLoop; // 這裏作了一個判斷,若是當前處於eventLoop對應的線程內,那麼直接執行代碼 // 若是當前運行的線程與eventLoop不是同一個,那麼將這個註冊的任務添加到eventLoop的任務隊列中 if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop // 將ChannelPromise設置爲不可取消,並檢查channel是否還存活,經過內部的jdk的channel檢查是否存活 if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } // 是否第一次註冊, // TODO 說明狀況下會註冊屢次?? boolean firstRegistration = neverRegistered; // 完成實際的註冊,即底層api的調用 // 若是對於jdk Nio的通道的註冊就是調用SelectableChannel.register(Selector sel, int ops, Object att) doRegister(); // 更新標誌變量 neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. // 調用全部的已添加的處理器節點的ChannelHandler.handlerAdded方法 pipeline.invokeHandlerAddedIfNeeded(); // 經過future對象已經註冊成功了 safeSetSuccess(promise); // 觸發一個channel註冊成功的事件,這個事件會在pipeline中傳播, // 全部註冊的handler會依次接收到該事件並做出相應的處理 pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { // 若是是第一次註冊,還須要觸發一個channel存活的事件,讓全部的handler做出相應的處理 pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See // 開始接收讀事件 // 對於Nio類型的channel, 經過調用jdk的相關api註冊讀事件爲感興趣的事件 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { try { final EventLoop eventLoop = channel.eventLoop(); // 獲取一個地址解析器 final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop); // 若是解析器不支持該地址或者改地址已經被解析過了,那麼直接開始建立鏈接 if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) { // Resolver has no idea about what to do with the specified remote address or it's resolved already. doConnect(remoteAddress, localAddress, promise); return promise; } // 對遠程地址進行解析 final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress); if (resolveFuture.isDone()) { final Throwable resolveFailureCause = resolveFuture.cause(); if (resolveFailureCause != null) { // Failed to resolve immediately channel.close(); promise.setFailure(resolveFailureCause); } else { // Succeeded to resolve immediately; cached? (or did a blocking lookup) // 解析成功後進行鏈接 doConnect(resolveFuture.getNow(), localAddress, promise); } return promise; } // Wait until the name resolution is finished. // 給future對象添加一個回調,採用異步方法進行鏈接, resolveFuture.addListener(new FutureListener<SocketAddress>() { @Override public void operationComplete(Future<SocketAddress> future) throws Exception { if (future.cause() != null) { channel.close(); promise.setFailure(future.cause()); } else { doConnect(future.getNow(), localAddress, promise); } } }); } catch (Throwable cause) { promise.tryFailure(cause); } return promise; }
private static void doConnect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. final Channel channel =; channel.eventLoop().execute(new Runnable() { @Override public void run() { if (localAddress == null) { // 調用 channel.connect方法完成鏈接 channel.connect(remoteAddress, connectPromise); } else { channel.connect(remoteAddress, localAddress, connectPromise); } connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } }); }
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return pipeline.connect(remoteAddress, promise); }
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return tail.connect(remoteAddress, promise); }
public ChannelFuture connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } if (isNotValidPromise(promise, false)) { // cancelled return promise; } // 找到下一個可以進行connect操做的,這裏用比特位來標記各類不一樣類型的操做, final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { // 調用AbstractChannelHandlerContext.invokeConnect next.invokeConnect(remoteAddress, localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeConnect(remoteAddress, localAddress, promise); } }, promise, null); } return promise; }
public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { unsafe.connect(remoteAddress, localAddress, promise); }
HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, HeadContext.class); unsafe =; setAddComplete(); }
public final void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { // 檢查promise狀態,channel存活狀態 if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } try { // 防止重複鏈接 if (connectPromise != null) { // Already a connect in process. throw new ConnectionPendingException(); } boolean wasActive = isActive(); // 調用doConnect方法進行鏈接 if (doConnect(remoteAddress, localAddress)) { // 若是當即就鏈接成功了,那麼將future對象設置爲成功 fulfillConnectPromise(promise, wasActive); } else { connectPromise = promise; requestedRemoteAddress = remoteAddress; // Schedule connect timeout. int connectTimeoutMillis = config().getConnectTimeoutMillis(); // 若是超時大於0,那麼會在超時到達後檢查是否鏈接成功 if (connectTimeoutMillis > 0) { connectTimeoutFuture = eventLoop().schedule(new Runnable() { @Override public void run() { ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress); // 若是connectPromise可以標記爲失敗,說明此時尚未鏈接成功,也就是鏈接超時了 // 此時須要關閉該通道 if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); } // 向future對象添加一個回調,在future被外部調用者取消時將通道關閉 promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isCancelled()) { if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; close(voidPromise()); } } }); } } catch (Throwable t) { promise.tryFailure(annotateConnectException(t, remoteAddress)); closeIfClosed(); } }
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
// 綁定指定的本地地址
if (localAddress != null) {
// 這個變量標記創建鏈接的動做是否發起成功 // 成功發起創建鏈接的工做並不表示鏈接已經成功創建 boolean success = false; try { // 實際創建鏈接的語句 boolean connected = SocketUtils.connect(javaChannel(), remoteAddress); if (!connected) { selectionKey().interestOps(SelectionKey.OP_CONNECT); } success = true; // 返回鏈接是否已經成功創建 return connected; } finally { if (!success) { doClose(); } }
public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress) throws IOException { try { return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() { @Override public Boolean run() throws IOException { // 調用jdk api創建鏈接,SocketChannel.connect return socketChannel.connect(remoteAddress); } }); } catch (PrivilegedActionException e) { throw (IOException) e.getCause(); } }
一句話,這代碼是真的很深! 很是不直接,初次看的話,若是沒有一個代碼框架圖在旁邊參考,很容易迷失在層層的繼承結構中,不少代碼層層調用,真正有用的邏輯隱藏的很深,因此看這中代碼必需要有耐心,有毅力,要有打破砂鍋問到底的決心。不過這樣的複雜的代碼結構好處也是顯而易見的,那就是良好的擴展性,你能夠在任意層級進行擴展。