Java 線程池的理論與實踐

Doug Lea

前段時間公司裏有個項目須要進行重構,目標是提升吞吐量和可用性,在這個過程當中對原有的線程模型和處理邏輯進行了修改,須要 發現有不少基礎的多線程的知識已經模糊不清,如底層線程的運行狀況、現有的線程池的策略和邏輯、池中線程的健康情況的監控等,此次從新回顧了一下,其中涉及大量 java.util.concurrent包中的類。本文將會包含如下內容:

  1. Java中的Thread與操做系統中的線程的關係
  2. 線程切換的各類開銷
  3. ThreadGroup存在的意義
  4. 使用線程池減小線程開銷
  5. Executor的概念
  6. ThreadPoolExecutor中的一些具體實現
  7. 如何監控線程的健康
  8. 參考ThreadPoolExecutor來設計適合本身的線程模型

1、問題描述

這個項目所在系統的軟件架構(從開發到運維)基本上採用的是微服務架構,微服務很好地解決了咱們系統的複雜性問題,可是隨之也帶來了一些問題,好比在此架構中大部分的服務都擁有本身單獨的數據庫,而有些(很重要的)業務須要作跨庫查詢。相信這種「跨庫查詢」的問題不少實踐微服務的公司都碰到過,一般這類問題有如下幾種解決方案(固然,還有更多其餘的方案,這裏就不一一敘述了):javascript

  1. 嚴格經過服務提供的API查詢。java

    這樣作的好處是將服務徹底當作黑盒,能夠最大限度得減小服務間的依賴與耦合關係,其次還能根據實際需求服務之間使用不一樣的數據庫類型;缺點是則代價太大。面試

  2. 將關心的信息冗餘到本身的庫中,並提供API讓其餘服務來主動修改。算法

    優勢是信息更新十分實時,缺點是增長了服務間的依賴。數據庫

  3. 指令與查詢分離(CQRS)。將可能被其餘服務關心的數據放入數據倉庫(或者作成相似於物化視圖、搜索引擎等),數據倉庫只提供讀的功能。編程

    優勢是對主庫不會有壓力,服務只要關心實現本身的業務就好,缺點是數據的實時性會受到了挑戰。數組

指令與查詢分離

結合實際狀況,咱們使用的是第3種方案。然而隨着愈來愈多的業務依賴讀庫,甚至依賴其中一些狀態的變化,因此讀庫的數據同步若是出現高延時,則會直接影響業務的進行。出了幾回這種事情後,因而下決心要改善這種狀況。首先想到的就是使用線程池來進行消息的消費(寫入讀庫),JDK自從1.5開始提供了實用而強大的線程池工具——Executor框架。

2、Executor框架

Executor框架在Java1.5中引入,大部分的類都在包java.util.concurrent中,由大神Doug Lea寫成,其中經常使用到的有如下幾個類和接口:緩存

  1. java.util.concurrent.Executor多線程

    一個只包含一個方法的接口,它的抽象含義是:用來執行一個Runnable任務的執行器。架構

  2. java.util.concurrent.ExecutorService

    對Executor的一個擴展,增長了不少對於任務和執行器的生命週期進行管理的接口,也是一般進行多線程開發最常使用的接口。

  3. java.util.concurrent.ThreadFactory

    一個生成新線程的接口。用戶能夠經過實現這個接口管理對線程池中生成線程的邏輯

  4. java.util.concurrent.Executors

    提供了不少不一樣的生成執行器的實用方法,好比基於線程池的執行器的實現。

3、爲何要用線程池

Java從最開始就是基於線程的,線程在Java裏被封裝成一個類java.lang.Thread。在面試中不少面試官都會問一個很基礎的關於線程問題:

Java中有幾種方法新建一個線程?

全部人都知道,標準答案是兩種:繼承Thread或者實現Runnable,在JDK源代碼中Thread類的註釋中也是這麼寫的。

然而在我看來這兩種方法根本就是一種,全部想要開啓線程的操做,都必須生成了一個Thread類(或其子類)的實例,執行其中的native方法start0()

Java中的線程

Java中將線程抽象爲一個普通的類,這樣帶來了不少好處,譬如能夠很簡單的使用面向對象的方法實現多線程的編程,然而這種程序寫多了容易會忘記,這個對象在底層是實實在在地對應了一個OS中的線程。

操做系統中的線程和進程

上圖中的進程(Process)能夠看作一個JVM,能夠看出,全部的進程有本身的私有內存,這塊內存會在主存中有一段映射,而全部的線程共享JVM中的內存。在現代的操做系統中,線程的調度一般都是集成在操做系統中的,操做系統能經過分析更多的信息來決定如何更高效地進行線程的調度,這也是爲何Java中會一直強調,線程的執行順序是不會獲得保證的,由於JVM本身管不了這個,因此只能認爲它是徹底無序的。

