使用 Executors,ThreadPoolExecutor,建立線程池,源碼分析理解

以前建立線程的時候都是用的 newCachedThreadPoo,newFixedThreadPool,newScheduledThreadPool,newSingleThreadExecutor 這四個方法。
固然 Executors 也是用不一樣的參數去 new ThreadPoolExecutor 實現的,本文先分析前四種線程建立方式,後在分析 new ThreadPoolExecutor 建立方式java

使用 Executors 建立線程池

1.newFixedThreadPool()

因爲使用了LinkedBlockingQueue因此maximumPoolSize沒用,當corePoolSize滿了以後就加入到LinkedBlockingQueue隊列中。 每當某個線程執行完成以後就從LinkedBlockingQueue隊列中取一個。 因此這個是建立固定大小的線程池。git

源碼分析github

public static ExecutorService newFixedThreadPool(int nThreads) {
	return new ThreadPoolExecutor(
			nThreads,
			nThreads,
			0L,
			TimeUnit.MILLISECONDS,
			new LinkedBlockingQueue<Runnable>());
}
複製代碼

2.newSingleThreadPool()

建立線程數爲1的線程池,因爲使用了LinkedBlockingQueue因此maximumPoolSize 沒用,corePoolSize爲1表示線程數大小爲1,滿了就放入隊列中,執行完了就從隊列取一個。緩存

源碼分析函數

public static ExecutorService newSingleThreadExecutor() {
	return new Executors.FinalizableDelegatedExecutorService
			(
					new ThreadPoolExecutor(
							1,
							1,
							0L,
							TimeUnit.MILLISECONDS,
							new LinkedBlockingQueue<Runnable>())
			);
}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
	this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
			Executors.defaultThreadFactory(), defaultHandler);
}
複製代碼

3.newCachedThreadPool()

建立可緩衝的線程池。沒有大小限制。因爲corePoolSize爲0因此任務會放入SynchronousQueue隊列中,SynchronousQueue只能存放大小爲1,因此會馬上新起線程,因爲maxumumPoolSizeInteger.MAX_VALUE因此能夠認爲大小爲2147483647。受內存大小限制。oop

源碼分析源碼分析

public static ExecutorService newCachedThreadPool() {
	return new ThreadPoolExecutor(
			0,
			Integer.MAX_VALUE,
			60L,
			TimeUnit.SECONDS,
			new SynchronousQueue<Runnable>());
}

public ThreadPoolExecutor(int corePoolSize,
						  int maximumPoolSize,
						  long keepAliveTime,
						  TimeUnit unit,
						  BlockingQueue<Runnable> workQueue) {
	this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
			Executors.defaultThreadFactory(), defaultHandler);
}
複製代碼

使用 ThreadPoolExecutor 建立線程池

源碼分析 ,ThreadPoolExecutor 的構造函數測試

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
	if (corePoolSize < 0 ||
			maximumPoolSize <= 0 ||
			maximumPoolSize < corePoolSize ||
			keepAliveTime < 0)
		throw new IllegalArgumentException();
	if (workQueue == null || threadFactory == null || handler == null)
		throw new NullPointerException();
	this.corePoolSize = corePoolSize;
	this.maximumPoolSize = maximumPoolSize;
	this.workQueue = workQueue;
	this.keepAliveTime = unit.toNanos(keepAliveTime);
	this.threadFactory = threadFactory;
	this.handler = handler;
}
複製代碼

構造函數參數

一、corePoolSize 核心線程數大小,當線程數 < corePoolSize ,會建立線程執行 runnableui

二、maximumPoolSize 最大線程數, 當線程數 >= corePoolSize的時候,會把 runnable 放入 workQueue中this

三、keepAliveTime 保持存活時間,當線程數大於corePoolSize的空閒線程能保持的最大時間。

四、unit 時間單位

五、workQueue 保存任務的阻塞隊列

六、threadFactory 建立線程的工廠

七、handler 拒絕策略

任務執行順序

一、當線程數小於 corePoolSize時,建立線程執行任務。

二、當線程數大於等於 corePoolSize而且 workQueue 沒有滿時,放入workQueue

三、線程數大於等於 corePoolSize而且當 workQueue 滿時,新任務新建線程運行,線程總數要小於 maximumPoolSize

四、當線程總數等於 maximumPoolSize 而且 workQueue 滿了的時候執行 handlerrejectedExecution。也就是拒絕策略。

四個拒絕策略

ThreadPoolExecutor默認有四個拒絕策略:

一、ThreadPoolExecutor.AbortPolicy() 直接拋出異常RejectedExecutionException

