-java
一、Netty這個詞,對於熟悉併發的童鞋一點都不陌生,它是一個異步事件驅動型的網絡通訊框架; 二、使用Netty不須要咱們關注過多NIO的API操做,簡簡單單的使用便可,很是方便,開發門檻較低; 三、並且Netty也經歷了各大著名框架的「摧殘」,足以證實其性能高,穩定性高; 四、那麼本章節就來和你們分享分析一下Netty的服務端啓動流程,分析Netty的源碼版本爲:netty-netty-4.1.22.Final;
一、是一個基於NIO的客戶端、服務器端的網絡通訊框架; 二、是一個以提供異步的、事件驅動型的網絡應用工具; 三、能夠供咱們快速開發高性能的、高可靠性的網絡服務器與客戶端;
一、開箱即用,簡單操做,開發門檻低,API簡單,只需關注業務實現便可,不用關心如何編寫NIO; 二、自帶多種協議棧且預置多種編解碼功能,且定製化能力強; 三、綜合性能高,已歷經各大著名框架(RPC框架、消息中間件)等普遍驗證,健壯性很是強大; 四、相對於JDK的NIO來講,netty在底層作了不少優化,將reactor線程的併發處理提到了極致; 五、社區相對較活躍,遇到問題能夠隨時提問溝通並修復;
一、建立兩個線程管理組,一個是bossGroup,一個是workerGroup,每一個Group下都有一個線程組children[i]來執行任務; 二、bossGroup專門用來攬客的,就是接收客戶端的請求連接,而workerGroup專門用來幹事的,bossGroup攬客完了就交給workerGroup去幹活了; 三、經過bind輕鬆的一句代碼綁定註冊,其實裏面一點都不簡單,一堆堆的操做; 四、建立NioServerSocketChannel,而且將此註冊到bossGroup的子線程中的多路複用器上; 五、最後一步就是將NioServerSocketChannel綁定到指定ip、port便可,由此完成服務端的整個啓動過程;
/** * Netty服務端啓動代碼。 * * @author hmilyylimh * * @version 0.0.1 * * @date 2018/3/25 * */ public class NettyServer { public static final int TCP_PORT = 20000; private final int port; public NettyServer(int port) { this.port = port; } public void start() throws Exception { EventLoopGroup bossGroup = null; EventLoopGroup workerGroup = null; try { // Server 端引導類 ServerBootstrap serverBootstrap = new ServerBootstrap(); // Boss 線程管理組 bossGroup = new NioEventLoopGroup(1); // Worker 線程管理組 workerGroup = new NioEventLoopGroup(); // 將 Boss、Worker 設置到 ServerBootstrap 服務端引導類中 serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // 指定通道類型爲NioServerSocketChannel,一種異步模式,OIO阻塞模式爲OioServerSocketChannel .localAddress("localhost", port)//設置InetSocketAddress讓服務器監聽某個端口已等待客戶端鏈接。 .childHandler(new ChannelInitializer<Channel>() {//設置childHandler執行全部的鏈接請求 @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new PacketHeadDecoder()); ch.pipeline().addLast(new PacketBodyDecoder()); ch.pipeline().addLast(new PacketHeadEncoder()); ch.pipeline().addLast(new PacketBodyEncoder()); ch.pipeline().addLast(new PacketHandler()); } }); // 最後綁定服務器等待直到綁定完成,調用sync()方法會阻塞直到服務器完成綁定,而後服務器等待通道關閉,由於使用sync(),因此關閉操做也會被阻塞。 ChannelFuture channelFuture = serverBootstrap.bind().sync(); System.out.println("Server started,port:" + channelFuture.channel().localAddress()); channelFuture.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new NettyServer(TCP_PORT).start(); } }
一、源碼: // NettyServer.java, Boss 線程管理組, 上面NettyServer.java中的示例代碼 bossGroup = new NioEventLoopGroup(1); // NioEventLoopGroup.java /** * Create a new instance using the specified number of threads, {@link ThreadFactory} and the * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}. */ public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null); } // NioEventLoopGroup.java public NioEventLoopGroup(int nThreads, Executor executor) { this(nThreads, executor, SelectorProvider.provider()); } // NioEventLoopGroup.java public NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider) { this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); } // NioEventLoopGroup.java public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); } // MultithreadEventLoopGroup.java /** * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...) */ protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { // DEFAULT_EVENT_LOOP_THREADS 默認爲CPU核數的2倍 super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); } // MultithreadEventExecutorGroup.java /** * Create a new instance. * * @param nThreads the number of threads that will be used by this instance. * @param executor the Executor to use, or {@code null} if the default should be used. * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */ protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); } // MultithreadEventExecutorGroup.java /** * Create a new instance. * * @param nThreads the number of threads that will be used by this instance. * @param executor the Executor to use, or {@code null} if the default should be used. * @param chooserFactory the {@link EventExecutorChooserFactory} to use. * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */ protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { // 小於或等於零都會直接拋異常,因而可知,要想使用netty,還得必須至少得有1個線程跑起來才能使用 throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (executor == null) { // 若是調用方不想本身定製線程池的話,那麼則用netty本身默認的線程池 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new EventExecutor[nThreads]; // 構建孩子結點數組,也就是構建NioEventLoopGroup持有的線程數組 for (int i = 0; i < nThreads; i ++) { // 循環線程數,依次建立實例化線程封裝的對象NioEventLoop boolean success = false; try { children[i] = newChild(executor, args); // 最終調用到了NioEventLoopGroup類中的newChild方法 success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } // 實例化選擇線程器,也就是說咱們要想執行任務,對於nThreads個線程,咱們得靠一個規則來如何選取哪一個具體線程來執行任務; // 那麼chooser就是來幹這個事情的,它主要是幫咱們選出須要執行任務的線程封裝對象NioEventLoop chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); } 二、主要講述了NioEventLoopGroup對象的實例化過程,這僅僅只是講了一半,由於還有一半是實例化children[i]子線程組; 三、每一個NioEventLoopGroup都配備了一個默認的線程池executor對象,並且同時也配備了一個選擇線程器chooser對象; 四、每一個NioEventLoopGroup都一個子線程組children[i],根據上層傳入的參數來決定子線程數量,默認數量爲CPU核數的2倍;
一、源碼: // MultithreadEventExecutorGroup.java, 最終調用到了NioEventLoopGroup類中的newChild方法 children[i] = newChild(executor, args); // NioEventLoopGroup.java @Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); } // NioEventLoop.java NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { // 調用父類的構造方法 // DEFAULT_MAX_PENDING_TASKS 任務隊列初始化容量值,默認值爲:Integer.MAX_VALUE // 若不想使用默認值的話,那麼就得本身配置 io.netty.eventLoop.maxPendingTasks 屬性值爲本身想要的值 super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } // 這個對象在NioEventLoopGroup的構造函數中經過SelectorProvider.provider()得到,而後一路傳參到此類 provider = selectorProvider; // 經過調用JDK底層類庫,爲每一個NioEventLoop配備一個多路複用器 final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; } // SingleThreadEventLoop.java protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) { // 調用父類的構造方法 super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler); // 構造任務隊列,最終會調用NioEventLoop的newTaskQueue(int maxPendingTasks)方法 tailTasks = newTaskQueue(maxPendingTasks); } // SingleThreadEventExecutor.java /** * Create a new instance * * @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it * @param executor the {@link Executor} which will be used for executing * @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the * executor thread * @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected. * @param rejectedHandler the {@link RejectedExecutionHandler} to use. */ protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { // 調用父類的構造方法 super(parent); this.addTaskWakesUp = addTaskWakesUp; // 添加任務時是否須要喚醒多路複用器的阻塞狀態 this.maxPendingTasks = Math.max(16, maxPendingTasks); this.executor = ObjectUtil.checkNotNull(executor, "executor"); taskQueue = newTaskQueue(this.maxPendingTasks); rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); } // AbstractScheduledEventExecutor.java protected AbstractScheduledEventExecutor(EventExecutorGroup parent) { // 調用父類的構造方法 super(parent); } // AbstractEventExecutor.java protected AbstractEventExecutor(EventExecutorGroup parent) { this.parent = parent; } 二、該流程主要實例化線程管理組的孩子結點children[i],孩子結點的類型爲NioEventLoop類型; 三、仔細一看,netty的開發者對命名也很講究,線程管理組的類名爲NioEventLoopGroup,線程管理組的子線程類名爲NioEventLoop, 有沒有發現有什麼不同的地方?其實就是差了個Group幾個字母,線程管理組天然以Group結尾,不是組的就天然沒有Group字母; 四、每一個NioEventLoop都持有組的線程池executor對象,方便添加task到任務隊列中; 五、每一個NioEventLoop都有一個selector多路複用器,而那些Channel就是註冊到這個玩意上面的; 六、每一個NioEventLoop都有一個任務隊列,並且這個隊列的初始化容器大小爲1024;
一、源碼: // SingleThreadEventLoop.java, 構造任務隊列,最終會調用NioEventLoop的newTaskQueue(int maxPendingTasks)方法 tailTasks = newTaskQueue(maxPendingTasks); // NioEventLoop.java @Override protected Queue<Runnable> newTaskQueue(int maxPendingTasks) { // This event loop never calls takeTask() // 因爲默認是沒有配置io.netty.eventLoop.maxPendingTasks屬性值的,因此maxPendingTasks默認值爲Integer.MAX_VALUE; // 那麼最後配備的任務隊列的大小也就天然使用無參構造隊列方法 return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue() : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks); } // PlatformDependent.java /** * Create a new {@link Queue} which is safe to use for multiple producers (different threads) and a single * consumer (one thread!). * @return A MPSC queue which may be unbounded. */ public static <T> Queue<T> newMpscQueue() { return Mpsc.newMpscQueue(); } // Mpsc.java static <T> Queue<T> newMpscQueue() { // 默認值 MPSC_CHUNK_SIZE = 1024; return USE_MPSC_CHUNKED_ARRAY_QUEUE ? new MpscUnboundedArrayQueue<T>(MPSC_CHUNK_SIZE) : new MpscUnboundedAtomicArrayQueue<T>(MPSC_CHUNK_SIZE); } 二、這裏主要看看NioEventLoop是如何構建任務隊列的,並且還構建了一個給定初始化容量值大小的隊列;
一、源碼: // NioEventLoop.java, 經過調用JDK底層類庫,爲每一個NioEventLoop配備一個多路複用器 final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; // NioEventLoop.java private SelectorTuple openSelector() { final Selector unwrappedSelector; try { // 經過 provider 調用底層獲取一個多路複用器對象 unwrappedSelector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } // DISABLE_KEYSET_OPTIMIZATION: 是否優化選擇器key集合,默認爲不優化 if (DISABLE_KEYSET_OPTIMIZATION) { return new SelectorTuple(unwrappedSelector); } // 執行到此,說明須要優化選擇器集合,首先建立一個選擇器集合 final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); // 而後經過反射找到SelectorImpl對象 Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { // 經過反射獲取SelectorImpl實現類對象 return Class.forName( "sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); } catch (Throwable cause) { return cause; } } }); if (!(maybeSelectorImplClass instanceof Class) || // ensure the current selector implementation is what we can instrument. !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) { if (maybeSelectorImplClass instanceof Throwable) { Throwable t = (Throwable) maybeSelectorImplClass; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t); } return new SelectorTuple(unwrappedSelector); } final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; // 如下run方法的主要目的就是將咱們本身建立的selectedKeySet選擇器集合經過反射替換底層自帶的選擇器集合 Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true); if (cause != null) { return cause; } cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true); if (cause != null) { return cause; } selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); return null; } catch (NoSuchFieldException e) { return e; } catch (IllegalAccessException e) { return e; } } }); if (maybeException instanceof Exception) { selectedKeys = null; Exception e = (Exception) maybeException; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e); return new SelectorTuple(unwrappedSelector); } // 反射執行完後,則將建立的selectedKeySet賦值爲當成員變量 selectedKeys = selectedKeySet; logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector); return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)); } 二、其實說得到多路複用器,倒不如說多路複用器從何而來,是經過provider調用provider.openSelector()方法而得到的; 三、而這個provider所產生的地方其內部是一個靜態變量,細心的童鞋會發現SelectorProvider.provider()這個裏面還真有一個靜態provider; 四、而這裏給用戶作了一個選擇是否須要優化選擇器,若是須要優化則用本身建立的選擇器經過反射塞到底層的多路複用器對象中;
一、源碼: // MultithreadEventExecutorGroup.java // 實例化選擇線程器,也就是說咱們要想執行任務,對於nThreads個線程,咱們得靠一個規則來如何選取哪一個具體線程來執行任務; // 那麼chooser就是來幹這個事情的,它主要是幫咱們選出須要執行任務的線程封裝對象NioEventLoop chooser = chooserFactory.newChooser(children); // DefaultEventExecutorChooserFactory.java @SuppressWarnings("unchecked") @Override public EventExecutorChooser newChooser(EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } } // PowerOfTwoEventExecutorChooser.java private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTwoEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; } } // GenericEventExecutorChooser.java private static final class GenericEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; GenericEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { return executors[Math.abs(idx.getAndIncrement() % executors.length)]; } } 二、記得在前面說過,在實例化線程組Group的時候,會實例化一個線程選擇器,而這個選擇器的實現方式也正是由經過線程數量來決定的; 三、PowerOfTwoEventExecutorChooser與GenericEventExecutorChooser的主要區別就是,當線程個數爲2的n次方的話,那麼則用PowerOfTwoEventExecutorChooser實例化的選擇器; 四、由於EventExecutorChooser的next()方法,一個是與操做,一個是求餘操做,而與操做的效率稍微高些,因此在選擇線程這個細小的差異,netty的開發人員也真實一絲不苟的處理;
因爲篇幅過長難以發佈,因此接下來的請看【原理剖析(第 011 篇)Netty之服務端啓動工做原理分析(下)】
詳見 原理剖析(第 011 篇)Netty之服務端啓動工做原理分析(下)react
https://gitee.com/ylimhhmily/SpringCloudTutorial.gitgit
SpringCloudTutorial交流QQ羣: 235322432segmentfault
SpringCloudTutorial交流微信羣: 微信溝通羣二維碼圖片連接數組
歡迎關注,您的確定是對我最大的支持!!!服務器