Java併發之ThreadPoolExecutor源碼解析(一)

線程池

假設咱們編寫了一個Servlet應用,當用戶經過瀏覽器發起一個請求到達咱們服務器時,傳統的Servlet應用通常針對一個用戶請求建立一個線程去執行請求,等到請求執行完畢後,再銷燬線程。這種設計在用戶量幾百或者幾千的狀況下通常不會有什麼大問題,可是若是咱們的用戶量上達幾萬甚至幾十萬幾百萬,頻繁的建立、銷燬線程,將會給服務器帶來巨大的開銷,甚至會出現OOM(Out Of Memory)異常。所以,爲了節省資源的消耗,提升資源的利用率,引出了線程池化技術。java

線程池會維護若干線程,等待任務的到來,避免重複建立、銷燬線程形成的消耗,提升任務的響應速度,不須要建立線程就能夠當即執行任務,使用線程池能夠進行統一的分配、調優和監控,避免無節制的建立線程,提升系統的穩定性。瀏覽器

固然,線程池也並不是十全十美,它也有力不從心的場景,例如:線程池適合生命週期較短的任務,不適合耗時較長的任務;線程池沒法設置任務的優先級,也沒法單獨啓動或者終止某個線程。緩存

如今,咱們對比一下線程池執行任務和建立線程執行任務的優點。在ThreadTest中,咱們聲明瞭一個線程安全的list,並建立10000個線程併發往list添加隨機值,最後咱們等待全部線程執行完畢,打印程序的執行時間和list的大小。安全

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;

public class ThreadTest {

    public static void main(String[] args) throws InterruptedException {
        Long start = System.currentTimeMillis();
        final Random random = new Random();
        final List<Integer> list = Collections.synchronizedList(new ArrayList<>());
        final List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            Thread thread = new Thread(() -> list.add(random.nextInt()));
            thread.start();
            threads.add(thread);
        }
        for (Thread thread : threads) {
            thread.join();
        }
        System.out.println("時間:" + (System.currentTimeMillis() - start));
        System.out.println("大小:" + list.size());
    }
}

    

執行結果:bash

時間:882
大小:10000

  

能夠看到list的長度爲10000,程序執行了882毫秒。服務器

下面,咱們用線程池的方式來執行相同的邏輯,咱們聲明一個線程池executorService,並往線程池中提交10000個任務,每一個任務都向list添加一個隨機值:網絡

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ThreadPoolTest {

    public static void main(String[] args) throws InterruptedException {
        Long start = System.currentTimeMillis();
        final Random random = new Random();
        final List<Integer> list = Collections.synchronizedList(new ArrayList<>());
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10000; i++) {
            executorService.execute(() -> list.add(random.nextInt()));
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
        System.out.println("時間:" + (System.currentTimeMillis() - start));
        System.out.println("大小:" + list.size());
    }
}

  

執行結果:併發

時間:52
大小:10000

  

能夠看到線程池的執行時間相比建立線程,大大縮短。dom

下面,咱們就從源碼的角度,來剖析線程池的工做原理。ThreadPoolExecutor是java.util.concurrent包下提供的線程池實現類,下圖是ThreadPoolExecutor類的繼承關係,咱們將從上至下逐個分析ThreadPoolExecutor的父類:Executor、ExecutorService、AbstractExecutorService。異步

Executor

咱們先來看下Executor接口定義:

public interface Executor {
    void execute(Runnable command);
}

  

Executor容許咱們提交若干待執行的任務,咱們再也不像之前同樣用new Thread(new RunnableTask()).start()的方式啓動一個線程去執行RunnableTask的run()方法,取而代之的是用Executor的實現類去執行,好比:

 Executor executor = anExecutor;
 executor.execute(new RunnableTask1());
 executor.execute(new RunnableTask2());
 ...

  

Executor提供了一種新的方式,咱們只需提交任務,Executor自身負責如何調度線程來執行任務。Executor並不要求任務的執行必須是異步的,也能夠在提交完任務後,同步執行任務:

 class DirectExecutor implements Executor {
     public void execute(Runnable r) {
         r.run();
     }
 }

  

一般狀況下,Executor會將提交過來的任務放在另外一個線程執行,而不是經過調用線程來執行:

 class ThreadPerTaskExecutor implements Executor {
     public void execute(Runnable r) {
         new Thread(r).start();
     }
 }

  

一些Executor接口的實如今調度線程執行任務時會添加一些限制,好比咱們能夠以代理模式的思想來封裝Executor:

 class SerialExecutor implements Executor {
   final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
   final Executor executor;
   Runnable active;

   SerialExecutor(Executor executor) { 
     this.executor = executor;
   }

   public synchronized void execute(final Runnable r) {
     tasks.offer(new Runnable() {
       public void run() {
         try {
           r.run();
         } finally {
           scheduleNext();
         }
       }
     });
     if (active == null) { 
       scheduleNext();
     }
   }

   protected synchronized void scheduleNext() {
     if ((active = tasks.poll()) != null) { 
       executor.execute(active);
     }
   }
 }

  

