多線程使用注意

命名

來源:https://www.cnblogs.com/guozp/p/10344446.htmlhtml

  • 咱們在建立線程池的時候,必定要給線程池名字,以下這種寫法,線程是默認直接生成的:java

    public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(3);
            for (int i = 0; i < 10; i++) {
                final int finalI = i;
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println(Thread.currentThread().getName() + ":" + finalI);
                    }
                });
            }       
         }

    最後的輸出:less

    pool-1-thread-3:2
    pool-1-thread-2:1
    pool-1-thread-3:4
    pool-1-thread-1:3
    pool-1-thread-3:6
    pool-1-thread-2:5
    pool-1-thread-3:8
    pool-1-thread-1:7
    pool-1-thread-2:9
  • Executors中有默認的線程工廠的實現:ide

    static class DefaultThreadFactory implements ThreadFactory {
            private static final AtomicInteger poolNumber = new AtomicInteger(1);
            private final ThreadGroup group;
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            private final String namePrefix;
    
            DefaultThreadFactory() {
                SecurityManager s = System.getSecurityManager();
                group = (s != null) ? s.getThreadGroup() :
                                      Thread.currentThread().getThreadGroup();
                namePrefix = "pool-" +
                              poolNumber.getAndIncrement() +
                             "-thread-";
            }
    
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group, r,
                                      namePrefix + threadNumber.getAndIncrement(),
                                      0);
                if (t.isDaemon())
                    t.setDaemon(false);
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
        }
  • 咱們能夠改造一下this

    public class NamedThreadFactory implements ThreadFactory {
        private final AtomicInteger threadNumber;
        private final String name;
        private final boolean isDaemon;
    
        public NamedThreadFactory(String name) {
            this(name, false);
        }
    
        public NamedThreadFactory(String name, boolean daemon) {
            this.threadNumber = new AtomicInteger(1);
            this.isDaemon = daemon;
            this.name = name + "-thread-pool-";
        }
    
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, this.name + this.threadNumber.getAndIncrement());
            t.setDaemon(this.isDaemon);
            if (t.getPriority() != Thread.NORM_PRIORITY){
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }

    那咱們看下改造以後的輸出結果:spa

    有名字的線程池-thread-pool-1:0
    有名字的線程池-thread-pool-3:2
    有名字的線程池-thread-pool-1:3
    有名字的線程池-thread-pool-2:1
    有名字的線程池-thread-pool-1:5
    有名字的線程池-thread-pool-1:7
    有名字的線程池-thread-pool-1:8
    有名字的線程池-thread-pool-3:4
    有名字的線程池-thread-pool-1:9
    有名字的線程池-thread-pool-2:6

    這樣的話,當咱們應用線上出現問題,須要經過jstack查看線程堆棧的時候,就能夠知道是哪些線程出現的問題,不然看到的都是統一的命名方式,看到都是清一色的線程,增長排查問題的難度線程

Thread異常處理

Java中線程執行的任務接口java.lang.Runnable 要求不拋出Checked異常,code

public interface Runnable {

    public abstract void run();
}

那麼若是 run() 方法中拋出了RuntimeException,將會怎麼處理了?htm

線程出現未捕獲異常後,JVM將調用Thread中的dispatchUncaughtException方法把異常傳遞給線程的未捕獲異常處理器對象

private void dispatchUncaughtException(Throwable e) {
    getUncaughtExceptionHandler().uncaughtException(this, e);
}

public UncaughtExceptionHandler getUncaughtExceptionHandler() {
    return uncaughtExceptionHandler != null ?
        uncaughtExceptionHandler : group;
}

Thread中存在兩個UncaughtExceptionHandler。一個是靜態的defaultUncaughtExceptionHandler,另外一個是非靜態uncaughtExceptionHandler。

// null unless explicitly set
private volatile UncaughtExceptionHandler uncaughtExceptionHandler;

// null unless explicitly set
private static volatile UncaughtExceptionHandler defaultUncaughtExceptionHandler;
  • defaultUncaughtExceptionHandler:設置一個靜態的默認的UncaughtExceptionHandler。來自全部線程中的Exception在拋出而且未捕獲的狀況下,都會今後路過。進程fork的時候設置的就是這個靜態的defaultUncaughtExceptionHandler,管轄範圍爲整個進程
  • uncaughtExceptionHandler:爲單個線程設置一個屬於線程本身的uncaughtExceptionHandler,轄範圍比較小。

若是沒有設置uncaughtExceptionHandler,將使用線程所在的線程組來處理這個未捕獲異常。線程組ThreadGroup實現了UncaughtExceptionHandler,因此能夠用來處理未捕獲異常。ThreadGroup類定義:

private ThreadGroup group;

class ThreadGroup implements Thread.UncaughtExceptionHandler{}

ThreadGroup實現的uncaughtException以下:

public void uncaughtException(Thread t, Throwable e) {
    if (parent != null) {
        parent.uncaughtException(t, e);
    } else {
        Thread.UncaughtExceptionHandler ueh =
            Thread.getDefaultUncaughtExceptionHandler();
        if (ueh != null) {
            ueh.uncaughtException(t, e);
        } else if (!(e instanceof ThreadDeath)) {
            System.err.print("Exception in thread \""
                             + t.getName() + "\" ");
            e.printStackTrace(System.err);
        }
    }
}