另外,類java.lang.Thread中的不少屬性也會直接映射爲操做系統中線程的一些屬性。Java的Thread中提供的一些方法如sleep和yield其實依賴於操做系統中線程的調度算法。

關於線程的調度算法能夠去讀操做系統相關的書籍,這裏就不作太多敘述了。

線程的開銷

一般來講,操做系統中線程之間的上下文切換大約要消耗1到10微秒

從上圖中能夠看出線程中包含了一些上下文信息:

  • CPU棧指針(Stack)、
  • 一組寄存器的值(Registers),
  • 指令計數器的值(PC)等,

它們都保存在此線程所在的進程所映射的主存中,而對於Java來講,這個進程就是JVM所在的那個進程,JVM的運行時內存能夠簡單的分爲以下幾部分:

  1. 若干個棧(Stack)。每一個線程有本身的棧,JVM中的棧是不能存儲對象的,只能存儲基礎變量和對象引用。
  2. 堆(Heap)。一個JVM只有一個堆,全部的對象都在堆上分配。
  3. 方法區(Method Area)。一個JVM只有一個方法區,包含了全部載入的類的字節碼和靜態變量。

其中#1中的棧能夠認爲是這個線程的上下文,建立線程要申請相應的棧空間,而棧空間的大小是必定的,因此當棧空間不夠用時,會致使線程申請不成功。在Thread的源代碼中能夠看到,啓動線程的最後一步是執行一個本地方法private native void start0(),代碼1是OpenJDK中start0最終調用的方法:

//代碼1
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
  JVMWrapper("JVM_StartThread");
  JavaThread *native_thread = NULL;
  bool throw_illegal_thread_state = false;

  // We must release the Threads_lock before we can post a jvmti event
  // in Thread::start.
  {
    MutexLocker mu(Threads_lock);

    //省略一些代碼

      jlong size =
             java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
      size_t sz = size > 0 ? (size_t) size : 0;
      native_thread = new JavaThread(&thread_entry, sz);
  }

  if (native_thread->osthread() == NULL) {
    THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(),
              "unable to create new native thread");
  }

  Thread::start(native_thread);

JVM_END複製代碼

從代碼1中能夠看到,線程的建立首先須要棧空間,因此過多的線程建立可能會致使OOM。

同時,線程的切換會有如下開銷:

  1. CPU中執行上下文的切換,致使CPU中的「指令流水線(Instruction Pipeline)」的中斷和CPU緩存的失效。
  2. 若是線程太多,線程切換的時間會比線程執行的時間要長,嚴重浪費了CPU資源。
  3. 對於共享資源的競爭(鎖)會致使線程切換開銷急劇增長。

根據以上的描述,因此一般建議儘量建立較少的線程,減小鎖的使用(尤爲是synchronized),儘可能使用JDK提供的同步工具。而爲了減小線程上下文切換帶來的開銷,一般使用線程池是一個有效的方法。

Java中的線程池

Executor框架中最經常使用的大概就是java.util.concurrent.ThreadPoolExecutor了,對於它的描述,簡單的說就是「它維護了一個線程池,對於提交到此Executor中的任務,它不是建立新的線程而是使用池內的線程進行執行」。對於「數量巨大但執行時間很小」的任務,能夠顯著地減小對於任務執行的開銷。java.util.concurrent.ThreadPoolExecutor中包含了不少屬性,經過這些屬性開發者能夠定製不一樣的線程池行爲,大體以下:

1. 線程池的大小:corePoolSizemaximumPoolSize

ThreadPoolExecutor中線程池的大小由這兩個屬性決定,前者指當線程池正常運行起來後的最小(核心)線程數,當一個任務到來時,若當前池中線程數小於corePoolSize,則會生成新的線程;後者指當等待隊列滿了以後可生成的最大的線程數。在例1中返回的對象中這兩個值相等,均等於用戶傳入的值。

2. 用戶能夠經過調用java.util.concurrent.ThreadPoolExecutor上的實例方法來啓動核心線程(core pool)
3. 可定製化的線程生成方式:threadFactory

默認線程由方法Executors.defaultThreadFactory()返回的ThreadFactory進行建立,默認建立的線程都不是daemon,開發者能夠傳入自定義的ThreadFactory進行對線程的定製化。

5. 非核心線程的空閒等待時間:keepAliveTime
6. 任務等待隊列:workQueue

