再探Android線程池

前言

在以前的初Android線程池文章中,因爲時間緣由,只是對Android中的線程池作了一個初步的瞭解。那篇文章中初步瞭解了線程池的做用、爲何使用線程池、ThreadPoolExecutor類及其構造方法中的參數含義,最後還初步總結了一下線程池的運行過程。這篇文章將同過示例和源碼分析的角度對線程池的運行原理進行深刻分析。Let's go!!!html

基礎知識準備

在進行今天的探究以前,咱們仍是先了解幾個小的知識點。java

Runnable和Callable之間的區別

/** * The <code>Runnable</code> interface should be implemented by any * class whose instances are intended to be executed by a thread. The * class must define a method of no arguments called <code>run</code>. * <p> * This interface is designed to provide a common protocol for objects that * wish to execute code while they are active. For example, * <code>Runnable</code> is implemented by class <code>Thread</code>. * Being active simply means that a thread has been started and has not * yet been stopped. * <p> * In addition, <code>Runnable</code> provides the means for a class to be * active while not subclassing <code>Thread</code>. A class that implements * <code>Runnable</code> can run without subclassing <code>Thread</code> * by instantiating a <code>Thread</code> instance and passing itself in * as the target. In most cases, the <code>Runnable</code> interface should * be used if you are only planning to override the <code>run()</code> * method and no other <code>Thread</code> methods. * This is important because classes should not be subclassed * unless the programmer intends on modifying or enhancing the fundamental * behavior of the class. * * @author Arthur van Hoff * @see java.lang.Thread * @see java.util.concurrent.Callable * @since JDK1.0 */
@FunctionalInterface
public interface Runnable {
    /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see java.lang.Thread#run() */
    public abstract void run();
}
複製代碼

從註釋中咱們能夠看到,若是一個實體對象想被線程執行,就必須實現這個接口,同時還要實現一個無參的run方法。編程

/** * A task that returns a result and may throw an exception. * Implementors define a single method with no arguments called * {@code call}. * * <p>The {@code Callable} interface is similar to {@link * java.lang.Runnable}, in that both are designed for classes whose * instances are potentially executed by another thread. A * {@code Runnable}, however, does not return a result and cannot * throw a checked exception. * * <p>The {@link Executors} class contains utility methods to * convert from other common forms to {@code Callable} classes. * * @see Executor * @since 1.5 * @author Doug Lea * @param <V> the result type of method {@code call} */
@FunctionalInterface
public interface Callable<V> {
    /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */
    V call() throws Exception;
}
複製代碼

Callable中有返回值而且能夠拋出異常,它於Runnable類似,都是爲多線程準備的。可是,後者不能有返回值,也不能在其內部自動拋出異常。
好了,看到這裏咱們來總結一下它們的異同。緩存

一、它們兩個都是接口(雖然這是一句廢話)。
二、它們兩個都能編寫多線程(感受又是一句廢話)。
三、Callable中的方法有返回值,Runnable中的則沒有。
四、Callable中的方法中可以自動拋出異常,Runnable中須要手動拋出異常。
多線程

說了這麼多感受仍是有點抽象,咱們來看一下用法吧。併發

public abstract class RunnableImp implements Runnable{}
複製代碼
public class CallableImp implements Callable<String> {
    @Override
    public String call() throws Exception {
        return "我是Callable";
    }
}
複製代碼
Handler mHandler = new Handler() {
        @Override
        public void handleMessage(@NonNull Message msg) {
            switch (msg.what) {
                case 1:
                    tv.setText("");
                    tv.setText(msg.obj.toString());
                    break;
            }
        }
    };
    
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_second);
        Button btn_runable = findViewById(R.id.btn_runable);
        Button btn_callable = findViewById(R.id.btn_callable);
        tv = findViewById(R.id.tv);

        btn_runable.setOnClickListener(this);
        btn_callable.setOnClickListener(this);
    }
    
@Override
public void onClick(View v) {
        switch (v.getId()) {
            case R.id.btn_runable:
                executeRunnable();
                break;
            case R.id.btn_callable:
                executeCallable();
                break;
        }
    }
    
public void executeRunnable() {
        RunnableImp runnableImp = new RunnableImp() {
            @Override
            public void run() {
                Message message = Message.obtain();
                message.what = 1;
                message.obj = "我是Runnable";
                mHandler.handleMessage(message);
            }
        };

        Thread thread = new Thread(runnableImp);
        thread.start();
    }
    
