線程處理二三事(一)

【強制】線程池不容許使用Executors去建立,而是經過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同窗更加明確線程池的運行規則,規避資源耗盡的風險。以上是《阿里巴巴》java開發手冊中關於併發的強制規定,爲何作這個規定?一下是我我的解惑和作出的相關延伸,此篇做爲我我的學習筆記的開篇。java

一.爲何要手動建立:
1.單例線程池web

Executor executor = Executors.newSingleThreadExecutor();

2.可擴容線程池設計模式

Executor executor1 = Executors.newCachedThreadPool();

3.定長線程池安全

Executor executor2 = Executors.newFixedThreadPool(10);

線程池的工做原理以下:
image併發

以上是三種平時常見的線程池,下面看建立源碼:jvm

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

都是和ThreadPoolExecutor相關,ThreadPoolExecutor構造函數:函數

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

採用Executor建立線程池會出現資源耗盡即OOM錯誤,主要出問題的地方是阻塞隊列的問題即workQueue。
1.看newSingleThreadExecutor建立源碼,線程池核心線程數和最大線程數都是1,剩下多餘的線程任務全都塞到LinkedBlockingQueue<Runnable>()中,如下是LinkedBlockingQueue<Runnable>()無參構造函數源碼,高併發

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

Integer.MAX_VALUE=2147483647,根據線程池的工做原理,若是自動建立線程池即表示不限制LinkedBlockingQueue的長度,這個阻塞隊列在高併發的狀況下可能會撐爆線程池。
2.看newFixedThreadPool建立源碼,和1同理。
3.看newCachedThreadPool源碼,此線程池容許最多Integer.MAX_VALUE個線程的建立,工做隊列是SynchronousQueue,如下是SynchronousQueue建立源碼:學習

/**
     * Creates a {@code SynchronousQueue} with nonfair access policy.
     */
    public SynchronousQueue() {
        this(false);
    }

    /**
     * Creates a {@code SynchronousQueue} with the specified fairness policy.
     *
     * @param fair if true, waiting threads contend in FIFO order for
     *        access; otherwise the order is unspecified.
     */
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

看註釋,SynchronousQueue在這裏是一個非公平的棧,這裏SynchronousQueue的問題不深究,往後會專門學習jdk中阻塞隊列
newCachedThreadPool線程池的大小由線程池最大線程數控制,自動建立的線程池極可能建立很是多的線程撐爆內存。
以上簡單從源碼角度回答問題緣由。
自動建立線程池就是直接使用Executors建立,弊端以上是簡單分析,可是看源碼,手動建立線程池就是脫了一層殼直接new ThreadPoolExecutor 而已,本質同樣的。ui

二.ThreadFactory
看上面ThreadPoolExecutor的構造函數源碼,我發現了ThreadFactory。~~~~
咱們建立一個task,交付給Executor,Executor會幫咱們建立一個線程去執行這個任務。ThreadFactory就是一個建立線程的工廠。
先看源碼

public interface ThreadFactory {

    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r);
}

/**
     * The default thread factory
     */
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

ThreadFactory是個接口,DefaultThreadFactory是JUC包中對此接口的實現。
先簡單看下這個工廠中的newThread方法,入參實現了Runnable接口,方法裏有Thread的構造函數,兩個if判斷,默認狀況下是非守護線程,線程優先級是5。線程的名字就是1的數據累加,在根據jvm是否開啓了安全管理器(jvm默認關閉)來決定線程組。若是咱們不自定一個ThreadFactory,那麼不管是採用自動建立線程池Executor,仍是手動建立線程池ThreadPoolExecutor,都是由DefaultThreadFactory來建立線程。
手動建立線程和自動建立線程有什麼不同的嗎?最粗略簡單的不同就是線程名字的不同,不一樣業務須要的線程池的線程取不一樣的名字,這樣在須要用到線程池的複雜業務狀況下,日誌會清爽不少,在實際開發中,這個優點很重要。
guava提供一個ThreadFactoryBuilder能夠幫助咱們實現本身的ThreadFactory。
ThreadFactoryBuilder中核心功能源碼

