Java多線程-線程池的建立使用與源碼拓展

多線程的設計方法確實能夠最大限度的發揮多核處理器的計算能力,提升吞吐量和性能。可是若是不加控制隨意使用線程,對系統的性能反而會產生不利。java

和進程相比,線程雖然是一種輕量級的,可是建立和關閉依然須要花費時間,若是每個小的任務都建立一個線程,則會頗有可能出現建立和銷燬線程佔用的時間大於該線程任務所消耗的時間。其次線程自己也是須要佔用內存空間的,大量的線程會搶佔寶貴的內存資源。編程

所以線程的使用須要掌握一個度,再有限的範圍內增長線程的數量能夠提升系統的性能,一旦超過這個範圍,大量的線程只會拖垮整個系統。bash

1 什麼是線程池

爲了不繫統頻繁的建立和銷燬線程,咱們可讓建立的線程複用。咱們可使用一個線程池維護一些線程,當你須要使用線程的時候,能夠從池子中隨便拿一個空閒線程,當完成工做時,並不急着關閉線程,而是將這些線程退回到線程池中,方便下次使用。多線程

簡而言之,再使用線程池後,建立線程編程了從線程池中得到空閒線程,關閉線程變爲想線程池歸還線程。併發

2 線程池的建立

線程池的成員都在java.util.concurrent包中,是JDK併發包的核心。其中ThreadPoolExecutor表示一個線程池。Executors類則是一個線程工廠的角色,經過Executors能夠取得一個擁有特定功能的線程池,經過Executors能夠取得一個特定功能的線程池。ide

2.1 newFixedThreadPool()方法

該方法返回一個固定線程數量的線程池。該線程池中的線程數量始終不變。當有一個新的任務提交時,線程池中如有空閒線程,則當即執行。若沒有則新的任務會暫存在一個任務隊列中,待有線程空閒時,便處理任務隊列中的隊列。函數

2.2 newSingleThreadExecutor()方法

該方法返回一個只有一個線程的線程池。如有多餘的任務被提交到線程池,任務會被保存在一個任務隊列中,待線程空閒,按先入先出的順序執行隊列中的任務。性能

2.3 newCachedThreadPool()方法

該方法返回一個可根據實際狀況調整線程數量的線程池。線程池的線程數量不肯定,但如有空閒線程能夠複用,則會優先使用可複用的線程。若全部的線程都在工做,又有新的任務提交,則會建立新的線程處理任務。全部線程在當前任務執行完畢後,將返回線程池進行復用。ui

2.4 newSingleThreadScheduledExecutor()方法

該方法返回一個ScheduledExecutorService對象,線程池大小爲1。ScheduledExecutorService接口在ExecutorService接口上擴展了在給定時間執行某任務的功能,如在某個固定的延時後執行,或者週期性執行某個任務。this

2.5 newScheduledThreadPool()方法

該方法會返回一個ScheduledExecutorService對象,但該線程池能夠執行線程數量。

建立固定大小的線程池

public class ThreadPoolThread {

    public static class MyTask implements Runnable{

        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID: " + Thread.currentThread().getId());
            try{
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public static void main(String[] args) {
            MyTask myTask = new MyTask();
            ExecutorService executorService = Executors.newFixedThreadPool(5);
            for (int i = 0;i< 10 ;i++){
                executorService.submit(myTask);
            }
        }
    }

}
複製代碼
1562554721820:Thread ID: 12
1562554721820:Thread ID: 15
1562554721820:Thread ID: 16
1562554721820:Thread ID: 13
1562554721820:Thread ID: 14
1562554722821:Thread ID: 15
1562554722821:Thread ID: 16
1562554722821:Thread ID: 12
1562554722821:Thread ID: 13
1562554722821:Thread ID: 14
複製代碼

計劃執行任務

newScheduledThreadPool()方法返回一個ScheduledExecutorService對象,能夠根據時間須要對線程進行調度。主要方法以下

public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,         long period,TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
複製代碼

和其餘線程不一樣,ScheduledExecutorService不必定會當即安排執行任務。他實際上是起到了計劃任務的做用,會在指定的時間對任務進行調度。

schedule()會在給定時間對任務進行一次調度。scheduleAtFixedRate()和scheduleWithFixedDelay()方法會對任務進行週期性調度,可是兩者仍是有區別的。scheduleAtFixedRate()方法的任務調度頻率是必定的,它是以上一個任務開始執行的時間爲起點,再在規定的時間調度下一次任務。而scheduleWithFixedDelay()方法是以上一個任務的結束後再通過規定時間進行任務調度。

public class ScheduleExecutorServiceDemo {

    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    System.out.println(System.currentTimeMillis());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },0,2, TimeUnit.SECONDS);
    }
}
複製代碼
1562555518798
1562555520798
1562555522798
1562555524799
1562555526800
複製代碼

