分析源碼,學會正確使用 Java 線程池

本文做者:oschina_2020javascript

在平常的開發工做當中,線程池每每承載着一個應用中最重要的業務邏輯,所以咱們有必要更多地去關注線程池的執行狀況,包括異常的處理和分析等。本文主要聚焦在如何正確使用線程池上,以及提供一些實用的建議。文中會稍微涉及到一些線程池實現原理方面的知識,可是不會過多展開。java

線程池的異常處理

UncaughtExceptionHandler數據庫

咱們都知道Runnable接口中的run方法是不容許拋出異常的,所以派生出這個線程的主線程可能沒法直接得到該線程在執行過程當中的異常信息。以下例:數組

public static void main(String[] args) throws Exception {
        Thread thread = new Thread(() -> {
            Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
            System.out.println(1 / 0); // 這行會致使報錯!
        });
        thread.setUncaughtExceptionHandler((t, e) -> {
            e.printStackTrace(); //若是你把這一行註釋掉,這個程序將不會拋出任何異常.
        });
        thread.start();
    }

爲何會這樣呢?其實咱們看一下Thread中的源碼就會發現,Thread在執行過程當中若是遇到了異常,會先判斷當前線程是否有設置UncaughtExceptionHandler,若是沒有,則會從線程所在的ThreadGroup中獲取。多線程

注意:每一個線程都有本身的ThreadGroup,即便你沒有指定,而且它實現了UncaughtExceptionHandler接口。併發

咱們看下ThreadGroup中默認的對UncaughtExceptionHandler接口的實現:ide

public void uncaughtException(Thread t, Throwable e) {
        if (parent != null) {
            parent.uncaughtException(t, e);
        } else {
            Thread.UncaughtExceptionHandler ueh =
                Thread.getDefaultUncaughtExceptionHandler();
            if (ueh != null) {
                ueh.uncaughtException(t, e);
            } else if (!(e instanceof ThreadDeath)) {
                System.err.print("Exception in thread \""
                                 + t.getName() + "\" ");
                e.printStackTrace(System.err);
            }
        }
    }

這個ThreadGroup若是有父ThreadGroup,則調用父ThreadGroup的uncaughtException,不然調用全局默認的Thread.DefaultUncaughtExceptionHandler,若是全局的handler也沒有設置,則只是簡單地將異常信息定位到System.err中,這就是爲何咱們應當在建立線程的時候,去實現它的UncaughtExceptionHandler接口的緣由,這麼作可讓你更方便地去排查問題。高併發

經過execute提交任務給線程池oop

回到線程池這個話題,若是咱們向線程池提交的任務中,沒有對異常進行try...catch處理,而且運行的時候出現了異常,那會對線程池形成什麼影響呢?答案是沒有影響,線程池依舊能夠正常工做,可是異常卻被吞掉了。這一般來講不是一個好事情,由於咱們須要拿到原始的異常對象去分析問題。性能

那麼怎樣才能拿到原始的異常對象呢?咱們從線程池的源碼着手開始研究這個問題。固然網上關於線程池的源碼解析文章有不少,這裏限於篇幅,直接給出最相關的部分代碼:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

這個方法就是真正去執行提交給線程池的任務的代碼。

這裏咱們略去其中不相關的邏輯,重點關注第19行到第32行的邏輯,其中第23行是真正開始執行提交給線程池的任務,那麼第20行是幹什麼的呢?其實就是在執行提交給線程池的任務以前能夠作一些前置工做,一樣的,咱們看到第31行,這個是在執行完提交的任務以後,能夠作一些後置工做。

beforeExecute這個咱們暫且無論,重點關注下afterExecute這個方法。咱們能夠看到,在執行任務過程當中,一旦拋出任何類型的異常,都會提交給afterExecute這個方法,然而查看線程池的源代碼咱們能夠發現,默認的afterExecute是個空實現,所以,咱們有必要繼承ThreadPoolExecutor去實現這個afterExecute方法。

看源碼咱們能夠發現這個afterExecute方法是protected類型的,從官方註釋上也能夠看到,這個方法就是推薦子類去實現的。

固然,這個方法不能隨意去實現,須要遵循必定的步驟,具體的官方註釋也有講,這裏摘抄以下:

*  <pre> {@code
	 * class ExtendedExecutor extends ThreadPoolExecutor {
	 *   // ...
	 *   protected void afterExecute(Runnable r, Throwable t) {
	 *     super.afterExecute(r, t);
	 *     if (t == null && r instanceof Future<?>) {
	 *       try {
	 *         Object result = ((Future<?>) r).get();
	 *       } catch (CancellationException ce) {
	 *           t = ce;
	 *       } catch (ExecutionException ee) {
	 *           t = ee.getCause();
	 *       } catch (InterruptedException ie) {
	 *           Thread.currentThread().interrupt(); // ignore/reset
	 *       }
	 *     }
	 *     if (t != null)
	 *       System.out.println(t);
	 *   }
	 * }}</pre>

那麼經過這種方式,就能夠將原先可能被線程池吞掉的異常成功捕獲到,從而便於排查問題。

可是這裏還有個小問題,咱們注意到在runWorker方法中,執行task.run();語句以後,各類類型的異常都被拋出了,那這些被拋出的異常去了哪裏?事實上這裏的異常對象最終會被傳入到Thread的dispatchUncaughtException方法中,源碼以下:

private void dispatchUncaughtException(Throwable e) {
        getUncaughtExceptionHandler().uncaughtException(this, e);
    }

能夠看到它會去獲取UncaughtExceptionHandler的實現類,而後調用其中的uncaughtException方法,這也就回到了咱們上一小節所分析的UncaughtExceptionHandler實現的具體邏輯。那麼爲了拿到最原始的異常對象,除了實現UncaughtExceptionHandler接口以外,也能夠考慮實現afterExecute方法。

經過submit提交任務到線程池

這個一樣很簡單,咱們仍是先回到submit方法的源碼:

public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

這裏的execute方法調用的是ThreadPoolExecutor中的execute方法,執行邏輯跟經過execute提交任務到線程池是同樣的。咱們先重點關注這裏的newTaskFor方法,其源碼以下:

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

能夠看到提交的Callable對象用FutureTask封裝起來了。咱們知道最終會執行到上述runWorker這個方法中,而且最核心的執行邏輯就是task.run();這行代碼。咱們知道這裏的task實際上是FutureTask類型,所以咱們有必要看一下FutureTask中的run方法的實現:

public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

能夠看到這其中跟異常相關的最關鍵的代碼就在第17行,也就是setException(ex);這個地方。咱們看一下這個地方的實現:

protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

這裏最關鍵的地方就是將異常對象賦值給了outcome,outcome是FutureTask中的成員變量,咱們經過調用submit方法,拿到一個Future對象以後,再調用它的get方法,其中最核心的方法就是report方法,下面給出每一個方法的源碼:

首先是get方法:

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

能夠看到最終調用了report方法,其源碼以下:

private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

上面是一些狀態判斷,若是當前任務不是正常執行完畢,或者被取消的話,那麼這裏的x其實就是原始的異常對象,能夠看到會被ExecutionException包裝。所以在你調用get方法時,可能會拋出ExecutionException異常,那麼調用它的getCause方法就能夠拿到最原始的異常對象了。

綜上所述,針對提交給線程池的任務可能會拋出異常這一問題,主要有如下兩種處理思路:

  1. 在提交的任務當中自行try...catch,但這裏有個很差的地方就是若是你會提交多種類型的任務到線程池中,每種類型的任務都須要自行將異常try...catch住,比較繁瑣。並且若是你只是catch(Exception e),可能依然會漏掉一些包括Error類型的異常,那爲了保險起見,能夠考慮catch(Throwable t)。
  2. 自行實現線程池的afterExecute方法,或者實現Thread的UncaughtExceptionHandler接口。

下面給出我我的建立線程池的一個示例,供你們參考:

BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE);
    statisticsThreadPool = new ThreadPoolExecutor(DEFAULT_CORE_POOL_SIZE, DEFAULT_MAX_POOL_SIZE,
            60, TimeUnit.SECONDS, queue, new ThreadFactoryBuilder()
            .setThreadFactory(new ThreadFactory() {
                private int count = 0;
                private String prefix = "StatisticsTask";

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, prefix + "-" + count++);
                }
            }).setUncaughtExceptionHandler((t, e) -> {
                String threadName = t.getName();
                logger.error("statisticsThreadPool error occurred! threadName: {}, error msg: {}", threadName, e.getMessage(), e);
            }).build(), (r, executor) -> {
        if (!executor.isShutdown()) {
            logger.warn("statisticsThreadPool is too busy! waiting to insert task to queue! ");
            Uninterruptibles.putUninterruptibly(executor.getQueue(), r);
        }
    }) {
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            if (t == null && r instanceof Future<?>) {
                try {
                    Future<?> future = (Future<?>) r;
                    future.get();
                } catch (CancellationException ce) {
                    t = ce;
                } catch (ExecutionException ee) {
                    t = ee.getCause();
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt(); // ignore/reset
                }
            }
            if (t != null) {
                logger.error("statisticsThreadPool error msg: {}", t.getMessage(), t);
            }
        }
    };
    statisticsThreadPool.prestartAllCoreThreads();

線程數的設置

咱們知道任務通常有兩種:CPU密集型和IO密集型。那麼面對CPU密集型的任務,線程數不宜過多,通常選擇CPU核心數+1或者核心數的2倍是比較合理的一個值。所以咱們能夠考慮將corePoolSize設置爲CPU核心數+1,maxPoolSize設置爲核心數的2倍。

一樣的,面對IO密集型任務時,咱們能夠考慮以核心數乘以4倍做爲核心線程數,而後核心數乘以5倍做爲最大線程數的方式去設置線程數,這樣的設置會比直接拍腦殼設置一個值會更合理一些。

固然總的線程數不宜過多,控制在100個線程之內比較合理,不然線程數過多可能會致使頻繁地上下文切換,致使系統性能反不如前。

如何正確關閉一個線程池

說到如何正確去關閉一個線程池,這裏面也有點講究。爲了實現優雅停機的目標,咱們應當先調用shutdown方法,調用這個方法也就意味着,這個線程池不會再接收任何新的任務,可是已經提交的任務還會繼續執行,包括隊列中的。因此,以後你還應當調用awaitTermination方法,這個方法能夠設定線程池在關閉以前的最大超時時間,若是在超時時間結束以前線程池可以正常關閉,這個方法會返回true,不然,一旦超時,就會返回false。一般來講咱們不可能無限制地等待下去,所以須要咱們事先預估一個合理的超時時間,而後去使用這個方法。

若是awaitTermination方法返回false,你又但願儘量在線程池關閉以後再作其餘資源回收工做,能夠考慮再調用一下shutdownNow方法,此時隊列中全部還沒有被處理的任務都會被丟棄,同時會設置線程池中每一個線程的中斷標誌位。shutdownNow並不保證必定可讓正在運行的線程中止工做,除非提交給線程的任務可以正確響應中斷。到了這一步,能夠考慮繼續調用awaitTermination方法,或者直接放棄,去作接下來要作的事情。

線程池中的其餘有用方法

你們可能有留意到,我在建立線程池的時候,還調用了這個方法:prestartAllCoreThreads。這個方法有什麼做用呢?咱們知道一個線程池建立出來以後,在沒有給它提交任何任務以前,這個線程池中的線程數爲0。有時候咱們事先知道會有不少任務會提交給這個線程池,可是等它一個個去建立新線程開銷太大,影響系統性能,所以能夠考慮在建立線程池的時候就將全部的核心線程所有一次性建立完畢,這樣系統起來以後就能夠直接使用了。

其實線程池中還提供了其餘一些比較有意思的方法。好比咱們如今設想一個場景,當一個線程池負載很高,快要撐爆致使觸發拒絕策略時,有沒有什麼辦法能夠緩解這一問題?實際上是有的,由於線程池提供了設置核心線程數和最大線程數的方法,它們分別是setCorePoolSize方法setMaximumPoolSize方法。是的,線程池建立完畢以後也是能夠更改其線程數的!所以,面對線程池高負荷運行的狀況,咱們能夠這麼處理:

  1. 起一個定時輪詢線程(守護類型),定時檢測線程池中的線程數,具體來講就是調用getActiveCount方法。
  2. 當發現線程數超過了核心線程數大小時,能夠考慮將CorePoolSize和MaximumPoolSize的數值同時乘以2,固然這裏不建議設置很大的線程數,由於並非線程越多越好的,能夠考慮設置一個上限值,好比50、100之類的。
  3. 同時,去獲取隊列中的任務數,具體來講是調用getQueue方法再調用size方法。當隊列中的任務數少於隊列大小的二分之一時,咱們能夠認爲如今線程池的負載沒有那麼高了,所以能夠考慮在線程池先前有擴容過的狀況下,將CorePoolSize和MaximumPoolSize還原回去,也就是除以2。

具體來講以下圖:

以上是我我的建議的一種使用線程池的方式。

線程池必定是最佳方案嗎?

線程池並不是在任何狀況下都是性能最優的方案。若是是一個追求極致性能的場景,能夠考慮使用Disruptor,這是一個高性能隊列。排除Disruptor不談,單純基於JDK的話會不會有更好的方案?答案是有的。

咱們知道在一個線程池中,多個線程是共用一個隊列的,所以在任務不少的狀況下,須要對這個隊列進行頻繁讀寫,爲了防止衝突所以須要加鎖。事實上在閱讀線程池源代碼的時候就能夠發現,裏面充斥着各類加鎖的代碼,那有沒有更好的實現方式呢?

其實咱們能夠考慮建立一個由單線程線程池構成的列表,每一個線程池都使用有界隊列這種方式去實現多線程。這麼作的好處是,每一個線程池中的隊列都只會被一個線程去操做,這樣就沒有競爭的問題。

其實這種用空間換時間的思路借鑑了Netty中EventLoop的實現機制。試想,若是線程池的性能真的有那麼好,爲何Netty不用呢?

其餘須要注意的地方

  1. 任何狀況下都不該該使用可伸縮線程池(線程的建立和銷燬開銷是很大的)。
  2. 任何狀況下都不該該使用無界隊列,單測除外。有界隊列經常使用的有ArrayBlockingQueue和LinkedBlockingQueue,前者基於數組實現,後者基於鏈表。從性能表現上來看,LinkedBlockingQueue的吞吐量更高可是性能並不穩定,實際狀況下應當使用哪種建議自行測試以後決定。順便說一句,Executors的newFixedThreadPool採用的是LinkedBlockingQueue。
  3. 推薦自行實現RejectedExecutionHandler,JDK自帶的都不是很好用,你能夠在裏面實現本身的邏輯。若是須要一些特定的上下文信息,能夠在Runnable實現類中添加一些本身的東西,這樣在RejectedExecutionHandler中就能夠直接使用了。

怎樣作到不丟任務

這裏其實指的是一種特殊狀況,就是好比忽然遇到了一股流量尖峯,致使線程池負載已經很是高了,即快要觸發拒絕策略的時候,咱們能夠怎麼作來儘可能防止提交的任務丟失。通常來講當遇到這種狀況的時候,應當儘快觸發報警通知研發人員來處理。以後無論是限流也好,仍是增長機器也好,甚至是上Kafka、Redis甚至是數據庫用來暫存任務數據也是能夠的,但畢竟遠水救不了近火,若是咱們但願在正式解決這個問題以前,先儘量地緩解,能夠考慮怎麼作呢?

首先能夠考慮的就是我前面提到的動態增大線程池中的線程數,可是假如已經擴容過了,此時不該繼續擴容,不然可能致使系統的吞吐量更低。在這種狀況下,應當自行實現RejectedExecutionHandler,具體來講就是在實現類中,單獨開一個單線程的線程池,而後調用原線程池的getQueue方法的put方法,將塞不進去的任務再次嘗試塞進去。固然在隊列滿的時候是塞不進去的,但那至少也只是阻塞了這個單獨的線程而已,並不影響主流程。

固然,這種方案是治標不治本的,面對流量激增這種場景其實業界有不少成熟的作法,只是單純從線程池的角度來看的話,這種方式不失爲一種臨時有效的解決方案。

做者簡介

呂亞東,某風控領域互聯網公司技術專家,主要關注高性能,高併發以及中間件底層原理和調優等領域。email:523144643@qq.com

原文連接地址:https://developer.baidu.com/topic/show/290668

相關文章
相關標籤/搜索