二. 線程管理之線程池

不忘初心 砥礪前行, Tomorrow Is Another Day !html

相關文章

本文概要:java

  1. 認識Executor與ExecutorService
  2. 理解Future與FuturTask
  3. ThreadPoolExecutor介紹
  4. Executors工廠類

線程池的優勢:bash

  • 重用線程,避免沒必要要的對象建立和銷燬.
  • 可有效控制最大併發線程數,提升系統資源的使用率.避免因線程過多強佔系統資源致使阻塞.

一. 認識Executor與ExecutorService

  • 認識Executor

Executor做爲線程池的頂級接口. 在Java的設計中,Runnable負責任務的提交. Executor負責任務的執行.將任務進行了解耦.多線程

public interface Executor {

    void execute(Runnable command);//執行已提交的 Runnable 任務對象
}
複製代碼
  • 認識ExecutorService

ExecutorService接口繼承了Executor接口,定義了一些生命週期的方法併發

public interface ExecutorService extends Executor {

   
    void shutdown();//順次地關閉ExecutorService,中止接收新的任務,等待全部已經提交的任務執行完畢以後,關閉ExecutorService


    List<Runnable> shutdownNow();//阻止等待任務啓動並試圖中止當前正在執行的任務,中止接收新的任務,返回處於等待的任務列表


    boolean isShutdown();//判斷線程池是否已經關閉

    boolean isTerminated();//若是關閉後全部任務都已完成,則返回 true。注意,除非首先調用 shutdown 或 shutdownNow,不然 isTerminated 永不爲 true。

    
    boolean awaitTermination(long timeout, TimeUnit unit)//等待(阻塞)直到關閉或最長等待時間或發生中斷,timeout - 最長等待時間 ,unit - timeout 參數的時間單位  若是此執行程序終止,則返回 true;若是終止前超時期滿,則返回 false 

 
    <T> Future<T> submit(Callable<T> task);//提交一個返回值的任務用於執行,返回一個表示任務的未決結果的 Future。該 Future 的 get 方法在成功完成時將會返回該任務的結果。


    <T> Future<T> submit(Runnable task, T result);//提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。該 Future 的 get 方法在成功完成時將會返回給定的結果。

 
    Future<?> submit(Runnable task);//提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。該 Future 的 get 方法在成功 完成時將會返回 null


    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)//執行給定的任務,當全部任務完成時,返回保持任務狀態和結果的 Future 列表。返回列表的全部元素的 Future.isDone() 爲 true。
        throws InterruptedException;


    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)//執行給定的任務,當全部任務完成時,返回保持任務狀態和結果的 Future 列表。返回列表的全部元素的 Future.isDone() 爲 true。
        throws InterruptedException;


    <T> T invokeAny(Collection<? extends Callable<T>> tasks)//執行給定的任務,若是在給定的超時期滿前某個任務已成功完成(也就是未拋出異常),則返回其結果。一旦正常或異常返回後,則取消還沒有完成的任務。
        throws InterruptedException, ExecutionException;


    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
複製代碼

在ExecutorService中運用到了Future相關知識,下面對Futue作一個簡單瞭解.異步

二. 理解Future與FutureTask

  • 理解Future

Future簡單理解就是對異步任務的統計類,包含進行取消、查詢是否完成、獲取結果等操做.ide

Future類位於java.util.concurrent包下,對應源碼.post

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    //表示若是在任務完成前被取消成功,則返回true
    boolean isCancelled();
    ////表示任務執行結束,不管是正常結束/中斷/發生異常,都返回true
    boolean isDone();
    //用來獲取執行結果,這個方法會產生阻塞,會一直等到任務執行完畢才返回
    V get() throws InterruptedException, ExecutionException;
    //用來獲取執行結果,有超時機制,若是阻塞時間超過了指定時間,會拋出異常
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
複製代碼
  • 理解FutureTask

FutureTask既實現了Future接口,又實現了Runnable接口.學習

對應源碼ui

public class FutureTask<V> implements RunnableFuture<V>

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

//構造方法
public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
}

public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
}

public void run() {
        if (state != NEW ||
            !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //回調callable的Call方法,獲取異步任務返回值.
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            //...省略部分代碼
        }
    }
複製代碼

因此咱們能夠經過Runnable接口實現線程,又能夠經過Future接口獲取線程執行完後的結果.

使用示例 上一篇文章Thread基礎中已經使用過Future了,這裏直接換成FutureTask的使用.

public class MyCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        System.out.println("子線程正在幹活");
        Thread.sleep(3000);
        return "實現Callable接口,重寫Call方法";
    }

    public static void main(String[] args) throws Exception {
        useFutureTask();
    }

    private static void  useFutureTask() throws InterruptedException, ExecutionException {
        MyCallable myCallable = new MyCallable();
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        //定義一個Task,再提交任務.
        FutureTask<String> futureTask = new FutureTask<>(myCallable);
        executorService.submit(futureTask);

        executorService.shutdown();

        Thread.sleep(1000);//模擬正在幹活
        System.out.println("主線程正在幹活");
        //阻塞當前線程,等待返回結果.
        System.out.println("等待返回結果:" + futureTask.get());
        System.out.println("主線程全部的活都幹完了");
    }
}

