Executor框架(一)Callable、Future、Executor和ExecutorService

本文已同步至我的博客 liaosi's blog-Executor框架(一)Callable、Future、Executor和ExecutorService

引言

Executor框架是指JDK 1.5中引入的一系列併發庫中與Executor相關的功能類,包括Executor、Executors、ExecutorService、Future、Callable等。java

爲何要引入Executor框架?

若是使用new Thread(...).start()的方法處理多線程,有以下缺點:多線程

  • 開銷大。對於JVM來講,每次新建線程和銷燬線程都會有很大的開銷。
  • 線程缺少管理。沒有一個池來限制線程的數量,若是併發量很高,會建立不少的線程,並且線程之間可能會有相互競爭,這將會過多得佔用系統資源,增長系統資源的消耗量。並且線程數量超過系統負荷,容易致使系統不穩定。

使用線程池的方式,有以下優勢:併發

  • 複用線程。經過複用建立的了的線程,減小了線程的建立、消亡的開銷。
  • 有效控制併發線程數。
  • 提供了更簡單靈活的線程管理。能夠提供定時執行、按期執行、單線程、可變線程數等多種線程使用功能。

Executor框架的UML圖

圖片描述

下面開始分析一下Executor框架中幾個比較重要的接口和類。框架

Callable

Callable 位於java.util.concurrent包下,它是一個接口,只聲明瞭一個叫作call()的方法。ide

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

Callable 接口相似於Runnable,二者都是爲那些其實例可能被另外一個線程執行的類設計的。和 Runnable 接口中的run()方法相似,Callable 提供一個call()方法做爲線程的執行體。可是call()方法比run()方法更增強大,這要體如今:測試

1)call 方法能夠有返回值。返回值的類型便是 Callable 接口傳遞進來的V類型。
2)call 方法能夠聲明拋出異常。

Future

Future 接口位於java.util.concurrent包下,是Java 1.5中引入的接口.
Future主要用來對具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果。必要時能夠經過get()方法獲取執行結果,get()方法會阻塞直到任務返回結果。this

public interface Future<V> {


    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

當你提交一個Callable對象給線程池時,將獲得一個Future對象,而且它和你傳入的Callable示例有相同泛型。spa

Future 接口中的5個方法:線程

  • cancel方法:用來取消任務,若是取消任務成功則返回true,若是取消任務失敗則返回false。
    參數mayInterruptIfRunning表示是否容許取消正在執行卻沒有執行完畢的任務,若是設置true,則表示能夠取消正在執行過程當中的任務。
    若是任務已經完成,則不管mayInterruptIfRunning爲true仍是false,此方法確定返回false,即若是取消已經完成的任務會返回false;
    若是任務正在執行,若mayInterruptIfRunning設置爲true,則返回true,若mayInterruptIfRunning設置爲false,則返回false;
    若是任務尚未執行,則不管mayInterruptIfRunning爲true仍是false,確定返回true。
  • isCancelled方法:表示任務是否被取消成功,若是在任務正常完成前被取消成功,則返回 true。
  • isDone方法:表示任務是否已經完成,若任務完成,則返回true;
  • get()方法:用來獲取執行結果,這個方法會產生阻塞,會一直等到任務執行完畢才返回;
  • get(long timeout, TimeUnit unit):用來獲取執行結果,若是在指定時間內,還沒獲取到結果,會拋出TimeoutException異常。

也就是說Future提供了三種功能:設計

1)判斷任務是否完成;
2)可以中斷任務;
3)可以獲取任務執行結果

Executor

Executor是一個接口,它將任務的提交與任務的執行分離開來,定義了一個接收Runnable對象的方法execute。Executor是Executor框架中最基礎的一個接口,相似於集合中的Collection接口。

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

ExecutorService

ExecutorService繼承了Executor,是一個比Executor使用更普遍的子類接口。定義了終止任務、提交任務、跟蹤任務返回結果等方法。
一個ExecutorService是能夠關閉的,關閉以後它將不能再接收任何任務。對於再也不使用的ExecutorService,應該將其關閉以釋放資源。

ExecutorService方法介紹

package java.util.concurrent;

import java.util.List;
import java.util.Collection;

public interface ExecutorService extends Executor {

    /**
     * 平滑地關閉線程池,已經提交到線程池中的任務會繼續執行完。
     */
    void shutdown();

