Netty服務端啓動全流程源碼分析

想要閱讀Netty源碼的同窗,建議從GitHub上把源碼拉下來,方便寫註釋、Debug調試哦~點我去下載!java

先來看一個簡單的Echo服務端程序,監聽本地的9999端口,有客戶端接入時控制檯輸出一句話,接收到客戶端的數據後直接原樣寫回。git

public class EchoServer {
	// 綁定的端口
	private final int port;

	public EchoServer(int port) {
		this.port = port;
	}

	public static void main(String[] args) {
		// 啓動Echo服務
		new EchoServer(9999).start();
	}

	public void start() {
		/* bossGroup負責客戶端的接入 workerGroup負責IO數據的讀寫 */
		NioEventLoopGroup boss = new NioEventLoopGroup(1);
		NioEventLoopGroup worker = new NioEventLoopGroup();
		new ServerBootstrap()
				.group(boss, worker)
				.channel(NioServerSocketChannel.class)
				.childHandler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel sc) throws Exception {
						sc.pipeline().addLast(new ChannelInboundHandlerAdapter(){

							@Override
							public void channelActive(ChannelHandlerContext ctx) throws Exception {
								super.channelActive(ctx);
								System.out.println("有新的客戶端鏈接...");
							}

							@Override
							public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
								/* 原樣寫回給客戶端,由於OutBoundHandler還要使用,所以不能釋放msg。 底層數據寫完後會自動釋放。 */
								ctx.writeAndFlush(msg);
							}

							@Override
							public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
								// 出現異常了
								cause.printStackTrace();
								ctx.channel().close();
							}
						});
					}
				})
				.bind(port);
	}
}
複製代碼

代碼仍是很簡單的,接下來會一步步分析,僅僅幾行代碼,Netty到底作了什麼!github

NioEventLoopGroup源碼分析

Netty程序要想成功運行,須要EventLoopGroup進行驅動,ServerBootstrap.bind()會將ServerSocketChannel綁定到本地端口,這樣服務端就能夠接收客戶端的鏈接了,可是在這以前,必須確保設置了EventLoopGroup,ServerBootstrap調用bind()前會進行檢查,方法是validate(),源碼以下:編程

/** * 驗證必要的參數 */
public B validate() {
    if (group == null) {//EventLoopGroup必須設置,依賴它驅動程序
        throw new IllegalStateException("group not set");
    }
    if (channelFactory == null) {//依賴channelFactory建立ServerSocketChannel對象
        throw new IllegalStateException("channel or channelFactory not set");
    }
    return self();
}
複製代碼

先來看看類的繼承關係: 在這裏插入圖片描述數組

NioEventLoopGroup實現了ScheduledExecutorService,說明它不只能夠執行異步任務,還能夠執行定時任務。實現Iterable接口,是由於EventLoopGroup管理着一組EventLoop,須要對其進行迭代遍歷。MultithreadEventExecutorGroup表明它是一個多線程的事件執行器,而它管理的EventLoop就是個單線程的事件執行器。promise

先來看構造函數,它的構造函數很是多,咱們直接看參數最全的一個:安全

/** * @param nThreads 線程數量,就是NioEventLoop的數量,默認CPU核心數*2 * @param executor NioEventLoop.run()的執行者,默認爲ThreadPerTaskExecutor,NioEventLoop將利用它來啓動一個FastThreadLocalThread並執行 * @param chooserFactory 選擇器工廠,默認DefaultEventExecutorChooserFactory,輪詢選擇NioEventLoop * @param selectorProvider 多路複用器提供者,DefaultSelectorProvider.create() * @param selectStrategyFactory select策略工廠,指示EventLoop應該要作什麼事情 * @param rejectedExecutionHandler 拒絕策略 * @param taskQueueFactory 任務隊列工廠,默認PlatformDependent.newMpscQueue(),Netty實現的高性能無鎖隊列 */
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory, final RejectedExecutionHandler rejectedExecutionHandler, final EventLoopTaskQueueFactory taskQueueFactory) {
    super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
            rejectedExecutionHandler, taskQueueFactory);
}
複製代碼
  • nThreads:線程數,意味着Group須要建立多少個EventLoop,默認是CPU核心數*2。
  • executor:NioEventLoop.run()的執行者,默認爲ThreadPerTaskExecutor,NioEventLoop將利用它來啓動一個FastThreadLocalThread並執行。
  • chooserFactory:選擇器工廠,默認DefaultEventExecutorChooserFactory,輪詢選擇NioEventLoop。
  • selectorProvider:多路複用器提供者,DefaultSelectorProvider.create(),根據平臺會提供對應實現。
  • selectStrategyFactory:select策略工廠,指示EventLoop應該要作什麼事情。
  • rejectedExecutionHandler:拒絕策略。
  • taskQueueFactory:任務隊列工廠,默認PlatformDependent.newMpscQueue(),Netty實現的高性能無鎖隊列。

NioEventLoopGroup會把參數傳給父類構造器MultithreadEventLoopGroup,這裏會對nThreads進行初始化設置:markdown

/** * 參數太多,之後也可能會改變,後面的參數直接用Object...接收了 */
protected MultithreadEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
    // 若是nThreads=0,則默認爲CPU核心數*2
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args);
}
複製代碼

再次調用父類構造器,核心初始化流程在MultithreadEventExecutorGroup中:多線程

