Vert.x系列(五)--ContextImpl源碼分析

前言:數組

線程安全是Vert.x的重要特性,但這一特性是由它依賴的netty實現的,Vert.x只是直接拿過來使用。安全

這裏涉及到不少個類。框架

ContextImpl、EventLoopContext、NioEventLoop、和NioEventLoop的父類SingleThreadEventLoop、和NioEventLoop的爺爺類SingleThreadEventExecutor。async

 

原理:ide

Netty定義了EventExecutor事件執行器,用作對任務處理的封裝。執行器內部維護了Queue<Runable>oop

,實現了任務的順序執行。還定義了MultithreadEventExecutorGroup類,維護數組變量EventExecutor[] children,實現了多核CPU的利用; (數組隊列結構,很是像Hashmap的數組鏈表結構)。一個Verticle和一個ContextImpl對應,再有一個ContextImpl和一個EventExecutor對應,使全部對Verticle的操做都在一個Queue<Runable>中依次執行,實現了線程安全。this

 

代碼:線程

代碼1.構造器3d

對於佔了大部分的普通Verticle來講通常來講,會依次由VertxImpl.getOrCreateContext()、createEventLoopContext()、EventLoopContext構造方法、ContextImpl構造方法調用後,進入ContextImpl類代理

 

在建立ContextImpl 時 ,這下面的三個方法(或構造方法),

// 利用next(),從group中取一個。next()也實現了對group的平衡獲取

private static EventLoop getEventLoop(VertxInternal vertx) {
    EventLoopGroup group = vertx.getEventLoopGroup();
    if (group != null) {
        return group.next();
    } else {
        return null;
    }
}

// 須要注意this的第2個參數是getEventLoop(vertx)方法的調用。才

 

protected ContextImpl(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config, 
ClassLoader tccl) {
    this(vertx, getEventLoop(vertx), internalBlockingPool, workerPool, deploymentID, config, tccl);
}

 

// 簡單的賦值

protected ContextImpl(VertxInternal vertx, EventLoop eventLoop, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config,ClassLoader tccl) {
    if (DISABLE_TCCL && !tccl.getClass().getName().equals("sun.misc.Launcher$AppClassLoader")) {
        log.warn("You have disabled TCCL checks but you have a custom TCCL to set.");
    }
    this.deploymentID = deploymentID;
    this.config = config;
    this.eventLoop = eventLoop;
    this.tccl = tccl;
    this.owner = vertx;
    this.workerPool = workerPool;
    this.internalBlockingPool = internalBlockingPool;
    this.orderedTasks = new TaskQueue();
    this.internalOrderedTasks = new TaskQueue();
    this.closeHooks = new CloseHooks(log);
}

完成對屬性private final EventLoop eventLoop;的賦值,即對ContextImpl和EventLoop的1對1綁定。

VertxImpl的構造方法中,會對它的成員變量 eventLoopGroup 賦值

eventLoopGroup = transport.eventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO);

在eventLoopGroup()方法爲:

public EventLoopGroup eventLoopGroup(int nThreads, ThreadFactory threadFactory, int ioRatio) {
    NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(nThreads, threadFactory);
    eventLoopGroup.setIoRatio(ioRatio);
    return eventLoopGroup;
}

能夠看到實例化了一個 NioEventLoopGroup 做爲返回值。NioEventLoopGroup 就是若干個NioEventLoop的封裝,主要仍是看NioEventLoop。

用ctrl+alt+U查看下類圖,發現NioEventLoop的繼承結構有點複雜。能夠看到 Executor、SingleThreadEventExecutor。

Executor 定義了 void execute(Runnable command); -- 處理任務的方法

SingleThreadEventExecutor 實現了void execute(Runnable command);

並定義了重要的任務隊列 private final Queue<Runnable> taskQueue;

也看看 NioEventLoopGroup的類圖:

在他的父類MultithreadEventExecutorGroup,定義了private final EventExecutor[] children;

