java.util.concurrent包學習筆記(一)Executor框架

 

類圖:java


 

其實從類圖咱們能發現concurrent包(除去java.util.concurrent.atomic 和 java.util.concurrent.locks)中的內容並無特別多,大概分爲四類:BlockingQueue阻塞隊列體系、Executor線程組執行框架、Future線程返回值體系、其餘各類單獨的併發工具等。多線程

首先學習的是Executor體系,是咱們處理多線程最常接觸的內容。首先咱們單獨看下繼承體系:併發

Executor是頂級接口,裏面只有一個方法:框架


 

public interface Executor {
    void execute(Runnable command);---執行一個Runnable對象,Runnable在前面的文章裏面已經講到,是線程的頂級接口,裏面有個run方法
}

ExecutorService是咱們常常用到的多線程執行框架的聲明,源碼也不多,咱們逐個方法進行解釋:函數

public interface ExecutorService extends Executor {
    void shutdown();----關閉線程執行框架中的線程,但效果是再也不接受新線程加入,而且等待線程執行結束後關閉Executor

    List<Runnable> shutdownNow();---大致同上的,可是該命令會嘗試關閉正在運行中的線程,可是也僅僅是調用terminate方法而後讓jdk去決定是否結束,同時該方法返回那些awaiting狀態的線程組

    boolean isShutdown();---是否已經被關閉的狀態

    boolean isTerminated();---是否已經被停止的狀態,在shutdown和shutdownnow被調用後,而且線程所有執行結束,該狀態纔是true,不然都是false

    boolean awaitTermination(long timeout, TimeUnit unit) ---若是線程組terminate了,返回true,超時時間到了返回false。
        throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);--提交一個Callable的回調,而後執行完成後將結果放入Future對象。

    <T> Future<T> submit(Runnable task, T result);--提交一個Runnable接口實現,而後result是Future返回值

    Future<?> submit(Runnable task);--提交一個Runnable接口實現,若是執行完成,future.get()能夠返回一個null

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)--執行提交的Callable集合,而後返回各自的執行結果的Future對象列表,每一個元素isDone都是true
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)--執行提交的task集合,執行完成或者timeout以後返回結果,isDone爲true
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)--執行提交的task集合,有一個執行完成就返回結果
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)--同上,有一個執行完成或者有timeout出現
        throws InterruptedException, ExecutionException, TimeoutException;

 AbstractExecutorService實現了ExecutorService接口,而且聲明爲抽象類,然而其中一個抽象方法都沒有。。.工具


 

public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {---爲子類定義了一個建立RunnableFuture對象的快捷方法
return new FutureTask<T>(runnable, value);
}
---其他方法都是接口的實現方法

}

 還有一個繼承了ExecutorService的接口的接口:學習


 

public interface ScheduledExecutorService extends ExecutorService { ---定義延遲執行的動做
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit); ---延遲delay個時間單位後開始執行

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);---延遲delay個時間單位後執行並返回Future對象

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);在initialDelay、initialDelay+N*period分別出發
  public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                               long initialDelay,
                               long delay,
                               TimeUnit unit); }--initialDelay開始進行,後面每次執行成功後的dealy時間單位開始

 


接下來是重磅的內容ThreadPoolExecutor 也就是Executor框架的核心邏輯所在,ThreadPoolExecutor類算上註釋有2100多行,但裏面有不少一部分是方法的詳細說明,還有大量的變量和private/protected的方法,真正開放出來的public方法不多,咱們只要逐個分析這些public方法就能夠了。this

咱們首先來看構造函數,構造函數有多個重載方法,目前只看參數最全的狀況:atom

public ThreadPoolExecutor(int corePoolSize, ----池子裏面的線程數量,即使idle狀態的線程也會保留這個數量
                              int maximumPoolSize,----池子裏面能盛放最大線程數量
                              long keepAliveTime,----當線程數量大於core核心數量的時候,而且裏有idle狀態的線程,那麼最大能夠被terminate的的等待時間
                                 (就是在cpu借的線程資源若是閒置多久就必須還回去) TimeUnit unit,--timeUnit BlockingQueue
<Runnable> workQueue,---任務的隊列 ThreadFactory threadFactory,---線程工廠 RejectedExecutionHandler handler) {---線程池處理不過來任務隊列的時候的默認處理方法 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
public void execute(Runnable command) {}---執行一個任務
public void shutdown() {---使用可重入鎖實現,關閉線程池執行框架,再也不接受新的任務,關閉idle狀態的線程,等待正在執行的執行完成。
final ReentrantLock mainLock = this.mainLock;
}
public List<Runnable> shutdownNow() {}--- 同ExecutorService中的接口說明,與shuwdown的區別在於會嘗試關閉正在執行的線程

public boolean isShutdown() {}---是否已經關閉(沒有running狀態的線程了)

-------一堆get和set方法

public int getActiveCount() {}---得到活動的線程數量
public long getCompletedTaskCount() {}---或者完成的任務數量

 其實咱們只要知道ThreadPoolExecutor的構造參數就能夠明白它的工做方式,並且咱們須要作的也是把這些內容構造好。spa

咱們最經常使用的Executors裏面的更加方便的Pool的類型其實都是爲咱們加了一些default參數的ThreadPoolExecutor:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
也就是其實最後都是會到ThreadPoolExecutor上面去實現,區別就是一個用着方便,一個你能夠控制的粒度更小,進而可能效率更高。

 

 咱們在類圖的繼承體系裏面還能發現一個叫作ForkJoinPool的類,這個類實際上是不少人口中所謂將來java併發方向的類,因爲涉及的內容較多,後面單獨隨筆進行學習和理解吧。

相關文章
相關標籤/搜索