高級程序員需知的併發編程知識(二)

說明

本篇是繼上一篇併發編程未討論完的內容的續篇。上一篇傳送門:java

Java併發編程一萬字總結(吐血整理)編程

活躍性問題

在上一篇咱們討論併發編程帶來的風險的時候,說到其中 一個風險就是活躍性問題。活躍性問題其實就是咱們的程序在某些場景或條件下執行不下去了。在這個話題下咱們會去了解什麼是死鎖、活鎖以及飢餓,該如何避免這些狀況的發生。緩存

死鎖

咱們通常使用加鎖來保證線程安全,可是過分地使用加鎖,可能致使死鎖發生。安全

哲學家進餐問題bash

「哲學家進餐」問題能很好地描述死鎖的場景。5個哲學家去吃火鍋,坐在一張圓桌上。它們有5根筷子(不是5雙),這5根筷子放在每一個人的中間。哲學家時而思考,時而進餐。每一個人都要取到一雙筷子才能吃到東西,而且在吃完後將筷子放回原處。網絡

在這裏插入圖片描述
能夠考慮一下這種狀況,若是每一個人都當即抓住本身左邊的筷子,而後等待本身右邊的筷子空出來,但同時都不放手本身已經拿到的筷子。會出現什麼狀況。能夠想到,每一個人都吃不上火鍋了,只等涼涼了。多線程

什麼是死鎖併發

每一個人都擁有其餘人須要的資源,同時又等待其餘人已經擁有的資源,而且每一個人在得到所需資源以前都不會放棄已經擁有的資源。這就是一種死鎖。app

再使用線程的術語描述一下。在線程A持有鎖L並想得到鎖M的同時,線程B持有鎖M並嘗試獲取鎖L,那麼這兩個線程將永遠地等待下去。框架

簡單死鎖代碼示例

public class LeftRightDeadLock {
    private final Object left = new Object();
    private final Object right = new Object();
    
    public void leftRight(){
        synchronized (left){
            synchronized (right){
                doSomething();
            }
        }
    }
    
    public void rightLeft(){
        synchronized (right){
            synchronized (left){
                doSomething();
            }
        }
    }
}

上面的代碼中,若是一個線程執行leftRight()方法,另外一個線程調用rightLeft()方法,則會發生死鎖。

上面生產死鎖的緣由是,兩個線程視圖以不一樣的順序來得到相同的鎖。若是按照相同的順序請求鎖,那麼就不會出現循環的加鎖依賴性,所以就不會產生死鎖。

產生死鎖的四個條件

有個叫Coffman的牛人幫咱們總結了產生死鎖的四個條件:

  1. 互斥,共享資源X和Y只能被一個線程佔用
  2. 佔用且等待,線程T1已經得到了共享資源X,在等待共享資源Y的時候,不釋放共享資源X;
  3. 不可搶佔,其餘線程不能強行搶佔線程T1佔用的資源;
  4. 循環等待,線程T1等待線程T2佔有的資源,線程T2等待線程T1佔有的資源,就是循環等待。

反過來講,咱們只要破壞掉四個條件中的一個,就能夠避免死鎖的發生。

首先第一個互斥條件無法破壞,由於加鎖就是互斥的語義。

  1. 對於「佔用且等待」的條件,咱們能夠一次性申請全部資源;
  2. 對於「不可搶佔」這個條件,佔用部分資源的線程在申請其餘資源時,若是申請不到,能夠主動釋放它佔有的資源。
  3. 對於「循環等待」這個條件,能夠按照固定的順序申請資源,全部線程都按照規定的順序得到鎖,這樣就不存在循環等待了。

活鎖

活鎖是另外一種形式的活躍性問題,該問題儘管不會阻塞線程,但也不能繼續執行下去,由於線程將不斷重複執行相同的操做,並且總會失敗。

當多個相互協做的線程都對彼此進行響應從而修改各自的狀態,並使得任何一個線程都沒法繼續執行時,就發生了活鎖。就好比路上兩我的相遇,出於禮貌,都給對方讓路,結果每次都碰到一塊兒。

