Java 併發編程 - 3

 

JDK 1.5 以前的同步容器

JDK 1.5 以前, 主要包括:數組

  • 同步容器 (Vector 和 Hashtable)
  • 同步包裝類 (Collections.synchronizedXxx)

這些類的共同特徵是, 公共方法都是由 synchronized 來修飾的, 以限制一次只能有一個線程能訪問容器.安全

同步容器中出現的問題

複合操做

老的容器自身並不支持複合操做, 包括:併發

  1. 迭代(反覆獲取元素, 直到得到容器中的最後一個元素)
  2. 導航(navigation, 根據必定的順序尋找下一個元素)
  3. 條件運算(check-then-act)

好在老的容器類遵循一個支持 客戶端加鎖 的同步策略. 來解決複合運算的問題:函數

  • 解決迭代和導航:ui

    synchronized(list) { // 確保調用 size() 後, list 大小不會改變
        for (int i = 0; i < list.size(); ++i) {
            doSomething(list[i]);
        }
    }
  • 解決條件運算:this

    synchronized(list) { // 確保調用 size() 後, list 大小不會改變
        int lastIndex = list.size() - 1;
        list.remove(lastIndex);
    }

這樣作的弊端是:線程

作任何操做都要鎖住整個容器, 效率低, 容易出錯.設計

迭代器 和 ConcurrentModificationException

Collection進行迭代的標準方法是使用 Iterator, 不管是顯式使用仍是 經過 JDK 1.5 以後的 for-each 語法. code

在 迭代 的時候, 仍有其餘線程在併發修改容器的可能性, 使用迭代器仍不可避免地須要在迭代期間對容器加鎖.xml

迭代器在併發修改的時候, 策略是 及時失敗(fail-fast) 的: 當發現迭代器被修改後(如: add 和 remove), 會拋出一個未檢查的 ConcurrentModificationException

以 ArrayList 爲例子, 其父類 AbstractList 內部有一個字段名爲 modCount 的計數器. 任何改變 List 大小的操做都須要改變 modCount 這個值. 

這個值會被用來在迭代或者時, 檢查有沒有修改容器, 套路是這樣的:

修改時:

if (modCount != expectedModCount) 
        throw new ConcurrentModificationException();
    }
    // Add or Remove
    // .......
    expectedModCount = modCount;

迭代:

public E prev/next() {
        if (modCount != expectedModCount) 
            throw new ConcurrentModificationException();
        }
        // Other.....
    }

Note: ConcurrentModificationException 也能夠出現單線程的代碼中, 好比當在迭代期間調用 remove 方法

隱藏的迭代器

有時候, 一些操做會隱含的調用迭代器, 好比:

  1. 調用 toString() 方法, 尤爲是寫 log 時, 有 

    log("Set:" + set);

    這樣的語句.

  2. hashCode 和 equals 方法, 如下是 HashTable 的 hashCode 和 equals 方法:

    public synchronized boolean equals(Object o) {
        if (o == this)
        return true;
    
        if (!(o instanceof Map))
            return false;
        Map<?,?> t = (Map<?,?>) o;
        if (t.size() != size())
            return false;
    
        try {
            Iterator<Map.Entry<K,V>> i = entrySet().iterator();
            while (i.hasNext()) {
                Map.Entry<K,V> e = i.next();
                K key = e.getKey();
                V value = e.getValue();
                if (value == null) {
                    if (!(t.get(key)==null && t.containsKey(key)))
                        return false;
                } else {
                    if (!value.equals(t.get(key)))
                        return false;
                }
            }
        } catch (ClassCastException unused)   {
            return false;
        } catch (NullPointerException unused) {
            return false;
        }
    
        return true;
    }
    
    public synchronized int hashCode() {
        int h = 0;
        if (count == 0 || loadFactor < 0)
            return h;  // Returns zero
    
        loadFactor = -loadFactor;  // Mark hashCode computation in progress
        Entry<?,?>[] tab = table;
        for (Entry<?,?> entry : tab) {
            while (entry != null) {
                h += entry.hashCode();
                entry = entry.next;
            }
        }
    
        loadFactor = -loadFactor;  // Mark hashCode computation complete
    
        return h;
    }
  3. 另外 containAllremoveAll 和 retainAll 也會產生迭代.

JDK 1.5 以後的容器

JDK 1.5 後, 新增長了:

  • ConcurrentHashMap, 來替代同步的 Map 實現, 增長了 put-if-absent, 替換和條件刪除
  • CopyOnWriteArrayList, 是 List 相應的同步實現
  • Queue, 用來臨時保存正在等待進一步處理的一系列元素, 實現包括
    • ConcurrentLinkedQueue, 一個傳統的 FIFO 隊列
    • PriorityQueue, 一個(非併發)居右優先級順序的隊列
  • BlockingQueue, 拓展自 Queue, 增長了可阻塞的插入和獲取操做. 
    • 若是隊列是空的, 那麼獲取操做會被阻塞直到有元素存在; 
    • 若是隊列是滿的, 那麼插入操做會被阻塞直到有有元素被取出.

JDK 1.6 後, 新增長了 

  1. Deque 和 BlockingDeque, 分別擴展了 Queue 和 BlockingQueue:

    • Deque 接口, 實現類是 ArrayDeque, 不阻塞
    • BlockingDeque 接口, 實現類是 LinkedBlockingDeque, 阻塞.
  2. ConcurrentSkipListMap 和 ConcurrentSkipListSet, 做爲 SortedMap 和 SortedSet 的併發替代品

Note: 從一個空的Queue中取元素, 並不會阻塞, 而是返回 null

ConcurrentHashMap

在 ConcurrentHashMap 以前, HashTable 和 SynchronizedMap 都是經過給整個方法加 synchronized 來達到同步的, 這樣限制某一時刻只有一個線程能夠訪問容器.

ConcurrentHashMap 使用一個更加細化的鎖機制, 名叫分離鎖. 這個機制容許更深層次的共享訪問: 

  • 任意數量的讀線程能夠併發訪問 Map.
  • 讀者和寫者能夠併發訪問 Map.
  • 有限數量的寫線程能夠併發修改 Map.

因爲併發環境中, Map 的大小一般是動態的, size 和 isEmpty 返回的只是個估算值(可能返回後接着過時).

支持的複合操做:

  1. put-if-absent
  2. remove-if-equal
  3. replace-if-equal

CopyOnWriteArrayList

寫入時複製(COW)容器的線程安全原理:

只要不可邊對象被正確發佈, 那麼訪問它將不須要更多的同步.

所以, 每次添加/修改一個元素, 容器內就會新建立一個新的數組, 容器底層的數組會指向這個新數組. 舊數組仍然被使用, 直到沒有引用後被 GC 回收.

因爲 COW 複製數組有開銷, 因此 COW 適用於容器迭代操做遠遠高於對容器修改的頻率.

FAQ: Arrays.copyOf 和 System.arraycopy 區別?

Arrays.copyOf 不只會複製元素, 還會建立新的數組. System.arrayCopy 拷貝到一個現有數組, Arrays.copyOf 實現中用了 System.arrayCopy;

阻塞隊列和生產者-消費者模式

生產者-消費者設計分離了 "識別須要完成的工做" 和 "執行工做". 該模式不會發現一個工做便當即處理, 而是把工做置入一個任務清單中:

  • 生產者不須要知道消費者的身份或者數量, 甚至根本沒有消費者.
  • 消費者也不須要知道生產者是誰, 以及是誰給它們安排的工做.
  • 生產者和消費者的關係是相對的, 消費者能夠成爲下一個任務隊列的生產者

最多見的生產者-消費者設計是: 線程池和工做隊列的結合

在設計初期就使用阻塞隊列創建對資源的管理, 提前作這件事情會比往後再修復容易的多.

Blocking queue 提供了可阻塞的 put 和 take 方法. 常見的實現有:

  1. LinkedBlockedQueue, FIFO, 鏈表實現, 隊列首 take, 隊列尾 put.
  2. ArrayBlockingQueue, FIFO, 數組實現, 能夠在 putIndex(隊列尾) 插入, 從 takeIndex(隊列首) 取出.
  3. PriorityBlockingQueue, 根據 Comparator 排序順序取出
  4. SynchronousQueue, 生產線程直接和消費線程對接, 若是生產線程找不到消費者或反之, 則, put 和 take 會一直阻止. 只有在消費者充足的時候比較適合, 他們總能爲下一個任務作好準備.

雙端隊列和竊取工做

雙端隊列用來實現 竊取工做(work stealing) 模式. 

在傳統的 生產者-消費者 設計中, 全部的消費者只共享一個工做隊列.

而在 竊取工做 設計中, 每個消費者都有一個本身的雙端隊列. 若是一個消費者完成了本身雙端隊列中的所有工做, 它能夠偷取其餘消費者的雙端隊列的 末尾 任務(其餘消費者仍然從隊列  取任務). 

由於工做者線程並不會競爭一個共享的任務隊列, 因此 竊取工做 模式比傳統的 生產者-消費者 設計有更好的伸縮性.

阻塞和可中斷的方法

阻塞: 線程被掛起, 狀態變爲BLOCKEDWAITING 或是 TIMED_WAITING等待直到一個事件發生才能繼續進行.

BlockingQueue 的 put 和 take 方法會拋出一個受檢查的 InterruptedException, 這個異常說明這是個阻塞方法, 能夠被中斷來提早結束阻塞.

處理中斷的方法:

  • 傳遞 InterruptedException. 傳遞給調用者, 能夠對其中特定活動進行簡潔地清理後, 再拋出.
  • 恢復中斷. 當代碼是 Runnable的一部分時, 必須捕獲 InterruptedException. 而且, 在當前線程中調用 interrupt 從新設置中斷狀態(拋出異常會清理中斷標誌位), 這樣調用棧中更高層代碼能夠發現中斷已經發生. 

    try {
        processTask(queue.take());
    } catch (InterruptedException e) {
        // 恢復中斷狀態
        Thread.currentThread().interrupt();
    }

Synchronizer

Synchronizer 是一個對象, 它根據自己的狀態調節線程的控制流. 主要類型有:

  1. 信號量(semaphore)
  2. 關卡(barrier)
  3. 閉鎖(latch)

他們的特性: 封裝狀態, 這些狀態絕對着線程執行到某一點時是經過仍是被迫等待.

閉鎖 latch

直到 閉鎖 到達 終點狀態 以前, 門一直是關閉的, 沒有線程可以經過, 在 終點狀態 到來的時候, 門開了, 容許全部線程經過. 一旦到了終點狀態, 他就 不能 再改變狀態了.

用例:

  1. 確保一個計算不會執行, 直到它須要的資源初始化.
  2. 確保一個服務不會開始, 直到它依賴的其餘服務都已經開始.
  3. 全部玩家等待就緒, 再開始.

FutureTask

FutureTask 描述了一個抽象的可攜帶結果的計算. FutureTask的計算經過 Callable 實現.

Callable 等價於一個可攜帶結果的 RunnableCallable 有三種狀態:

  1. 等待
  2. 運行
  3. 完成(包括正常結束, 取消 和 異常)

要獲取 FutureTask 的結果, 能夠調用 get() 方法. 調用 get() 時, 有兩種狀況:

  1. 若已經完成, 則直接獲取結果
  2. 若還未完成, 則阻塞, 直至任務完成返回結果或者拋出異常.

FutureTask 保證了計算結果將計算線程安全的傳遞到當前線程. 

假如FutureTask執行的任務有異常拋出, 則異常會被封裝在 ExecutionException 裏. 如下代碼能夠從 ExecutionException 中取出異常:

try {
        futureTask.get();
    } catch (ExecutionException e) {
        Throwable cause = e.getCause(); 
        if (cause instanceOf XXXException) {
            // 本身想要捕獲的異常
        } else {
            throw launderThrowable(cause);
        }
    }

    public static RuntimeException launderThrowable(Throwable cause) {
        if (t instanceOf RuntimeException) {
            return (RuntimeException)t;
        } else if (t instanceOf Error) {

        } else {
            throw new IllegalStateException("Not unchecked", t);
        }
    }

信號量 (Semaphore)

計數信號量用來控制可以同時訪問某種資源的活動的數量, 或者同時執行某一操做的數量.

使用計數信號量以前須要先構造一個, 構造時能夠將許可集(permit)總數傳遞進去. 在使用計數信號量時, 要先嚐試獲取(acquire)一個許可, 假如此時有剩餘許可則繼續執行, 若沒有, 則 阻塞. 使用完以後, 要手動釋放(release)一個許可. 

用處:

  1. 構造一個定長的池.
  2. 構建有界阻塞容器.

關卡 (CyclicBarrier)

關卡用來阻塞一組線程, 直到 全部線程 達到一個條件. 就像一些家庭成員指定商場的一個集合地點:"咱們每一個人6:00在麥當勞見, 到了之後不見不散, 以後咱們再決定接下來作什麼". 

關卡 與 閉鎖 的不一樣:

關卡: 等待的是其餘線程, 能夠重複被使用 閉鎖: 等待的是事件, 只能使用一次

當一個線程到達關卡點時, 調用 awaitawait 會被阻塞, 直到全部線程都到達關卡點.

  • 若是全部線程都到達了關卡點, 關卡就被成功地突破, 全部線程會被釋放.
  • 若是對 await 的調用超時, 或者阻塞中的線程被中斷, 那麼關卡就被認爲是 失敗 的. 

    • 若某一個線程調用有時限的 await, 那麼當這個線程 await 超時, 這個線程會拋出 TimeoutException 異常, 其餘調用 barrior.await() 的線程會拋出 BrokenBarrierException;

若是成功地經過關卡, await 爲每個線程返回一個惟一的到達索引號, 能夠用來 "選舉" 產生一個領導, 在下一次迭代中承擔一些特殊工做.

CyclicBarrier 也容許你向構造函數傳遞一個 關卡行爲(Barrier action), 這是一個 Runnable, 當成功經過關卡的時候, 會(在 某一個 子任務線程中) 執行, 可是在阻塞線程被釋放以前是不能執行的.

Exchanger

Exchanger 是關卡的另外一種形式, 它是一種兩步關卡, 在關卡點會交換數據.

相關文章
相關標籤/搜索