/* 建立一個多線程的事件執行器組 */
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
    // 確保線程數大於0
    checkPositive(nThreads, "nThreads");

    /* 若是沒提供Executor,則建立默認的ThreadPerTaskExecutor。 ThreadPerTaskExecutor依賴於一個ThreadFactory,靠它建立線程來執行任務。 默認的ThreadFactory會使用FastThreadLocalThread來提高FastThreadLocal的性能。 */
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    // 建立子EventExecutor
    children = new EventExecutor[nThreads];
    for (int i = 0; i < nThreads; i++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                // EventExecutor建立失敗,停機釋放資源
                for (int j = 0; j < i; j++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    /* 建立選擇器:簡單輪詢 PowerOfTwoEventExecutorChooser:2的冪次方,位運算 GenericEventExecutorChooser:不然,取餘 有事件/任務要執行時,取出一個EventExecutor */
    chooser = chooserFactory.newChooser(children);

    // 全部children中止時收到一個通知,優雅停機時用到
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };
    for (EventExecutor e : children) {
        e.terminationFuture().addListener(terminationListener);
    }

    // 返回一個只讀的children,iterator()迭代時使用
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
複製代碼

EventLoopGroup會管理EventLoop,EventLoop執行任務須要依賴Executor,Executor執行任務須要依賴ThreadFactory建立新的線程,咱們看下Netty默認的Executor實現。併發

默認的ThreadFactory,會建立FastThreadLocalThread線程,來優化FastThreadLocal的性能,關於FastThreadLocal後面會有專門的文章介紹。

// 建立一個默認的線程工廠
protected ThreadFactory newDefaultThreadFactory() {
    return new DefaultThreadFactory(getClass());
}

/* 默認的線程工廠 */
public class DefaultThreadFactory implements ThreadFactory {

    private static final AtomicInteger poolId = new AtomicInteger();

    // 生成線程名稱時用到:prefix+nextId自增
    private final AtomicInteger nextId = new AtomicInteger();
    private final String prefix;//前綴
    private final boolean daemon;//是否守護線程,默認false
    private final int priority;//優先級 默認5
    protected final ThreadGroup threadGroup;//所屬線程組
	
    // 省略部分代碼......
    
    @Override
    public Thread newThread(Runnable r) {
        // 建立一個FastThreadLocalThread線程,優化FastThreadLocal的性能
        Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
        try {
            if (t.isDaemon() != daemon) {
                t.setDaemon(daemon);
            }

            if (t.getPriority() != priority) {
                t.setPriority(priority);
            }
        } catch (Exception ignored) {
            // Doesn't matter even if failed to set.
        }
        return t;
    }

    protected Thread newThread(Runnable r, String name) {
        return new FastThreadLocalThread(threadGroup, r, name);
    }
}
複製代碼

有了ThreadFactory,Executor的實現就很簡單了,當要執行任務的時候,建立一個新線程去跑就行了。EventLoop會在第一次execute()時調用該方法,整個生命週期只會調用一次,即每一個EventLoop只會建立一個線程,後續全部的任務,都是在run()方法裏無限輪詢去執行。

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
    }

    /* 執行任務時,利用ThreadFactory建立一個新線程去跑。 EventLoop會在第一次execute()時調用該方法,整個生命週期只會調用一次, 即每一個EventLoop只會建立一個線程,後續全部的任務,都是在run()方法裏無限輪詢去執行。 */
    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}
複製代碼

有了Executor,接下來就會調用newChild()進行children的初始化,對於NioEventLoopGroup來講,它管理的孩子是NioEventLoop,因此newChild()會建立NioEventLoop:

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    // EventLoop須要一個TaskQueue來存放待執行的任務,這裏判斷是否有指定QueueFactory,沒有則使用默認的
    EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
    
    // 建立NioEventLoop
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
複製代碼

EventLoopGroup自己不幹活,向它提交任務,它只會交給它的孩子EventLoop執行,因此它依賴一個EventExecutorChooser,當有任務來臨時,從衆多的孩子中挑選出一個,默認的選擇策略就是簡單輪詢。 Netty這裏作了一個小小的優化,若是孩子數量是2的冪次方數會使用位運算,不然取模。源碼以下:

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    // 單例模式,經過INSTANCE提供一個單例對象
    private DefaultEventExecutorChooserFactory() { }

    /* 建立一個選擇器,從一組EventExecutor中挑選出一個。 Netty默認的選擇策略就是:簡單輪詢。 */
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        // 兩種Chooser實現都有一個AtomicLong計數器,每次next()先自增再取餘

        // 若是數量是2的冪次方數,則採用位運算
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            // 不然,對長度進行取餘
            return new GenericEventExecutorChooser(executors);
        }
    }

    // 是不是2的冪次方數
    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
        /* 二進制中,最高位是符號位,0正數、1負數。剩下的就是這個數的絕對值部分。 原碼:設置符號位,其餘0填充 反碼:正數的反碼與原碼相同,負數的反碼:除符號位外,其餘位取反 補碼:正數的補碼與原碼相同,負數的補碼:除符號位外,其餘位取反,而後在最後一位加1(計算機使用補碼) 以下舉例: 5:00000000 00000000 00000000 00000101(原碼) 5:00000000 00000000 00000000 00000101(反碼) 5:00000000 00000000 00000000 00000101(補碼) -5:10000000 00000000 00000000 00000101(原碼) -5:11111111 11111111 11111111 11111010(反碼) -5:11111111 11111111 11111111 11111011(補碼) 5 & -5 = 00000000 00000000 00000000 00000001 = 1 不是2的冪次方數 8 & -8 = 00000000 00000000 00000000 00001000 & 11111111 11111111 11111111 11111000 = 00000000 00000000 00000000 00001000 = 8 是2的冪次方數 */
    }

    // 2的冪次方數的選擇器,位運算
    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            // 計數器自增 & 長度-1,和HashMap同樣
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    // 普通的選擇器,取餘
    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicLong idx = new AtomicLong();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
}
複製代碼

有了選擇器後,你向EventLoopGroup提交的任務,包括註冊Channel,它都會輪詢出一個EventLoop轉交任務,源碼以下:

@Override
public ChannelFuture register(Channel channel) {
    // 選出一個孩子,讓它去執行
    return next().register(channel);
}
複製代碼

EventLoopGroup還有一個方法特別有用,那就是shutdownGracefully()優雅停機,調用後它會中止接受新的任務,並把隊列中等待執行的任務(包括定時任務)處理完(Netty不保證100%處理完),而後釋放資源。因爲EventLoopGroup自己不幹活,所以它依然中止全部的EventLoop,,源碼以下:

public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
    for (EventExecutor l: children) {
        // 將孩子按個停機
        l.shutdownGracefully(quietPeriod, timeout, unit);
    }
    // 返回一個終止Future,停機後收到通知
    return terminationFuture();
}
複製代碼

NioEventLoopGroup差很少就這樣,比較簡單,它只是負責管理EventLoop,核心仍是在EventLoop上。

NioEventLoop源碼分析

NioEventLoopGroup在建立時,會根據線程數初始化NioEventLoop。NioEventLoop能夠看做是一個單線程的線程池,也是真正幹活的角色,它的繼承關係以下: 在這裏插入圖片描述

NioEventLoop的主要職責是負責處理註冊到其上的Channel的IO事件,除此以外它還能夠執行用戶提交的系統任務和定時任務,例如:你能夠每隔一段時間檢查一下鏈接是否斷開,若是斷開,客戶端能夠重連,服務端須要及時釋放資源。

一個Channel只能被註冊到一個EventLoop上,一個EventLoop能夠註冊多個Channel。一旦Channel註冊到EventLoop,該EventLoop就要負責處理它整個生命週期的全部事件。事件以回調的方式被觸發,全部的回調任務會被封裝成一個Runnable放入taskQueue,由EventLoop線程串行化處理。雖然看似「串行化處理」效率低下,可是這避免了線程切換的開銷和數據同步的問題,並且你能夠開啓多個EventLoop,並行處理,充分利用CPU資源。

先看屬性,以下:

private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.

// 是否禁用SelectionKey優化?默認爲false
private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
    SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);

private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;

// Selector重建的閾值,默認512,目的是解決JDK Selector空輪詢Bug
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;

// 當前有幾個準備就緒的Channel?selectStrategy會用到,大於0表明有Channel事件須要處理
private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get() throws Exception {
        return selectNow();
    }
};
複製代碼

再看構造函數,源碼以下:

/** * 建立一個NioEventLoop實例,用來執行註冊在其上的Channel事件 * @param parent 所屬Group * @param executor * @param selectorProvider 多路複用器提供者,不一樣平臺會使用不一樣實現 * @param strategy Selector.select()的策略 * @param rejectedExecutionHandler 拒絕策略 * @param queueFactory 任務隊列工廠 */
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
             EventLoopTaskQueueFactory queueFactory) {
    super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
            rejectedExecutionHandler);
    this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
    this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
    /* 每一個EventLoop都會有一個Selector,用來監聽註冊在其上的Channel事件。 對於BossGroup,處理的是Accept。 對於WorkerGroup,處理的是read、write... SelectorTuple:Selector元組,Netty提供了一個Selector包裝,用來優化select()性能 */
    final SelectorTuple selectorTuple = openSelector();
    this.selector = selectorTuple.selector;
    this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
複製代碼
  • parent:EventLoop隸屬的EventLoopGroup,由Group來管理和調度。
  • executor:EventLoop須要executor開啓新線程跑自身的run()方法。
  • selectorProvider:多路複用器提供者,不一樣平臺會使用不一樣實現。
  • strategy:select策略工廠,指示EventLoop應該要作什麼事情。
  • rejectedExecutionHandler:拒絕策略。
  • queueFactory:任務隊列工廠,負責建立taskQueue。

NioEventLoop首先建立了兩個TaskQueue來存放待執行的任務,run()方法會不斷消費任務。雖然能夠多線程併發的往taskQueue中提交任務,可是因爲EventLoop是單線程的,全部taskQueue的生產消費模型是:**多生產者單消費者。**針對這種消費場景,Netty實現了高性能的無鎖隊列「MpscQueue」,Queue的建立源碼以下:

// 建立TaskQueue,存放待執行的任務
private static Queue<Runnable> newTaskQueue( EventLoopTaskQueueFactory queueFactory) {
    if (queueFactory == null) {
        // 默認建立Netty實現MpscQueue:Netty實現的高性能無鎖隊列,適用於多個生產者,單個消費者。
        return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
    }
    return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}

/* 根據最大隊列數建立Queue。 MpscQueue:Netty實現的高性能無鎖隊列,適用於多個生產者,單個消費者。 多個線程能夠併發往EventLoop提交任務,可是EventLoop自己是單線程消費的。 */
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
    // This event loop never calls takeTask()
    return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
        : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
複製代碼

關於MpscQueue,後面會專門寫文章介紹。

建立完taskQueue就是調用父類構造器進行相應的賦值操做了,這裏略過,下面主要看openSelector()。 每一個NioEventLoop被建立時,都會同時建立一個Selector多路複用器,這是JDK提供的,不熟悉的同窗去看看Java Nio編程。EventLoopGroup會將Channel註冊到NioEventLoop上,實際上就是註冊到Selector上了。這樣NioEventLoop就能夠經過Selector來監聽準備就緒的Channel,而後根據事件類型去觸發相應的回調,因此Selector是NioEventLoop的核心。

openSelector()會作一個優化,將JDK的SelectorImpl的selectedKeys、publicSelectedKeys屬性由HashSet替換成Netty的SelectedSelectionKeySet,內部是一個數組。當Selector監聽到有準備就緒的Channel時,會往HashSet裏添加SelectionKey,當SelectionKey比較多時,就容易發生哈希衝突,時間複雜度會增長,而SelectedSelectionKeySet內部使用數組來保存,避免了哈希衝突,性能會有必定的提高。