要解決這種活鎖問題,須要在重試機制中加入隨機性。好比,在網絡上,兩臺機器使用相同的載波來發送數據包,那麼這些數據包就會發生衝突。這兩臺機器都檢查到了衝突,並都在稍後再次重發。若是兩者都選擇了在1秒後重試,那麼又會發生衝突,而且不斷地衝突下去,於是即便有大量閒置的帶寬,也沒法將數據包發送出去。爲避免這種狀況的發生,須要讓他們分別等待一段隨機的時間,這樣就能避免活鎖的發生了。

飢餓

「飢餓」就是當線程因爲沒法訪問它所須要的資源而不能繼續執行時的場景。所謂「不患寡而患不均」。當某些線程一直獲取不到CPU執行資源的時候,就發生了「飢餓」。

一些容易致使飢餓的場景:

  1. 在應用中對Java線程優先級的使用不當。(由於JVM會將Thread API中的10個優先級映射到操做系統的調度優先級上,這就可能存在兩個不一樣的優先級被映射到了操做系統層的同一個優先級,所以儘可能不要改變線程優先級)
  2. 持有鎖的線程,若是執行的時間過程或者存在無限循環,也可能致使「飢餓」問題。

解決「飢餓」問題的通常方案就是使用公平鎖(注意synchronized術語非公平鎖)。

JUC工具類庫

Java併發包給咱們提供了很是豐富的構建併發程序的基礎模塊,例如線程安全容器類、同步工具類,阻塞隊列等。

這些工具類都在java.util.concurrent包下面,因此簡稱J.U.C工具包。

同步容器和併發容器

同步容器類有哪些

主要有Vector和Hashtable,這兩個都是早期JDK的一部分,還有一些封裝器類是由Collections.sychronizedXxx等工廠方法建立的。
這些類實現線程安全的方式都是:將他們的狀態封裝起來,並對每一個公有方法都進行同步,也就是使用synchronized內置鎖的方式,使得每次只有一個線程能訪問容器的狀態。

同步容器類的問題

同步容器類雖然是線程安全的類,可是在某些場景下可能須要額外的客戶端加鎖來保護複合操做的線程安全性。好比迭代(反覆訪問元素,直到遍歷完容器中的全部元素)、條件運算(若是沒有則添加)。下面給出一個示例說明下:

public class GetLastElement implements Runnable{
    private List<Object> list;

    public GetLastElement(List<Object> list) {
        this.list = list;
    }

    public Object getLast(){
        int index = list.size() - 1;
        return list.get(index);
    }

    public void run() {
        getLast();
    }
}

public class RemoveLastElement implements Runnable{
    private List<Object> list;

    public RemoveLastElement(List<Object> list) {
        this.list = list;
    }

    public void deleteLast(){
        int index = list.size() - 1;
        list.remove(index);
    }

    public void run() {
        deleteLast();
        System.out.println(list);
    }

    public static void main(String[] args) {
        List<Object> list = new Vector<Object>();
        list.add(123);
        list.add("hello");

        Thread thread1 = new Thread(new GetLastElement(list));
        Thread thread2 = new Thread(new RemoveLastElement(list));
        thread1.start();
        thread2.start();
    }
}

上面的代碼中,可能會出現,線程1在獲取get(2)的時候,2位置上的元素已經被線程2給remove(2)掉了。此時線程1會拋出異常,但這並非咱們指望獲得的結果。

所以爲了保證複合操做的原子性,就須要在get和remove方法上加鎖。像下面這樣:

public Object getLast(){
        synchronized (list){
            int index = list.size() - 1;
            return list.get(index);
        }        
    }

    public void deleteLast(){
        synchronized (list){
            int index = list.size() - 1;
            list.remove(index);
        }
    }

這樣就保證了在調用size()和get()方法之間,不會有機會讓另一個線程進來remove掉元素了。

迭代器和ConcurrentModificationException

Java中的集合類庫都是實現了Iterator迭代器接口。不管是使用標準的迭代仍是for-each循還語法中,對容器類進行迭代遍歷的方式都是Iterator。然而,若是有其餘線程併發地修改容器,那麼即便是使用迭代器也沒法避免在迭代期間對容器加鎖。

