第五章 基礎構建模塊

5.1 同步容器類

  實現方式 : 將他們的狀態封裝起來,並對每一個公有方法都進行同步, 使得每次只有一個線程能夠訪問. java

5.1.1 存在的問題

  複合操做 並不是線程安全. 好比 迭代, 條件運算等.數組

  在對同步容器類的複合操做加鎖時必定要以容器對象爲鎖對象, 保證複合操做的鎖對象容器使用的鎖對象一致.才能實現一個線程安全的複合操做安全

 public static void getLast(Vector<?> list) {
        // 此處的鎖對象必須和 Vector 內部的鎖對象一致
        synchronized (list) {
            int lastIndex = list.size()-1;
            list.remove(lastIndex);
        }
    }

 

5.1.2 迭代器與ConcurrentModificationException

  在容器的迭代過程中被修改(結構上被改變)時會拋出  ConcurrentModificationException併發

  解決方法 : 在迭代過程當中持有容器的鎖. 並在全部對共享容器進行迭代的地方加鎖框架

5.1.3 隱藏迭代器

  如下操做也會間接的進行容器的迭代操做異步

  toString() , hashCode() , equals() 等不少方法都出觸發容器的迭代操做. ide

5.2 併發容器

5.2.1 ConcurrentHashMap

  •   加鎖策略 : 分段鎖(粒度更細的加鎖機制), 同步方法塊
  •   支持多個線程併發的訪問  ConcurrentHashMap , 實現更高的吞吐量 .  
  •   ConcurrentHashMap 返回的迭代器具備弱一致性 , 能夠(可是不保證)將修改操做當即反映給容器
  •   迭代過程當中不會加鎖 , 也不會拋出 ConcurrentModificationException , 
  •   將一些複合操做(putIfAbsent() 若沒有則添加) 實現爲原子操做

5.2.3 CopyOnWriteArrayList

  • 每次修改容器時先複製數組, 引用依舊指向原來的數組 , 而後修改新的數組, 最後將引用指向新的數組.
  • 寫入時複製 也可理解爲 修改時複製
  • 返回的迭代器不會拋出  ConcurrentModificationException 
  • 使用場景 : 迭代操做遠遠多於修改操做. 複製數組的操做有必定的開銷.
  • 修改操做使用 ReentrantLock 進行加鎖

5.3 阻塞隊列

  • 提供阻塞的 put 和 take 方法
  • put 方法將阻塞到直到有空間可用 , take 方法將阻塞到直到有元素可用
  • 隊列能夠有界, 也能夠無界
  • 修改容器時統一使用建立隊列實例時建立的 ReentrantLock 對象

  BlockingQueue(阻塞隊列接口)工具

    LinkedBlockingQueue 相似與 LinkedListspa

    ArrayBlockingQueue 相似與 ArrayList線程

    PriorityBlockingQueue   按優先級排序的隊列

    SynchronousQueue 每一個插入操做必須等待另外一個線程的對應移除操做 ,反之亦然。很是適合作交換工做,生產者的線程和消費者的線程同步以傳遞某些信息、事件或者任務。

 5.3.1 生產者與消費者的特色

  • 生產者和消費者只須要完成各自的任務
  • 阻塞隊列將負責全部的控制流
  • 每一個功能的代碼和邏輯更加清楚
  • 他們總體的並行度取決於二者中較低的並行度

  生產者和消費者設計也可使用 Executor 任務執行框架來實現, 其自己也使用 生產者--消費者模式

5.3.2 串行線程封閉 

  • 線程封閉對象只能由單個對象擁有,能夠經過安全的發該對象來轉移全部權.而且發佈對象的線程不會再訪問它
  • 轉移全部權以後,只有另一個線程得到這個對象的訪問權限, 它能夠對它作任意的修改,由於它有獨佔訪問權.

5.3.3 雙端隊列和工做密取

  雙端隊列

  • ArrayDeque 和 LinkedBlockingDeque
  • 實如今隊列頭和隊列尾的高效插入和移除.

  工做密取

  • 每一個消費者有本身的雙端隊列 , 當一個消費者完成本身隊列的全部任務後 , 那麼它能夠從其餘消費者的雙端隊列祕密的獲取任務 .

5.4 阻塞方法和中斷方法

  處理 InterruptedException 

  • 將 InterruptedException  傳遞給方法的調用者
  • 捕獲這個異常, 並恢復中斷狀態

5.5 同步工具類

5.5.1 閉鎖(CountDownLatch)

  做用 : 用來確保某些活動直到其餘活動都完成後才執行.

  • 計數器 : 表示須要等待的事件數量
  • await() : 阻塞調用此方法的線程直到計數器爲0
  • countDown() : 計數器遞減

用法一 : 建立必定數量的線程,多個線程併發的執行任務

  • 使用兩個閉鎖, 分別表示起始門和結束門. 起始門初始值爲1 , 結束門初始值爲等待的事件數量. 
  • 每一個工做線程在起始門等待,
  • 全部線程就緒後起始門調用 countDown()直到起始門的值爲0, 以後全部線程同時開始執行(由於線程以前都在起始門等待)
  • 主線程調用結束門的await(), 主線程阻塞直到全部工做線程結束
  • 工做線程所有執行完畢, 主線程運行