public void executeCallable() {
        FutureTask<String> futureTask = new FutureTask<>(new CallableImp());
        new Thread(futureTask).start();
        try {
            Message message = Message.obtain();
            message.what = 1;
            message.obj = futureTask.get();
            mHandler.handleMessage(message);
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
複製代碼

第一部分代碼:建立一個抽象類實現Runnable接口。
第一部分代碼:建立一個抽象類實現Callable接口,同時重寫call()方法,返回一個String類型的值。
第三部分代碼:建立一個Activity,界面中有兩個按鈕,分別使RunnableCallcal的方式執行多線程,最後經過Handler將數據返回到主線程中改變界面。
app

上面的代碼沒有什麼難處,相信都能看得懂,咱們來看一下運行結果。 less

FutureTask

咱們在看上面多線程的例子時,發現線程能夠和FutureTask結合起來,那麼這個類究竟時幹什麼的呢?
dom

/** * A cancellable asynchronous computation. This class provides a base * implementation of {@link Future}, with methods to start and cancel * a computation, query to see if the computation is complete, and * retrieve the result of the computation. The result can only be * retrieved when the computation has completed; the {@code get} * methods will block if the computation has not yet completed. Once * the computation has completed, the computation cannot be restarted * or cancelled (unless the computation is invoked using * {@link #runAndReset}). * * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or * {@link Runnable} object. Because {@code FutureTask} implements * {@code Runnable}, a {@code FutureTask} can be submitted to an * {@link Executor} for execution. * * <p>In addition to serving as a standalone class, this class provides * {@code protected} functionality that may be useful when creating * customized task classes. * * @since 1.5 * @author Doug Lea * @param <V> The result type returned by this FutureTask's {@code get} methods */
public class FutureTask<V> implements RunnableFuture<V> {
    ...
}
複製代碼

我們呢先不着急看裏面的代碼,先瞅兩眼註釋,從註釋中咱們能獲取如下幾個信息。異步

一、它能夠取消的異步運算。
二、它Future的實現類(我擦,Future是什麼東東?無論了,等下再看)。
三、調用方法來讓其運行,也能取消運行。
四、只有當運行結束時才能看到運行的結果,當運行沒有結束時get()方法會被阻塞掉。
五、一旦運行結束,將不能從新啓動或者取消,除非調用runAndReset()方法
六、因爲其實現了Runnable接口,Futuretask能夠被看做Callable或者Runnable的包裝類,能夠在Executor類中執行提交。

RunnableFuture

咱們看到FutureTask實現了RunnableFuture接口,咱們看一下它是幹什麼的。

/** * A {@link Future} that is {@link Runnable}. Successful execution of * the {@code run} method causes completion of the {@code Future} * and allows access to its results. * @see FutureTask * @see Executor * @since 1.6 * @author Doug Lea * @param <V> The result type returned by this Future's {@code get} method */
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /** * Sets this Future to the result of its computation * unless it has been cancelled. */
    void run();
}
複製代碼

從上面咱們能夠看到RunnableFuture也一個接口,同時它繼承自RunnableFuture。執行run方法時可讓Future完成運行,而且還能獲取到運行的結果。
看到這裏,估計仍是不知道這些講的都是啥。彆着急,讓咱們繼續往下看。

Future

Runnable咱們已經看過了,讓咱們看一個Future

/** * A {@code Future} represents the result of an asynchronous * computation. Methods are provided to check if the computation is * complete, to wait for its completion, and to retrieve the result of * the computation. The result can only be retrieved using method * {@code get} when the computation has completed, blocking if * necessary until it is ready. Cancellation is performed by the * {@code cancel} method. Additional methods are provided to * determine if the task completed normally or was cancelled. Once a * computation has completed, the computation cannot be cancelled. * If you would like to use a {@code Future} for the sake * of cancellability but not provide a usable result, you can * declare types of the form {@code Future<?>} and * return {@code null} as a result of the underlying task. * * <p> * <b>Sample Usage</b> (Note that the following classes are all * made-up.) * * <pre> {@code * interface ArchiveSearcher { String search(String target); } * class App { * ExecutorService executor = ... * ArchiveSearcher searcher = ... * void showSearch(final String target) * throws InterruptedException { * Future<String> future * = executor.submit(new Callable<String>() { * public String call() { * return searcher.search(target); * }}); * displayOtherThings(); // do other things while searching * try { * displayText(future.get()); // use future * } catch (ExecutionException ex) { cleanup(); return; } * } * }}</pre> * * The {@link FutureTask} class is an implementation of {@code Future} that * implements {@code Runnable}, and so may be executed by an {@code Executor}. * For example, the above construction with {@code submit} could be replaced by: * <pre> {@code * FutureTask<String> future = * new FutureTask<>(new Callable<String>() { * public String call() { * return searcher.search(target); * }}); * executor.execute(future);}</pre> * * <p>Memory consistency effects: Actions taken by the asynchronous computation * <a href="package-summary.html#MemoryVisibility"> <i>happen-before</i></a> * actions following the corresponding {@code Future.get()} in another thread. * * @see FutureTask * @see Executor * @since 1.5 * @author Doug Lea * @param <V> The result type returned by this Future's {@code get} method */
public interface Future<V> {
    //在任務執行完以前可否取消任務,返回false說明任務已經執行完畢,反之返回true。
    boolean cancel(boolean mayInterruptIfRunning);
    //判斷任務完成以前是否被取消,返回true表示任務完成以前被取消。
    boolean isCancelled();
    //任務完成以後返回true。
    boolean isDone();
    //當任務執行結束以後能夠獲取執行結果。
    V get() throws InterruptedException, ExecutionException;
    //在給定的時間內等待任務的完成,若是等待時任務完成返回執行結果,不然將會拋出TimeoutException異常。
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
複製代碼

從類的註釋咱們能夠看到,Future是一個接口,他提供給了咱們方法來檢測當前的任務是否已經結束,還能夠等待任務結束而且拿到一個結果,經過調用Futureget()方法能夠當任務結束後返回一個結果值,若是工做沒有結束,則會阻塞當前線程,直到任務執行完畢,咱們能夠經過調用cancel()方法來中止一個任務,若是任務已經中止,則cancel()方法會返回true;若是任務已經完成或者已經中止了或者這個任務沒法中止,則cancel()會返回一個false。當一個任務被成功中止後,他沒法再次執行。isDone()isCancel()方法能夠判斷當前工做是否完成和是否取消。

小結

對於FutureFutureTask之間的關係,咱們能夠總結成一張圖。

回想如下咱們在寫 CallableRunnable例子時,咱們也用到了 FutureTask進行多線程操做,而且它於 Runnable方式相比較來看, FutureTask能夠經過 get()方法獲取線程的執行結果,而普通的方式則不能。同時, FuturTask在多線程任務執行的過程當中,能夠對任務作一系列操做,例如:取消、查看任務是否執行完畢等等。

若是想了解更多有關Future和FutureTask等相關知識能夠參考如下幾篇文章:
Java併發編程:Callable、Future和FutureTask原理解析
Java FutureTask 源碼分析 Android上的實現
FutureTask的用法及兩種經常使用的使用場景

線程池使用舉例

說了這麼多,咱們來動手寫幾個例子吧,畢竟光說不練假把式。

execute方式

public class ExecuteTask implements Runnable {
    private int taskNum;

    public ExecuteTask(int taskNum) {
        this.taskNum = taskNum;
    }

    @Override
    public void run() {
        Log.i("ThreadPoolDemoExecute", "當前執行task" + taskNum);
        try {
            Thread.currentThread().sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Log.i("ThreadPoolDemoExecute", "task:" + taskNum + "執行完畢");
    }
}

public void execute() {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2
                , 4
                , 200
                , TimeUnit.SECONDS
                , new ArrayBlockingQueue<Runnable>(5));

        for (int i = 0; i < 8; i++) {
            ExecuteTask task = new ExecuteTask(i);
            executor.execute(task);
            Log.i("ThreadPoolDemoExecute", "線程池中線程數目:" + executor.getPoolSize());
            Log.i("ThreadPoolDemoExecute", "線程池中等待執行的任務數:" + executor.getQueue().size());
            Log.i("ThreadPoolDemoExecute", "已經執行完的任務數:" + executor.getCompletedTaskCount());
        }
        executor.shutdown();
    }
複製代碼

在這個例子中首先建立了一個ExecuteTask類,使其實現Runnable接口。在run方法中讓當前線程sleep2秒,至關因而執行時間。
而後使用ThreadPoolExecutor建立線程池,因爲以前文章已經對這幾個參數作了解釋,這裏就再也不講解。
最後經過循環的方式讓建立ExecuteTask對象,並調用ThreadPoolExecutorexecute方法。讓咱們看一下運行結果。

submit方式

public class Data {
    private String name;

    public Data(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}


public class SubmitTask implements Runnable {
    private Data data;
    public SubmitTask(Data data) {
        this.data = data;
    }

    @Override
    public void run() {
        Log.i("ThreadPoolDemoSubmit", "當前執行" + data.getName());
    }
}

public void submit() {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2
                , 4
                , 200
                , TimeUnit.SECONDS
                , new ArrayBlockingQueue<Runnable>(5));

        for (int i = 0; i < 8; i++) {
            Data data = new Data("任務" + i);
            SubmitTask task = new SubmitTask(data);
            Future<Data> future = executor.submit(task, data);
            try {
                Log.i("ThreadPoolDemoSubmit", future.get().getName() + "執行完畢");
            } catch (ExecutionException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        executor.shutdown();
}
複製代碼

在使用這種方式時,仍是先建立了SubmitTask類,同時也讓它實現了Runnable接口,重寫run方法。
其次仍是建立出線程池對象,而後經過循環的方式建立Data對象,同時也建立相應的SubmitTask對象,並將Data對象放入到SubmitTask對象中去。
最後,調用submit方法傳入SubmitTaskData對象,這個方法將會返回一個Future對象,在以前講解Future時知道能夠經過get()方法獲取任務的執行結果,運行結果以下圖。

execute方法解析

在第一個例子中咱們使用的是execute方法向線程池中增長任務,那麼咱們就看一下這方法的源碼。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE – 3; //32-3=29,線程數量所佔位數
private static final int CAPACITY = (1 << COUNT_BITS) – 1;//低29位表示最大線程數,229-1

//五種線程池狀態
private static final int RUNNING = -1 << COUNT_BITS;    //int型變量高3位(含符號位)101表RUNING
private static final int SHUTDOWN = 0 << COUNT_BITS;    //高3位000
private static final int STOP = 1 << COUNT_BITS;    //高3位001
private static final int TIDYING = 2 << COUNT_BITS;    //高3位010
private static final int TERMINATED = 3 << COUNT_BITS;    //高3位011

private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
        int c = ctl.get(); //1
        if (workerCountOf(c) < corePoolSize) { //2
            if (addWorker(command, true)) //3
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { //4
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command)) //5
                reject(command); 
            else if (workerCountOf(recheck) == 0) //6
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) //7
            reject(command);
    }
複製代碼

註釋上說添加的過程主要分爲三個步驟:

一、若是當前線程數小於核心線程數,則嘗試新開一個線程給這個任務。調用addWorker方法時經過原子性檢查當前線程池的運行狀態和任務個數,經過返回false來避免在不該該添加線程的地方出現錯誤。
二、若是一個任務能夠被成功的添加到隊列中去,但仍然須要第二次檢查是否真的須要添加一個線程(存在上次檢查以後有一個線程死掉的情況,或者進入這個方法時線程池已經關閉)。判斷是否有必要須要回滾或者新建一個線程。
三、若是不能將任務放入任務棧中,仍然會仙劍一個線程。當失敗時咱們就能知道線程池已經關閉或者線程池已經達到上限了,此時將會執行拒接策略。

說了這麼多,其實就是在執行execute()方法時作了多重的判斷操做,而且屢次嘗試爲任務新建線程將其放入到任務棧,若是實在放不進去說明線程池已經滿了或者已經再也不工做了。
上述只是關於添加任務執行過程的大體說明,下面咱們將看一下代碼細節,可是在看代碼細節以前仍是看一下有關線程池的幾種狀態。

RUNNING:能接受新提交的任務,而且也能處理阻塞隊列中的任務

SHUTDOWN:關閉狀態,再也不接受新提交的任務,但卻能夠繼續處理阻塞隊列中已保存的任務。在線程池處於 RUNNING 狀態時,調用 shutdown()方法會使線程池進入到該狀態。(finalize() 方法在執行過程當中也會調用shutdown()方法進入該狀態)

STOP:不能接受新任務,也不處理隊列中的任務,會中斷正在處理任務的線程。在線程池處於 RUNNINGSHUTDOWN狀態時,調用 shutdownNow()方法會使線程池進入到該狀態

TIDYING:若是全部的任務都已終止了,workerCount (有效線程數) 爲0,線程池進入該狀態後會調用 terminated() 方法進入TERMINATED 狀態

TERMINATED:在terminated() 方法執行完後進入該狀態,默認terminated()方法中什麼也沒有作

代碼細節對應註釋

註釋1:經過ctl.get()方法獲取一個值,這個值中包含了當前線程池的狀態和有效線程數。
註釋2:看看當前線程數是否小於核心線程數。
註釋3:若是當前線程數小於核心線程數,會調用addWork方法將任務傳入,同時傳入一個true值表示建立核心線程。
註釋4:若是當前線程數不小於核心線程數時,這時要判斷當線程池是否在運行,同時還要判斷任務棧是否能成功插入任務。
註釋5:在知足註釋4的條件後,還會從新檢查當前線程池的運行狀態,若是不是就將任務從任務棧中移除,同時執行拒絕策略。
註釋6:若是當先線程池處於運行狀態,而且當前線程池的線程數爲0,這時將會調用addWork方法,但此時傳入的firstTask爲空,而且也不是核心線程。
註釋7:再次執行了addWorker方法而且將任務傳入,同時建立的不是核心線程,若是此次添加任務仍是失敗,將會執行拒絕策略。

addWorker(Runnable firstTask, boolean core)

execute方法的執行流程咱們已經看完了,其中向線程池中插入線程的核心方法是addWorker(Runnable firstTask, boolean core)方法,咱們再來看一下這個方法。

/** * Checks if a new worker can be added with respect to current * pool state and the given bound (either core or maximum). If so, * the worker count is adjusted accordingly, and, if possible, a * new worker is created and started, running firstTask as its * first task. This method returns false if the pool is stopped or * eligible to shut down. It also returns false if the thread * factory fails to create a thread when asked. If the thread * creation fails, either due to the thread factory returning * null, or due to an exception (typically OutOfMemoryError in * Thread.start()), we roll back cleanly. * * @param firstTask the task the new thread should run first (or * null if none). Workers are created with an initial first task * (in method execute()) to bypass queuing when there are fewer * than corePoolSize threads (in which case we always start one), * or when the queue is full (in which case we must bypass queue). * Initially idle threads are usually created via * prestartCoreThread or to replace other dying workers. * * @param core if true use corePoolSize as bound, else * maximumPoolSize. (A boolean indicator is used here rather than a * value to ensure reads of fresh values after checking other pool * state). * @return true if successful */
     private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // 1 查看線程池是否能夠接收新的任務
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            for (;;) {
                int wc = workerCountOf(c);
                // 2 看一下當前線程的數量是否超過了容量
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 3 調用compareAndIncrementWorkerCount方法使用CAS方法增長線程
                // 若是增長失敗,須要跳出從新執行
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 4 相間一個Worker對象,並將任務封裝到該對象中去
            // 經過線程工廠爲Worker建立一個獨有的線程
            w = new Worker(firstTask);
            // 5 獲取這個線程
            final Thread t = w.thread;
            if (t != null) {
                // 7 獲取重入鎖,並鎖住
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    // 8 判斷線程池處於運行狀態,或者線程池關閉且任務線程爲空
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 9 若是線程已經啓動,須要拋出異常
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        // 10 private final HashSet<Worker> wokers = new HashSet<Worker>();
                        // 包含線程池中全部的工做線程,只有在獲取了全局的時候才能訪問它。
                        // 將新構造的工做線程加入到工做線程集合中。
                        workers.add(w);
                        // 11 設置largestPoolSize和workerAdded
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    // 12 解鎖
                    mainLock.unlock();
                }
                // 13 添加成功執行線程
                if (workerAdded) {
                     //在被構造爲Worker工做線程,且被加入到工做線程集合中後,執行線程任務,
                     //注意這裏的start實際上執行Worker中run方法,因此接下來分析Worker的run方法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    
     private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
        ...
        Worker(Runnable firstTask) {
            setState(-1); 
            this.firstTask = firstTask;
            // 經過線程工廠建立線程
            this.thread = getThreadFactory().newThread(this);
        }
        
        //執行任務
        public void run() {
            runWorker(this);
        }
        
        ...
    }
    
    private void addWorkerFailed(Worker w) {
        // 獲取鎖對象
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 將任務移除
            if (w != null)
                workers.remove(w);
            // 減小任務個數
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
複製代碼

對這個方法的分析都在代碼中的註釋,同時在這裏跟你們提一下關於CAS的相關知識,你們能夠參考Java CAS 原理剖析這篇文章。

小結

execute方法已經分析完畢了,仍是用那副經典圖來做爲總結。

submit方法解析

咱們以前也使用了submit方法舉例,因此接下來咱們來看一下ThreadPoolExecutorsubmit方法。

public abstract class AbstractExecutorService implements ExecutorService {
    ...
    /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    ...
}
複製代碼

可是當咱們點進去看時,這裏又到了AbstractExecutorService類中。到這裏咱們從前面的線程池類的繼承結構中知道Executor接口中是沒有submit方法的,它只是定義了一個execute方法,而是在ExecutorService接口中定義了submit方法,AbstractExecutorService又實現了ExecutorService接口,同時實現了submit方法。最後ThreadPoolExecutor又繼承了AbstractExecutorService類。
最終在執行AbstractExecutorService.submit方法時又調用了子類ThreadPoolExecutor````中的execute方法,這種寫法實際上是[模板方法模式](https://www.cnblogs.com/yulinfeng/p/5902164.html):AbstractExecutorService.submit方法只是一個架子,具體的實現交由子類ThreadPoolExecutor.execute```方法來實現。

這個方法有三個重載,其中咱們在舉例時是用的第二種。

第一種方法:單獨傳入一個Runnable可是沒有傳入返回值類型,在返回的RunnableFuture對象的泛型時傳入的是一個Void,也就是說這種方法沒法獲取線程執行的返回值。

第二種方法:傳入一個Runnable對象,同時傳入了一個返回值的載體,經過載體間接獲取線程執行的結果。

第三種方法:傳入一個Callable方法,同時定義了返回值的類型。以前也講過Callable的實現對象中須要重寫call方法,在call方法中有返回值,這個返回值最後被定義給了RunnableFuture對象,經過get方法能夠獲取到。

Executors

上面講的都是咱們手動建立線程池,下面咱們來介紹一個類,它能夠幫咱們自動建立一個線程池,它就是Executors

/** * Factory and utility methods for {@link Executor}, {@link * ExecutorService}, {@link ScheduledExecutorService}, {@link * ThreadFactory}, and {@link Callable} classes defined in this * package. This class supports the following kinds of methods: * * <ul> * <li>Methods that create and return an {@link ExecutorService} * set up with commonly useful configuration settings. * <li>Methods that create and return a {@link ScheduledExecutorService} * set up with commonly useful configuration settings. * <li>Methods that create and return a "wrapped" ExecutorService, that * disables reconfiguration by making implementation-specific methods * inaccessible. * <li>Methods that create and return a {@link ThreadFactory} * that sets newly created threads to a known state. * <li>Methods that create and return a {@link Callable} * out of other closure-like forms, so they can be used * in execution methods requiring {@code Callable}. * </ul> * * @since 1.5 * @author Doug Lea */
public class Executors {
    ...
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
    ...
}
複製代碼

這個類至關於一個工廠,只須要咱們調用其靜態方法,就能爲咱們建立相應的線程池。這個類中有不少用於建立線程池的靜態方法,這裏只是找了5個比較具備表明性的方法,咱們來分析一下。

newFixedThreadPool:建立一個特定長度的線程池,能夠控制併發的數量,超過長度的線程在隊列中等待。
newSingleThreadExecutor:建立一個單線程的線程池,而且只用這一個線程執行任務。
newCachedThreadPool:建立一個可緩存的線程池,這個線程池能夠是無限大。
newScheduledThreadPool:建立一個定長的線程池,支持定時或週期性的執行任務。
newWorkStealingPool:建立一個多任務的線程池,減小鏈接數,適用於耗時的操做。

這個類幫助咱們自動建立的確很方便,可是也存在一些問題:

newCachedThreadPool方法中傳入的最大線程數爲Integer.MAX_VALUE,可是核心線程數爲0,因此短期內大量任務進來後,會建立有不少線程池對象,在資源有限的狀況下容易致使OOM
newFixedThreadPoolnewSingleThreadExecutor使用的任務隊列爲LinkedBlockingQueue,去看它的源碼能夠知道,它的隊列長度是Integer.MAX_VALUE,因此這兩個方法也存在同newCachedThreadPool同樣的問題。

總結

有關Android線程池相關的分析到這裏就告一段落了,若是有哪裏寫的不對的地方,還請各位大佬批評指正,本人將不勝感激。

參考資料

Java併發編程:Callable、Future和FutureTask原理解析
Java FutureTask 源碼分析 Android上的實現
FutureTask的用法及兩種經常使用的使用場景
Java CAS 原理剖析
模板方法模式

相關文章
相關標籤/搜索