構建一個新的現成是有必定代價的,由於涉及和操做系統的交互。若是程序中建立了大量的生命期很短的現成,應該使用線程池(Thread Pool)。一個線程池中包含許多準備運行的空閒線程。將Runnable對象交給線程池,就會有一個線程調用run方法。java
package ch14; import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.Scanner; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; public class MatchCounter implements Callable<Integer> { private File fileDirectory; private String keyWord; // Thread pool private ExecutorService pool; private int count; public MatchCounter(File fileDirectory, String keyWord, ExecutorService pool) { this.fileDirectory = fileDirectory; this.keyWord = keyWord; this.pool = pool; } @Override public Integer call() throws Exception { count = 0; try{ File[] files = fileDirectory.listFiles(); List<Future<Integer>> results = new ArrayList<>(); for (File file : files) { // 若是是文件夾,繼續調用 if(file.isDirectory()){ MatchCounter matchCounter = new MatchCounter(file, keyWord, pool); // 提交任務 Future<Integer> result = pool.submit(matchCounter); results.add(result); }else{ // search this file content is has keyword. then count ++. if(search(file)){ count ++; } } } // 統計結果 for (Future<Integer> result : results) { try{ count += result.get(); }catch (Exception e){ System.out.println(e.getMessage()); } } }catch (Exception e){ } // 返回計算結果 return count; } // 搜索文件中是否包含關鍵字 public boolean search(File file){ try(Scanner in = new Scanner(file,"UTF-8")){ boolean founded = false; while(!founded && in.hasNextLine()){ String line = in.nextLine(); if(line.contains(keyWord)){ founded = true; } } return founded; }catch (Exception e){ System.out.println(e.getMessage()); return false; } } }
package ch14; import java.io.File; import java.util.Scanner; import java.util.concurrent.*; public class TheadPoolTest { public static void main(String[] args) { try (Scanner scanner = new Scanner(System.in)) { System.out.println("輸入查找目錄:"); String dir = scanner.nextLine(); System.out.println("輸入查找關鍵字:"); String keyword = scanner.nextLine(); // 申請線程池 ExecutorService pool = Executors.newCachedThreadPool(); // 建立對象 MatchCounter matchCounter = new MatchCounter(new File(dir), keyword, pool); // 獲取結果 Future<Integer> result = pool.submit(matchCounter); try { System.out.println(result.get() + " files matching the keywords: " + keyword); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } // 關閉線程池 pool.shutdown(); // 獲取程序運行過程當中線程池的最大線程數 int largestPoolSize = ((ThreadPoolExecutor) pool).getLargestPoolSize(); System.out.println("Largest pool size = " + largestPoolSize); /** * 輸入查找目錄: * C:\Users\Administrator\Desktop\test * 輸入查找關鍵字: * xx * * 3 files matching the keywords: xx * Largest pool size = 2 */ } } }
ExecutorService new CachedThreadPool 返回一個帶緩存的線程池,該線程池在必要的時候建立線程,在線程空閒60秒以後終止線程;框架
ExecutorService newFixedThreadPool(int threads) 返回一個線程池,該池子中的線程數由參數指定;dom
ExecutorService newSginleThreadExecutor() 返回一個執行器,他在一個單個的線程中一次執行各個任務。ide
Future<T> submit(Callable<T> task)測試
Future<T> submit(Runnable task, T result)this
Future<T> submit(Runnable task)
void shutdown() 關閉服務,會先完成已經提交的任務而再也不接受新的任務
int getLargestPoolSize() 返回線程池在該執行器生命週期中的最大尺寸。換句話說,就是得到該線程池建立一個歷史線程數的最大值。
有些應用使用了大量的線程,可是不少都是空閒的。例如,一個web服務器可能會爲每隔鏈接分別使用一個線程。另一些應用可能對每一個處理器內核分別使用一個線程,來完成計算密集型任務,如圖像處理或者視頻處理。JAVA SE 7中新引入了fork-join框架,專門用來支持後一類應用。假設有一個處理任務,他能夠很天然的分解爲子任務,以下所示
if(問題規格 < 設定的閾值){ 直接處理任務 }else{ 分解該任務 遞歸的解決每個子任務 組合結果 }
work stealing(工做覓取).
package ch14; import java.util.concurrent.RecursiveTask; import java.util.function.DoublePredicate; public class ForkCounter extends RecursiveTask<Integer> { public static final int THRESHOLD = 1000; private double[] values; private int from; private int to; private DoublePredicate filter; public ForkCounter(double[] values, int from, int to, DoublePredicate filter) { this.values = values; this.from = from; this.to = to; this.filter = filter; } @Override protected Integer compute() { if(to - from < THRESHOLD){ int count = 0; for (int i = from; i < to; i++) { if(filter.test(values[i])){ count ++; } } return count; }else{ // 分解任務爲2段 int mid = (from + to) / 2; ForkCounter first = new ForkCounter(values,from,mid,filter); ForkCounter second = new ForkCounter(values,mid,to,filter); // 執行兩個任務,返回全部任務的結果(Any則是其中一個就行) invokeAll(first,second); // 返回結果 return first.join() + second.join(); } } }
package ch14; import java.util.concurrent.ForkJoinPool; public class ForkCounterTest { public static void main(String[] args) { final int SIZE = 10000; double[] numbers = new double[SIZE]; for (int i = 0; i < SIZE; i++) { numbers[i] = Math.random(); } // 找出大於0.5的數 ForkCounter counter = new ForkCounter(numbers,0,numbers.length,x -> x>0.5); ForkJoinPool forkJoinPool = new ForkJoinPool(); forkJoinPool.invoke(counter); System.out.println(counter.join()); // 5026 } }