/* 打開一個Selector多路複用器 */
private SelectorTuple openSelector() {
    final Selector unwrappedSelector;//未包裝的原生Selector
    try {
        // 基於SelectorProvider打開一個原生的Selector,這是JDK提供的。
        unwrappedSelector = provider.openSelector();
    } catch (IOException e) {
        throw new ChannelException("failed to open a new selector", e);
    }

    // 若是禁用了SelectionKey優化,則unwrappedSelector和selector都指向原生Selector
    if (DISABLE_KEY_SET_OPTIMIZATION) {
        return new SelectorTuple(unwrappedSelector);
    }

    // 不然,使用SelectedSelectionKeySet來優化SelectionKey

    Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                return Class.forName(
                        "sun.nio.ch.SelectorImpl",
                        false,
                        PlatformDependent.getSystemClassLoader());
            } catch (Throwable cause) {
                return cause;
            }
        }
    });

    if (!(maybeSelectorImplClass instanceof Class) ||
        // ensure the current selector implementation is what we can instrument.
        !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
        if (maybeSelectorImplClass instanceof Throwable) {
            Throwable t = (Throwable) maybeSelectorImplClass;
            logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
        }
        return new SelectorTuple(unwrappedSelector);
    }

    final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

    Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                // 反射獲取SelectorImpl的selectedKeys、publicSelectedKeys屬性
                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                    // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
                    // This allows us to also do this in Java9+ without any extra flags.
                    long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                    long publicSelectedKeysFieldOffset =
                            PlatformDependent.objectFieldOffset(publicSelectedKeysField);

                    if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                        PlatformDependent.putObject(
                                unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                        PlatformDependent.putObject(
                                unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                        return null;
                    }
                    // We could not retrieve the offset, lets try reflection as last-resort.
                }

                Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                if (cause != null) {
                    return cause;
                }
                cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                if (cause != null) {
                    return cause;
                }

                /* 經過反射將SelectorImpl的selectedKeys、publicSelectedKeys替換爲selectedKeySet來提高性能。 */
                selectedKeysField.set(unwrappedSelector, selectedKeySet);
                publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                return null;
            } catch (NoSuchFieldException e) {
                return e;
            } catch (IllegalAccessException e) {
                return e;
            }
        }
    });

    if (maybeException instanceof Exception) {
        selectedKeys = null;
        Exception e = (Exception) maybeException;
        logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
        return new SelectorTuple(unwrappedSelector);
    }
    selectedKeys = selectedKeySet;
    logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
    // 建立一個SelectorTuple,包含一個原生的Selector,和優化過的Selector。
    return new SelectorTuple(unwrappedSelector,
                             new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
複製代碼

SelectedSelectionKeySet部分源碼以下:

/** * Selector的 Set<SelectionKey> selectedKeys * 默認用HashSet存儲,當有Channel準備就緒時,會添加到HashSet中,但若是發生衝突,HashSet的時間複雜度是O(n)鏈表/O(log n)紅黑樹 * Netty經過反射將selectedKeys、publicSelectedKeys替換成SelectedSelectionKeySet * 使用數組來避免哈希衝突 */
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
    // 使用數組來保存,默認長度1024
    SelectionKey[] keys;
    int size;//keys大小

    SelectedSelectionKeySet() {
        keys = new SelectionKey[1024];
    }
    // 省略部分代碼.......
}
複製代碼

建立完Selector,NioEventLoop的初始化就完成了,但此時線程並未啓動,Netty這裏作了懶加載處理,只有當EventLoop第一次被調用execute()執行任務時纔會經過executor去建立線程跑run()方法。

用戶不主動提交任務的前提下,對於BossGroup的EventLoop來講,線程是在調用bind()方法將ServerSocketChannel註冊到EventLoop時被啓動的。對於WorkerGroup的EventLoop來講,線程是在BossGroup接收到客戶端鏈接時,將SocketChannel註冊到WorkerGroup時被啓動的。

不論是ServerSocketChannel.bind()仍是接收到客戶端鏈接,都是要將Channel註冊到EventLoop,再由EventLoop去輪詢處理事件。register()源碼以下:

// 註冊Channel
@Override
public ChannelFuture register(Channel channel) {
    // 建立一個DefaultChannelPromise,再註冊,目的是讓用戶能夠在註冊完成時收到通知
    return register(new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    // 轉交給Channel.Unsafe完成
    promise.channel().unsafe().register(this, promise);
    return promise;
}
複製代碼

這裏須要說下Channel.Unsafe接口,對於bind()write()read()等這類方法,因爲須要和底層API交互,Netty對開發者屏蔽了底層實現,不但願由開發者調用這類方法,因而將它們封裝到Channel.Unsafe中,從名字中也能看出來,這些操做是不安全的,開發者儘可能不要去本身調用。

register()操做的目的其實就是將JDK的SocketChannel註冊到Selector多路複用器上,因爲須要和底層API交互,因而轉交給Channel.Unsafe處理,源碼在io.netty.channel.AbstractChannel.AbstractUnsafe#register(),以下所示:

/* 將Channel註冊到EventLoop,其實就是調用JDK底層的:SocketChannel.register(selector)。 將Channel註冊到多路複用器。 */
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    // 重複註冊校驗
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    // 檢查是否兼容,Channel和EventLoop模式不能混用,例如Oio和Nio不兼容
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    AbstractChannel.this.eventLoop = eventLoop;
    /* 當前線程是不是EventLoop線程? 若是是就直接執行,不然提交一個任務,後面串行化執行。 */
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}
複製代碼

register()作了一些檢查,而後確保由EventLoop來執行註冊操做,前面說過了,EventLoop會負責處理Channel的全部事件。register0()完成註冊,並觸發相應的事件回調,經過Pipeline傳播出去。