快速失敗(fail-fast)機制

所謂fail-fast,其實就是一種警示機制,就是當它們發現容器在迭代過程當中被修改是,就會拋出一個ConcurrentModificationException異常。

好比下面這段代碼:

List<String> list = new Vector<String>();
        list.add("java");
        for(String str : list){
            list.remove(str);
        }

運行時就會拋出ConcurrentModificationException異常,事實上咱們換成ArrayLlist也同樣存在這種fail-fast機制。

Exception in thread "main" java.util.ConcurrentModificationException
	at java.util.Vector$Itr.checkForComodification(Vector.java:1184)
	at java.util.Vector$Itr.next(Vector.java:1137)
	at thread.syncontainer.FailFast.main(FailFast.java:10)

另一點,同步容器類將全部對容器狀態的訪問都串行化,以此來實現線程安全性,但這種方法嚴重下降了併發性,當多個線程競爭容器的鎖時,吞吐量將嚴重下降。

併發容器

Java5提供了多種併發容器用來改進同步容器的性能問題。

  • 增長了ConcurrentHashMap,用來代替同步且基於散列的Map;
  • CopyOnWriteArrayList,用於在遍歷操做爲主要操做的狀況下代替同步的List。
  • 新增了Queue和BlockingQueue;
  • Java6又引入了ConcurrentSkipListMap和ConcurrentSkipListSet,分別做爲同步的SortedMap和SortedSet的併發替代品。

ConcurrentHashMap

與HashMap同樣,ConcurrentHashMap也是一個基於散列的Map,可是它使用了一種徹底不一樣的加鎖策略來提供更高的併發性和伸縮性。ConcurrentHashMap並非將每一個方法都在同一把鎖上同步並使得每次只能有一個線程訪問容器,而是使用一種粒度更細的加鎖機制來實現更大程度的共享,這種機制成爲分段鎖。在這種機制中,多個讀線程能夠併發訪問Map,讀線程和寫線程也能夠併發地訪問Map。而且必定數量的寫入線程能夠併發地修改Map。ConcurrentHashMap帶來的結果就是,在併發訪問環境下實現更高的吞吐量。

同時ConcurrentHashMap提供的迭代器不會拋出ConcurrentModificationException異常。另一點是實現了ConcurrentMap,該接口提供了一些諸如「如沒有則添加」,「若相等則移除」,「若相等則替換」等複合操做,而且能保證是原子操做。

CopyOnWriteArrayList

「寫入時複製(copy-on-write)」容器的線程安全性在於,只要正確地發佈一個事實不可變的對象,那麼在訪問該對象那個時就再也不須要進一步的同步。在每次修改時,都會建立並從新發佈一個新的容器副本,從而實現可變性。

阻塞隊列

阻塞隊列提供了可阻塞的put和take方法,以及支持定時的offer和poll方法。若是隊列已經滿了,那麼put方法將阻塞知道有空間可用;若是隊列爲空,那麼take方法將會阻塞直到有元素可用。隊列可使有界的,也能夠是無界的,無界隊列永遠都不會充滿,所以無界隊列上的put方法也永遠不會阻塞。

Java類庫中阻塞隊列BlockingQueue是一個接口,它有多種實現,其中LinkedBlockingQueue和ArrayBlockingQueue是FIFO(先進先出)隊列,兩者分別和LinkedList和ArrayList相似,但比同步List擁有更好的併發性能。PriorityBlockingQueue是一個按優先級排序的隊列,當你但願按照某種順序而不是FIFO來處理元素時,可使用這個隊列。

基於阻塞隊列,能夠很是容易地實現生產者-消費者模型。當數據生成時,生產者只須要把數據放入阻塞隊列中,而消費者從隊列中拿數據消費就能夠了。相比wait/notify實現起來要簡單許多。

CountDownLatch

CountDownLatch是一種閉鎖的實現。那什麼是閉鎖呢?閉鎖就是一個同步工具類,能夠延遲線程的進度直到其到達終止狀態。閉鎖就至關於一扇門:在閉鎖到達結束狀態以前,這扇門一直是關閉的,而且沒有任何線程能經過,當到達結束狀態時(通常就是計數器爲0),這扇門會打開並容許全部線程經過。