能夠看出任務每兩秒被調度一次。

若是任務的執行時間大於調度時間,則任務就會在上一個任務結束後當即被調用。

將代碼修改成8秒

Thread.sleep(8000);
複製代碼
1562555680333
1562555688333
1562555696333
1562555704333
複製代碼

調度程序實際上並不保證任務會無限期的持續調用,若是任務自己拋出異常,那麼後續全部執行都會中斷。

3 線程池的內部實現

對於幾個核心的線程池,雖然看着建立的線程池有着不一樣的功能特色,可是其內部都是使用了ThreadPoolExecutor類。

看幾個線程池的建立源碼:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
    }
    
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
    }
複製代碼

能夠看出他們都是ThreadPoolExecutor類的封裝,來看一下ThreadPoolExecutor類的構造:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
複製代碼
  • corePoolSize:指定了線程池中的線程數量。
  • maximumPoolSize:指定了線程池中的最大線程數。
  • keepAliveTime:當線程池中的線程數量超過corePoolSize,多餘的空閒線程的存活時間,即超過corePoolSize的空閒線程在多長時間內被銷燬。
  • unit:keepAliveTime的單位。
  • workQueue:任務隊列,被提交但還沒有被執行的任務的存放隊列。
  • threadFacotry:線程工廠,用於建立線程。
  • handler:拒絕策略。當任務太多來不及處理時如何拒絕任務。

3.1 workQueue-任務隊列

參數workQueue指被提交但未執行的任務隊列,他是一個BlockingQueue接口的對象,僅用於存放Runnable對象。在ThreadPoolExecutor構造中可使用一下幾種BlockingQueue接口:

  • 直接提交隊列:有SynchronousQueue對象提供。SynchronousQueue沒有容量,每插入一個操做都要等待一個相應的刪除操做,反之每個刪除操做都要等待對應的插入操做。使用SynchronousQueue若是總有新的任務提交給線程執行,若是沒有空閒進程會嘗試建立新的線程,若是線程數已經到達最大值則執行拒絕策略。使用SynchronousQueue一般要設置很大的maximumPoolSize值,不然很容易執行拒絕策略。
  • 有界的任務隊列:有界的任務隊列使用ArrayBlockingQueue類實現,ArrayBlockingQueue類的構造函數必須帶一個容量參數,表示該隊列的最大容量。使用有界的任務隊列時,如有新的任務要執行,當線程池的實際線程小於corePoolSize則優先建立線程,若大於corePoolSize則將任務加入等待隊列。若等待隊列已滿,則在總線程不大於maximumPoolSize的狀況下建立新的線程執行任務,若大於maximumPoolSize則執行拒絕策略。
  • 無界的任務隊列:無界的任務隊列使用LinkedBlockingQueue類實現。與有界的任務隊列相比,無界的任務隊列不會出現入隊失敗的狀況。使用LinkedBlockingQueue當有新的任務須要線程執行時,若線程數小於corePoolSize,則會建立新的線程,但當線程數達到corePoolSize後就不會繼續增加了。後續如有新的任務加入有沒有空閒線程則直接進入隊列等待。
  • 優先任務隊列:優先任務隊列時帶有執行優先級的隊列。經過PriorityBlockingQueue類實現,能夠控制任務的執行前後順序。它是一個特殊的無界隊列。PriorityBlockingQueue類能夠根據任務自身的優先級順序進行執行。

4 拒絕策略

拒絕策略是當任務數量超過系統實際承載能力時執行的策略。拒絕策略能夠說時系統超負荷運行時的補救措施。

