前言:數組
線程安全是Vert.x的重要特性,但這一特性是由它依賴的netty實現的,Vert.x只是直接拿過來使用。安全
這裏涉及到不少個類。框架
ContextImpl、EventLoopContext、NioEventLoop、和NioEventLoop的父類SingleThreadEventLoop、和NioEventLoop的爺爺類SingleThreadEventExecutor。async
原理:ide
Netty定義了EventExecutor事件執行器,用作對任務處理的封裝。執行器內部維護了Queue<Runable>oop
,實現了任務的順序執行。還定義了MultithreadEventExecutorGroup類,維護數組變量EventExecutor[] children,實現了多核CPU的利用; (數組隊列結構,很是像Hashmap的數組鏈表結構)。一個Verticle和一個ContextImpl對應,再有一個ContextImpl和一個EventExecutor對應,使全部對Verticle的操做都在一個Queue<Runable>中依次執行,實現了線程安全。this
代碼:線程
代碼1.構造器3d
對於佔了大部分的普通Verticle來講通常來講,會依次由VertxImpl.getOrCreateContext()、createEventLoopContext()、EventLoopContext構造方法、ContextImpl構造方法調用後,進入ContextImpl類代理
在建立ContextImpl 時 ,這下面的三個方法(或構造方法),
// 利用next(),從group中取一個。next()也實現了對group的平衡獲取
private static EventLoop getEventLoop(VertxInternal vertx) { EventLoopGroup group = vertx.getEventLoopGroup(); if (group != null) { return group.next(); } else { return null; } }
// 須要注意this的第2個參數是getEventLoop(vertx)方法的調用。才
protected ContextImpl(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config, ClassLoader tccl) { this(vertx, getEventLoop(vertx), internalBlockingPool, workerPool, deploymentID, config, tccl); }
// 簡單的賦值
protected ContextImpl(VertxInternal vertx, EventLoop eventLoop, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config,ClassLoader tccl) { if (DISABLE_TCCL && !tccl.getClass().getName().equals("sun.misc.Launcher$AppClassLoader")) { log.warn("You have disabled TCCL checks but you have a custom TCCL to set."); } this.deploymentID = deploymentID; this.config = config; this.eventLoop = eventLoop; this.tccl = tccl; this.owner = vertx; this.workerPool = workerPool; this.internalBlockingPool = internalBlockingPool; this.orderedTasks = new TaskQueue(); this.internalOrderedTasks = new TaskQueue(); this.closeHooks = new CloseHooks(log); }
完成對屬性private final EventLoop eventLoop;的賦值,即對ContextImpl和EventLoop的1對1綁定。
VertxImpl的構造方法中,會對它的成員變量 eventLoopGroup 賦值
eventLoopGroup = transport.eventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO);
在eventLoopGroup()方法爲:
public EventLoopGroup eventLoopGroup(int nThreads, ThreadFactory threadFactory, int ioRatio) { NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(nThreads, threadFactory); eventLoopGroup.setIoRatio(ioRatio); return eventLoopGroup; }
能夠看到實例化了一個 NioEventLoopGroup 做爲返回值。NioEventLoopGroup 就是若干個NioEventLoop的封裝,主要仍是看NioEventLoop。
用ctrl+alt+U查看下類圖,發現NioEventLoop的繼承結構有點複雜。能夠看到 Executor、SingleThreadEventExecutor。
Executor 定義了 void execute(Runnable command); -- 處理任務的方法
SingleThreadEventExecutor 實現了void execute(Runnable command);
並定義了重要的任務隊列 private final Queue<Runnable> taskQueue;
也看看 NioEventLoopGroup的類圖:
在他的父類MultithreadEventExecutorGroup,定義了private final EventExecutor[] children;
那麼,對前面的eventLoopGroup()方法裏的
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(nThreads, threadFactory);
這句話概括,就是擁有 EventExecutor[] children; 的對象,而EventExecutor實現了對Queue<Runnable> taskQueue;的操做。就是「原理」裏說的數組隊列結構。
代碼2. runOnContext
對Verticle的操做,最後都會統一到 ContextImpl.runOnContext()方法處理,好比EventBusImpl.deliverToHandler()
runOnContext做爲入口方法很簡單:
// Run the task asynchronously on this same context @Override public void runOnContext(Handler<Void> task) { try { executeAsync(task); } catch (RejectedExecutionException ignore) { // Pool is already shut down } }
executeAsync 有 abstract 關鍵字修飾,須要查看 ContextImpl 的子類EventLoopContext ,看看它是怎麼實現的
public void executeAsync(Handler<Void> task) { // No metrics, we are on the event loop. nettyEventLoop().execute(wrapTask(null, task, true, null)); }
這個wrapTask(代碼略)方法把屬於Vertx的Handler封裝成JDK的Runable,傳給netty框架處理。再使用execute()執行。下面的邏輯就是netty如何處理Runnable.
代碼3 SingleThreadEventExecutor.execute()
execute() 最上層的接口Executor定義的。NioEventLoop的父類SingleThreadEventExecutor 重寫了此方法.SingleThreadEventExecutor去執行execute() ,本身仍然仍是一個代理,會把真正執行運行線程的邏輯(相似方法名doExecute作的事情)的邏輯交給 private final Executor executor;執行
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); // 對Queue添加 addTask(Runnable task)--offerTask(Runnable task) --taskQueue.offer(task); 這一系列操做, // 完成了對 Queue<Runnable>的添加操做。 addTask(task); if (!inEventLoop) { // 執行 //SingleThreadEventExecutor.execute--> SingleThreadEventExecutor.startThread--> // SingleThreadEventExecutor.doStartThread. -->成員 Executor executor的execute(),實現是ThreadPerTaskExecutor的execute() startThread(); // 對Queue減小 if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
這個Executor executor 是在SingleThreadEventExecutor的構造方法中實例化的ThreadPerTaskExecutor,是屬於Netty框架的。可是,ThreadPerTaskExecutor包含一個接口屬性ThreadFactory threadFactory。針對Vertx框架的場景,new ThreadPerTaskExecutor(threadFactory) 中的 threadFactory是屬於Vertx框架的VertxThreadFactory。
protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory,boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { // 粗體代碼 this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler); }
public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; // 這個實際是Vertx框架下的VertxThreadFactory public ThreadPerTaskExecutor(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.threadFactory = threadFactory; } @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); // 最最最底層的Thread.start()方法。 } }
這個變量的源頭,很早很早前,由VertxImpl在調用時傳入的
eventLoopGroup = transport.eventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO);
因此,弄到如今,各類Factory包裹,N層邏輯。才最終仍是使用抽象工廠模式,調用了Vertx實現的工廠。
須要注意的是 , NioEventLoop重寫了newTaskQueue()方法,
@Override protected Queue<Runnable> newTaskQueue(int maxPendingTasks) { // This event loop never calls takeTask() return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue(): PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks); }
因此Queue<Runnable> taskQueue 擁有的不是在SingleThreadEventExecutor.newTaskQueue()裏的 LinkedBlockingQueue , 而是 MpscUnboundedArrayQueue。
public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.threadFactory = threadFactory; } @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); } }