package com.pinnet.test;

import java.util.concurrent.CountDownLatch;

public class CountLatchTest {

    public void timeTask(int threadNumbers, Runnable task) throws InterruptedException {
        CountDownLatch start = new CountDownLatch(1);
        CountDownLatch end = new CountDownLatch(threadNumbers);

        for (int i = 0; i < threadNumbers; i++) {
            new Thread() {
                public void run() {
                    try {
                        // 全部線程在起始門等待
                        start.await();
                        // 執行任務
                        task.run();
                        // 結束門遞減
                        end.countDown();
                    } catch (InterruptedException e) {

                    }
                }
            }.start();
        }
        // 全部工做線程開始執行
        start.countDown();
        // 全部工做線程啓動後主線程當即登待
        end.await();
        System.out.println("開始主線程");

    }

}

用法二: 建立必定數量的線程,多個線程依次的執行任務

  • 使用一個閉鎖
  • 線程依次啓動, 執行完成後countDown() 
  • 主線程 await() , 直到計數器爲0 ,主線程執行
public void timeTask2(int threadNumbers, Runnable task) throws InterruptedException {
        CountDownLatch start = new CountDownLatch(threadNumbers);
        // 任務一次執行
        for (int i = 0; i < threadNumbers; i++) {
            new Thread() {
                public void run() {
                        // 任務開始
                        task.run();
                        // 遞減
                        start.countDown();
                }
            }.start();
        }
        // 主線程阻塞直到計數器爲0
        start.await();
        System.out.println("開始主線程");

    }

5.5.2 FutureTask

  一種可生成結果,可異步取消的計算.

  • 計算經過 Callable 實現
  • 能夠將計算結果從執行任務的線程 傳遞 到獲取這個結果的線程 , 並保證其安全性
  • get() 獲取結果可阻塞到任務完成
public class FutureTaskTest {
    // 建立任務
    private final FutureTask<Integer> future = new FutureTask<>(new Callable<Integer>() {
        public Integer call() {
            return 123;
        }
    });
    // 建立線程
    private final Thread thread = new Thread(future);
    // 對外提供方法啓動線程
    public void start() {
        thread.start();
    }
    // 獲取計算結果
    public Integer get() {
        try {
            return future.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
            return null;
        }
    }
}

5.5.3 信號量(Semaphore)

  • 控制同時訪問某個資源或執行某個操做的數量.
  • Semaphore 管理必定數量的許可, 執行操做時先獲取許可,執行完操做後釋放許可.
  • 獲取許可時可阻塞.

5.5.4 柵欄

  閉鎖和柵欄的區別 :

  • 閉鎖是一次性對象 , 一旦進入終止狀態 , 就不能被重置
  • 柵欄是全部線程必須同時到達柵欄位置 , 才能繼續執行.
  • 閉鎖用於等待事件 , 柵欄用於等待線程.

  基本使用 : 

  • 指定數量線程到達柵欄位置後,全部線程被釋放 . 柵欄將被重置 
  • 成功經過柵欄,await()將會返回一個到達索引號
  • await() 超時 或 await()阻塞的線程被中斷 , 則柵欄被打破.全部阻塞的await() 調用被終止 , 並拋出 BrokenBarrierException.

  用法一 : CyclicBarrier(int number)   

      await() 調用 number 次後全部調用 await() 的線程繼續執行 , 不然線程在await() 阻塞

public void barrier() {
        int number = 6;
        // 參數表示屏障攔截的線程數量 
        // barrier.await() 調用 number 次後全部線程的阻塞狀態解除
        CyclicBarrier barrier = new CyclicBarrier(number);
        
        for (int i = 0; i < number; i++) {
            new Thread(new Runnable() {
                
                @Override
                public void run() {
                    System.out.println("此線程任務已經完成");
                    try {
                        // 調用await方法告訴CyclicBarrier我已經到達了屏障,而後當前線程被阻塞。
                        barrier.await();
                        System.out.println("全部線程執行完成");
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }

用法二 : CyclicBarrier(int parties, Runnable runnable) 指定數量的線程到達屏障點後 執行  runnable

 public void barrier2() {
        int number = 6;
        // 參數表示屏障攔截的線程數量 
        // barrier.await() 調用 number 次後全部線程的阻塞狀態解除
        CyclicBarrier barrier = new CyclicBarrier(number, new Runnable() {
            
            @Override
            public void run() {
                //  指定數量的線程到達屏障點後執行  barrier()
                barrier();
            }
        });
        
        for (int i = 0; i < number; i++) {
            new Thread(new Runnable() {
                
                @Override
                public void run() {
                    System.out.println("此線程任務已經完成");
                    try {
                        // 調用await方法告訴CyclicBarrier我已經到達了屏障,而後當前線程被阻塞。
                        barrier.await();
                        System.out.println("全部線程執行完成");
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }

雙方形式的柵欄 Exchanger 

相關文章
相關標籤/搜索