這個隊列是java.util.concurrent.BlockingQueue<E>的一個實例。當池中當前沒有空閒的線程來執行任務,就會將此任務放入等待隊列,根據其具體實現類的不一樣,又可分爲3種不一樣的隊列策略:

  1. 容量爲0。如:java.util.concurrent.SynchronousQueue

    等待隊列容量爲0,全部須要阻塞的任務必須等待池內的某個線程有空閒,才能繼續執行,不然阻塞。調用Executors.newCachedThreadPool的兩個函數生成的線程池是這個策略。

  2. 不限容量。如:不指定容量的java.util.concurrent.LinkedBlockingQueue

    等待隊列的長度無窮大,根據上文中的敘述,在這種策略下,不會有多於corePoolSize的線程被建立,因此maximumPoolSize也就沒有任何意義了。調用Executors.newFixedThreadPool生成的線程池是這個策略。

  3. 限制容量。如:指定容量的任何java.util.concurrent.BlockingQueue<E>

    在某些場景下(本文中將描述這種場景),須要指定等待隊列的容量,以防止過多的資源消耗,好比若是使用不限容量的等待隊列,當有大量的任務到來而池內又無空閒線程執行任務時,會有大量的任務堆積,這些任務都是某個類的對象,是要消耗內存的,就可能致使OOM。如何去平衡等待隊列和線程池的大小要根據實際場景去判定,若是配置不當,可能會致使資源耗盡、線程上下文切換消耗、或者線程調度消耗。這些都會直接影響系統的吞吐。

7. 任務拒絕處理器:defaultHandler

若是任務被拒絕執行,則會調用這個對象上的RejectedExecutionHandler.rejectedExecution()方法,JDK定義了4種處理策略,用戶能夠自定義本身的任務處理策略。

8. 容許核心線程過時:allowCoreThreadTimeOut

上面說的全部狀況都是基於這個變量爲false(默認值)來講的,若是你的線程池已經不使用了(不被引用),可是其中還有活着的線程時,這個線程池是不會被回收的,這種狀況就形成了內存泄漏——一塊永遠不會被訪問到的內存卻沒法被GC回收。
用戶能夠經過在拋棄線程池引用的時候顯式地調用shutdown()來釋放它,或者將allowCoreThreadTimeOut設置爲true,則在過時時間後,核心線程會被釋放,則其會被GC回收。

4、若是線程死掉了怎麼辦

幾乎全部Executors中生成線程池的方法的註釋上,都有表明相贊成思的一句話,表示若是線程池中的某個線程死掉了,線程池會生成一個新的線程代替它。下面是方法java.util.concurrent.Executors.newFixedThreadPool(int)上的註釋。

If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks.

線程死亡的緣由

咱們都知道守護線程(daemon)會在全部的非守護線程都死掉以後也死掉,除此以外致使一個非守護線程死掉有如下幾種可能:

  1. 天然死亡,Runnable.run()方法執行完後返回。
  2. 執行過程當中有未捕獲異常,被拋到了Runnable.run()以外,致使線程死亡。
  3. 其宿主死亡,進程關閉或者機器死機。在Java中一般是System.exit()方法被調用
  4. 其餘硬件問題。

線程池要保證其高可用性,就必須保證線程的可用。如一個固定容量的線程池,其中一個線程死掉了,它必需要能監控到線程的死亡並生成一個新的線程來代替它。ThreadPoolExecutor中與線程相關的有這樣幾個概念:

  1. java.util.concurrent.ThreadFactory,在Executors中有兩種ThreadFactory,但其提供的線程池只使用了一種java.util.concurrent.Executors.DefaultThreadFactory,它是簡單的使用ThreadGroup來實現。

  2. java.lang.ThreadGroup,從Java1開始就存在的類,用來創建一個線程的樹形結構,能夠用它來組織線程間的關係,但其並無對其包含的子線程的監控。

  3. java.util.concurrent.ThreadPoolExecutor.Worker,ThreadPoolExecutor對線程的封裝,其中還包含了一些統計功能。

ThreadPoolExecutor中如何保障線程的可用

在ThreadPoolExecutor中使用了一個很巧妙的方法實現了對線程池中線程健康情況的監控,代碼2是從ThreadPoolExecutor類源碼中截取的一段代碼,它們在一塊兒說明了其對線程的監控。

能夠看到,在ThreadPoolExecutor中的線程被封裝成一個對象Worker,而將其中的run()代理到ThreadPoolExecutor中的runWorker(),在runWorker()方法中是一個獲取任務並執行的死循環。若是任務的運行出了什麼問題(如拋出未捕獲異常),processWorkerExit()方法會被執行,同時傳入的completedAbruptly參數爲true,會從新添加一個初始任務爲null的Worker,並隨之啓動一個新的線程。