//調用輸出
子線程正在幹活
主線程正在幹活
等待返回結果:實現Callable接口,重寫Call方法
主線程全部的活都幹完了
複製代碼

說到這裏,可能有些人和我同樣迷糊,ExecutorService能夠經過submit和execute進行執行任務.那麼這二者區別是啥.咱們來簡單的總結一下.

對應源碼

//submit方法
<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

//execute方法
void execute(Runnable command);
複製代碼

從源碼對比能夠知道.

  • 接收的參數不同.execute僅能接收Runnable類型.
  • submit()有返回值,而execute()沒有
  • submit()能夠進行Exception處理.

經過總結會發現這其實就與前面介紹的實現Runable與Callable接口的特色相似.

三. ThreadPoolExecutor介紹

全部線程池都是經過ThreadPoolExecutor來建立.

對應源碼

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
複製代碼

參數解析

  • corePoolSize : 核心線程數
  • maximumPoolSize : 最大線程數
  • keepAliveTime : 非核心線程閒置時的超時時間.若是設置了allowCoreThreadTimeOut爲true,那麼也將做用於核心線程.
  • unit : 時間單位
  • workQueue : 任務隊列.存放Runnable任務.

另外還有2個參數,通常咱們不須要去手動設定.

  • ThreadFactory : 線程工廠,建立線程用.
  • RejectedExecutionHandler : 任務隊列已滿或者沒法成功執行任務時,會調用此handler的rejectedExecution方法拋出異常通知調用者,俗稱飽和策略.

理解ThreadPoolExecutor處理的流程


線程池工做流程
  1. 在提交任務時,檢查是否達到核心線程數量.
    • 未達到,則啓動核心線程去執行任務.
    • 已達到,進行下一步.
  2. 檢查任務隊列是否已滿.
    • 隊列未滿,加入任務隊列.
    • 隊列已滿,進行下一步.
  3. 檢查是否到達最大線程數.
    • 未達到,啓動非核心線程去執行任務.
    • 已達到,執行飽和策略.
  • 一旦有線程處於閒置時,就會去隊列中獲取任務執行.

四. Executors工廠類

經過Executors提供四種線程池,newFixedThreadPool、newSingleThreadExecutor、newScheduledThreadPool、ewCachedThreadPool.

  • FixedThreadPool

可重用固定線程數的線程池.

對應源碼

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

複製代碼
  • 只有核心線程.數量固定
  • 無超時機制,隊列大小無限制.

使用示例

private static void executeFixedThreadPool() {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 20; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("線程:" + Thread.currentThread().getName());
                }
            });
        }
    }
複製代碼
  • SingleThreadExecutor

單線程的線程池

對應源碼

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
複製代碼

同FixedThreadPool相似.

  • 固定只有一個核心線程. 所以它能確保全部任務在一個線程中按順序執行.

使用示例

private static void executeSingleThreadExecutor() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 20; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("線程:" + Thread.currentThread().getName());
                }
            });
        }
    }
複製代碼
  • ScheduledThreadPool

支持定時和週期性任務的線程池

對應源碼

ScheduledExecutorService.java
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

ScheduledThreadPoolExecutor.java    
public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }
複製代碼

同FixedThreadPool相似.

  • 核心線程數量固定,非核心線程不定.

使用示例

private static void executeScheduledThreadPool(int flag) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("後"+System.currentTimeMillis());
//                try {
//                    Thread.sleep(10000);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
                System.out.println("線程:" + Thread.currentThread().getName());

            }
        };
        System.out.println("前"+System.currentTimeMillis());
        switch (flag) {
            case 0:
                //延遲1000毫秒後開始執行
                executorService.schedule(runnable, 1000, TimeUnit.MILLISECONDS);
                break;
            case 1:
                //延遲1000毫秒後開始執行,後面每隔2000毫秒執行一次.強調任務的執行頻率,不受任務執行時間影響,過期不候.
                executorService.scheduleAtFixedRate(runnable, 1000, 2000, TimeUnit.MILLISECONDS);
                break;
            case 2:
                //延遲1000毫秒後開始執行,後面每次延遲3000毫秒執行一次.強調任務執行的間隔.
                executorService.scheduleWithFixedDelay(runnable, 1000, 3000, TimeUnit.MILLISECONDS);

                break;
            default:
                break;
        }
    }

複製代碼
  • CachedThreadPool

對應源碼

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
複製代碼
  • 只有非核心線程.
  • 有超時機制,隊列沒法存儲,被當即執行.

所以適合大量的耗時較少的任務.

使用示例

private static void executeCachedThreadPool() {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 20; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("線程:" + Thread.currentThread().getName());
                }
            });
        }
    }
複製代碼

關於線程池相關就介紹到這裏了.

因爲本人技術有限,若有錯誤的地方,麻煩你們給我提出來,本人不勝感激,你們一塊兒學習進步.

參考連接:

相關文章
相關標籤/搜索