private static ThreadFactory build(ThreadFactoryBuilder builder) {
        final String nameFormat = builder.nameFormat;
        final Boolean daemon = builder.daemon;
        final Integer priority = builder.priority;
        final UncaughtExceptionHandler uncaughtExceptionHandler = builder.uncaughtExceptionHandler;
        final ThreadFactory backingThreadFactory = builder.backingThreadFactory != null ? builder.backingThreadFactory : Executors.defaultThreadFactory();
        final AtomicLong count = nameFormat != null ? new AtomicLong(0L) : null;
        return new ThreadFactory() {
            public Thread newThread(Runnable runnable) {
                Thread thread = backingThreadFactory.newThread(runnable);
                if (nameFormat != null) {
                    thread.setName(ThreadFactoryBuilder.format(nameFormat, count.getAndIncrement()));
                }

                if (daemon != null) {
                    thread.setDaemon(daemon);
                }

                if (priority != null) {
                    thread.setPriority(priority);
                }

                if (uncaughtExceptionHandler != null) {
                    thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
                }

                return thread;
            }
        };
    }

build設計模式,咱們能夠本身去定義線程名字,是否爲守護線程,線程優先級,線程的異常處理器。若是要選擇自定義ThreadFactory,強烈推薦ThreadFactoryBuilder。

三.submit和execute
task提交到線程池有兩種方式
1.submit 2.execute
demo:

public static void testSubmitAndExecute() throws ExecutionException, InterruptedException {
        ThreadFactoryBuilder orderThreadFactoryBuilder = new ThreadFactoryBuilder();
        ThreadFactory orderThread = orderThreadFactoryBuilder.setNameFormat("---order---").build();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 30, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10),
                orderThread);
        for (int i = 0; i < 5; i++) {
            int fina = i;
            Runnable runnable = () -> {
                if (fina == 3) {
                    throw new RuntimeException("demo exception");
                } else {
                    System.out.println("runnable---" + fina + "---" + Thread.currentThread().getName());
                    System.out.println();
                }
            };
//            threadPoolExecutor.submit(runnable);
            threadPoolExecutor.execute(runnable);
//            Future<?> submit = threadPoolExecutor.submit(runnable);
//            System.out.println(submit.get());
        }
    }

若是採用了execute執行,那麼結果就是execute執行結果
發現當某個線程出現異常的時候,其實並不會影響到其餘線程的執行。這裏是直接拋出了一個異常。也能夠經過ThreadFactory來定義Thread本身的UncaughtExceptionHandler屬性,即實現UncaughtExceptionHandler接口的類。

改爲submit形式處理:

Future<?> submit = threadPoolExecutor.submit(runnable);
            System.out.println(submit.get());

WX20191113-submit2.png
發現get會拿到一個返回結果,這裏是null,這表示線程執行成功沒有異常,並且會卡住,即當第i=3的線程異常以後,程序就拋出異常中止了。這是和execute不同的地方。

改爲submit形式處理2:

threadPoolExecutor.submit(runnable);

WX20191113-1510531.png
看日誌徹底看不到異常,明明是有異常拋出的。沒有異常打印,可是不影響其餘線程的運行。

這裏簡單記錄下實際線程池提交的不一樣狀況。


寫到目前爲止,從手冊中的一條規則開始思考,看源碼,簡單瞭解了爲何,主要是跟線程池的工做隊列有關,線程池中的線程怎麼來的,找到了ThreadFactory,

1.newCachedThreadPool的SynchronousQueue這個須要學習2.ThreadGroup須要學習,System.getSecurityManager()java安全器須要去了解。3.線程池submit和execute兩種方式在源碼層析的學習。4.Thread,runnable,callable這三種的本質不一樣和聯繫。

相關文章
相關標籤/搜索