多線程併發修改一個數據結構,很容易破壞這個數據結構,如散列表。鎖可以保護共享數據結構,但選擇線程安全的實現更好更容易,如阻塞隊列就是線程安全的集合。java
Vector
和HashTable
類提供了線程安全的動態數組和散列表,而ArrayList
和HashMap
卻不是線程安全的。git
java.util.concurrent
包提供了映射表、有序集、隊列的高效實現,如:github
ConcurrentLinkedQueue
:多線程安全訪問,無邊界,非阻塞,隊列;數組
ConcurrentHashMap
:多線程安全訪問,散列映射表,初始容量默認16,調整因子默認0.75。安全
併發的散列映射表ConcurrentHashMap
提供原子性的關聯插入putIfAbsent(key, value)
和關聯刪除removeIfPresent(key, value)
。寫數組的拷貝CopyOnWriteArrayList
和CopyOnWriteArraySet
是線程安全的集合,全部的修改線程會對底層數組進行復制。對於常常被修改的數據列表,使用同步的ArrayList
性能賽過CopyOnWriteArrayList
。數據結構
對於線程安全的集合,返回的是弱一致性的迭代器:多線程
迭代器不必定能反映出構造後的全部修改;併發
迭代器不會將同一個值返回兩次;框架
迭代器不會拋出ConcurrentModificationException
異常。dom
一般線程安全的集合可以高效的支持大量的讀者和必定數量的寫者,當寫者線程數目大於設定值時,後來的寫者線程會被暫時阻塞。而對於大多數線程安全的集合,size()
方法通常沒法在常量時間完成,通常須要遍歷整個集合才能肯定大小。
任何集合類使用同步包裝器都會變成線程安全的,會將集合的方法使用鎖加以保護,保證線程的安全訪問。使用同步包裝器時要確保沒有任何線程經過原始的非同步方法訪問數據結構,也能夠說確保不存在任何指向原始對象的引用,能夠採用下面構造一個集合並當即傳遞給包裝器的方法定義。
List<E> synchArrayList = Collections.synchronizedList(new ArrayList<E>()); Map<K, V> synchHashMap = Collections.synchronizedMap(new HashMap<K, V>());
固然最好使用java.util.concurrent
包中定義的集合,同步包裝器並無太多安全和性能上的優點。
Callable
與Runnable
相似,均可以封裝一個異步執行的任務,可是Callable
有返回值。Callabele<T>
接口是一個參數化的類型,只有一個方法call()
,類型參數就是返回值的類型。Future
用來保存異步計算的結果,用get()
方法獲取結果。get()
方法的調用會被阻塞,直到計算完成。有超時參數的get()
方法超時時會拋出TimeoutException
異常。
FutureTask
可將Callable
轉換成Future
和Runnable
,實現了二者的接口。
Callable<Integer> myComputation = new MyComputationCallable(); FutureTask<Integer> task = new FutureTask<Integer>(myComputation); Thread t = new Thread(task); // it's a Runnable t.start(); Integer result = task.get(); // it's a Future
這裏有一個計算指定目錄及其子目錄下與關鍵字匹配的文件數目的例子,涉及到Callable
、FutureTask
、Future
的使用。
public Integer call() { count = 0; try { File [] files = directory.listFiles(); List<Future<Integer>> results = new ArrayList<>(); for (File file : files) { if (file.isDirectory()) { MatchCounter counter = new MatchCounter(file, keyword); FutureTask<Integer> task = new FutureTask<>(counter); results.add(task); Thread t = new Thread(task); t.start(); } else { if (search(file)) { count++; } } } for (Future<Integer> result : results) { try { count += result.get(); } catch (ExecutionException e) { e.printStackTrace(); } } } catch (InterruptedException e) { ; } return count; }
構建一個新的線程是有代價的,涉及到與操做系統的交互。對於程序中須要建立大量生命期很短的線程,應該使用線程池。線程池中的線程執行完畢並不會立刻死亡,而是在池中準備爲下一個請求提供服務。固然使用線程池還能夠限制併發線程的數目。
須要調用執行器Executors
的靜態工廠方法來構建線程池,下面的方法返回的是ExecutorService
接口的ThreadPoolExecutor
類的對象。
Executors.newCachedThreadPool
:線程空閒60秒後終止,如有空閒線程當即執行任務,若無則建立新線程。
Executors.newFixedThreadPool
:池中線程數由參數指定,固定大小,剩餘任務放置在隊列。
使用submit()
方法,將Runnable
對象或Callable
對象提交給線程池ExecutorService
,任務什麼時候執行由線程池決定。調用submit()
方法,會返回一個Future
對象,用來查詢任務狀態或結果。當用完線程池時,要記得調用shutdown()
關閉,會在全部任務執行完後完全關閉。相似的調用shutdownNow
,可取消還沒有開始的任務並試圖終端正在運行的線程。
線程池的使用步驟大體以下:
調用Executors
類的靜態方法newCachedThreadPool()
或newFixedThreadPool()
;
調用submit()
提交Runnable
或Callable
對象;
若是提交Callable
對象,就要保存好返回的Future
對象;
線程池用完時,調用shutdown()
。
對於以前提到的計算文件匹配數的例子,須要產生大量生命期不少的線程,可使用一個線程池來運行任務,完整代碼在這裏。
public Integer call() { count = 0; try { File [] files = directory.listFiles(); List<Future<Integer>> results = new ArrayList<>(); for (File file : files) { if (file.isDirectory()) { MatchCounter counter = new MatchCounter(file, keyword, pool); Future<Integer> result = pool.submit(counter); results.add(result); } else { if (search(file)) { count++; } } } for (Future<Integer> result : results) { try { count += result.get(); } catch (ExecutionException e) { e.printStackTrace(); } } } catch (InterruptedException e) { ; } return count; }
對於多線程程序,有些應用使用了大量線程,但其中大多數都是空閒的。還有些應用須要完成計算密集型任務,Fork-Join框架專門用來支持這類任務。使用Fork-Join框架解決思路大體是分治的思想,採用遞歸計算再合併結果。只需繼承RecursiveTask<T>
類,並覆蓋compute()
方法。invokeAll()
方法接收不少任務並阻塞,直到這些任務完成,join()
方法將生成結果。
對於問題,統計數組中知足某特性的元素個數,使用Fork-Join框架是很合適的。
import java.util.concurrent.*; public class ForkJoinTest { public static void main(String [] args) { final int SIZE = 10000000; double [] numbers = new double[SIZE]; for (int i = 0; i < SIZE; i++) { numbers[i] = Math.random(); } Counter counter = new Counter(numbers, 0, numbers.length, new Filter() { public boolean accept(double x) { return x > 0.5; } }); ForkJoinPool pool = new ForkJoinPool(); pool.invoke(counter); System.out.println(counter.join()); } } interface Filter { boolean accept(double t); } class Counter extends RecursiveTask<Integer> { private final int THRESHOLD = 1000; private double [] values; private int from; private int to; private Filter filter; public Counter(double [] values, int from, int to, Filter filter) { this.values = values; this.from = from; this.to = to; this.filter = filter; } public Integer compute() { if (to - from < THRESHOLD) { int count = 0; for (int i = from; i < to; i++) { if (filter.accept(values[i])) { count++; } } return count; } else { int mid = (from + to) / 2; Counter first = new Counter(values, from, mid, filter); Counter second = new Counter(values, mid, to, filter); invokeAll(first, second); return first.join() + second.join(); } } }
另外,Fork-Join框架使用工做密取來平衡可用線程的工做負載,比手工多線程強多了。