默認狀況下,線程組處理未捕獲異常的邏輯是,首先將異常消息通知給父線程組,而後嘗試利用一個默認的defaultUncaughtExceptionHandler來處理異常,若是沒有默認的異常處理器則將錯誤信息輸出到System.err。也就是JVM提供給咱們設置每一個線程的具體的未捕獲異常處理器,也提供了設置默認異常處理器的方法,一般java.lang.Thread對象運行設置一個默認的異常處理方法:

public static void setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler eh) {
        SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            sm.checkPermission(
                new RuntimePermission("setDefaultUncaughtExceptionHandler")
                    );
        }
    
         defaultUncaughtExceptionHandler = eh;
     }

而這個默認的靜態全局的異常捕獲方法是直接輸出異常堆棧。
固然,咱們能夠覆蓋此默認實現,只須要實現java.lang.Thread.UncaughtExceptionHandler接口便可

public interface UncaughtExceptionHandler {

    void uncaughtException(Thread t, Throwable e);
}

submit異常吞併

  • 咱們平時都是經過submit來提交一個Callable,那若是提交的是Runnable呢,爲方便起見咱們核心的代碼都放在一塊兒了
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}
//最終FutureTask中的callable指向的是一個RunnableAdapter,而RunnableAdapter的call方法也是調用了咱們傳進來的task的run方法,返回的是null
static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}
  • 那從這裏咱們就知道,咱們經過submit傳遞進去的Runnale,最後在FutureTask的run方法裏面調用的callable.call()實質上仍是咱們傳遞進去的runnable的run方法,在源碼FutureTask的run方法的時候發現,FutureTask中執行任務若是出現異常,是不會拋出來的,必須經過get方法才能夠獲取到,固然也能夠重寫afterExecute()這個回調方法,在這個裏面來調用get獲取異常信息,
    仍是要重點強調下,咱們在經過submit執行任務的時候,必定要調用get()方法
  • 這裏咱們重寫afterExecute()方法,來獲取submit(Runnable task)的執行異常:
protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        //執行的Callable,對應的t必定是Null
        if (t == null && r instanceof Future) {
            try {
                Future future = (Future) r;
                if (future.isDone()){
                    // 判斷任務是否執行完成
                    future.get();
                }
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt(); 
            }
        }
    }

CountDownLatch 丟失事件

  • 咱們在處理一批任務的時候,每每會把任務進行partition,而後再交給每一個線程去處理,那主線程須要等待全部的線程處理完,來統計本次處理的時間,以及其餘統計的數據,差很少和下面這段代碼相似:
public void execute3(){
    List<Integer> data = new ArrayList<Integer>(100);
    for (int i = 0; i < 100; i++) {
        data.add(i + 10);
    }

    List<List<Integer>> partition = Lists.partition(data, 20);
    final CountDownLatch countDownLatch = new CountDownLatch(partition.size());
    for (final List<Integer> dataToHandle : partition) {
        threadPoolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                try{
                    for (Integer i : dataToHandle) {
                        doSomeThing(i);
                    }
                }catch (Exception e){
                   logger.error(e.getMessage(), e);
                }finally {  
                    countDownLatch.countDown();
                }
            }
        });
    }

    try {
        countDownLatch.await();
    } catch (InterruptedException e) {
        logger.error(e.getMessage(), e);
    }

    logger.info("任務執行結束...");
}
  • 以前這麼寫代碼沒有出現過問題,直到最近出現問題才發現這麼寫會致使主線程沒法結束的問題。咱們看下,雖然在每一個任務的finally中進行處理
  • countDownLatch.countDown();可是有一點忽視了,咱們在異常那塊其實有提到過,若是線程池滿了,拋出RejectExecuteException的話,那此次任務的countDownLatch就會被忽視,固然咱們這是在主線程裏執行,直接會拋出異常致使主線程結束,可是若是和上面提到的在單獨的子線程裏面去執行這個線程池,那這樣的話因爲主線程沒法捕獲到子線程的異常,就會出現主線程沒法結束的問題,因此咱們在子線程中執行線程池必定要避免這點 即若是在子線程中執行,須要改成下面這樣:
public void execute3(){
    List<Integer> data = new ArrayList<Integer>(100);
    for (int i = 0; i < 100; i++) {
        data.add(i + 10);
    }

    final List<List<Integer>> partition = Lists.partition(data, 20);
    final CountDownLatch countDownLatch = new CountDownLatch(partition.size());
    new Thread(new Runnable() {
        @Override
        public void run() {
            for (final List<Integer> dataToHandle : partition) {
                try {
                    threadPoolExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            try{
                                for (Integer i : dataToHandle) {
                                    doSomeThing(i);
                                }
                            }catch (Exception e){
                                logger.error(e.getMessage(), e);
                            }finally {

                                countDownLatch.countDown();
                            }
                        }
                    });
                } catch (RejectedExecutionException e) {
                    logger.error(e.getMessage(), e);
                    //處理完異常以後須要補充countDownLatch事件
                    countDownLatch.countDown();
                }
            }

        }
    }).start();

    try {
        countDownLatch.await();
    } catch (InterruptedException e) {
        logger.error(e.getMessage(), e);
    }

    logger.info("任務執行結束...");
}

來源:https://www.cnblogs.com/guozp/p/10344446.html

相關文章
相關標籤/搜索