那麼,對前面的eventLoopGroup()方法裏的

NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(nThreads, threadFactory);

這句話概括,就是擁有 EventExecutor[] children; 的對象,而EventExecutor實現了對Queue<Runnable> taskQueue;的操做。就是「原理」裏說的數組隊列結構。

 

代碼2. runOnContext

對Verticle的操做,最後都會統一到 ContextImpl.runOnContext()方法處理,好比EventBusImpl.deliverToHandler()

runOnContext做爲入口方法很簡單:

// Run the task asynchronously on this same context

@Override
public void runOnContext(Handler<Void> task) {
    try {
        executeAsync(task);
    } catch (RejectedExecutionException ignore) {
    // Pool is already shut down
    }
}

executeAsync 有 abstract 關鍵字修飾,須要查看 ContextImpl 的子類EventLoopContext ,看看它是怎麼實現的

public void executeAsync(Handler<Void> task) {
// No metrics, we are on the event loop.
nettyEventLoop().execute(wrapTask(null, task, true, null));
}

這個wrapTask(代碼略)方法把屬於Vertx的Handler封裝成JDK的Runable,傳給netty框架處理。再使用execute()執行。下面的邏輯就是netty如何處理Runnable.

 

代碼3 SingleThreadEventExecutor.execute()

execute() 最上層的接口Executor定義的。NioEventLoop的父類SingleThreadEventExecutor 重寫了此方法.SingleThreadEventExecutor去執行execute() ,本身仍然仍是一個代理,會把真正執行運行線程的邏輯(相似方法名doExecute作的事情)的邏輯交給 private final Executor executor;執行

@Override
public void execute(Runnable task) {
    if (task == null) {
    throw new NullPointerException("task");
    }
    boolean inEventLoop = inEventLoop();
    // 對Queue添加 addTask(Runnable task)--offerTask(Runnable task) --taskQueue.offer(task); 這一系列操做, // 完成了對 Queue<Runnable>的添加操做。
    addTask(task);
    if (!inEventLoop) {
    // 執行 
    //SingleThreadEventExecutor.execute--> SingleThreadEventExecutor.startThread-->
    // SingleThreadEventExecutor.doStartThread. -->成員 Executor executor的execute(),實現是ThreadPerTaskExecutor的execute()
    startThread();
    // 對Queue減小
    if (isShutdown() && removeTask(task)) {
    reject();
    }
    }
    if (!addTaskWakesUp && wakesUpForTask(task)) {
    wakeup(inEventLoop);
    }
}

 

這個Executor executor 是在SingleThreadEventExecutor的構造方法中實例化的ThreadPerTaskExecutor,是屬於Netty框架的。可是,ThreadPerTaskExecutor包含一個接口屬性ThreadFactory threadFactory。針對Vertx框架的場景,new ThreadPerTaskExecutor(threadFactory) 中的 threadFactory是屬於Vertx框架的VertxThreadFactory。

protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory,boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
// 粗體代碼
this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
}
public final class ThreadPerTaskExecutor implements Executor {

    private final ThreadFactory threadFactory; // 這個實際是Vertx框架下的VertxThreadFactory
    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
           }
        this.threadFactory = threadFactory;
    }
    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start(); // 最最最底層的Thread.start()方法。
    }
}

這個變量的源頭,很早很早前,由VertxImpl在調用時傳入的

eventLoopGroup = transport.eventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO);

因此,弄到如今,各類Factory包裹,N層邏輯。才最終仍是使用抽象工廠模式,調用了Vertx實現的工廠。

 

須要注意的是 , NioEventLoop重寫了newTaskQueue()方法,

@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue(): PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}

因此Queue<Runnable> taskQueue 擁有的不是在SingleThreadEventExecutor.newTaskQueue()裏的 LinkedBlockingQueue , 而是 MpscUnboundedArrayQueue。

public final class ThreadPerTaskExecutor implements Executor {

private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
相關文章
相關標籤/搜索