Java ExecutorService線程池中的小坑——關於線程池中拋出的異常處理

開發本身的項目有一段時間了,由於是個長時間跑的服務器端程序,因此異常處理顯得尤其重要。
對於異常的抓取和日誌(狹義上的日誌)的分析一點都不能落下。java

咱們使用了Java自帶的Executor模塊,我只是稍微看了下Executors當中三個線程池的實現(策略爲:Fixed, Cached, Schedule),其實光看名字就能夠了解各自的一些策略信息。OK,這一次我須要一種策略合併Fixed和Cached的兩種特色的自定義Executor。其實很簡單,給Cached設置一個上線就是了。注意他們的同步隊列使用的不一樣,用LinkedBlockingQueue是個不錯的選擇,至於BlockingQueue的實現能夠自行谷歌(之後再記吧)。sql

先看寫的簡略的代碼apache

package com.zjseek.recharge.core;

import com.zjseek.recharge.exception.SKErrorCode;
import com.zjseek.recharge.exception.SKOrderState;
import com.zjseek.recharge.model.OrderModel;
import com.zjseek.recharge.service.OrderService;
import org.apache.log4j.Logger;

import java.sql.Timestamp;
import java.util.concurrent.*;

/**
 * Created by geminiwen on 14-6-28.
 */
public class OrderExceptionThreadExecutor extends ThreadPoolExecutor {
    private Logger logger = Logger.getLogger(OrderExceptionThreadExecutor.class);
    private OrderService orderService;

    public OrderExceptionThreadExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        init();
    }

    public OrderExceptionThreadExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        init();
    }

    public OrderExceptionThreadExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        init();
    }

    public OrderExceptionThreadExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        init();
    }

    private void init() {
        this.orderService = new OrderService();
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        Future<?> f = (Future<?>) r;
        try {
            f.get();
        } catch (InterruptedException e) {
            logger.error("線程池中發現異常,被中斷", e);
        } catch (ExecutionException e) {
            logger.error("線程池中發現異常,被中斷", e);
        }

    }
}

我這是一個訂單處理流程,主要用到了一個protected方法,就是afterExecute。一看這個函數的樣子,想固然的覺得若是線程池中出了問題,異常天然回在第二個參數t中傳過來。
也許的確是這樣的,可是這裏有一個區別。
咱們知道ExecutorServcie中執行一個Runnable有兩個方法,兩個分別是服務器

public void execute(Runnable command);
public <T> Future<T> submit(Runnable task, T result);

別看接受的參數差很少,其實submit最後是調用的execute的,並且在調用execute前,對task進行了一次封裝,變成了RunnableFuture(它是接口,繼承了RunnableFuture實際是一個實現類FutureTask)。ide

OK,對於實際操做Runnable的不一樣,暫時說到這,看下execute方法作了什麼事
execute方法對進來的Runnable又包裝成了worker而後進入runWorker
runWorker方法中有這麼幾行函數

try {
    beforeExecute(wt, task);
    Throwable thrown = null;
    try {
       task.run();
    } catch (RuntimeException x) {
       thrown = x; throw x;
    } catch (Error x) {
       thrown = x; throw x;
    } catch (Throwable x) {
       thrown = x; throw new Error(x);
    } finally {
       afterExecute(task, thrown);
    }
} finally {
    task = null;
    w.completedTasks++;
    w.unlock();
}

好了,到了最關鍵的afterExecute這個步驟,我滿心覺得這裏全部的異常都會經過thrown傳遞進來,看來我仍是太年輕了,以前咱們分析過,這個Runnable已經被submit封裝成了FutureTask,那麼這個task.run()除了咱們本身定義的run任務以外,到底還幹了啥呢?this

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

OK,這段源碼摘自FutureTask中的run方法,實際咱們本身定義的任務已經變成了Callable:線程

public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

從它的構造函數就能夠看出來。日誌

而後咱們在上面實際運行task的地方實際上是c.call()這一句。code

result = c.call();

咱們寫的任務所有在這句代碼裏面執行完畢了,看看外面都wrap了啥? OK 咱們全部的Throwable所有已經被setException吃掉了,怎麼還會拋出到外面那層的execute中呢?
因此我以前實驗的時候,在submit中提交任務不管任務怎麼拋異常,在afterExecute中的第二個參數是取不到的,緣由就在這。

再回頭看看針對submit改造的函數

protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    Future<?> f = (Future<?>) r;
    try {
        f.get();
    } catch (InterruptedException e) {
        logger.error("線程池中發現異常,被中斷", e);
    } catch (ExecutionException e) {
        logger.error("線程池中發現異常,被中斷", e);
    }

}

固然,這裏已經默認r是實現Future接口了。經過FutureTask的get方法,能把剛剛setException中的異常給拋出來,這樣咱們就能真的拿到這些異常了。

結論

若是咱們關心線程池執行的結果,則須要使用submit來提交task,那麼在afterExecute中對異常的處理也須要經過Future接口調用get方法去取結果,才能拿到異常,若是咱們不關心這個任務的結果,能夠直接使用ExecutorService中的execute方法(實際是繼承Executor接口)來直接去執行任務,這樣的話,咱們的Runnable沒有通過多餘的封裝,在runWorker中獲得的異常也直接能在afterExecute中捕捉。

好了,以上就是對線程池異常捕捉的一個記錄。想一想應該不難,今天也是偶然機會看到的。今天在開發中碰到PHP鎖的問題,頭疼死了。

相關文章
相關標籤/搜索