private void register0(ChannelPromise promise) {
    try {
        // 確保Channel是打開狀態
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        // JDK原生channel.register(selector)
        doRegister();
        neverRegistered = false;
        registered = true;

        // 觸發 ChannelHandler.handlerAdded()回調
        pipeline.invokeHandlerAddedIfNeeded();

        // 通知promise操做成功了,觸發回調
        safeSetSuccess(promise);

        // 註冊完成,觸發ChannelRegistered回調,經過pipeline傳播出去
        pipeline.fireChannelRegistered();

        // 若是鏈接激活了,則觸發active事件,只在首次註冊時會觸發
        if (isActive()) {
            if (firstRegistration) {
                // 觸發ChannelRegistered回調,經過pipeline傳播出去
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        // 異常了,關閉資源,觸發失敗通知
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}
複製代碼

doRegister()會調用JDK底層的註冊,源碼以下:

// 真正調用JDK底層API完成註冊
@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // 獲取Java原生SocketChannel註冊到未包裝的原生Selector上
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                eventLoop().selectNow();
                selected = true;
            } else {
                throw e;
            }
        }
    }
}
複製代碼

完成SocketChannel的註冊後,EventLoop就能夠經過輪詢Selector來監聽準備就緒的Channel了,後面就是一系列的事件處理了。

在調用io.netty.channel.AbstractChannel.AbstractUnsafe#register()時,EventLoop線程已經啓動並執行run()方法,在run()方法裏,EventLoop線程會執行一個死循環,直到線程被中止。

在死循環裏,EventLoop線程會不斷輪詢Selector是否有準備就緒的Channel須要處理?taskQueue是否有任務在等待執行?scheduledTaskQueue是否有定時任務須要執行?NioEventLoop.run()是任務處理的關鍵。

@Override
protected void run() {
    /* 無效空輪詢的次數 JDK的Selector存在Bug,會致使空輪詢,CPU飆升。 Netty會檢測Selector.select()空輪詢次數,超過SELECTOR_AUTO_REBUILD_THRESHOLD則重建Selector。 有效輪詢:要麼有IO事件到達、要麼執行了Task。 */
    int selectCnt = 0;
    for (;;) {
        try {
            int strategy;
            try {
                /* NioEventLoop的執行策略: 有任務待執行嗎? 沒有:Selector.select()阻塞,等待IO事件到達(定時任務判斷) 有:非阻塞調用Selector.selectNow(), */
                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                switch (strategy) {
                case SelectStrategy.CONTINUE:// 重試IO循環
                    continue;

                case SelectStrategy.BUSY_WAIT:// NIO不支持忙等,走SELECT

                case SelectStrategy.SELECT: // 隊列中沒有任務要執行
                    // 下一個要執行的定時任務截止時間
                    long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                    if (curDeadlineNanos == -1L) {
                        curDeadlineNanos = NONE;//沒有定時任務
                    }
                    nextWakeupNanos.set(curDeadlineNanos);
                    try {
                        /* 若是沒有任務要執行,則在下一個任務要執行前,阻塞等待IO事件。 沒有定時任務,則等待超時爲Long.MAX_VALUE,無限等待 */
                        if (!hasTasks()) {
                            strategy = select(curDeadlineNanos);
                        }
                    } finally {
                        // This update is just to help block unnecessary selector wakeups
                        // so use of lazySet is ok (no race condition)
                        nextWakeupNanos.lazySet(AWAKE);
                    }
                    // fall through
                default:
                }
            } catch (IOException e) {
                // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                // the selector and retry. https://github.com/netty/netty/issues/8566
                rebuildSelector0();
                selectCnt = 0;
                handleLoopException(e);
                continue;
            }

            selectCnt++;//無效輪詢次數+1,後面會判斷是否重置
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            boolean ranTasks;
            if (ioRatio == 100) {
                // 優先處理全部的IO事件後再去處理Task
                try {
                    if (strategy > 0) {// 表明有準備就緒的Channel待處理
                        processSelectedKeys();
                    }
                } finally {
                    // 處理完IO事件後,執行全部Task
                    ranTasks = runAllTasks();
                }
            } else if (strategy > 0) {
                // 先處理IO事件,並記錄所花的時間
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // 根據ioTime和ioRatio,計算處理Task能分配的時間
                    final long ioTime = System.nanoTime() - ioStartTime;
                    ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            } else {
                /* 有待執行的任務,且Selector.selectNow()返回0,沒有IO事件須要處理,那就先執行少許的Task。 每64個任務檢查一次超時,若是有足夠的任務,那麼最少執行64個。 因此,不該該提交耗時任務,阻塞IO線程!!! */
                ranTasks = runAllTasks(0); // This will run the minimum number of tasks
            }

            if (ranTasks || strategy > 0) {
                // 若是執行了任務或者有IO事件,說明此次輪詢是有效的,重置selectCnt
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
                selectCnt = 0;
            } else if (unexpectedSelectorWakeup(selectCnt)) { // 意外喚醒時,是否須要重置selectCnt,解決Selector空輪詢Bug
                selectCnt = 0;
            }
        } catch (CancelledKeyException e) {
            // Harmless exception - log anyway
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
        } catch (Error e) {
            throw (Error) e;
        } catch (Throwable t) {
            handleLoopException(t);
        } finally {
            // 無論正常/異常中止,都要關閉,釋放資源。
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Error e) {
                throw (Error) e;
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
}
複製代碼

SelectStrategy是一個選擇策略,其實就是告訴EventLoop線程須要作什麼事。

  • SELECT:表示當前沒有任務要執行,應該阻塞在Selector.select()上等待就緒的Channel。
  • CONTINUE:重試IO循環。
  • BUSY_WAIT:忙等,Nio不支持,會走SELECT邏輯。
  • 大於0:表明有準備就緒的Channel須要處理。

NioEventLoop在沒有Channel事件,又沒有taskQueue任務時,會調用nextScheduledTaskDeadlineNanos()計算距離下一次要執行的定時任務還有多長時間,在這以前,它會調用Selector.select(curDeadlineNanos)阻塞等待Channel事件(5微妙內不會阻塞),源碼以下:

// 在下一個定時任務要執行前,等待IO事件
private int select(long deadlineNanos) throws IOException {
    if (deadlineNanos == NONE) {
        // 沒有定時任務,直接阻塞
        return selector.select();
    }
    // 若是截止時間在5微秒內,超時將爲0
    long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
    return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
複製代碼

被喚醒,要麼是由於有Channel事件了,要麼是超時了須要執行定時任務了,開始走下面的邏輯。

ioRatio表明EventLoop執行IO事件和Task的時間比例,100表明優先執行完全部的IO事件再執行系統任務,不然會根據這個比例去調整執行Task所消耗的時間。

processSelectedKeys()會挨個處理準備就緒的Channel事件,前面說過,Netty默認會使用數組代替HashSet優化SelectionKey,這裏會進行判斷:

/* 處理SelectionKey,分爲優化後的處理,和普通處理 優化:HashSet<SelectionKey> --> SelectionKey[] */
private void processSelectedKeys() {
    if (selectedKeys != null) {
        // 說明Netty將HashSet優化爲數組了,能夠高效處理
        processSelectedKeysOptimized();
    } else {
        // 沒優化過,普通處理
        processSelectedKeysPlain(selector.selectedKeys());
    }
}
複製代碼

不論如何,最終都會遍歷selectedKeys,挨個處理,源碼以下:

// 處理SelectionKey事件
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {//有效性檢查,Channel、Selector可能已經被關閉
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            return;
        }
        if (eventLoop == this) {
            unsafe.close(unsafe.voidPromise());
        }
        return;
    }

    try {
        // 準備就緒的事件標誌位
        int readyOps = k.readyOps();
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);
            // 鏈接就緒
            unsafe.finishConnect();
        }

        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // 數據可寫
            ch.unsafe().forceFlush();
        }

        // 數據可讀、有新的鏈接接入
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            // 對於ServerSocketChannel只關心OP_ACCEPT事件
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}
複製代碼