在建立SerialExecutor對象時,會要求傳入一個executor對象,執行任務並非SerialExecutor對象自己,SerialExecutor並不執行任務,只是將任務緩存到隊列tasks,而executor纔是真正負責執行任務。

ExecutorService

ExecutorService擴展了Executor,ExecutorService不但具有Executor執行任務的能力,咱們還能夠關閉ExecutorService,這將使ExecutorService拒絕接受新提交的任務。ExecutorService.submit(...)方法是基於Executor.execute(Runnable command)封裝的,execute方法沒有任何返回,而submit會返回一個Future對象,經過Future對象咱們能夠取消任務或者等待任務將來的執行結果。

public interface ExecutorService extends Executor {

    /**
     * 關閉線程池,調用此方法後再也不接受新任務,但會處理線程池內
     * 還沒有完成的任務,若是線程池已經關閉,再次調用此方法將無事發生。
     * 這個方法不會等待已提交但還沒有完成的任務執行完畢,須要調用awaitTermination(long timeout, TimeUnit unit)
     * 來等待。
     */
    void shutdown();

    /**
     * 調用此方法會嘗試中止全部正在運行的線程,好比:調用Thread.interrupt()
     * 標記線程已中斷,若是任務沒有響應中斷則線程沒法中止。這個方法會返回還沒有
     * 執行的任務列表,它不會等待正在執行的任務執行完畢,須要調用awaitTermination(long timeout, TimeUnit unit)
     * 來等待。
     */
    List<Runnable> shutdownNow();

    /**
     * 判斷線程池是否已關閉,true爲關閉。
     */
    boolean isShutdown();

    /**
     * 若是調用shutdown()或者shutdownNow()後,全部任務都已完成,則返回true
     */
    boolean isTerminated();

    /**
     * 使當前調用此方法線程陷入阻塞,直到:
     *      1.調用中止線程池方法後,完成全部任務。
     *      2.阻塞超時。
     *      3.調用此方法線程被中斷。
     *
     * @param timeout 最大等待時長
     * @param unit    時長單位
     * @return 若是任務都執行完畢返回true,若是超時或中斷則返回false
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;

    /**
     * 提交一個有返回值的任務後,將返回一個Future對象表明任務的運行結果,
     * 能夠調用Future.get()得到任務的執行結果,若是提交任務後想當即得到結果,
     * 能夠用:result = exec.submit(aCallable).get();這樣的方式得到,
     * 調用線程將陷入阻塞,直到線程池執行完任務,執行結果被放入到Future對象。
     *
     * @param task 待執行的任務。
     * @param <T>  任務執行結果的類型。
     * @return 返回Future對象,線程池執行完畢任務後,執行結果會被放入到Future對象。
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * 提交一個可執行的任務和給定的執行結果,並返回一個Future對象表明該任務未來的運行結果,
     * 若是任務成功執行,Future對象將返回咱們給定的執行結果。
     *
     * @param task   待執行的任務
     * @param result 任務執行完畢後的返回結果
     * @param <T>    返回結果的類型
     * @return 返回Future對象,線程池執行完畢任務後,執行結果會被放入到Future對象。
     */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * 提交待執行的任務並返回Future對象表明該任務將來的運行結果,若是任務執行成功,
     * 調用Future.get()將返回null。
     *
     * @param task 待執行的任務
     * @return 返回Future對象,表明該任務將來的運行結果。
     */
    Future<?> submit(Runnable task);
}

  

咱們能夠用ExecutorService來模擬一個網絡服務,用Executors.newFixedThreadPool(int)工廠方法生成的線程池中的線程來處理傳入的網絡請求:

 class NetworkService implements Runnable {
   private final ServerSocket serverSocket;
   private final ExecutorService pool;

   public NetworkService(int port, int poolSize)
       throws IOException {
     serverSocket = new ServerSocket(port);
     pool = Executors.newFixedThreadPool(poolSize);
   }

   public void run() { // run the service
     try {
       for (;;) {
         pool.execute(new Handler(serverSocket.accept()));
       }
     } catch (IOException ex) {
       pool.shutdown();
     }
   }
 }

 class Handler implements Runnable {
   private final Socket socket;
   Handler(Socket socket) { this.socket = socket; }
   public void run() {
     // read and service request on socket
   }
 }

  

關閉ExecutorService分兩個階段,首先調用shutdown()拒絕再有新的任務提交,而後在必要的時候調用shutdownNow(),嘗試中斷正在執行的任務。

 void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }
 }

  

AbstractExecutorService

AbstractExecutorService是juc(java.util.concurrent)包下提供的ExecutorService接口的默認實現,在newTaskFor方法中返回FutureTask做爲RunnableFuture接口的實現。能夠看到不管是Runnable仍是Callable類型的任務,都會被封裝成RunnableFuture類型的任務來執行。

public abstract class AbstractExecutorService implements ExecutorService {

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
	
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
	
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
	
}
相關文章
相關標籤/搜索