源碼|從串行線程封閉到對象池、線程池

今天講一個牛逼而實用的概念,串行線程封閉對象池是串行線程封閉的典型應用場景;線程池糅合了對象池技術,但核心實現不依賴於對象池,很容易產生誤會。本文從串行線程封閉和對象池入手,最後經過源碼分析線程池的核心原理,釐清對象池與線程池之間的誤會。java

JDK版本:oracle java 1.8.0_102git

線程封閉與串行線程封閉

線程封閉

線程封閉是一種常見的線程安全設計策略:僅在固定的一個線程內訪問對象,不對其餘線程共享github

使用線程封閉技術,對象O始終只對一個線程T1可見,「單線程」中天然不存在線程安全的問題。web

ThreadLocal是經常使用的線程安全工具,見源碼|ThreadLocal的實現原理。線程封閉在Servlet及高層的web框架Spring等中應用很多。面試

串行線程封閉

線程封閉雖然好用,卻限制了對象的共享。串行線程封閉改進了這一點:對象O只能由單個線程T1擁有,但能夠經過安全的發佈對象O來轉移O的全部權;在轉移全部權後,也只有另外一個線程T2能得到這個O的全部權,而且發佈O的T1不會再訪問O數據庫

所謂「全部權」,指修改對象的權利。安全

相對於線程封閉,串行線程封閉使得任意時刻,最多僅有一個線程擁有對象的全部權。固然,這不是絕對的,只要線程T1事實不會再修改對象O,那麼就至關於僅有T2擁有對象的全部權。串行線層封閉讓對象變得能夠共享(雖然只能串行的擁有全部權),靈活性獲得大大提升;相對的,要共享對象就涉及安全發佈的問題,依靠BlockingQueue等同步工具很容易實現這一點。併發

對象池是串行線程封閉的經典應用場景,如數據庫鏈接池等。oracle

對象池

對象池利用了串行封閉:將對象O「借給」一個請求線程T1,T1使用完再交還給對象池,並保證「未擅自發布該對象」且「之後再也不使用」;對象池收回O後,等T2來借的時候再把它借給T2,完成對象全部權的傳遞框架

猴子擼了一個簡化版的線程池,用戶只須要覆寫newObject()方法:

public abstract class AbstractObjectPool<T> {
  protected final int min;
  protected final int max;
  protected final List<T> usings = new LinkedList<>();
  protected final List<T> buffer = new LinkedList<>();
  private volatile boolean inited = false;

  public AbstractObjectPool(int min, int max) {
    this.min = min;
    this.max = max;
    if (this.min < 0 || this.min > this.max) {
      throw new IllegalArgumentException(String.format(
          "need 0 <= min <= max <= Integer.MAX_VALUE, given min: %s, max: %s", this.min, this.max));
    }
  }

  public void init() {
    for (int i = 0; i < min; i++) {
      buffer.add(newObject());
    }
    inited = true;
  }

  protected void checkInited() {
    if (!inited) {
      throw new IllegalStateException("not inited");
    }
  }

  abstract protected T newObject();

  public synchronized T getObject() {
    checkInited();

    if (usings.size() == max) {
      return null;
    }
    if (buffer.size() == 0) {
      T newObj = newObject();
      usings.add(newObj);
      return newObj;
    }
    T oldObj = buffer.remove(0);
    usings.add(oldObj);
    return oldObj;
  }

  public synchronized void freeObject(T obj) {
    checkInited();
    if (!usings.contains(obj)) {
      throw new IllegalArgumentException(String.format("obj not in using queue: %s", obj));
    }

    usings.remove(usings.indexOf(obj));
    buffer.add(obj);
  }
}
複製代碼

AbstractObjectPool具備如下特性:

  • 支持設置最小、最大容量
  • 對象一旦申請就再也不釋放,避免了GC

雖然很簡單,但大能夠用於一些時間敏感、資源充裕的場景。若是時間進一步敏感,可將getObject()、freeObject()改寫爲併發程度更高的版本,但記得保證安全發佈安全回收;若是資源不那麼充裕,能夠適當增長對象回收策略。

能夠看到,一個對象池的基本行爲包括:

  • 建立對象newObject()
  • 借取對象getObject()
  • 歸還對象freeObject()

典型的對象池有各類鏈接池、常量池等,應用很是多,模型也大同小異,不作解析。使人迷惑的是線程池,很容易讓人誤覺得線程池的核心原理也是對象池,下面來追一遍源碼。

線程池

首先擺出結論:線程池糅合了對象池模型,但核心原理是生產者-消費者模型

繼承結構以下:

image.png

用戶能夠將Runnable(或Callables)實例提交給線程池,線程池會異步執行該任務,返回響應的結果(完成/返回值)。

猴子最喜歡的是submit(Callable<T> task)方法。咱們從該方法入手,逐步深刻函數棧,探究線程池的實現原理。

submit()

submit()方法在ExecutorService接口中定義,AbstractExecutorService實現,ThreadPoolExecutor直接繼承。

public abstract class AbstractExecutorService implements ExecutorService {
...
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
...
}
複製代碼

AbstractExecutorService#newTaskFor()建立一個RunnableFuture類型的FutureTask。

核心是execute()方法。

execute()

execute()方法在Executor接口中定義,ThreadPoolExecutor實現。

public class ThreadPoolExecutor extends AbstractExecutorService {
...
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
...
}
複製代碼

咱們暫且忽略線程池的池化策略。關注一個最簡單的場景,看能不能先回答一個問題:線程池中的任務如何執行?

核心是addWorker()方法。以8行的參數爲例,此時,線程池中的線程數未達到最小線程池大小corePoolSize,一般能夠直接在9行返回。

addWorker()

簡化以下:

public class ThreadPoolExecutor extends AbstractExecutorService {
...
    private boolean addWorker(Runnable firstTask, boolean core) {
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN) {
                        workers.add(w);
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
...
}
複製代碼

我去掉了不少用於管理線程池、維護線程安全的代碼。假設線程池未關閉,worker(即w,下同)添加成功,則必然可以將worker添加至workers中。workers是一個HashSet:

private final HashSet<Worker> workers = new HashSet<Worker>();
複製代碼

哪裏是對象池?

若是說與對象池有關,那麼workers即至關於示例代碼中的using,應用了對象池模型;只不過這裏的using是一直增加的,直到達到最大線程池大小maximumPoolSize。

可是很明顯,線程池並無將線程發佈出去,workers也僅僅完成using「保存線程」的功能。那麼,線程池中的任務如何執行呢?跟線程池有沒有關係?

哪裏又不是?

注意九、1七、24行:

  • 9行將咱們提交到線程池的firstTask封裝入一個worker。
  • 17行將worker加入workers,維護起來
  • 24行則啓動了worker中的線程t

核心在與這三行,但線程池並無直接在addWorker()中啓動任務firstTask,代之以啓動一個worker。最終任務必然被啓動,那麼咱們繼續看Worker如何啓動這個任務。

Worker

Worker實現了Runnable接口:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
...
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker */
    public void run() {
        runWorker(this);
    }
...
}
複製代碼

爲何要將構造Worker時的參數命名爲firstTask?由於當且僅當須要創建新的Worker以執行任務task時,纔會調用構造函數。所以,任務task對於新Worker而言,是第一個任務firstTask。

Worker的實現很是簡單:將本身做爲Runable實例,構造時在內部建立並持有一個線程thread。Thread和Runable的使用你們很熟悉了,核心是Worker的run方法,它直接調用了runWorker()方法。

runWorker()

敲黑板!!!

重頭戲來了。簡化以下:

public class ThreadPoolExecutor extends AbstractExecutorService {
...
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
...
}
複製代碼

咱們在前面將要執行的任務賦值給firstTask,5-6行首先取出任務task,並將firstTask置爲null。由於接下來要執行task,firstTask字段就沒有用了。

重點是10-31行的while循環。下面分狀況討論。

case1:第一次進入循環,task不爲null

case1對應前面做出的諸多假設。

第一次進入循環時,task==firstTask,不爲null,使10行布爾短路直接進入循環;從而16行執行的是firstTask的run()方法;異常處理不表;最後,finally代碼塊中,task會被置爲null,致使下一輪循環會進入case2。

case2:非第一次進入循環,task爲null

case2是更廣泛的狀況,也就是線程池的核心。

case1中,task被置爲了null,使10行布爾表達式執行第二部分(task = getTask()) != null(getTask()稍後再講,它返回一個用戶已提交的任務)。假設task獲得了一個已提交的任務,從而16行執行的是新得到的任務task的run()方法。後同case1,最後task仍然會被置爲null,之後循環都將進入case2。

getTask()

任務從哪來呢?簡化以下:

public class ThreadPoolExecutor extends AbstractExecutorService {
...
    private Runnable getTask() {
        boolean timedOut = false;

        for (;;) {
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
...
}
複製代碼

咱們先看最簡單的,19-28行。

首先,workQueue是一個線程安全的BlockingQueue,大部分時候使用的實現類是LinkedBlockingQueue,見源碼|併發一枝花之BlockingQueue

private final BlockingQueue<Runnable> workQueue;
複製代碼

假設timed爲false,則調用阻塞的take()方法,返回的r必定不是null,從而12行退出,將任務交給了某個worker線程。

一個小細節有點意思:前面每一個worker線程runWorker()方法時,在循環中加鎖粒度在worker級別,直接使用的lock同步;但由於每個woker都會調用getTask(),考慮到性能因素,源碼中getTask()中使用樂觀的CAS+SPIN實現無鎖同步。關於樂觀鎖和CAS,能夠參考個人另外一篇文章源碼|併發一枝花之ConcurrentLinkedQueue【僞】

workQueue中的元素從哪來呢?這就要回顧execute()方法了。

execute()

public class ThreadPoolExecutor extends AbstractExecutorService {
...
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
...
}
複製代碼

前面以8行的參數爲例,此時,線程池中的線程數未達到最小線程池大小corePoolSize,一般能夠直接在9行返回。進入8行的條件是「當前worker數小於最小線程池大小corePoolSize」

若是不知足,會繼續執行到12行。isRunning(c)判斷線程池是否未關閉,咱們關注未關閉的狀況;則會繼續執行布爾表達式的第二部分workQueue.offer(command),嘗試將任務command放入隊列workQueue。

workQueue.offer()的行爲取決於線程池持有的BlockingQueue實例。Executors.newFixedThreadPool()、Executors.newSingleThreadExecutor()建立的線程池使用LinkedBlockingQueue,而Executors.newCachedThreadPool()建立的線程池則使用SynchronousQueue。以LinkedBlockingQueue爲例,建立時不配置容量,即建立爲無界隊列,則LinkedBlockingQueue#offer()永遠返回true,從而進入12-18行。

更細節的內容沒必要關心了,當workQueue.offer()返回true時,已經將任務command放入了隊列workQueue。當將來的某個時刻,某worker執行完某一個任務以後,會從workQueue中再取出一個任務繼續執行,直到線程池關閉,直到海枯石爛。

CachedThreadPool是一種無界線程池,使用SynchronousQueue能進一步提高性能,簡化代碼結構。留給讀者分析。

case2小結

能夠看到,實際上,線程池的核心原理與對象池模型無關,而是生產者-消費者模型

image.png

  • 生產者(調用submit()或execute()方法)將任務task放入隊列
  • 消費者(worker線程)循環從隊列中取出任務處理任務(執行task.run())。

鉤子方法

回到runWorker()方法,在執行任務的過程當中,線程池保留了一些鉤子方法,如beforeExecute()、afterExecute()。用戶能夠在實現本身的線程池時,能夠經過覆寫鉤子方法爲線程池添加功能。

猴子不認爲鉤子方法是一種好的設計。由於鉤子方法大多依賴於源碼實現,那麼除非瞭解源碼或API聲明絕對的嚴謹正確,不然很難正確使用鉤子方法。等發生錯誤時再去了解實現,可能就太晚了。說到底,仍是不要使用相似extends這種表達「擴展」語義的語法來實現繼承,詳見Java中如何恰當的表達「繼承」與「擴展」的語義?

固然,鉤子方法也是極其方便的。權衡看待。

總結

相對於線程封閉,串行線程封閉離用戶的距離更近一些,簡單靈活,實用性強,很容易掌握。而線程封閉更多淪爲單純的設計策略,單純使用線程封閉的場景很少。

線程池與串行線程封閉、對象池的關係不大,但常常被混爲一談;沒看過源碼的很難想到其實現方案,面試時也能立分高下。

線程池的實現頗有意思。在追源碼以前,猴子一直覺得線程池就是把線程存起來,用的時候取出來執行任務;看了源碼才知道實現如此之妙,簡潔優雅效率高。源碼纔是最好的老師。


本文連接:源碼|從串行線程封閉到對象池、線程池
做者:猴子007
出處:monkeysayhi.github.io
本文基於 知識共享署名-相同方式共享 4.0 國際許可協議發佈,歡迎轉載,演繹或用於商業目的,可是必須保留本文的署名及連接。

相關文章
相關標籤/搜索