針對不一樣的就緒事件,會調用Channel.Unsafe對應的方法。

對於OP_CONNECT事件,會調用unsafe.finishConnect()方法,它主要就是判斷鏈接是否激活,若是激活則觸發ChannelActive回調,並經過Pipeline傳播出去。

對於OP_WRITE事件,會調用ch.unsafe().forceFlush()方法,這裏的ch是指客戶端Channel,它會將ChannelOutboundBuffer緩衝的數據轉換成JDK的ByteBuffer並調用底層API經過SocketChannel響應給客戶端。

對於OP_ACCEPT事件,ServerSocketChannel會調用io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read()方法來接收客戶端鏈接:

/* NioEventLoop.processSelectedKey() 當Channel有 OP_READ | OP_ACCEPT 事件時調用該方法。 對於服務端Channel來講,就是 OP_ACCEPT. */
@Override
public void read() {
    assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    // 接收對端數據時,ByteBuf的分配策略,基於歷史數據動態調整初始化大小,避免太大浪費空間,過小又會頻繁擴容
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);

    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {
                /* 對於ServerSocketChannel來講,就是接收一個客戶端Channel,添加到readBuf */
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }
                // 遞增已讀取的消息數量
                allocHandle.incMessagesRead(localRead);
            } while (continueReading(allocHandle));
        } catch (Throwable t) {
            exception = t;
        }

        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            // 經過pipeline傳播ChannelRead事件
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        // 讀取完畢的回調,有的Handle會根據本次讀取的總字節數,自適應調整下次應該分配的緩衝區大小
        allocHandle.readComplete();
        // 經過pipeline傳播ChannelReadComplete事件
        pipeline.fireChannelReadComplete();

        if (exception != null) {// 事件處理異常了
            // 是否須要關閉鏈接
            closed = closeOnReadError(exception);

            // 經過pipeline傳播異常事件
            pipeline.fireExceptionCaught(exception);
        }

        if (closed) {//若是須要關閉,那就關閉
            inputShutdown = true;
            if (isOpen()) {
                close(voidPromise());
            }
        }
    } finally {
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}
複製代碼

主要是看doReadMessages()方法,Netty會調用accept()獲取到一個JDK原生SocketChannel,並把它包裝成Netty的NioSocketChannel

/* 對於服務端Channel來講,處理 OP_ACCEPT 事件就是從Channel中接收一個客戶端Channel。 */
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    // 獲取客戶端Channel,調用的就是JDK原生方法:serverSocketChannel.accept()
    SocketChannel ch = SocketUtils.accept(javaChannel());
    try {
        if (ch != null) {
            // 將原生SocketChannel包裝成Netty的NioSocketChannel
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);
        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }
    return 0;
}
複製代碼

接收到客戶端的鏈接,並把它封裝成NioSocketChannel,隨後會觸發channelRead回調,在ServerBootstrapAcceptor.ServerBootstrapAcceptor中,會把客戶端Channel註冊到WorkerGroup中,由WorkerGroup去完成後續的IO讀寫事件,BossGroup只負責鏈接的創建,這就是經典的Reactor線程模型。

一樣對於OP_ACCEPT事件,SocketChannel會調用io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read()來接收對端發送的數據:

/* 客戶端發送數據時觸發。 見 io.netty.channel.nio.NioEventLoop.processSelectedKey */
@Override
public final void read() {
    final ChannelConfig config = config();
    if (shouldBreakReadReady(config)) {
        clearReadPending();
        return;
    }
    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            // 分配一個ByteBuf,大小能容納可讀數據,又不過於浪費空間。
            byteBuf = allocHandle.allocate(allocator);
            /* doReadBytes(byteBuf):ByteBuf內部有ByteBuffer,底層仍是調用了SocketChannel.read(ByteBuffer) allocHandle.lastBytesRead()根據讀取到的實際字節數,自適應調整下次分配的緩衝區大小。 */
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            if (allocHandle.lastBytesRead() <= 0) {
                // 沒數據可讀了.
                byteBuf.release();
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0;
                if (close) {
                    // There is nothing left to read as we received an EOF.
                    readPending = false;
                }
                break;
            }

            // 遞增已經讀取的消息數量
            allocHandle.incMessagesRead(1);
            readPending = false;
            // 經過pipeline傳播ChannelRead事件
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
        } while (allocHandle.continueReading());//判斷是否須要繼續讀

        // 讀取完畢,pipeline傳播ChannelReadComplete事件
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();

        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}
}
複製代碼

大致邏輯和ServerSocketChannel相似,只是接收到的數據再也不是SocketChannel,而是ByteBuf。底層仍是調用了JDK原生的SocketChannel.read(ByteBuffer),再將ByteBuffer轉換成Netty的ByteBuf。

數據接收到後,繼續經過Pipeline傳播ChannelReadChannelReadComplete回調。

到這裏,基本就把EventLoop說的差很少了,總體工做流程已經瞭解了。細節的地方如:ByteBuf是如何動態分配的,ByteBuf是如何寫出到SocketChannel的等等,這些後面專門寫文章講吧,否則這篇文章太長了。 ​

ServerBootstrap源碼分析

前面分別講了NioEventLoopGroup和NioEventLoop單獨的工做流程,尚未把整個完整的流程給串起來。做爲服務端啓動的引導類,ServerBootstrap是服務端整個啓動流程的入口,核心方法 bind() 會調用initAndRegister()建立一個ServerSocketChannel,並把它註冊到BossGroup的EventLoop的 Selector 上,這樣BossGroup就能夠處理鏈接事件了。但此時是不會有鏈接事件的,由於尚未綁定到本地端口,客戶端沒法創建鏈接。 註冊完後,ServerBootstrap隨後會調用doBind0()將ServerSocketChannel綁定到本地端口,至此服務端啓動完成,耐心等待Channel事件便可。

/* 建立一個ServerSocketChannel,並綁定到本地端口 */
public ChannelFuture bind(SocketAddress localAddress) {
    // 數據驗證,group/channelFactory不能爲null
    validate();
    return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}

private ChannelFuture doBind(final SocketAddress localAddress) {
    /* 1.反射建立ServerSocketChannel 2.ServerSocketChannel的初始化,建立Pipeline、設置Options、Attrs。 3.將ServerSocketChannel註冊到EventLoop 此時,EventLoop能夠開始輪詢Accept事件了,可是因爲還未bind本地端口,因此不會有事件發生。 */
    final ChannelFuture regFuture = initAndRegister();

    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        // 若是異常了,直接返回
        return regFuture;
    }

    if (regFuture.isDone()) {
        // Register成功了,則開始綁定端口
        ChannelPromise promise = channel.newPromise();
        /* 將Channel綁定到本地端口,底層仍是調用了JDK原生的channel.bind()。 因爲bind()是一個出站事件,須要經過Pipeline傳播,因此會轉交給Pipeline執行:pipeline.bind(localAddress, promise)。 最終會傳播到DefaultChannelPipeline的HeadContext.bind(),它又會轉交給Channel.Unsafe.bind()。 Channel.Unsafe.bind()最終會調用JDK原生的javaChannel().bind(),詳見:io.netty.channel.socket.nio.NioServerSocketChannel.doBind() 綁定成功後,會觸發promise的回調 */
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // 由於是異步的,防止Register還沒完成,經過註冊回調來綁定。
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    promise.setFailure(cause);
                } else {
                    promise.registered();
                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}
複製代碼

先看initAndRegister(),初始化ServerSocketChannel並註冊到BossGroup:

// 初始化和註冊
final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        /* channelFactory根據Channel.class反射建立實例 服務端:ServerSocketChannel 客戶端:SocketChannel */
        channel = channelFactory.newChannel();

        /* 初始化Channel:服務端和客戶端 1.設置ChannelPipeline 2.設置options 3.設置attrs */
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            channel.unsafe().closeForcibly();
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    // 將Channel註冊到EventLoop,從Group中輪詢出一個EventLoop
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}
複製代碼

init()會初始化Channel,分爲服務端和客戶端兩種。客戶端Channel初始化很簡單,就是設置Pipeline、Options、Attrs,這裏就不貼代碼了。服務端複雜一些,除了設置自身的Pipeline、Options、Attrs,還要負責初始化客戶端接入的Channel,並把它註冊到WorkerGroup:

// 服務端Channel初始化
@Override
void init(Channel channel) {// 這裏的channel是ServerSocketChannel
    // 設置options
    setChannelOptions(channel, newOptionsArray(), logger);
    // 設置attrs
    setAttributes(channel, newAttributesArray());

    // 初始化ServerSocketChannel的ChannelPipeline
    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    // 和ServerSocketChannel創建鏈接的客戶端SocketChannel須要設置的options和attrs
    final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);

    /* 往服務端Channel添加Handler: 1.封裝HandlerAdded回調任務,保存在PendingHandlerCallback 2.後續的register()操做會觸發回調:pipeline.invokeHandlerAddedIfNeeded(); */
    p.addLast(new ChannelInitializer<Channel>() {
        /* initChannel()什麼時候被調用? ChannelHandler被添加到Pipeline有一個對應的回調:handlerAdded() addLast()會提交一個任務,讓EventLoop來觸發這個回調 ChannelInitializer在handlerAdded()回調裏會執行該初始化方法。 */
        @Override
        public void initChannel(final Channel ch) {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();//ServerBootstrap.handler()設置的
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    // ServerBootstrapAcceptor是服務端接收客戶端鏈接的核心
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}
複製代碼

服務端Channel在初始化Pipeline的時候會添加一個ServerBootstrapAcceptor,它是服務端接收客戶端鏈接的核心。 ​

先看屬性,它保留了客戶端鏈接時建立Channel的必要信息:

private final EventLoopGroup childGroup;// Reactor模型中的WorkerGroup
private final ChannelHandler childHandler;// 客戶端Channel的ChannelHandler
private final Entry<ChannelOption<?>, Object>[] childOptions;// 客戶端Channel的Options
private final Entry<AttributeKey<?>, Object>[] childAttrs;// 客戶端Channel的Attrs
private final Runnable enableAutoReadTask; // 啓用自動讀取的任務
複製代碼

構造函數就不貼代碼了,都是屬性賦值操做。 ​

須要重點關注的方法是channelRead(),前面已經分析過了,BossGroup監聽到有客戶端接入時會觸發該回調:

/* 有客戶端鏈接時,觸發. 見 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe.read() */
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;// 這裏的Channel是SocketChannel

    // 設置客戶端Channel的Pipeline、Options、Attrs
    child.pipeline().addLast(childHandler);
    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);

    try {
        /* 將客戶端Channel註冊到WorkerGroup: 1.next()輪詢出一個EventLoop.register() 2.Channel.Unsafe.register(),Channel註冊到Selector 3.觸發各類回調 Channel一旦註冊到EventLoop,就由該EventLoop負責處理它整個生命週期的全部事件。 */
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                // 若是註冊失敗,強制關閉鏈接
                if (!future.isSuccess()) {
                    // 底層就是調用原生JDK的關閉方法:javaChannel().close();
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}
複製代碼

這裏處理的是客戶端的接入,設置Options、Attrs、Pipeline,並註冊到WorkerGroup,後續的全部讀寫事件交給WorkerGroup處理。 ​

doBind0()沒調用以前,全部的這一切都不會發生,因此最後只要看一下Netty是如何將ServerSocketChannel綁定到本地端口的,整個流程就所有分析結束了。

private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {

    // 往Channel綁定的EventLoop提交一個綁定任務,轉交給Channel去執行
   	channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}
複製代碼

因爲bind()是一個出站事件,因此會轉交給Pipeline執行,須要它把事件傳播出去。

@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}
複製代碼

Pipeline會從TailContext開始傳播,TailContext會日後尋找能處理bind事件的ChannelHandler:

@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(localAddress, "localAddress");
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    /* TailContext會日後尋找能處理bind事件的ChannelHandler。 由於是出站事件,因此調用findContextOutbound() */
    final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {// 讓EventLoop線程串行化處理
        next.invokeBind(localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeBind(localAddress, promise);
            }
        }, promise, null, false);
    }
    return promise;
}
複製代碼

若是用戶沒有重寫bind()回調的話,TailContext會把事件傳播給HeadContext,因爲bind操做須要和底層API交互,HeadContext會將操做轉交給Channel.Unsafe執行,因此最終會調用io.netty.channel.AbstractChannel.AbstractUnsafe#bind(),源碼以下:

/* 將ServerSocketChannel綁定到本地端口,如何被觸發的? 1.Bootstrap.bind()會往Channel註冊的EventLoop提交一個任務:Channel.bind() 2.因爲bind()是一個出站事件,須要被Pipeline傳播出去,因而會被轉交給Pipeline執行:Pipeline.bind() 3.bind()事件從TailContext開始傳播,不出意外會傳播到HeadContext。 4.HeadContext會再將bind()任務轉交給Channel.Unsafe執行,因而被觸發。 總結:Channel.bind()會將事件經過Pipeline進行傳播,從TailContext到HeadContext。 */
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();//確保是EventLoop線程執行

    // promise標記爲不可取消 確保Channel是Open狀態,若是close了就沒法bind了
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
        localAddress instanceof InetSocketAddress &&
        !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
        !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
        logger.warn(
                "A non-root user can't receive a broadcast packet if the socket " +
                "is not bound to a wildcard address; binding to a non-wildcard " +
                "address (" + localAddress + ") anyway as requested.");
    }

    boolean wasActive = isActive();//鏈接是否活躍
    try {
        /* 真正的綁定操做,子類實現。 看NioServerSocketChannel實現,就是調用了JDK原生的javaChannel().bind(); */
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }

    if (!wasActive && isActive()) {
        // 鏈接處於活躍狀態,觸發Active回調,往EventLoop提交一個任務,經過Pipeline傳播出去。
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}
複製代碼

doBind()又會調回到NioServerSocketChannel.doBind(),其實就是調用JDK原生的ServerSocketChannel.bind(localAddress , backlog),源碼以下:

@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        // 獲取JDK的ServerSocketChannel.bind()
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}
複製代碼

綁定成功後就能夠正常處理客戶端的接入了,以後客戶端Channel都會由WorkerGroup驅動IO的讀寫。 ​

總結

這篇文章分析了Netty服務端啓動的全流程,從ServerSocketChannel的建立到綁定端口,再到BossGroup驅動客戶端鏈接的接入和WorkerGroup驅動數據的讀寫。 還重點分析了NioEventLoopGroup和NioEventLoop的工做模式,認真讀完,相信你會對Netty總體的工做機制有所瞭解。 ​

數據接收ByteBuf的分配,數據write的底層細節沒有介紹到,包括Netty對高性能所做的努力也尚未過多介紹,考慮到篇幅緣由,後面會專門再開一篇文章。 ​

寫到這裏就結束了,此時此刻,個人電腦編輯器已經很是卡了,艱難的敲下這段文字後,是時候說再見了!!!

相關文章
相關標籤/搜索