    /**
     * 當即關閉線程池,返回尚未開始執行的任務列表。
     * 會嘗試中斷正在執行的任務(每一個線程調用 interruput方法),但這個行爲不必定會成功。
     */
    List<Runnable> shutdownNow();

    /**
     * 判斷線程池是否已經關閉
     */
    boolean isShutdown();

    /**
     * 判斷線程池的任務是否已經執行完畢。
     * 注意此方法調用以前須要先調用shutdown()方法或者shutdownNow()方法,不然老是會返回false
     */
    boolean isTerminated();

    /**
     * 判斷線程池的任務是否都執行完。
     * 若是沒有任務沒有執行完畢則阻塞,直至任務完成或者達到了指定的timeout時間就會返回
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 提交帶有一個返回值的任務到線程池中去執行(回調),返回的 Future 表示任務的待定結果。
     * 當任務成功完成後,經過 Future 實例的 get() 方法能夠獲取該任務的結果。
     * Future 的 get() 方法是會阻塞的。
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     *提交一個Runnable的任務,當任務完成後,能夠經過Future.get()獲取的是提交時傳遞的參數T result
     * 
     */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * 提交一個Runnable的人無語,它的Future.get()得不到任何內容,它返回值老是Null。
     * 爲何有這個方法?爲何不直接設計成void submit(Runnable task)這種方式?
     * 這是由於Future除了get這種獲取任務信息外,還能夠控制任務,
     具體體如今 Future的這個方法上:boolean cancel(boolean mayInterruptIfRunning)
     這個方法可以去取消提交的Rannable任務。
     */
    Future<?> submit(Runnable task);

