Java線程池Executor&ThreadPool

  java自1.5版本以後,提供線程池,供開發人員快捷方便的建立本身的多線程任務。下面簡單的線程池的方法及說明。java

  一、Executor 緩存

    線程池的頂級接口。定義了方法execute(Runnable),該方法接收一個Runnable實例,用來執行一個任務,該任務便是一個實現Runnable接口的類。多線程

    此服務方法無返回值,緣由是由於實現Runnable接口的類的run方法是無返回(void)的。框架

    經常使用方法 : void execute(execute)jvm

    做用 : 啓動並執行線程任務ide

  二、ExecutorService工具

    繼承自Executor接口,提供了更多的方法調用,例如優雅關閉方法shutdown,有返回值的submit。測試

    2.一、ExecutorService生命週期this

      運行 - Running 、關閉 - shuttingdown、終止 - terminatedspa

      Running : 線程池正在執行中,活動狀態。建立後即進入此狀態

      shuttingdown : 優雅關閉,線程池正在關閉中。再也不接收新的線程任務,已有的任務(正在處理的 + 隊列中阻塞的),處理完畢後,關閉線程池。

              調用shutdown()方法,即進入此狀態

      terminated : 線程池已關閉。

    2.二、submit方法

      有返回值,Future類型。重載了方法,submit(Runnable)不須要提供返回值。submit(Callable)、submit(Runnable,T)能夠提供線程執行後的結果返回值。

    2.三、Future

      線程執行完畢結果。獲取線程執行結果是經過get()方法獲取。get()無參,阻塞等待線程執行結束。

      get(long timeout, TimeUnit unit)有參,阻塞等待固定時長,超時未獲取,則拋出異常。

    2.四、Callable

      相似Runnable的一個線程接口。其中的對應run的方法是call方法。此接口提供了線程執行完畢返回值。

package com.cn.cfang.executor;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Test {
    public static void main(String[] args) throws Exception{
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        Data data = new Data();
//        Future<Data> future = executorService.submit(new Task(data), data); //runnable
        Future<Data> future = executorService.submit(new Task1(data)); //callable
        System.out.println(future.get().getName());
        executorService.shutdown();
    }
}

class Data {
    String name;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
}

class Task implements Runnable{
    Data data;
    public Task(Data data) {
        this.data = data;
    }
    @Override
    public void run() {
         data.setName("hello world");
    }
}

class Task1 implements Callable<Data>{
    Data data;
    public Task1(Data data) {
        this.data = data;
    }
    @Override
    public Data call() throws Exception {
        data.setName("hello world");
        return data;
    }
    
}

   三、Executors工具類

    提供了不少的工廠方法用於建立線程池,返回的線程池都實現了ExecutorService接口。

線程池屬於進程級的重量級資源,默認的生命週期同JVM一致,當開啓線程池後,直到jvm關閉,是線程池的默認的生命週期。
若是手動調用shutdown方法,可優雅關閉線程池,在當前全部任務執行結束後,關閉線程池。

  四、幾種經常使用的線程池

    4.一、FixedThreadPool

     容量固定的線程池。使用LinkedBlockingQueue做爲任務隊列,當任務數量大於線程池容量的時候,未執行的任務進入任務等待隊列LinkedBlockingQueue中,

     當線程有空閒的時候,自動從隊列中取出任務執行。

        使用場景: 大多數狀況下,推薦使用的線程池。由於os系統和硬件是有線程上限限制的,不可能去無限的提供線程池操做。

    4.二、CachedThreadPool

      緩存線程池。容量 0-Integer.MAX_VALUE,自動根據任務數擴容:若是線程池中的線程數不知足任務執行需求,則建立新的線程並添加到池中。

      生命週期默認60s,當線程空閒時長到60s的時候,自動終止銷燬釋放線程,移除線程池。

      使用場景 : 可用於測試最高負載量,用於對FixedThreadPool容量的參考。

      注意,放入CachedThreadPool的線程沒必要擔憂其結束,超過TIMEOUT(默認60s)不活動,其會自動被終止。 

    4.三、ScheduledThreadPool

      定時及週期性的任務執行的線程池,多數狀況下可用來替代Timer類。

