jdk源代碼concurrent說明及底層執行簡單流程

clipboard.png

一.類圖說明緩存

  1. Executor接口類,執行Runnable接口execute。函數

  2. ExecutorService接口類繼承Executor接口,包含提交執行Runnable和Callable接口submit以及shutdown,invokeAll接口。this

  3. ScheduledExecutorService接口類繼承ExecutorService接口,主要包含計劃執行接口schedule,scheduleAtFixedRate以及scheduleWithFixedDelay。spa

  4. 虛類AbstractExecutorService,實現ExecutorService接口。實現ExecutorService相關接口。線程

  5. ThreadPoolExecutor類,繼承虛類AbstractExecutorService。實現初始化線程池數量、策略並執行線程。code

  6. ScheduledThreadPoolExecutor類,繼承ThreadPoolExecutor類並實現ScheduledExecutorService接口。實現計劃執行相關接口。對象

  7. 執行類,定義ThreadPoolExecutor和ScheduledThreadPoolExecutor類,並使用相關concurrent類方法。繼承

二.簡述下從定義線程池,到其底層執行的簡單流程接口

ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.submit(new NewTask());

簡單定義瞭如上兩句代碼,JDK自動建立3個固定大小的線程的線程池,submit實現了Runnable接口的NewTask對象之後,JDK自動啓動線程並執行NewTask對象的run方法。流程是如何的呢?隊列

1.Executors的newFixedThreadPool方法new了一個ThreadPoolExecutor對象,且new了一個LinkedBlockingQueue對象。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

2.調用ThreadPoolExecutor的構造函數,其中ThreadFactory默認使用defaultThreadFactory,defaultHandler爲AbortPolicy。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

3.初始化之後,調用AbstractExecutorService類的submit方法,執行Runnable對象。

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

4.RunnableAdapter實現了Callable接口,FutureTask類包含callable類成員對象。
FutureTask實現RunnableFuture接口,RunnableFuture繼承Runnable接口。因此newTaskFor(task, null)方法返回一個FutureTask對象。

public class FutureTask<V> implements RunnableFuture<V>

public interface RunnableFuture<V> extends Runnable, Future<V>

RunnableFuture<Void> ftask = newTaskFor(task, null);

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}

static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}

5.關鍵的調用ThreadPoolExecutor類execute方法,管理線程池並執行Runnable任務。主要邏輯以下:
1)若是線程數量小於corePoolSize,新來一個任務,建立一個新線程。
2)若是線程數量等於corePoolSize,則將任務緩存到workQueue中。
3)若是線程數量等於corePoolSize而且workQueue隊列已滿,則繼續建立線程直到等於maximumPoolSize。
4)若是線程數量等於maximumPoolSize,且workQueue隊列已滿,則根據defaultHandler策略執行相應措施。默認是AbortPolicy,拋出一個運行時異常RejectedExecutionException。另外還有3種策略:CallerRunsPolicy->若是線程池沒有shutdown,則直接調用Runnable的run方法執行。若是線程池shutdown,則直接丟棄;DiscardPolicy->直接丟棄,沒有任何異常;DiscardOldestPolicy->丟棄最老的任務,並調用線程池的execute方法執行,若是線程池沒有shutdown。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
相關文章
相關標籤/搜索