    /**
     * 執行一組給定的Callable任務,返回對應的Future列表。列表中每個Future都將持有該任務的結果和狀態。
     * 當全部任務執行完畢後,方法返回,此時而且每個Future的isDone()方法都是true。
     * 完成的任務多是正常結束,也能夠是異常結束
     * 若是當任務執行過程當中,tasks集合被修改了,那麼方法的返回結果將是不肯定的,
       即不能肯定執行的是修改前的任務,仍是修改後的任務
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    /**
     * 執行一組給定的Callable任務,返回對應的Future列表。列表中每個Future都將持有該任務的結果和狀態。
     * 當全部任務執行完畢後或者超時後,方法將返回,此時而且每個Future的isDone()方法都是true。
     * 一旦方法返回,未執行完成的任務被取消,而完成的任務可能正常結束或者異常結束, 
     * 完成的任務能夠是正常結束,也能夠是異常結束
     * 若是當任務執行過程當中,tasks集合被修改了,那麼方法的返回結果將是不肯定的
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 執行一組給定的Callable任務,當成功執行完(沒拋異常)一個任務後此方法便返回,返回的是該任務的結果
     * 一旦此正常返回或者異常結束,未執行的任務都會被取消。 
     * 若是當任務執行過程當中,tasks集合被修改了,那麼方法的返回結果將是不肯定的
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    /**
     * 執行一組給定的Callable任務,當在timeout(超時)以前成功執行完(沒拋異常)一個任務後此方法便返回,返回的是該任務的結果
     * 一旦此正常返回或者異常結束,未執行的任務都會被取消。 
     * 若是當任務執行過程當中,tasks集合被修改了,那麼方法的返回結果將是不肯定的
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

shutdown() 和 shutdownNow() 是用來關閉鏈接池的兩個方法,並且這兩個方法都是在當前線程當即返回,不會阻塞至線程池中的方法執行結束。調用這兩個方法以後,鏈接池將不能再接受任務。
下面給寫幾個示例來加深ExecutorService的方法的理解。
先寫兩個任務類:ShortTask和LongTask,這兩個類都繼承了Runnable接口,ShortTask的run()方法執行很快,LongTask的run()方法執行時間爲10s。

public class LongTask implements Runnable {


    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(10L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("complete a long task");
    }

}
public class ShortTask implements Runnable {

    @Override
    public void run() {
        System.out.println("complete a short task...");
    }
    
}

測試shutdown()方法

@Test
    public void testShutdown() throws InterruptedException {
        ExecutorService threadpool = Executors.newFixedThreadPool(4);
        //將4個任務提交到有4個線程的線程池
        threadpool.submit(new ShortTask());
        threadpool.submit(new ShortTask());
        threadpool.submit(new LongTask());
        threadpool.submit(new ShortTask());

        //關閉線程池
        threadpool.shutdown();

        boolean isShutdown = threadpool.isShutdown();
        System.out.println("線程池是否已經關閉:" + isShutdown);

        final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
        while (!threadpool.awaitTermination(1L, TimeUnit.SECONDS)) {
            System.out.println("線程池中還有任務在執行,當前時間:" + sdf.format(new Date()));
        }

        System.out.println("線程池中已經沒有在執行的任務,線程池已徹底關閉!");
    }

控制檯輸出的結果:

complete a short task...
線程池是否已經關閉:true
complete a short task...
complete a short task...
線程池中還有任務在執行,當前時間:19:53:08
線程池中還有任務在執行,當前時間:19:53:09
線程池中還有任務在執行,當前時間:19:53:10
線程池中還有任務在執行,當前時間:19:53:11
線程池中還有任務在執行,當前時間:19:53:12
線程池中還有任務在執行,當前時間:19:53:13
線程池中還有任務在執行,當前時間:19:53:14
線程池中還有任務在執行,當前時間:19:53:15
線程池中還有任務在執行,當前時間:19:53:16
complete a long task
線程池中已經沒有在執行的任務,線程池已徹底關閉!

測試shutdownNow()方法

@Test
    public void testShutdownNow() throws InterruptedException {
        ExecutorService threadpool = Executors.newFixedThreadPool(3);
        //將5個任務提交到有3個線程的線程池
        threadpool.submit(new LongTask());
        threadpool.submit(new LongTask());
        threadpool.submit(new LongTask());
        threadpool.submit(new LongTask());
        threadpool.submit(new LongTask());

        //主線程睡眠2秒鐘,讓3個線程池的任務都開始執行
        TimeUnit.SECONDS.sleep(1L);

        //關閉線程池
        List<Runnable> waiteRunnables = threadpool.shutdownNow();
        System.out.println("尚未執行的任務數:" + waiteRunnables.size());

        boolean isShutdown = threadpool.isShutdown();
        System.out.println("線程池是否已經關閉:" + isShutdown);

        final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
        while (!threadpool.awaitTermination(1L, TimeUnit.SECONDS)) {
            System.out.println("線程池中還有任務在執行,當前時間:" + sdf.format(new Date()));
        }

        System.out.println("線程池中已經沒有在執行的任務,線程池已徹底關閉!");
    }

控制檯輸出:

java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at java.lang.Thread.sleep(Thread.java:340)
    at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
    at com.lzumetal.multithread.threadpooltest.LongTask.run(LongTask.java:11)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
尚未執行的任務數:2
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
線程池是否已經關閉:true
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
complete a long task
    at java.lang.Thread.run(Thread.java:748)
complete a long task
java.lang.InterruptedException: sleep interrupted
complete a long task
    at java.lang.Thread.sleep(Native Method)
    at java.lang.Thread.sleep(Thread.java:340)
    at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
    at com.lzumetal.multithread.threadpooltest.LongTask.run(LongTask.java:11)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at java.lang.Thread.sleep(Thread.java:340)
    at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
    at com.lzumetal.multithread.threadpooltest.LongTask.run(LongTask.java:11)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
線程池中已經沒有在執行的任務,線程池已徹底關閉!

從上面咱們看到當調用了試圖shutdownNow()後,那三個執行的任務都被interrupt了。並且awaitTermination(long timeout, TimeUnit unit)方法返回的是true。

測試submit(Callable<T> task)方法

Callabel任務類:

package com.lzumetal.multithread.threadpooltest;

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

public class CallableTask implements Callable<String> {

    @Override
    public String call() throws Exception {
        TimeUnit.SECONDS.sleep(5L);
        return "success";
    }


}

測試:

@Test
    public void testSubmitCallable() {

        ExecutorService threadpool = null;
        try {

            threadpool = Executors.newFixedThreadPool(3);

            final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
            System.out.println("提交一個callable任務到線程池,如今時間是:" + sdf.format(new Date()));

            Future<String> future = threadpool.submit(new CallableTask());

            System.out.println("獲取callable任務的結果:" + future.get() + ",如今時間是:" + sdf.format(new Date()));
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            if (threadpool != null) {
                threadpool.shutdown();
            }
        }

    }

控制檯輸出:

提交一個callable任務到線程池,如今時間是:20:25:27
獲取callable任務的結果:success,如今時間是:20:25:32
相關文章
相關標籤/搜索