你能夠向CountDownLatch對象設置一個初始的計數值,任何在這個對象上調用await()方法都將阻塞,知道這個計數值爲0。而其餘任務(或者說線程)在結束其工做時,能夠在該對象上調用countDown()方法來減少這個計數器值。

另外須要注意的一點是,CountDownLatch被設計爲只觸發一次,意思就是計數器減到0以後不會再重置了。

看個圖能更清楚點:

在這裏插入圖片描述

能夠看到Thread-0在CountDownLatch實體對象上調用了await()方法後阻塞,直到其餘兩個線程的運行將計數器count的值減爲0,Thread-0線程才繼續執行。

CountDownLatch的典型用法是將一個任務分紅N個獨立的子任務,N個子任務分別由N個線程執行,建立一個初始值爲N的CountDownLatch閉鎖,主線程調用await()方法阻塞等待子任務的執行結果,子線程在執行完任務是調用countDown遞減計數器值。

public class TaskPortion implements Runnable{
    private static int counter = 0;
    private final int id = counter++;
    private final CountDownLatch countDownLatch;

    public TaskPortion(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    public void run() {
        doWork();
        countDownLatch.countDown();
    }

    public void doWork(){
        System.out.println(this + "完成");
    }

    @Override
    public String toString() {
        return "TaskPortion{" +
                "id=" + id +
                '}';
    }
}

public class MainTask implements Runnable{
    private CountDownLatch countDownLatch;

