假設咱們編寫了一個Servlet應用,當用戶經過瀏覽器發起一個請求到達咱們服務器時,傳統的Servlet應用通常針對一個用戶請求建立一個線程去執行請求,等到請求執行完畢後,再銷燬線程。這種設計在用戶量幾百或者幾千的狀況下通常不會有什麼大問題,可是若是咱們的用戶量上達幾萬甚至幾十萬幾百萬,頻繁的建立、銷燬線程,將會給服務器帶來巨大的開銷,甚至會出現OOM(Out Of Memory)異常。所以,爲了節省資源的消耗,提升資源的利用率,引出了線程池化技術。java
import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Random; public class ThreadTest { public static void main(String[] args) throws InterruptedException { Long start = System.currentTimeMillis(); final Random random = new Random(); final List<Integer> list = Collections.synchronizedList(new ArrayList<>()); final List<Thread> threads = new ArrayList<>(); for (int i = 0; i < 10000; i++) { Thread thread = new Thread(() -> list.add(random.nextInt())); thread.start(); threads.add(thread); } for (Thread thread : threads) { thread.join(); } System.out.println("時間:" + (System.currentTimeMillis() - start)); System.out.println("大小:" + list.size()); } }
時間:882 大小:10000
import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class ThreadPoolTest { public static void main(String[] args) throws InterruptedException { Long start = System.currentTimeMillis(); final Random random = new Random(); final List<Integer> list = Collections.synchronizedList(new ArrayList<>()); ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10000; i++) { executorService.execute(() -> list.add(random.nextInt())); } executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.DAYS); System.out.println("時間:" + (System.currentTimeMillis() - start)); System.out.println("大小:" + list.size()); } }
時間:52 大小:10000
public interface Executor { void execute(Runnable command); }
Executor容許咱們提交若干待執行的任務,咱們再也不像之前同樣用new Thread(new RunnableTask()).start()的方式啓動一個線程去執行RunnableTask的run()方法,取而代之的是用Executor的實現類去執行,好比:
Executor executor = anExecutor; executor.execute(new RunnableTask1()); executor.execute(new RunnableTask2()); ...
class DirectExecutor implements Executor { public void execute(Runnable r) { r.run(); } }
class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); } }
class SerialExecutor implements Executor { final Queue<Runnable> tasks = new ArrayDeque<Runnable>(); final Executor executor; Runnable active; SerialExecutor(Executor executor) { this.executor = executor; } public synchronized void execute(final Runnable r) { tasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); if (active == null) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((active = tasks.poll()) != null) { executor.execute(active); } } }
ExecutorService擴展了Executor,ExecutorService不但具有Executor執行任務的能力,咱們還能夠關閉ExecutorService,這將使ExecutorService拒絕接受新提交的任務。ExecutorService.submit(...)方法是基於Executor.execute(Runnable command)封裝的,execute方法沒有任何返回,而submit會返回一個Future對象,經過Future對象咱們能夠取消任務或者等待任務將來的執行結果。
public interface ExecutorService extends Executor { /** * 關閉線程池,調用此方法後再也不接受新任務,但會處理線程池內 * 還沒有完成的任務,若是線程池已經關閉,再次調用此方法將無事發生。 * 這個方法不會等待已提交但還沒有完成的任務執行完畢,須要調用awaitTermination(long timeout, TimeUnit unit) * 來等待。 */ void shutdown(); /** * 調用此方法會嘗試中止全部正在運行的線程,好比:調用Thread.interrupt() * 標記線程已中斷,若是任務沒有響應中斷則線程沒法中止。這個方法會返回還沒有 * 執行的任務列表,它不會等待正在執行的任務執行完畢,須要調用awaitTermination(long timeout, TimeUnit unit) * 來等待。 */ List<Runnable> shutdownNow(); /** * 判斷線程池是否已關閉,true爲關閉。 */ boolean isShutdown(); /** * 若是調用shutdown()或者shutdownNow()後,全部任務都已完成,則返回true */ boolean isTerminated(); /** * 使當前調用此方法線程陷入阻塞,直到: * 1.調用中止線程池方法後,完成全部任務。 * 2.阻塞超時。 * 3.調用此方法線程被中斷。 * * @param timeout 最大等待時長 * @param unit 時長單位 * @return 若是任務都執行完畢返回true,若是超時或中斷則返回false */ boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; /** * 提交一個有返回值的任務後,將返回一個Future對象表明任務的運行結果, * 能夠調用Future.get()得到任務的執行結果,若是提交任務後想當即得到結果, * 能夠用:result = exec.submit(aCallable).get();這樣的方式得到, * 調用線程將陷入阻塞,直到線程池執行完任務,執行結果被放入到Future對象。 * * @param task 待執行的任務。 * @param <T> 任務執行結果的類型。 * @return 返回Future對象,線程池執行完畢任務後,執行結果會被放入到Future對象。 */ <T> Future<T> submit(Callable<T> task); /** * 提交一個可執行的任務和給定的執行結果,並返回一個Future對象表明該任務未來的運行結果, * 若是任務成功執行,Future對象將返回咱們給定的執行結果。 * * @param task 待執行的任務 * @param result 任務執行完畢後的返回結果 * @param <T> 返回結果的類型 * @return 返回Future對象,線程池執行完畢任務後,執行結果會被放入到Future對象。 */ <T> Future<T> submit(Runnable task, T result); /** * 提交待執行的任務並返回Future對象表明該任務將來的運行結果,若是任務執行成功, * 調用Future.get()將返回null。 * * @param task 待執行的任務 * @return 返回Future對象,表明該任務將來的運行結果。 */ Future<?> submit(Runnable task); }
class NetworkService implements Runnable { private final ServerSocket serverSocket; private final ExecutorService pool; public NetworkService(int port, int poolSize) throws IOException { serverSocket = new ServerSocket(port); pool = Executors.newFixedThreadPool(poolSize); } public void run() { // run the service try { for (;;) { pool.execute(new Handler(serverSocket.accept())); } } catch (IOException ex) { pool.shutdown(); } } } class Handler implements Runnable { private final Socket socket; Handler(Socket socket) { this.socket = socket; } public void run() { // read and service request on socket } }
void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } }
public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } }