java多線程總結-線程池

1 準備知識

介紹線程池以前先簡要了解一下Executor,ExecutorService,Future,Callable,Executors是什麼,和線程池又有什麼關係緩存

1.1 Executor

它是線程池頂級接口。它定義了一個方法void execute(Runnable)
dom

這個方法是用於處理任務的一個服務方法,調用者提供Runnable接口的實現,線程池經過線程執行這個Runnable,該服務方法是無返回值的ide

1.2 ExecutorService

ExecutorService是Executor接口的子接口,它提供了一個新的服務方法submit,是有返回值的,返回值類型爲Future類型(關於Future見1.3),它提供返回值主要是由Callable的call方法提供返回值(Callable見1.4) ,全部的線程池類型都實現這個接口工具

1.3 Future

顧名思義,Future->將來,表明線程任務執行結束後的結果。
獲取線程執行結果的方式是經過get方法獲取的,get有兩種方式,有參和無參
測試

無參T get()->阻塞等待線程執行結束,並獲得結果。
有參T get(long, TimeUnit)->阻塞固定時長,等待線程執行結束後的結果,若是在阻塞時長範圍內,線程未執行結束,拋出異常。this

1.4 Callable

Callable相似Runnable接口,它有一個call方法,它的做用和Runnable中的run方法徹底一致,但也有區別
Callable的call->有返回值,能夠拋出任意異常
Runnable的run-> 無返回值,不能拋出未檢查的異常
線程

call方法的返回值就是Future中get方法的返回值設計

1.5 Executors

Executors是一個工具類,相似Collection和Collections的關係,能夠更簡單的建立若干種線程池,經過Executors能夠直接獲得想要的線程池3d

2 線程池

線程池狀態: Running, ShuttingDown, Termitnaed

  • Running - 線程池正在執行中。活動狀態。
  • ShuttingDown - 線程池正在關閉過程當中。優雅關閉。一旦進入這個狀態,線程池再也不接收新的任務,處理全部已接收的任務,處理完畢後,關閉線程池。
  • Terminated - 線程池已經關閉。

2.1 固定容量線程池FixedThreadPool

FixedThreadPool是固定容量線程池,建立線程池的時候容量固定,使用的是BlockingQueue 做爲任務的載體,線程池默認的容量上限是Integer.MAX_VALUE

  • 特色:當任務數量大於線程池容量的時候,沒有運行的任務保存在任務隊列中,當線程有空閒的,自動從隊列中取出任務執行
  • 使用場景: 大多數狀況下,使用的線程池,首選推薦FixedThreadPool。OS系統和硬件是有線程支持上限。不能隨意的無限制提供線程池。

下面是一個無返回值的小案例:
案例中建立了一個線程池,容量爲5,執行6個任務,分析調用shutdown方法後,分析任務的執行狀況

/**
 * 線程池
 * 固定容量線程池
 */
package com.bernardlowe.concurrent.t08;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Test_02_FixedThreadPool {
    
    public static void main(String[] args) {
        ExecutorService service = 
                Executors.newFixedThreadPool(5);
        for(int i = 0; i < 6; i++){
            service.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.MILLISECONDS.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + " - test executor");
                }
            });
        }
        
        System.out.println("初始狀態:" + service);

        System.out.println("開始調用shutdown方法=====");
        service.shutdown();
        // 是否已經結束, 至關於回收了資源。
        System.out.println("是否terminated:" + service.isTerminated());
        // 是否已經關閉, 是否調用過shutdown方法
        System.out.println("是否shutdown:" + service.isShutdown());
        System.out.println("shutdown後的狀態:" + service);
        
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        // service.shutdown();
        System.out.println("2秒事後任務所有執行完====");
        System.out.println("是否terminated:" + service.isTerminated());
        System.out.println("是否shutdown:" + service.isShutdown());
        System.out.println("任務所有執行完事後狀態:" + service);
    }

}

結果:

從圖中能夠分析出如下幾個過程
在初始狀態:五個執行線程,1個任務在等待隊列,0個完成任務

調用shutdown方法後:線程池未關閉(terminated爲false),調用了shutdown(再也不接收新任務),0個完成任務

兩秒後任務執行完畢:線程池已關閉(terminated爲true),調用了shutdown(再也不接收新任務),6個完成任務

下面是一個有返回值的小案例:
案例中建立了一個線程池,容量爲1,submit方法傳了一個Callable,future經過get獲取線程的返回值

/**
 * 線程池
 * 固定容量線程池(有返回值)
 */
package com.bernardlowe.concurrent.t08;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;

public class Test_03_Future {
    
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        
        ExecutorService service = Executors.newFixedThreadPool(1);

        Future<String> future = service.submit(new Callable<String>() {
            @Override
            public String call() {
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return Thread.currentThread().getName() + " - test executor";
            }
        });
        System.out.println("線程是否結束: " + future.isDone()); // 查看線程是否結束, 任務是否完成。 call方法是否執行結束

        System.out.println("call方法的返回值: " + future.get()); // 獲取call方法的返回值。
        System.out.println("線程是否結束: " + future.isDone());

        // 關閉線程池
        service.shutdown();
    }

}

結果:

2.2 CachedThreadPool

緩存的線程池, 容量不限(Integer.MAX_VALUE),自動擴容
容量管理策略:若是線程池中的線程數量不知足任務執行,建立新的線程。每次有新任務沒法即時處理的時候,都會建立新的線程。當線程池中的線程空閒時長達到必定的臨界值(默認60秒),自動釋放線程,這裏經過Executors.newCachedThreadPool()方法獲得的線程池沒法修改空閒時間,具體緣由見下圖,但能夠經過自定義線程池ThreadPoolExecutor修改,具體方法見2.5,這裏就不解釋了

應用場景: 內部應用或測試應用。

  • 內部應用,有條件的內部數據瞬間處理時應用,如:電信平臺夜間執行數據整理(有把握在短期內處理完全部工做,且對硬件和軟件有足夠的信心)。
  • 測試應用,在測試的時候,嘗試獲得硬件或軟件的最高負載量,用於提供FixedThreadPool容量的指導

案例演示:

/**
 * 線程池
 * 無容量限制的線程池(最大容量默認爲Integer.MAX_VALUE)
 */
package com.bernardlowe.concurrent.t08;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Test_05_CachedThreadPool {
    
    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        
        System.out.println(service);
        
        for(int i = 0; i < 5; i++){
            service.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.MILLISECONDS.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + " - test executor");
                }
            });
        }
        
        System.out.println(service);
        
        try {
            TimeUnit.SECONDS.sleep(65);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println(service);
    }

}

2.3 計劃任務線程池ScheduledThreadPool

ScheduledThreadPool是計劃任務線程池,能夠根據計劃自動執行任務的線程池,底層實現是一個DelayedWorkQueue,它的一個主要方法scheduleAtFixedRate

有如下幾個參數:

  • command - 要執行的任務
  • initialDelay - 第一次任務執行的間隔。
  • period - 屢次任務執行的間隔。
  • unit - 屢次任務執行間隔的時間單位。

案例:

/**
 * 線程池
 * 計劃任務線程池。
 */
package com.bernardlowe.concurrent.t08;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Test_07_ScheduledThreadPool {
    
    public static void main(String[] args) {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
        System.out.println(service);
        
        // 定時完成任務。 scheduleAtFixedRate(Runnable, start_limit, limit, timeunit)
        // runnable - 要執行的任務。
        service.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            }
        }, 0, 300, TimeUnit.MILLISECONDS);
        
    }

}

2.4 單一容量的線程池SingleThreadExecutor

單一容量的線程池,用法和FixedThreadPool相似,但和newFixedThreadPool不同的是newSingleThreadExecutor建立的線程池又被一個FinalizableDelegatedExecutorService包裝了一下
總結一下SingleThreadExecutor:

  • 單線任務處理的線程池
  • shutdown方法必然會被調用
  • 不具有ThreadPoolExecutor全部功能的線程池
    具體能夠看看這篇文章:https://www.jianshu.com/p/2b7d853322bb