public static void main(String[] args) {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
        System.out.println(service);
        
        // 定時完成任務。 scheduleAtFixedRate(Runnable, start_limit, limit, timeunit)
        // runnable - 要執行的任務。
     // start_limit - 第一次執行任務的時間間隔
     // limit - 屢次任務執行的時間間隔
// timeunit - 時間單位 service.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); } }, 0, 300, TimeUnit.MILLISECONDS); }

    4.四、SingleThreadExceutor 單一容量線程池。

    4.五、自定義線程池

      自定義線程池,可使用ThreadPoolExecutor類來進行建立管理。線程池中,除了ForkJoinPool外,其餘經常使用的線程池底層,都是使用ThreadPoolExecutor實現的。

      參數說明:

        corePoolSize:核心線程數,也是最少線程數。在建立線程池時,默認狀況下,是不會建立線程池的,也即此時的線程池中線程數爲0,直到有任務來臨時,纔會去建立線程。固然,手動調用prestartCoreThread()或者prestartAllCoreThreads()方法,能夠初始化建立線程池中的線程。默認狀況下,當有任務來臨時,就會建立新的線程去處理執行,即便此時線程池中有空閒的線程。當線程數達到corePoolSize時,線程數不增長,此時任務會放入等待隊列BlockingQueue中。

        workQueue:阻塞隊列,用來存儲等待執行的任務資源。

          maximumPoolSize:最大線程數。當阻塞隊列滿了,開始擴充線程池中的線程數。直到達到此最大值的時候。

        handler:當線程池中的線程數等於maximumPoolSize的時候,此時再來任務的話,交由此拒絕策略執行。

        keepAliveTime:表示的線程在空閒多長時間後會被終止。默認是在線程數大於corePoolSize才生效,也能夠手動設置allowCoreThreadTimeOut()方法讓線程數在不大於  corePoolSize也生效。

 public ThreadPoolExecutor(
             int corePoolSize, //核心容量,建立線程池的時候,默認有多少的線程數。也是最少線程數
                   int maximumPoolSize, //最大線程數
                   long keepAliveTime,  //線程生命週期,0爲永久。當線程空閒多長時間,自動回收。
                   TimeUnit unit,  //生命週期時間單位。
                  BlockingQueue<Runnable> workQueue,  //任務阻塞隊列。
           RejectedExecutionHandler handler
      ) {     
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);       }

      簡單例子:

package com.cn.cfang.executor;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Test2 {

    public static void main(String[] args){ 
        //建立等待隊列 
        BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20); 
        //建立線程池,池中保存的線程數爲3,容許的最大線程數爲5
        ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue); 
        //建立七個任務 
        Runnable t1 = new MyThread(); 
        Runnable t2 = new MyThread(); 
        Runnable t3 = new MyThread(); 
        Runnable t4 = new MyThread(); 
        Runnable t5 = new MyThread(); 
        Runnable t6 = new MyThread(); 
        Runnable t7 = new MyThread(); 
        //每一個任務會在一個線程上執行
        pool.execute(t1); 
        pool.execute(t2); 
        pool.execute(t3); 
        pool.execute(t4); 
        pool.execute(t5); 
        pool.execute(t6); 
        pool.execute(t7); 
        //關閉線程池 
        pool.shutdown(); 
    } 
}

class MyThread implements Runnable{ 
    @Override
    public void run(){ 
        System.out.println(Thread.currentThread().getName() + "正在執行。。。"); 
        try{ 
            Thread.sleep(100); 
        }catch(InterruptedException e){ 
            e.printStackTrace(); 
        } 
    } 
}

 五、forkjoin框架

    拆分合並,將一個大的任務,拆分紅若干子任務,並最終彙總子任務的執行結果,獲得大任務的執行結果。並行執行,採用工做竊取機制,更加有效的利用cpu資源。

    5.一、主要類

      ForkJoinPool : 用於執行Task。任務分割出的子任務會添加到當前工做線程所維護的雙端隊列中,進入隊列的頭部。

              當一個工做線程的隊列裏暫時沒有任務時,它會隨機從其餘未完成工做線程的隊列的尾部獲取一個任務。

      ForkJoinTask:ForkJoin任務,提供在任務中執行fork()和join()操做的機制(二叉分邏輯),一般不直接繼承ForkJoinTask類,

             而是繼承抽象子類RecursiveTask(有返回結果) 或者 RecursiveAction (無返回結果)。

      ForkJoinWorkerThread:ForkJoinPool 內部的worker thread,用來具體執行ForkJoinTask。內部有 ForkJoinPool.WorkQueue,來保存要執行的 ForkJoinTask。

      ForkJoinPool.WorkQueue:保存要執行的ForkJoinTask。

   5.二、工做竊取機制

      一、大任務分割成N個子任務,爲避免線程競爭,因而分開幾個隊列去保存這些子任務,併爲每一個隊列提供一個工做線程去處理其中的任務。工做線程與任務隊列一一對應。

      二、若是A線程執行完本身隊列中的全部任務,若是此時其餘隊列中還有未執行的任務,則A線程會去竊取一個其餘隊列的任務來執行。可是,此時兩個線程同時訪問,

        可能會產生競爭問題,因此,任務隊列設計成了雙向隊列。A線程竊取的時候,從另外一端開始執行,儘量的去避免線程競爭問題。

      三、工做竊取機制,充分的利用線程資源,並儘量的去避免線程間的競爭問題。可是,只能是儘量避免,並不能規避。例如,雙向隊列只有一個任務。

     5.三、簡單使用

      例:求和 0 - 10000000000L。

package com.cn.cfang.executor;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

/**
 * 步驟:
 *  一、創建任務類Task,繼承RecursiveTask或者RecursiveAction。須要返回值則選用RecursiveTask,無需返回值選用RecursiveAction
 *  二、任務類Task,知足必定的閾值,則對子任務進行計算,不知足,則二叉分後,遞歸調用自身
 *  三、調用中,新建ForkJoinPool對象,新建任務類對象Task,將任務類對象Task放入ForkJoinPool中執行。
 *     若是須要返回值,則能夠invoke或者Future-submit。
 * @author cfang
 * 2018年5月15日 上午10:51:03
 */
public class Test3 {
    
    public static void main(String[] args) throws Exception{
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinWorkTask task = new ForkJoinWorkTask(0l, 10000000l);
//        Long result = pool.invoke(task);
//        System.out.println(result);
        Future<Long> future = pool.submit(task);
        System.out.println(future.get());
    }
    
}

class ForkJoinWorkTask extends RecursiveTask<Long>{

    private static final long serialVersionUID = 1L;
    
    private Long start;    //起始
    private Long end;    //終止
    private static final Long THRESHOLD = 10000L; //子任務分割閾值
    
    public ForkJoinWorkTask(Long start, Long end){
        this.start = start;
        this.end = end;
    }
    
    @Override
    protected Long compute() {
        Long sum = 0l;
        if(end - start <= THRESHOLD){ //足夠小的子任務,進行計算求和
            for(Long i = start; i < end; i++){
                sum += i;
            }
        }else{ //任務拆分不知足,繼續拆分(二叉分邏輯)
            Long middle = (start + end) / 2;
            ForkJoinWorkTask rightTask = new ForkJoinWorkTask(start, middle);
            rightTask.fork();
            ForkJoinWorkTask leftTask = new ForkJoinWorkTask(middle + 1, end);
            leftTask.fork();
            sum = rightTask.join() + leftTask.join();
        }
        return sum;
    }
    
}
相關文章
相關標籤/搜索