二、ThreadPoolExecutor.CallerRunsPolicy() 直接調用run方法而且阻塞執行

三、ThreadPoolExecutor.DiscardPolicy() 直接丟棄後來的任務

四、ThreadPoolExecutor.DiscardOldestPolicy() 丟棄在隊列中隊首的任務

固然能夠本身繼承RejectedExecutionHandler來寫拒絕策略.

TestThreadPoolExecutor 示例

TestThreadPoolExecutor.java

package io.ymq.thread.TestThreadPoolExecutor;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/** * 描述: * * @author yanpenglei * @create 2017-10-12 15:39 **/
public class TestThreadPoolExecutor {
    public static void main(String[] args) {

        long currentTimeMillis = System.currentTimeMillis();

        // 構造一個線程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 6, 3,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3)
        );

        for (int i = 1; i <= 10; i++) {
            try {
                String task = "task=" + i;
                System.out.println("建立任務並提交到線程池中:" + task);
                threadPool.execute(new ThreadPoolTask(task));

                Thread.sleep(100);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        try {
            //等待全部線程執行完畢當前任務。
            threadPool.shutdown();

            boolean loop = true;
            do {
                //等待全部線程執行完畢當前任務結束
                loop = !threadPool.awaitTermination(2, TimeUnit.SECONDS);//等待2秒
            } while (loop);

            if (loop != true) {
                System.out.println("全部線程執行完畢");
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println("耗時:" + (System.currentTimeMillis() - currentTimeMillis));
        }


    }
}

複製代碼

ThreadPoolTask.java

package io.ymq.thread.TestThreadPoolExecutor;

import java.io.Serializable;

/** * 描述: * * @author yanpenglei * @create 2017-10-12 15:40 **/
public class ThreadPoolTask implements Runnable, Serializable {

    private Object attachData;

    ThreadPoolTask(Object tasks) {
        this.attachData = tasks;
    }

    public void run() {

        try {

            System.out.println("開始執行任務:" + attachData + "任務,使用的線程池,線程名稱:" + Thread.currentThread().getName());

            System.out.println();

        } catch (Exception e) {
            e.printStackTrace();
        }
        attachData = null;
    }

}
複製代碼

遇到java.util.concurrent.RejectedExecutionException

第一

你的線程池 ThreadPoolExecutor 顯示的 shutdown() 以後,再向線程池提交任務的時候。 若是你配置的拒絕策略是 AbortPolicy 的話,這個異常就會拋出來。

第二

當你設置的任務緩存隊列太小的時候,或者說, 你的線程池裏面全部的線程都在幹活(線程數== maxPoolSize),而且你的任務緩存隊列也已經充滿了等待的隊列, 這個時候,你再向它提交任務,則會拋出這個異常。

響應

能夠看到線程 pool-1-thread-1 到5 循環使用

建立任務並提交到線程池中:task=1
開始執行任務:task=1任務,使用的線程池,線程名稱:pool-1-thread-1

建立任務並提交到線程池中:task=2
開始執行任務:task=2任務,使用的線程池,線程名稱:pool-1-thread-2

建立任務並提交到線程池中:task=3
開始執行任務:task=3任務,使用的線程池,線程名稱:pool-1-thread-3

建立任務並提交到線程池中:task=4
開始執行任務:task=4任務,使用的線程池,線程名稱:pool-1-thread-4

建立任務並提交到線程池中:task=5
開始執行任務:task=5任務,使用的線程池,線程名稱:pool-1-thread-5

建立任務並提交到線程池中:task=6
開始執行任務:task=6任務,使用的線程池,線程名稱:pool-1-thread-1

建立任務並提交到線程池中:task=7
開始執行任務:task=7任務,使用的線程池,線程名稱:pool-1-thread-2

建立任務並提交到線程池中:task=8
開始執行任務:task=8任務,使用的線程池,線程名稱:pool-1-thread-3

建立任務並提交到線程池中:task=9
開始執行任務:task=9任務,使用的線程池,線程名稱:pool-1-thread-4

建立任務並提交到線程池中:task=10
開始執行任務:task=10任務,使用的線程池,線程名稱:pool-1-thread-5

全部線程執行完畢
耗時:1015
複製代碼

測試代碼

github github.com/souyunku/ym…

Contact

  • 做者:鵬磊
  • 出處:www.ymq.io
  • Email:admin@souyunku.com
  • 版權歸做者全部,轉載請註明出處
  • Wechat:關注公衆號,搜雲庫,專一於開發技術的研究與知識分享

關注公衆號-搜雲庫
搜雲庫
相關文章
相關標籤/搜索