JDK內置了四種拒絕策略:

  • AbortPolicy策略:該策略會直接拋出異常,阻止正常工做。
  • CallerRunsPolicy策略:只要線程池未關閉,該策略直接在當前調用者線程中運行當前被丟棄的任務。這樣不會真的丟棄線程,可是會使任務提交線程性能降低。
  • DiscardOldestPolicy策略:該策略將丟棄最老的一個請求,也就是即將被執行的一個任務,並嘗試再次提交當前任務。
  • DiscardPolicy策略:該策略丟棄沒法處理的任務,不進行任何處理。

上面的策略都實現了RejectedExecutionHandler接口,若是以上策略沒法知足實際開發,能夠本身擴展。

RejectedExecutionHandler接口構造:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
複製代碼

自定義拒絕策略:

//拒絕策略demo
public class RejectThreadPoolDemo {

    public static class MyTask implements Runnable{

        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + ": Thread ID : " + Thread.currentThread().getId());
            try{
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask myTask = new MyTask();
        ThreadPoolExecutor es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10), Executors.privilegedThreadFactory(), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println(r.toString() + "被拒絕");
            }
        });
        for (int i = 0;i<Integer.MAX_VALUE;i++){
           es.submit(myTask);
           Thread.sleep(10);
        }
    }

}
複製代碼
1562575292467: Thread ID : 14
1562575292478: Thread ID : 15
1562575292489: Thread ID : 16
java.util.concurrent.FutureTask@b4c966a被拒絕
java.util.concurrent.FutureTask@2f4d3709被拒絕
java.util.concurrent.FutureTask@4e50df2e被拒絕
複製代碼

5 自定義線程建立:ThreadFactory

ThreadFactory是一個接口,他只有一個用來建立線程的方法。

Thread newThread(Runnable r);
複製代碼

經過自定義線程建立咱們能夠跟蹤線程池在什麼時候建立了多少線程,自定義線程名等。

public class ThreadFactoryDemo {

    static volatile int i = 0;

    public static class TestTask implements Runnable{

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName());
        }
    }

    public static void main(String[] args) {
        TestTask testTask = new TestTask();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r,"test--" + i);
                i++;
                return thread;
            }
        });
        for (int i = 0;i<5;i++){
            threadPoolExecutor.submit(testTask);
        }
    }

}
複製代碼
test--0
test--1
test--4
test--2
test--3
複製代碼

6 擴展線程池

雖然JDK已經幫咱們實現了穩定的線程池,但若是咱們想要對線程池進行一些擴展,好比監控任務執行的開始和結束時間怎麼辦呢。

ThreadPoolExecutor是一個能夠擴展的線程池,它提供了beforExecutor(),afterExecutor()和terminated()三個接口來對其進行擴展。

public class ThreadFactoryDemo {

    static volatile int i = 0;

    public static class TestTask implements Runnable{

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName());
        }
    }

    public static void main(String[] args) {
        TestTask testTask = new TestTask();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r,"test--" + i);
                i++;
                return thread;
            }
        }){
            @Override
            protected void beforeExecute(Thread t,Runnable r){
                System.out.println("task-----準備執行");
            }
        };
        for (int i = 0;i<5;i++){
            threadPoolExecutor.submit(testTask);
        }
    }

}
複製代碼
task-----準備執行
task-----準備執行
test--2
task-----準備執行
test--1
task-----準備執行
test--4
task-----準備執行
test--3
test--0
複製代碼

7 submit和execute的區別

7.1 execute()方法

execute提交的方式只能提交一個Runnable的對象,且該方法的返回值是void,也便是提交後若是線程運行後,和主線程就脫離了關係了,固然能夠設置一些變量來獲取到線程的運行結果。而且當線程的執行過程當中拋出了異常一般來講主線程也沒法獲取到異常的信息的,只有經過ThreadFactory主動設置線程的異常處理類才能感知到提交的線程中的異常。

7.2 sumbit()方法

sumbit()方法有三種形式:

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
複製代碼

sumbit方法會返回一個Future對象,這個Future對象表明這線程的執行結果,當主線程調用Future的get方法的時候會獲取到從線程中返回的結果數據。若是在線程的執行過程當中發生了異常,get會獲取到異常的信息。

相關文章
相關標籤/搜索