    public MainTask(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    public void run() {
        try {
            countDownLatch.await();
            System.out.println("主任務完成");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        CountDownLatch countDownLatch = new CountDownLatch(10);
        for(int i = 0; i < 10; i++){
            exec.execute(new TaskPortion(countDownLatch));
        }
        exec.execute(new MainTask(countDownLatch));
        exec.shutdown();
    }
}

運行結果:

TaskPortion{id=1}完成
TaskPortion{id=3}完成
TaskPortion{id=0}完成
TaskPortion{id=5}完成
TaskPortion{id=9}完成
TaskPortion{id=4}完成
TaskPortion{id=8}完成
TaskPortion{id=6}完成
TaskPortion{id=2}完成
TaskPortion{id=7}完成
主任務完成

能夠看到當全部子任務都完成了,主任務才能完成。

CyclicBarrier

柵欄(Barrier)相似於閉鎖,它能阻塞一組線程直到某個事件的發生。柵欄和閉鎖的關鍵區別在於,全部線程必須同時到達柵欄位置,才能繼續執行。

CyclicBarrier是柵欄的一種實現,並且從名字中Cycilc能夠判斷是能夠循環利用的柵欄(Barrier)。它適合這種一種場景:你但願建立一組任務,它們並行地執行工做,而後在進行下一個步驟前等待,直至全部任務都完成。它使得全部的並行任務都將在柵欄處列隊,所以能夠一致地向前移動。

下面引用《Java編程思想》中的賽馬遊戲中的例子作爲示例:

public class Horse implements Runnable{
    private static int counter = 0;
    private final int id = counter++;
    private int strides = 0;
    private static Random rand = new Random(7);
    private static CyclicBarrier barrier;
    public Horse(CyclicBarrier b){ barrier = b; }
    public synchronized int getStrides(){  return strides; }

    public void run() {
        try {
            while (!Thread.interrupted()){
                synchronized (this){
                    strides += rand.nextInt(3);
                }
                barrier.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }

    public String toString(){ return "Horse " + id + " "; }

    public String tracks(){
        StringBuilder s = new StringBuilder();
        for(int i = 0; i < getStrides(); i++ ){
            s.append("*");
        }
        s.append(id);
        return s.toString();
    }
}

public class HorseRace {
    static final int FINISH_LINE = 15;
    private List<Horse> horses = new ArrayList<Horse>();
    private ExecutorService exec = Executors.newCachedThreadPool();
    private CyclicBarrier barrier;

    public HorseRace(int nHorses, final int pause){
        barrier = new CyclicBarrier(nHorses, new Runnable() {
            public void run() {
                StringBuilder s = new StringBuilder();
                for(int i = 0; i< FINISH_LINE; i++){
                    s.append("=");
                }
                System.out.println(s);

                for(Horse horse : horses){
                    System.out.println(horse.tracks());
                }
                for(Horse horse : horses){
                    if(horse.getStrides() >= FINISH_LINE){
                        System.out.println(horse + "won!");
                        exec.shutdownNow();
                        return;
                    }
                }

                try {
                    TimeUnit.MILLISECONDS.sleep(pause);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        for (int i = 0; i < nHorses; i++){
            Horse horse = new Horse(barrier);
            horses.add(horse);
            exec.execute(horse);
        }
    }

    public static void main(String[] args) {
        int nHorses = 7;
        int pause = 200;
        new HorseRace(nHorses,pause);
    }
}

這個程序挺有意思,這裏我把馬的匹數調小到3匹,FINISH_LINE 的值調小一些,運行結果以下:

===============
*0
1
**2
===============
**0
*1
***2
===============
****0
*1
****2
===============
****0
*1
****2
===============
****0
**1
*****2
===============
****0
**1
******2
===============
*****0
**1
******2
===============
*******0
***1
******2
===============
*********0
*****1
******2
===============
***********0
*****1
*******2
===============
***********0
******1
*******2
===============
***********0
******1
*********2
===============
***********0
******1
**********2
===============
*************0
********1
************2
===============
*************0
********1
*************2
===============
*************0
*********1
**************2
===============
*************0
*********1
***************2
Horse 2 won!

CyclicBarrier的構造器中有兩個參數,第一個是並行任務數量,第二個是一個Runnable類型的柵欄任務,就是當咱們的計數器減爲0的時候,會自動執行這個柵欄任務。

這裏的計數器遞減的動做不是咱們作的(代碼中咱們只調用了CyclicBarrier的await()方法),而是CyclicBarrier自動幫咱們作了,在CyclicBarrier中有個count域,經過看下面註釋咱們知道當全部並行的線程都到達柵欄處了,就會減爲0,而後在新一輪開始時又會被重置。

/**
     * Number of parties still waiting. Counts down from parties to 0
     * on each generation.  It is reset to parties on each new
     * generation or when broken.
     */
    private int count;

Semaphore

正常的鎖(Lock顯示鎖或Jvm內建鎖synchronized)在任什麼時候刻都只容許一個任務訪問一項資源,而計數信號量(Semaphore)容許n個任務同時訪問這個資源。

Semaphore中管理着一組虛擬的許可(permit),許可的初始數量能夠經過構造函數指定。在執行操做時能夠首先得到許可(只要還有剩餘的許可),並在使用之後釋放許可。若是沒有許可,那麼acquire()方法將阻塞直到有許可(或者直到被中斷或操做超時)。release()方法將返回一個許可給信號量。

Semaphore通常能夠用來實現「對象池」,它管理着數量有限的對象,當要使用對象時能夠簽出它們,而在用戶使用完畢時,能夠將它們籤回。

public class Pool<T> {
    private int size;
    private List<T> items = new ArrayList<T>();
    private volatile boolean [] checkedOut;
    private Semaphore available;
    public Pool(Class<T> classObject, int size){
        this.size = size;
        checkedOut = new boolean[size];
        available = new Semaphore(size,true);
        for (int i = 0; i < size; i++){
            try {
                items.add(classObject.newInstance());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
    
    public T checkOut() throws InterruptedException {
        available.acquire();
        return getItem();
    }
    public void checkIn(T x){
        if(releaseItem(x)){
            available.release();
        }
    }
    
    private synchronized T getItem(){
        for (int i = 0; i < size; i++){
            if(!checkedOut[i]){
                checkedOut[i] = true;
                return items.get(i);
            }
        }
        return null;
    }
    
    private synchronized boolean releaseItem(T item){
        int index = items.indexOf(item);
        if(index == -1) return false;
        if(checkedOut[index]){ 
            checkedOut[index] = false;
            return true;
        }
        return false;
    }
}

線程池

不要在建立野生線程了

像下面這樣建立的線程,咱們叫作「野生線程」,做爲測試demo沒有問題,若是要上生產的系統,最好不要這麼作。

Thread t = new Thread(r)//r是runnable對象

由於這樣去建立線程,很容易形成無限制的線程建立,並且線程管理起來很是不便。

咱們知道建立線程的開銷是比較高的,所以線程須要複用,須要池化管理。

Executor框架

Executor框架實際上是Java類庫提供給咱們的任務管理框架,線程池是其中的一部分。

在Java類庫中,任務執行的主要抽象不是Thread,而是Executor。Executor是一個接口,以下:

public interface Executor {
    void execute(Runnable command);
}

Executor它提供了一種標準的方法將任務的提交過程與執行過程解耦開來,並用Runnable來表示任務。Executor的實現類還提供了對生命週期的支持,以及統計信息收集、應用程序管理機制和性能監視機制。

將任務的提交和執行解耦開來以後,就能夠靈活地爲某種類型的任務指定和修改執行策略。在執行策略中定義了任務執行的方方面面,這包括:

  • 在什麼線程中執行任務?
  • 任務按照什麼順序執行(FIFO、LIFO、優先級)?
  • 有多少個任務能併發執行?
  • 在隊列中有多少個任務在等待執行?
  • 若是系統因爲過載而須要拒絕一個任務,那麼該選擇哪個任務?另外,如何通知應用程序有任務被拒絕?
  • 在執行一個任務以前或以後,應該進行哪些動做?

經過Executors工具類中的靜態工廠方法建立線程池

首先要知道Executors是一個工具類,是幫助咱們建立一些經常使用的線程池的,這些線程池提供了一些默認的配置。

好比咱們能夠經過Executors的靜態工廠方法建立下面幾種經常使用線程池:

newFixedThreadPool

建立一個固定長度的線程池,每當提交一個任務時就建立一個線程,直到達到線程池的最大數量,這時線程池的規模將再也不變化。

newCachedThreadPool

建立一個能夠緩存的線程池,若是線程池的當前規模超過了吹需求時,那麼將會回收空閒的線程,而當需求增長時,則能夠添加新的線程,線程池的規模不存在任何限制。

newSingleThreadExecutor

是一個單線程的Executor,它建立單個工做線程來執行任務,若是這個線程異常結束,會建立另外一個線程來代替。

ExecutorService接口提供生命週期管理方法

ExecutorService是一個接口,繼承自Executor,擴展了Executor接口的功能,提供了服務的生命週期管理的一些方法:

public interface ExecutorService extends Executor {

    /**
     * 關閉執行器, 主要有如下特色:
     * 1. 已經提交給該執行器的任務將會繼續執行, 可是再也不接受新任務的提交;
     * 2. 若是執行器已經關閉了, 則再次調用沒有反作用.
     */
    void shutdown();

    /**
     * 當即關閉執行器, 主要有如下特色:
     * 1. 嘗試中止全部正在執行的任務, 沒法保證可以中止成功, 但會盡力嘗試(例如, 經過 Thread.interrupt中斷任務, 可是不響應中斷的任務可能沒法終止);
     * 2. 暫停處理已經提交但未執行的任務;
     *
     * @return 返回已經提交但未執行的任務列表
     */
    List<Runnable> shutdownNow();

    /**
     * 若是該執行器已經關閉, 則返回true.
     */
    boolean isShutdown();

    /**
     * 判斷執行器是否已經【終止】.
     * <p>
     * 僅當執行器已關閉且全部任務都已經執行完成, 才返回true.
     * 注意: 除非首先調用 shutdown 或 shutdownNow, 不然該方法永遠返回false.
     */
    boolean isTerminated();

    /**
     * 阻塞調用線程, 等待執行器到達【終止】狀態.
     *
     * @return {@code true} 若是執行器最終到達終止狀態, 則返回true; 不然返回false
     * @throws InterruptedException if interrupted while waiting
     */
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

    /**
     * 提交一個具備返回值的任務用於執行.
     * 注意: Future的get方法在成功完成時將會返回task的返回值.
     *
     * @param task 待提交的任務
     * @param <T>  任務的返回值類型
     * @return 返回該任務的Future對象
     * @throws RejectedExecutionException 若是任務沒法安排執行
     * @throws NullPointerException       if the task is null
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * 提交一個 Runnable 任務用於執行.
     * 注意: Future的get方法在成功完成時將會返回給定的結果(入參時指定).
     *
     * @param task   待提交的任務
     * @param result 返回的結果
     * @param <T>    返回的結果類型
     * @return 返回該任務的Future對象
     * @throws RejectedExecutionException 若是任務沒法安排執行
     * @throws NullPointerException       if the task is null
     */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * 提交一個 Runnable 任務用於執行.
     * 注意: Future的get方法在成功完成時將會返回null.
     *
     * @param task 待提交的任務
     * @return 返回該任務的Future對象
     * @throws RejectedExecutionException 若是任務沒法安排執行
     * @throws NullPointerException       if the task is null
     */
    Future<?> submit(Runnable task);

    /**
     * 執行給定集合中的全部任務, 當全部任務都執行完成後, 返回保持任務狀態和結果的 Future 列表.
     * <p>
     * 注意: 該方法爲同步方法. 返回列表中的全部元素的Future.isDone() 爲 true.
     *
     * @param tasks 任務集合
     * @param <T>   任務的返回結果類型
     * @return 任務的Future對象列表,列表順序與集合中的迭代器所生成的順序相同,
     * @throws InterruptedException       若是等待時發生中斷, 會將全部未完成的任務取消.
     * @throws NullPointerException       任一任務爲 null
     * @throws RejectedExecutionException 若是任一任務沒法安排執行
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

    /**
     * 執行給定集合中的全部任務, 當全部任務都執行完成後或超時期滿時(不管哪一個首先發生), 返回保持任務狀態和結果的 Future 列表.
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

    /**
     * 執行給定集合中的任務, 只有其中某個任務率先成功完成(未拋出異常), 則返回其結果.
     * 一旦正常或異常返回後, 則取消還沒有完成的任務.
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

    /**
     * 執行給定集合中的任務, 若是在給定的超時期滿前, 某個任務已成功完成(未拋出異常), 則返回其結果.
     * 一旦正常或異常返回後, 則取消還沒有完成的任務.
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

詳解線程池ThreadPoolExecutor

計算線程池的大小

線程池的理想大小取決於被提交的任務類型以及所部署系統的特性。在代碼中一般不會硬編碼寫死一個值,而應該是經過某種配置機制提供。

要避免過大或者太小。設置過大,那麼大量的線程將在相對不多的CPU和內存資源上發生競爭,這不只會致使更高的內存使用量,並且還可能耗盡資源。若是設置太小,那麼將致使許多空閒的處理器沒法執行工做,形成資源浪費,從而下降吞吐率。

《併發編程實踐》中給我提供了一個經驗公式:

要是處理器達到指望的使用率,線程池的最優大小等於:

  Nthreads = Ncpu * Ucpu * (1 + W/C),其中

  Ncpu = CPU核心數

  Ucpu = CPU使用率,0~1

  W/C = 等待時間與計算時間的比率

能夠經過Runtime來得到CPU的數目:

int N_CPUS = Runtime.getRuntime().availableProcessors();

配置ThreadPoolExecutor

前面咱們經過Executors工具類中的工廠方法建立的newFixedThreadPool、newCachedThreadPool這些線程池,其池返回的就是ThreadPoolExecutor對象。

在這裏插入圖片描述

當默認的執行策略不能知足需求時,咱們就能夠經過ThreadPoolExecutor來定製本身的線程池。

咱們來看下ThreadPoolExecutor的構造函數:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

總共有4個重載的構造函數,咱們選擇參數最多的進行分析,其餘幾個都調用了上面這個。

先看下參數:

corePoolSize:表示線程池的基本大小,也叫核心線程數,而且當工做隊列滿了的時候,若是在有任務提交上來,會建立超過這個數量的線程。

maximumPoolSize:最大線程數,表示該線程池最多能夠建立這麼多線程。

keepAliveTime:空閒線程的存活時間,若是某個線程的空閒時間超過了存活時間,將會被回收。

unit:存活時間的時間單位。

workQueue:阻塞隊列,提交上來的任務若是沒有可用的線程執行時,會被放入該隊列中等待執行。

threadFactory:線程工廠,用於指定如何建立一個線程,好比能夠指定一個有意義的名字,指定一個UncaughtExceptionHandler等。

RejectedExecutionHandler :拒絕策略或者飽和策略處理器。當有界隊列被填滿後,該如何處理提交上來的任務。JDK提供了四種RejectedExecutionHandler的實現。AbortPolicy、DiscardPolicy、DiscardOldestPolicy和CallerRunsPolicy。

AbortPolicy策略是默認的飽和策略,該策略會拋出未檢查的RejectedExecutionExcetion異常。調用者能夠捕獲異常進行相應地處理。

DiscardPolicy策略會拋棄任務,DiscardOldestPolicy會拋棄下一個將要被回字形的任務,而後嘗試從新提交新的任務。

CallerRunsPolicy不會拋棄任務,也不會拋出異常,而是將某些任務回退到調用者,從而下降新任務的流量。

下圖描述了線程池的整體工做流程:

在這裏插入圖片描述

JAVA內存模型

Java內存模型是個很複雜的規範,這裏簡單介紹下。

Java內存模型規定了全部的變量都存儲在主內存中,每條線程還有本身的工做內存,線程的工做內存中保存了該線程中是用到的變量的主內存副本拷貝,線程對變量的全部操做都必須在工做內存中進行,而不能直接讀寫主內存。不一樣的線程之間也沒法直接訪問對方工做內存中的變量,線程間變量的傳遞均須要本身的工做內存和主存之間進行數據同步進行。

JMM是一種規範,目的是解決因爲多線程經過共享內存進行通訊時,存在的本地內存數據不一致、編譯器會對代碼指令重排序、處理器會對代碼亂序執行等帶來的問題。目的是保證併發編程場景中的原子性、可見性和有序性。

happen-before規則

happen-before規則是JMM中限制指令重排序的一些規則。意思是前面一個操做的結果對後續操做是可見的。這裏總結6條happen-before規則:

一、程序的順序性規則。

意思就是程序前面對某個變量的修改必定是對後續操做可見的。

public void read(){
        int x = 20;
        boolean b = true;
    }

x = 20 happen-before b = true,這也符合咱們的直覺。

二、volatile變量規則

對volatile白你兩的寫操做 happen-before 於對該變量的讀操做。這也是volatile類型的變量能保證可見性的緣由。

三、傳遞性規則

若是A happen-before B,B happen-before C。則A happen-before C。

四、監視器鎖規則

監視器鎖指的就是synchronized。對一個鎖的解鎖 happen-before 於後續對這個鎖的加鎖。

public void read(){
        int x = 20;
        boolean b = true;
        synchronized (this){// 這裏會自動加鎖
            x = 10;
        }//執行結束,自動釋放鎖
    }

這條規則說的就是,若是線程A執行完釋放鎖後將x改成10了,線程B在得到鎖進入臨界區後是能夠看到線程A對x的寫操做的,也就是B能看到x=10。

五、線程啓動規則

它是指主線程A啓動子線程B後,子線程B可以看到主線程在啓動子線程B前的操做。

int x = 10;
        Thread B = new Thread(()->{
            // 此處是能夠看到主線程修改的x=30的
        });
        x = 30;
        B.start();

六、線程join規則

這條是關於線程等待的。它是指主線程A等待子線程B完成(主線程A經過調用子線程B的join()方法實現),當子線程B完成後(主線程A中join()方法返回),主線程可以看到子線程的操做。

int x = 10;
        Thread B = new Thread(()->{
            // 對共享變量的修改
            x = 15;
        });
        B.start();
        B.join();
        
        // 主線程在調用了join以後是能
        // 看到x = 15的

後記

CAS原理,AQS和顯式鎖ReentrantLock原理性的東西后期單獨總結。

相關文章
相關標籤/搜索