Core Java 併發:理解併發概念

1. 簡介java


從誕生開始,Java 就支持線程、鎖等關鍵的併發概念。這篇文章旨在爲使用了多線程的 Java 開發者理解 Core Java 中的併發概念以及使用方法。安全


2. 概念數據結構


圖片

表1 併發概念
多線程


2.1 競爭條件併發


多個線程對共享資源執行一系列操做,根據每一個線程的操做順序可能存在幾種結果,這時出現競爭條件。下面的代碼不是線程安全的,並且能夠不止一次地初始化 value,由於 check-then-act(檢查 null,而後初始化),因此延遲初始化的字段不具有原子性:app


class Lazy <T> {
 private volatile T value;
 T get() {
   if (value == null)
     value = initialize();
   return value;
 }
}


2.2 數據競爭異步


兩個或多個線程試圖訪問同一個非 final 變量而且不加上同步機制,這時會發生數據競爭。沒有同步機制可能致使這樣的狀況,線程執行過程當中作出其餘線程沒法看到的更改,於是致使讀到修改前的數據。這樣反過來可能又會致使無限循環、破壞數據結構或獲得錯誤的計算結果。下面這段代碼可能會無限循環,由於讀線程可能永遠不知道寫線程所作的更改:async


class Waiter implements Runnable {
 private boolean shouldFinish;
 void finish() { shouldFinish = true; }
 public void run() {
   long iteration = 0;
   while (!shouldFinish) {
     iteration++;
   }
   System.out.println("Finished after: " + iteration);
 }
}

class DataRace {
 public static void main(String[] args) throws InterruptedException {
   Waiter waiter = new Waiter();
   Thread waiterThread = new Thread(waiter);
   waiterThread.start();
   waiter.finish();
   waiterThread.join();
 }
}


3. Java 內存模型:happens-before 關係


Java 內存模型定義基於一些操做,好比讀寫字段、 Monitor 同步等。這些操做能夠按照 happens-before 關係進行排序。這種關係可用來推斷一個線程什麼時候看到另外一個線程的操做結果,以及構成一個程序同步後的全部信息。ide


happens-before 關係具有如下特性:函數


  • 在線程開始全部操做前調用 Thread#start

  • 在獲取 Monitor 前,釋放該 Monitor

  • 在讀取 volatile 變量前,對該變量執行一次寫操做

  • 在寫入 final 變量前,確保在對象引用已存在

  • 線程中的全部操做應在 Thread#join 返回以前完成


4. 標準同步特性


4.1 synchronized 關鍵字


使用 synchronized 關鍵字能夠防止不一樣線程同時執行相同代碼塊。因爲進入同步執行的代碼塊以前加鎖,受該鎖保護的數據能夠在排他模式下操做,從而讓操做具有原子性。此外,其餘線程在得到相同的鎖後也能看到操做結果。


class AtomicOperation {
 private int counter0;
 private int counter1;
 void increment() {
   synchronized (this) {
     counter0++;
     counter1++;
   }
 }
}


也能夠在方法上加 synchronized 關鍵字。


圖片

表2 當整個方法都標記 synchronized 時使用的 Monitor


鎖是可重入的。若是線程已經持有鎖,它能夠再次成功地得到該鎖。


class Reentrantcy {
 synchronized void doAll() {
   doFirst();
   doSecond();
 }
 synchronized void doFirst() {
   System.out.println("First operation is successful.");
 }
 synchronized void doSecond() {
   System.out.println("Second operation is successful.");
 }
}


競爭的程度對獲取 Monitor 的方式有影響:


圖片

表3: Monitor 狀態


4.2 wait/notify


wait/notify/notifyAll 方法在 Object 類中聲明。若是以前設置了超時,線程進入 WAITING 或 TIMED_WAITING 狀態前保持 wait狀態。要喚醒一個線程,能夠執行下列任何操做:


  • 另外一個線程調用 notify 將喚醒任意一個在 Monitor 上等待的線程。

  • 另外一個線程調用 notifyAll 將喚醒全部在等待 Monitor 上等待的線程。

  • 調用 Thread#interrupt 後會拋出 InterruptedException 異常。


最多見的模式是條件循環:


class ConditionLoop {
 private boolean condition;
 synchronized void waitForCondition() throws InterruptedException {
   while (!condition) {
     wait();
   }
 }
 synchronized void satisfyCondition() {
   condition = true;
   notifyAll();
 }
}


  • 請記住,在對象上調用 wait/notify/notifyAll,須要首先得到該對象的鎖

  • 在檢查等待條件的循環中保持等待:這解決了另外一個線程在等待開始以前即知足條件時的計時問題。 此外,這樣作還可讓你的代碼免受可能(也的確會)發生的虛假喚醒

  • 在調用 notify/notifyAll 前,要確保知足等待條件。若是不這樣作會引起通知,然而沒有線程可以避免等待循環


4.3 volatile 關鍵字


volatile 解決了可見性問題,讓修改爲爲原子操做。因爲存在 happens-before 關係,在接下來讀取 volatile 變量前,先對 volatile 變量進行寫操做。 從而保證了對該字段的任何讀操做都能督讀到最近一次修改後的值。


class VolatileFlag implements Runnable {
 private volatile boolean shouldStop;
 public void run() {
   while (!shouldStop) {
     // 執行操做
   }
   System.out.println("Stopped.");
 }
 void stop() {
   shouldStop = true;
 }
 public static void main(String[] args) throws InterruptedException {
   VolatileFlag flag = new VolatileFlag();
   Thread thread = new Thread(flag);
   thread.start();
   flag.stop();
   thread.join();
 }
}


4.4 Atomic


java.util.concurrent.atomic package 包含了一組類,它們用相似 volatile 的無鎖方式支持單個值的原子複合操做。


使用 AtomicXXX 類,能夠實現 check-then-act 原子操做:


class CheckThenAct {
 private final AtomicReference<String> value = new AtomicReference<>();
 void initialize() {
   if (value.compareAndSet(null, "Initialized value")) {
     System.out.println("Initialized only once.");
   }
 }
}


AtomicInteger 和 AtomicLong 都提供原子 increment/decrement 操做:


class Increment {
 private final AtomicInteger state = new AtomicInteger();
 void advance() {
   int oldState = state.getAndIncrement();
   System.out.println("Advanced: '" + oldState + "' -> '" + (oldState + 1) + "'.");
 }
}


若是你但願有這樣一個計數器,不須要在獲取計數的時候具有原子性,能夠考慮用 LongAdder 取代 AtomicLong/AtomicInteger。 LongAdder 能在多個單元中存值並在須要時增長計數,所以在競爭激烈的狀況下表現更好。


4.5 ThreadLocal


一種在線程中包含數據但不用鎖的方法是使用 ThreadLocal 存儲。從概念上講,ThreadLocal 能夠看作每一個 Thread 存有一份本身的變量。Threadlocal 一般用於保存每一個線程的值,好比「當前事務」或其餘資源。 此外,還能夠用於維護每一個線程的計數器、統計信息或 ID 生成器。


class TransactionManager {
 private final ThreadLocal<Transaction> currentTransaction
     = ThreadLocal.withInitial(NullTransaction::new);
 Transaction currentTransaction() {
   Transaction current = currentTransaction.get();
   if (current.isNull()) {
     current = new TransactionImpl();
     currentTransaction.set(current);
   }
   return current;
 }
}


5. 安全地發佈對象


想讓一個對象在當前做用域外使用能夠發佈對象,例如從 getter 返回該對象的引用。 要確保安全地發佈對象,僅在對象徹底構造好後發佈,可能須要同步。 能夠經過如下方式安全地發佈:


  • 靜態初始化器。只有一個線程能夠初始化靜態變量,由於類的初始化在獲取排他鎖條件下完成。

class StaticInitializer {
 // 無需額外初始化條件,發佈一個不可變對象
 public static final Year year = Year.of(2017);
 public static final Set<String> keywords;
 // 使用靜態初始化器構造複雜對象
 static {
   // 建立可變集合
   Set<String> keywordsSet = new HashSet<>();
   // 初始化狀態
   keywordsSet.add("java");
   keywordsSet.add("concurrency");
   // 設置 set 不可修改
   keywords = Collections.unmodifiableSet(keywordsSet);
 }
}


  • volatile 字段。因爲寫入 volatile 變量發生在讀操做以前,所以讀線程總能讀到最新的值。


class Volatile {
 private volatile String state;
 void setState(String state) {
   this.state = state;
 }
 String getState() {
   return state;
 }
}


  • Atomic。例如 AtomicInteger 將值存儲在 volatile 字段中,因此 volatile 變量的規則在這裏也適用。


class Atomics {
 private final AtomicInteger state = new AtomicInteger();
 void initializeState(int state) {
   this.state.compareAndSet(0, state);
 }
 int getState() {
   return state.get();
 }
}


  • final 字段

    

class Final {
 private final String state;
 Final(String state) {
   this.state = state;
 }
 String getState() {
   return state;
 }
}


確保在對象構造期間不會修改此引用。


class ThisEscapes {
private final String name;
ThisEscapes(String name) {
  Cache.putIntoCache(this);
  this.name = name;
}
String getName() { return name; }
}
class Cache {
private static final Map<String, ThisEscapes> CACHE = new ConcurrentHashMap<>();
static void putIntoCache(ThisEscapes thisEscapes) {
  // 'this' 引用在對象徹底構造以前發生了改變
  CACHE.putIfAbsent(thisEscapes.getName(), thisEscapes);
}
}


  • 正確同步字段


class Synchronization {
 private String state;
 synchronized String getState() {
   if (state == null)
     state = "Initial";
   return state;
 }
}


6. 不可變對象


不可變對象的一個重要特徵是線程安全,所以不須要同步。要成爲不可變對象:


  • 全部字段都標記 final

  • 全部字段必須是可變或不可變的對象,注意不要改變對象做用域,不然構造後不能改變對象狀態

  • this 引用在構造對象時不要泄露

  • 類標記 final,子類沒法重載改變類的行爲

  • 不可變對象示例:


// 標記爲 final,禁止繼承
public final class Artist {
 // 不可變變量,字段標記 final
 private final String name;
 // 不可變變量集合, 字段標記 final
 private final List<Track> tracks;
 public Artist(String name, List<Track> tracks) {
   this.name = name;
   // 防護性拷貝
   List<Track> copy = new ArrayList<>(tracks);
   // 使可變集合不可修改
   this.tracks = Collections.unmodifiableList(copy);
   // 構造對象期間,'this' 不傳遞到任何其餘地方
 }
 // getter、equals、hashCode、toString 方法
}
// 標記爲 final,禁止繼承
public final class Track {
 // 不可變變量,字段標記 final
 private final String title;
 public Track(String title) {
   this.title = title;
 }
 // getter、equals、hashCode、toString 方法
}


7. 線程


java.lang.Thread 類用於表示應用程序線程或 JVM 線程。 代碼始終在某個 Thread 類的上下文中執行,使用 Thread#currentThread() 可返回本身的當前線程。


圖片

表4 線程狀態


圖片

表5 線程協調方法


7.1 如何處理 InterruptedException?


  • 清理全部資源,並在當前運行級別儘量能完成線程執行

  • 當前方法聲明拋出 InterruptedException。

  • 若是方法沒有聲明拋出 InterruptedException,那麼應該經過調用 Thread.currentThread().interrupt() 將中斷標誌恢復爲 true。 而且在這個級別上拋出更合適的異常。爲了能在更高調用級別上處理中斷,把中斷標誌設置爲 true 很是重要


7.2 處理意料以外的異常


線程能夠指定一個 UncaughtExceptionHandler 接收因爲發生未捕獲異常致使線程忽然終止的通知。


Thread thread = new Thread(runnable);
thread.setUncaughtExceptionHandler((failedThread, exception) -> {
 logger.error("Caught unexpected exception in thread '{}'.",
     failedThread.getName(), exception);
});
thread.start();


8. 活躍度


8.1 死鎖


有多個線程,每一個線程都在等待另外一個線程持有的資源,造成一個獲取資源的線程循環,這時會發生死鎖。最典型的資源是對象 Monitor ,但也多是任何可能致使阻塞的資源,例如 wait/notify。


下面的代碼可能產生死鎖:


class Account {
 private long amount;
 void plus(long amount) { this.amount += amount; }
 void minus(long amount) {
   if (this.amount < amount)
     throw new IllegalArgumentException();
   else
     this.amount -= amount;
 }
 static void transferWithDeadlock(long amount, Account first, Account second){
   synchronized (first) {
     synchronized (second) {
       first.minus(amount);
       second.plus(amount);
     }
   }
 }
}


若是同時出現如下狀況,就會發生死鎖:


  • 一個線程正試圖從第一個賬戶切換到第二個賬戶,並已得到了第一個賬戶的鎖

  • 另外一個線程正試圖從第二個賬戶切換到第一個賬戶,並已得到第二個賬戶的鎖


避免死鎖的方法:


  • 按順序加鎖:老是以相同的順序獲取鎖


class Account {
 private long id;
 private long amount;
 // 此處略去了一些方法
 static void transferWithLockOrdering(long amount, Account first, Account second){
   boolean lockOnFirstAccountFirst = first.id < second.id;
   Account firstLock = lockOnFirstAccountFirst  ? first  : second;
   Account secondLock = lockOnFirstAccountFirst ? second : first;
   synchronized (firstLock) {
     synchronized (secondLock) {
       first.minus(amount);
       second.plus(amount);
     }
   }
 }
}


  • 鎖定超時:獲取鎖時不要無限期阻塞,而是釋放全部鎖並重試


class Account {
 private long amount;
 // 此處略去了一些方法
 static void transferWithTimeout(
     long amount, Account first, Account second, int retries, long timeoutMillis
 )
throws InterruptedException
{
   for (int attempt = 0; attempt < retries; attempt++) {
     if (first.lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS))
     {
       try {
         if (second.lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS))
         {
           try {
             first.minus(amount);
             second.plus(amount);
           }
           finally {
             second.lock.unlock();
           }
         }
       }
       finally {
         first.lock.unlock();
       }
     }
   }
 }
}


Jvm 可以檢測 Monitor 死鎖,並以線程轉儲的形式打印死鎖信息。


8.2 活鎖與線程飢餓


當線程將全部時間用於協商資源訪問或者檢測避免死鎖,以致於沒有線程可以訪問資源時,會形成活鎖(Livelock)。 線程飢餓發生在線程長時間持鎖,致使一些線程沒法繼續執行被「餓死」。


9. java.util.concurrent


9.1 線程池


線程池的核心接口是 ExecutorService。 java.util.concurrent 還提供了一個靜態工廠類 Executors,其中包含了新建線程池的工廠方法,新建的線程池參數採用最多見的配置。


圖片

表6 靜態工廠方法


譯註:在並行計算中,work-stealing 是一種針對多線程計算機程序的調度策略。 它解決了在具備固定數量處理器或內核的靜態多線程計算機上執行動態多線程計算的問題,這種計算能夠「產生」新的執行線程。 在執行時間、內存使用和處理器間通訊方面都可以高效地完成任務。


在調整線程池的大小時,一般須要根據運行應用程序的計算機中的邏輯核心數量來肯定線程池的大小。 在 Java 中,能夠經過調用 Runtime.getRuntime().availableProcessors() 讀取。


圖片

表7 線程池實現


可經過 ExecutorService#submit、ExecutorService#invokeAll 或 ExecutorService#invokeAny 提交任務,可根據不一樣任務進行屢次重載。


圖片

表8 任務的功能接口


9.2 Future


Future 是對異步計算的一種抽象,表明計算結果。計算結果多是某個計算值或異常。ExecutorService 的大多數方法都使用 Future 做爲返回類型。使用 Future 時,可經過提供的接口檢查當前狀態,或者一直阻塞直到結果計算完成。


ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> "result");
try {
 String result = future.get(1L, TimeUnit.SECONDS);
 System.out.println("Result is '" + result + "'.");
}
catch (InterruptedException e) {
 Thread.currentThread().interrupt();
 throw new RuntimeException(e);
}
catch (ExecutionException e) {
 throw new RuntimeException(e.getCause());
}
catch (TimeoutException e) {
 throw new RuntimeException(e);
}
assert future.isDone();


9.3 鎖


9.3.1 Lock


java.util.concurrent.locks package 提供了標準 Lock 接口。ReentrantLock 在實現 synchronized 關鍵字功能的同時還包含了其餘功能,例如獲取鎖的狀態信息、非阻塞 tryLock() 和可中斷鎖定。直接使用 ReentrantLock 示例以下:


class Counter {
 private final Lock lock = new ReentrantLock();
 private int value;
 int increment() {
   lock.lock();
   try {
     return ++value;
   } finally {
     lock.unlock();
   }
 }
}



9.3.2 ReadWriteLock


java.util.concurrent.locks package 還包含 ReadWriteLock 接口(以及 Reentrantreadelock 實現)。該接口定義了一對鎖進行讀寫操做,一般支持多個併發讀取,但只容許一個寫入。


class Statistic {
 private final ReadWriteLock lock = new ReentrantReadWriteLock();
 private int value;
 void increment() {
   lock.writeLock().lock();
   try {
     value++;
   } finally {
     lock.writeLock().unlock();
   }
 }
 int current() {
   lock.readLock().lock();
   try {
     return value;
   } finally {
     lock.readLock().unlock();
   }
 }
}



9.3.3 CountDownLatch


CountDownLatch 用一個計數器初始化。線程能夠調用 await() 等待計數歸零。其餘線程(或同一線程)可能會調用 countDown() 來減少計數。一旦計數歸零即不可重用。CountDownLatch 用於發生某些操做時觸發一組未知的線程。


9.3.4 CompletableFuture


CompletableFuture 是對異步計算的一種抽象。 與普通 Future 不一樣,CompletableFuture 僅支持阻塞方式得到結果。當結果產生或發生異常時,執行由已註冊的回調函數建立的任務管道。不管是建立過程當中(經過 CompletableFuture#supplyAsync/runAsync),仍是在加入回調過程當中(*async 系列方法),若是沒有指定標準的全局 ForkJoinPool#commonPool 均可以設置執行計算的執行器。


考慮 CompletableFuture 已執行完畢,那麼經過非 *async 方法註冊的回調將在調用者的線程中執行。


若是程序中有幾個 future,可使用 CompletableFuture#allOf 得到一個 future,這個 future 在全部 future 完成時結束。也能夠調用 CompletableFuture#anyOf 得到一個 future,這個 future 在其中任何一個 future 完成時結束。


ExecutorService executor0 = Executors.newWorkStealingPool();
ExecutorService executor1 = Executors.newWorkStealingPool();
// 當這兩個 future 完成時結束
CompletableFuture<String> waitingForAll = CompletableFuture
   .allOf(
       CompletableFuture.supplyAsync(() -> "first"),
       CompletableFuture.supplyAsync(() -> "second", executor1)
   )
   .thenApply(ignored -> " is completed.");
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Concurrency Refcard", executor0)
   // 使用同一個 executor
   .thenApply(result -> "Java " + result)
   // 使用不一樣的 executor
   .thenApplyAsync(result -> "Dzone " + result, executor1)
   // 當前與其餘 future 完成後結束
   .thenCombine(waitingForAll, (first, second) -> first + second)
   // 默認使用 ForkJoinPool#commonPool 做爲 executor
   .thenAcceptAsync(result -> {
     System.out.println("Result is '" + result + "'.");
   })
   // 通用處理
   .whenComplete((ignored, exception) -> {
     if (exception != null)
       exception.printStackTrace();
   });
// 第一個阻塞調用:在 future 完成前保持阻塞
future.join();
future
   // 在當前線程(main)中執行
   .thenRun(() -> System.out.println("Current thread is '" + Thread.currentThread().getName() + "'."))
   // 默認使用 ForkJoinPool#commonPool 做爲 executor
   .thenRunAsync(() -> System.out.println("Current thread is '" + Thread.currentThread().getName() + "'."))



9.4 併發集合


使集合線程安全最簡單方法是使用 Collections#synchronized* 系列方法。 因爲這種解決方案在競爭激烈的狀況下性能不好,因此 java.util.concurrent 提供了多種針對併發優化的數據結構。


9.4.1 List


圖片

表9:java.util.concurrent 中的 Lists


譯註:copy-on-write(寫入時複製)是一種計算機程序設計領域的優化策略。其核心思想是,若是有多個調用者同時請求相同資源(如內存或磁盤上的數據存儲),他們會共同獲取相同的指針指向相同的資源,直到某個調用者試圖修改資源的內容時,系統纔會真正複製一份專用副本給該調用者,而其餘調用者所見到的最初的資源仍然保持不變。這個過程對其餘的調用者透明。這種作法的主要優勢是若是調用者沒有修改該資源,就不會新建副本,所以多個調用者只是讀取操做能夠共享同一份資源。


9.4.2 Map


圖片

表10 java.util.concurrent 中的 Map


9.4.3 Set


圖片

表11 java.util.concurrent 中的 Set


封裝 concurrent map 進而建立 concurrent set 的另外一種方法:


Set<T> concurrentSet = Collections.newSetFromMap(new ConcurrentHashMap<T, Boolean>());



9.4.4 Queue


隊列就像是「生產者」和「消費者」之間的管道。按照「先進先出(FIFO)」順序,將對象從管道的一端加入,從管道的另外一端取出。BlockingQueue 接口繼承了 Queue接口,而且增長了(生產者添加對象時)隊列滿或(消費者讀取或移除對象時)隊列空的處理。 在這些狀況下,BlockingQueue 提供的方法能夠一直保持或在一段時間內保持阻塞狀態,直到等待的條件由另外一個線程的操做改變。


圖片

表12 java.util.concurrent 中的 Queue

相關文章
相關標籤/搜索