2.5 分支合併線程池ForkJoinPool

分支合併線程池(mapduce相似的設計思想),能夠遞歸完成複雜任務,適合用於處理複雜任務
要求可分支合併的任務必須是ForkJoinTask類型的子類型
ForkJoinTask類型提供了兩個抽象子類型:
RecursiveTask有返回結果的分支合併任務
RecursiveAction無返回結果的分支合併任務

案例:
這個案例作了一個以ForkJoinPool實現的數據累加,當計算數字區間大於MAX_SIZE=50000時,開啓新的線程任務的計算,最後合併統計結果

/**
 * 線程池
 * 分支合併線程池。
 */
package com.bernardlowe.concurrent.t08;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class Test_08_ForkJoinPool {
    
    final static int[] numbers = new int[1000000];
    final static int MAX_SIZE = 500000;
    final static Random r = new Random();
    
    
    static{
        for(int i = 0; i < numbers.length; i++){
            numbers[i] = r.nextInt(1000);
        }
    }
    
    static class AddTask extends RecursiveTask<Long>{ // RecursiveAction
        int begin, end;
        public AddTask(int begin, int end){
            this.begin = begin;
            this.end = end;
        }
        
        // 
        protected Long compute(){
            if((end - begin) < MAX_SIZE){
                long sum = 0L;
                for(int i = begin; i < end; i++){
                    sum += numbers[i];
                }
                // System.out.println("form " + begin + " to " + end + " sum is : " + sum);
                return sum;
            }else{
                int middle = begin + (end - begin)/2;
                AddTask task1 = new AddTask(begin, middle);
                AddTask task2 = new AddTask(middle, end);
                task1.fork();// 就是用於開啓新的任務的。 就是分支工做的。 就是開啓一個新的線程任務。
                task2.fork();
                // join - 合併。將任務的結果獲取。 這是一個阻塞方法。必定會獲得結果數據。
                return task1.join() + task2.join();
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException, ExecutionException, IOException {
        long result = 0L;
        for(int i = 0; i < numbers.length; i++){
            result += numbers[i];
        }
        System.out.println(result);
        
        ForkJoinPool pool = new ForkJoinPool();
        AddTask task = new AddTask(0, numbers.length);
        
        Future<Long> future = pool.submit(task);
        System.out.println(future.get());
        
    }

}

結果:該任務分類四個線程任務進行計算,最後彙總

2.5 ThreadPoolExecutor

ThreadPoolExecutor線程池的底層實現,除ForkJoinPool外,其餘經常使用線程池底層都是使用ThreadPoolExecutor實現的,其中有一個構造方法以下:

  • corePoolSize:核心容量,建立線程池的時候,默認有多少線程。也是線程池保持的最少線程數
  • maximumPoolSize: 最大容量,線程池最多有多少線程
  • keepAliveTime: 生命週期,0爲永久。當線程空閒多久後,自動回收
  • unit: 生命週期單位,爲生命週期提供單位,如:秒,毫秒
  • workQueue 任務隊列,阻塞隊列。注意,泛型必須是Runnable

案例:

/**
 * 線程池
 * 固定容量線程池
 */
package com.bernardlowe.concurrent.t08;

import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Test_09_ThreadPoolExecutor {
    
    public static void main(String[] args) {
        // 模擬fixedThreadPool, 核心線程5個,最大容量5個,線程的生命週期無限。
        ExecutorService service = 
                new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, 
                        new LinkedBlockingQueue<Runnable>());
        
        for(int i = 0; i < 6; i++){
            service.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.MILLISECONDS.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + " - test executor");
                }
            });
        }
        
        System.out.println(service);
        
        service.shutdown();
        System.out.println(service.isTerminated());
        System.out.println(service.isShutdown());
        System.out.println(service);
        
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        service.shutdown();
        System.out.println(service.isTerminated());
        System.out.println(service.isShutdown());
        System.out.println(service);
        
    }
}
相關文章
相關標籤/搜索