//代碼2
//ThreadPoolExecutor的動態內部類
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

    /** 對象中封裝的線程 */
    final Thread thread;
    /** 第一個要運行的任務,可能爲null. */
    Runnable firstTask;
    /** 任務計數器 */
    volatile long completedTasks;

    //省略其餘代碼

    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    /** Delegates main run loop to outer runWorker */
    public void run() {
        runWorker(this);
    }
}


final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            try {
                beforeExecute(wt, task);
                try {
                    task.run();
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}複製代碼

5、回到個人問題

因爲各類各樣的緣由,咱們並無使用數據庫自帶的主從機制來作數據的複製,而是將主庫的全部DML語句做爲消息發送到讀庫(DTS),同時本身實現了數據的重放。初版的數據同步服務十分簡單,對於主庫的DML消息處理和消費(寫入讀庫)都是在一個線程內完成的.這麼實現的優勢是簡單,但缺點是直接致使了表與表之間的數據同步會受到影響,若是有一個表A突然來了不少的消息(每每是批量修改數據形成的),則會佔住消息處理通道,影響其餘業務數據的及時同步,同時單線程寫庫吞吐過小。

上文說到,首先想到的是使用線程池來作消息的消費,可是不能直接套用上邊說的Executor框架,因爲如下幾個緣由:

  1. ThreadPoolExecutor中默認全部的任務之間是不互相影響的,然而對於數據庫的DML來講,消息的順序不能被打亂,至少單表的消息順序必須有序,否則會影響最終的數據一致。
  2. ThreadPoolExecutor中全部的線程共享一個等待隊列,然而爲了防止表與表之間的影響,每一個線程應該有本身的任務等待隊列。
  3. 寫庫操做的吞吐直接受到提交事務數的影響,因此此多線程框架要能夠支持任務的合併。

重複造輪子是沒有意義的,可是在咱們這種場景下JDK中現有的Executor框架不符合要求,只能本身造輪子。

個人實現

首先把線程抽象成「DML語句的執行器(Executor)」。其中包含了一個Thread的實例,維護了本身的等待隊列(限定容量的阻塞隊列),和對應的消息執行邏輯。

除此以外還包含了一些簡單的統計、線程健康監控、合併事務等處理。

Executor的對象實現了Thread.UncaughtExceptionHandler接口,並綁定到其工做線程上。同時ExecutorGroup也會再生成一個守護線程專門來守護池內全部線程,做爲額外的保險措施。

把線程池的概念抽象成執行器組(ExecutorGroup),其中維護了執行器的數組,並維護了目標表到特定執行器的映射關係,並對外提供執行消息的接口,其主要代碼以下:

//代碼3
public class ExecutorGroup {

    Executor[] group = new Executor[NUM];
    Thread boss = null;
    Map<String, Integer> registeredTables = new HashMap<>(32);
// AtomicInteger cursor = new AtomicInteger();
    volatile int cursor = 0;

    public ExecutorGroup(String name) {
        //init group
        for(int i = 0; i < NUM; i++) {
            logger.debug("啓動線程{},{}", name, i);
            group[i] = new Executor(this, String.format("sync-executor-%s-%d", name, i), i / NUM_OF_FIRST_CLASS);

        }
        startDaemonBoss(String.format("sync-executor-%s-boss", name));
    }

    //額外的保險
    private void startDaemonBoss(String name) {
        if (boss != null) {
            boss.interrupt();
        }
        boss = new Thread(() -> {
            while(true) {
                //休息一分鐘。。。

                if (this.group != null) {
                    for (int i = 0; i < group.length; i++) {
                        Executor executor = group[i];
                        if (executor != null) {
                            executor.checkThread();
                        }
                    }
                }
            }

        });
        boss.setName(name);
        boss.setDaemon(true);
        boss.start();
    }
    public void execute(Message message){
        logger.debug("執行消息");

        //省略消息合法性驗證

        if (!registeredTables.containsKey(taskKey)) {
            //已註冊
// registeredTables.put(taskKey, cursor.getAndIncrement());
            registeredTables.put(taskKey, cursor++ % NUM);
        }
        int index = registeredTables.get(taskKey);
        logger.debug("執行消息{},註冊索引{}", taskKey, index);
        try {
            group[index].schedule(message);
        } catch (InterruptedException e) {
            logger.error("準備消息出錯", e);
        }

    }

}複製代碼

完成後總體的線程模型以下圖所示:

新的線程模型

Java1.7新加入的TransferQueue

Java1.7中提供了新的隊列類型TransferQueue,但只提供了一個它的實現java.util.concurrent.LinkedTransferQueue<E>,它有更好的性能表現,可它是一個無容量限制的隊列,而在咱們的這個場景下必需要限制隊列的容量,因此要本身實現一個有容量限制的隊列。

相關文章
相關標籤/搜索