java併發編程

 

常見線程池java

newSingleThreadExecutor
單個線程的線程池,即線程池中每次只有一個線程工做,單線程串行執行任務
 
newFixedThreadExecutor(n)
固定數量的線程池,沒提交一個任務就是一個線程,直到達到線程池的最大數量,而後後面進入等待隊列直到前面的任務完成才繼續執行.
 
newCacheThreadExecutor(推薦使用)
可緩存線程池,當線程池大小超過了處理任務所需的線程,那麼就會回收部分空閒(通常是60秒無執行)的線程,當有任務來時,又智能的添加新線程來執行。
 
newScheduleThreadExecutor
大小無限制的線程池,支持定時和週期性的執行線程
 
Java裏面線程池的頂級接口是Executor,可是嚴格意義上講Executor並非一個線程池,而只是一個執行線程的工具。真正的線程池接口是ExecutorService。
比較重要的幾個類:

ExecutorService程序員

真正的線程池接口。編程

ScheduledExecutorService數組

能和Timer/TimerTask相似,解決那些須要任務重複執行的問題。緩存

ThreadPoolExecutor安全

ExecutorService的默認實現。網絡

ScheduledThreadPoolExecutor數據結構

繼承ThreadPoolExecutor的ScheduledExecutorService接口實現,週期性任務調度的類實現。多線程

 

 

要配置一個線程池是比較複雜的,尤爲是對於線程池的原理不是很清楚的狀況下,頗有可能配置的線程池不是較優的,所以在Executors類裏面提供了一些靜態工廠,生成一些經常使用的線程池。併發

1. newSingleThreadExecutor

建立一個單線程的線程池。這個線程池只有一個線程在工做,也就是至關於單線程串行執行全部任務。若是這個惟一的線程由於異常結束,那麼會有一個新的線程來替代它。此線程池保證全部任務的執行順序按照任務的提交順序執行。

2.newFixedThreadPool

建立固定大小的線程池。每次提交一個任務就建立一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,若是某個線程由於執行異常而結束,那麼線程池會補充一個新線程。

3. newCachedThreadPool

建立一個可緩存的線程池。若是線程池的大小超過了處理任務所須要的線程,

那麼就會回收部分空閒(60秒不執行任務)的線程,當任務數增長時,此線程池又能夠智能的添加新線程來處理任務。此線程池不會對線程池大小作限制,線程池大小徹底依賴於操做系統(或者說JVM)可以建立的最大線程大小。

4.newScheduledThreadPool

建立一個大小無限的線程池。此線程池支持定時以及週期性執行任務的需求。

 

例子

public class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "正在執行....");
    }
}

newSingleThreadExecutor

public class MainTest {
    public static void main(String[] args) {
        
        //建立一個可重用固定線程數的線程池
        ExecutorService pool = Executors.newSingleThreadExecutor();

        Thread t1 = new MyThread();
        Thread t2 = new MyThread();
        Thread t3 = new MyThread();
        Thread t4 = new MyThread();
        Thread t5 = new MyThread();
        
        pool.execute(t1);
        pool.execute(t2);
        pool.execute(t3);
        pool.execute(t4);
        pool.execute(t5);
        
        pool.shutdown();
    }
}

newFixedThreadPool

public class MainTest {
    public static void main(String[] args) {
        // 建立一個可重用固定線程數的線程池
        ExecutorService pool = Executors.newFixedThreadPool(2);

        Thread t1 = new MyThread();
        Thread t2 = new MyThread();
        Thread t3 = new MyThread();
        Thread t4 = new MyThread();
        Thread t5 = new MyThread();

        pool.execute(t1);
        pool.execute(t2);
        pool.execute(t3);
        pool.execute(t4);
        pool.execute(t5);

        pool.shutdown();
    }
}

newCachedThreadPool

public class MainTest {
    public static void main(String[] args) {
        // 建立一個可緩存的線程池
        ExecutorService pool = Executors.newCachedThreadPool();

        Thread t1 = new MyThread();
        Thread t2 = new MyThread();
        Thread t3 = new MyThread();
        Thread t4 = new MyThread();
        Thread t5 = new MyThread();

        pool.execute(t1);
        pool.execute(t2);
        pool.execute(t3);
        pool.execute(t4);
        pool.execute(t5);

        pool.shutdown();
    }
}

newScheduledThreadPool

public class MainTest {
    public static void main(String[] args) {
        // 建立一個大小無限的線程池。此線程池支持定時以及週期性執行任務的需求。
        ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1);

        // 每隔一段時間打印系統時間,證實二者是互不影響的
        exec.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println(System.nanoTime());

            }
        }, 1000, 2000, TimeUnit.MILLISECONDS);
    }
}

volatile

在多線程併發編程中synchronized和volatile都扮演着重要的角色,volatile是輕量級的synchronized,它在多處理器開發中保證了共享變量的「可見性」。可見性的意思是當一個線程修改一個共享變量時,另一個線程能讀到這個修改的值。若是volatile變量修飾符使用恰當的話,它比synchronized的使用和執行成本更低,由於它不會引發線程上下文的切換和調度。

transient

java語言的關鍵字,變量修飾符,若是用transient聲明一個實例變量,當對象存儲時,它的值不須要維持。換句話來講就是,用transient關鍵字標記的成員變量不參與序列化過程。

synchronized

Java語言的關鍵字,可用來給對象和方法或者代碼塊加鎖,當它鎖定一個方法或者一個代碼塊的時候,同一時刻最多隻有一個線程執行這段代碼。當兩個併發線程訪問同一個對象object中的這個加鎖同步代碼塊時,一個時間內只能有一個線程獲得執行。另外一個線程必須等待當前線程執行完這個代碼塊之後才能執行該代碼塊。然而,當一個線程訪問object的一個加鎖代碼塊時,另外一個線程仍能夠訪問該object中的非加鎖代碼塊。

Java中的每個對象均可以做爲鎖。具體表現爲如下3種形式。

·對於普通同步方法,鎖是當前實例對象。

·對於靜態同步方法,鎖是當前類的Class對象。

·對於同步方法塊,鎖是Synchonized括號裏配置的對象。

鎖的升級與對比

在Java SE 1.6中,鎖一共有4種狀態,級別從低到高依次是:無鎖狀態偏向鎖狀態輕量級鎖狀態重量級鎖狀態.

這幾個狀態會隨着競爭狀況逐漸升級。鎖能夠升級但不能降級,意味着偏向鎖升級成輕量級鎖後不能降級成偏向鎖。這種鎖升級卻不能降級的策略,目的是爲了提升得到鎖和釋放鎖的效率。

cas操做

CAS是Compare-and-swap(比較與替換)的簡寫,CAS操做的就是樂觀鎖,每次不加鎖而是假設沒有衝突而去完成某項操做,若是由於衝突失敗就重試,直到成功爲止。synchronized是悲觀鎖。

Java實現原子操做

在Java中能夠經過鎖和循環CAS的方式來實現原子操做。

使用循環CAS實現原子操做

JVM中的CAS操做正是利用了處理器提供的CMPXCHG指令實現的。自旋CAS實現的基本思路就是循環進行CAS操做直到成功爲止,如下代碼實現了一個基於CAS線程安全的計數器方法safeCount和一個非線程安全的計數器count。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class Counter {
    private AtomicInteger atomicI = new AtomicInteger(0);
    private int i = 0;

    public static void main(String[] args) {
        final Counter cas = new Counter();
        List<Thread> ts = new ArrayList<Thread>(600);
        long start = System.currentTimeMillis();
        for (int j = 0; j < 100; j++) {
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < 10000; i++) {
                        cas.count();
                        cas.safeCount();
                    }
                }
            });
            ts.add(t);
        }
        
        for (Thread t : ts) {
            t.start();
        }
        
        // 等待全部線程執行完成
        for (Thread t : ts) {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        System.out.println(cas.i);
        System.out.println(cas.atomicI.get());
        System.out.println(System.currentTimeMillis() - start);
    }

    // 使用CAS實現線程安全計數器
    private void safeCount() {
        for (;;) {
            int i = atomicI.get();
            boolean suc = atomicI.compareAndSet(i, ++i);
            if (suc) {
                break;
            }
        }
    }

    // 非線程安全計數器
    private void count() {
        i++;
    }
}

 從Java 1.5開始,JDK的併發包裏提供了一些類來支持原子操做,如AtomicBoolean(用原子方式更新的boolean值)、AtomicInteger(用原子方式更新的int值)和AtomicLong(用原子方式更新的long值)。這些原子包裝類還提供了有用的工具方法,好比以原子的方式將當前值自增1和自減1。

CAS實現原子操做的三大問題

在Java併發包中有一些併發框架也使用了自旋CAS的方式來實現原子操做,好比LinkedTransferQueue類的Xfer方法。CAS雖然很高效地解決了原子操做,可是CAS仍然存在三大問題。ABA問題,循環時間長開銷大,以及只能保證一個共享變量的原子操做。

1)ABA問題。由於CAS須要在操做值的時候,檢查值有沒有發生變化,若是沒有發生變化則更新,可是若是一個值原來是A,變成了B,又變成了A,那麼使用CAS進行檢查時會發現它的值沒有發生變化,可是實際上卻變化了。ABA問題的解決思路就是使用版本號。在變量前面追加上版本號,每次變量更新的時候把版本號加1,那麼A→B→A就會變成1A→2B→3A。從Java 1.5開始,JDK的Atomic包裏提供了一個類AtomicStampedReference來解決ABA問題。這個類的compareAndSet方法的做用是首先檢查當前引用是否等於預期引用,而且檢查當前標誌是否等於預期標誌,若是所有相等,則以原子方式將該引用和該標誌的值設置爲給定的更新值。

2)循環時間長開銷大。自旋CAS若是長時間不成功,會給CPU帶來很是大的執行開銷。若是JVM能支持處理器提供的pause指令,那麼效率會有必定的提高。pause指令有兩個做用:第一,它能夠延遲流水線執行指令(de-pipeline),使CPU不會消耗過多的執行資源,延遲的時間取決於具體實現的版本,在一些處理器上延遲時間是零;第二,它能夠避免在退出循環的時候因內存順序衝突(Memory Order Violation)而引發CPU流水線被清空(CPU Pipeline Flush),從而提升CPU的執行效率。

3)只能保證一個共享變量的原子操做。當對一個共享變量執行操做時,咱們可使用循環CAS的方式來保證原子操做,可是對多個共享變量操做時,循環CAS就沒法保證操做的原子性,這個時候就能夠用鎖。還有一個取巧的辦法,就是把多個共享變量合併成一個共享變量來操做。好比,有兩個共享變量i=2,j=a,合併一下ij=2a,而後用CAS來操做ij。從Java 1.5開始,JDK提供了AtomicReference類來保證引用對象之間的原子性,就能夠把多個變量放在一個對象裏來進行CAS操做。

使用鎖機制實現原子操做

鎖機制保證了只有得到鎖的線程纔可以操做鎖定的內存區域。JVM內部實現了不少種鎖機制,有偏向鎖、輕量級鎖和互斥鎖。有意思的是除了偏向鎖,JVM實現鎖的方式都用了循環CAS,即當一個線程想進入同步塊的時候使用循環CAS的方式來獲取鎖,當它退出同步塊的時候使用循環CAS釋放鎖。

併發編程模型的兩個關鍵問題

線程之間如何通訊及線程之間如何同步。

在命令式編程中,線程之間的通訊機制有兩種:共享內存和消息傳遞。

Java的併發採用的是共享內存模型,Java線程之間的通訊老是隱式進行,整個通訊過程對程序員徹底透明。若是編寫多線程程序的Java程序員不理解隱式進行的線程之間通訊的工做機制,極可能會遇到各類奇怪的內存可見性問題。

Java內存模型的抽象結構

在Java中,全部實例域、靜態域和數組元素都存儲在堆內存中,堆內存在線程之間共享。

局部變量(Local Variables),方法定義參數(Java語言規範稱之爲Formal Method Parameters)和異常處理器參數(Exception Handler Parameters)不會在線程之間共享,它們不會有內存可見性問題,也不受內存模型的影響。

Java線程之間的通訊由Java內存模型(本文簡稱爲JMM)控制,JMM決定一個線程對共享變量的寫入什麼時候對另外一個線程可見。從抽象的角度來看,JMM定義了線程和主內存之間的抽象關係:線程之間的共享變量存儲在主內存(Main Memory)中,每一個線程都有一個私有的本地內存(Local Memory),本地內存中存儲了該線程以讀/寫共享變量的副本。本地內存是JMM的一個抽象概念,並不真實存在。它涵蓋了緩存、寫緩衝區、寄存器以及其餘的硬件和編譯器優化。

線程A與線程B之間要通訊的話,必需要經歷下面2個步驟

1)線程A把本地內存A中更新過的共享變量刷新到主內存中去。

2)線程B到主內存中去讀取線程A以前已更新過的共享變量。

數據依賴性

數據依賴分爲下列3種類型:
  寫後讀

  寫後寫

  讀後寫

譯器和處理器可能會對操做作重排序。編譯器和處理器在重排序時,會遵照數據依賴性,編譯器和處理器不會改變存在數據依賴關係的兩個操做的執行順序。

數據競爭與順序一致性

當程序未正確同步時,就可能會存在數據競爭。Java內存模型規範對數據競爭的定義以下。

在一個線程中寫一個變量,

在另外一個線程讀同一個變量,

並且寫和讀沒有經過同步來排序。

當代碼中包含數據競爭時,程序的執行每每產生違反直覺的結果(前一章的示例正是如此)。若是一個多線程程序能正確同步,這個程序將是一個沒有數據競爭的程序。

順序一致性內存模型

順序一致性模型有一個單一的全局內存,這個內存經過一個左右擺動的開關能夠鏈接到任意一個線程,同時每個線程必須按照程序的順序來執行內存讀/寫操做。(即在順序一致性模型中,全部操做之間具備全序關係)

同步程序的順序一致性效果

在JMM中就沒有這個保證。未同步程序在JMM中不但總體的執行順序是無序的,並且全部線程看到的操做執行順序也可能不一致。

public class SynchronizedExample {
    int a = 0;
    boolean flag = false;

    public synchronized void writer() { // 獲取鎖
        a = 1;
        flag = true;
    } // 釋放鎖

    public synchronized void reader() { // 獲取鎖
        if (flag) {
            int i = a;
            // ……釋放鎖
        }
    }
}

在上面示例代碼中,假設A線程執行writer()方法後,B線程執行reader()方法。這是一個正確同步的多線程程序。

兩個內存模型中的執行時序對比圖

JMM在具體實現上的基本方針爲:在不改變(正確同步的)程序執行結果的前提下,儘量地爲編譯器和處理器的優化打開方便之門。

volatile的特性

理解volatile特性的一個好方法是把對volatile變量的單個讀/寫,當作是使用同一個鎖對這些單個讀/寫操做作了同步。下面經過具體的示例來講明,示例代碼以下。

class VolatileFeaturesExample {
       volatile long vl = 0L;                  // 使用volatile聲明64位的long型變量
       public void set(long l) {
           vl = l;                             // 單個volatile變量的寫
       }
       public void getAndIncrement () {
           vl++;                               // 複合(多個)volatile變量的讀/寫
       }
       public long get() {
           return vl;                          // 單個volatile變量的讀
       }
}

假設有多個線程分別調用上面程序的3個方法,這個程序在語義上和下面程序等價。

class VolatileFeaturesExample {
       long vl = 0L;                           // 64位的long型普通變量
       public synchronized void set(long l) {  // 對單個的普通變量的寫用同一個鎖同步
           vl = l;
       }
       public void getAndIncrement () {        // 普通方法調用
           long temp = get();                  // 調用已同步的讀方法
           temp += 1L;                         // 普通寫操做
           set(temp);                          // 調用已同步的寫方法
       }
       public synchronized long get() {        // 對單個的普通變量的讀用同一個鎖同步
           return vl;
       }
}

如上面示例程序所示,一個volatile變量的單個讀/寫操做,與一個普通變量的讀/寫操做都是使用同一個鎖來同步,它們之間的執行效果相同。

鎖的happens-before規則保證釋放鎖和獲取鎖的兩個線程之間的內存可見性,這意味着對一個volatile變量的讀,老是能看到(任意線程)對這個volatile變量最後的寫入。

鎖的語義決定了臨界區代碼的執行具備原子性。這意味着,即便是64位的long型和double型變量,只要它是volatile變量,對該變量的讀/寫就具備原子性。若是是多個volatile操做或相似於volatile++這種複合操做,這些操做總體上不具備原子性。

簡而言之,volatile變量自身具備下列特性。

·可見性。對一個volatile變量的讀,老是能看到(任意線程)對這個volatile變量最後的寫入。

·原子性:對任意單個volatile變量的讀/寫具備原子性,但相似於volatile++這種複合操做不具備原子性。

volatile寫的內存語義以下

當寫一個volatile變量時,JMM會把該線程對應的本地內存中的共享變量值刷新到主內存。

volatile讀的內存語義以下

當讀一個volatile變量時,JMM會把該線程對應的本地內存置爲無效。線程接下來將從主內存中讀取共享變量

下面對volatile寫和volatile讀的內存語義作個總結

·線程A寫一個volatile變量,實質上是線程A向接下來將要讀這個volatile變量的某個線程發出了(其對共享變量所作修改的)消息。

·線程B讀一個volatile變量,實質上是線程B接收了以前某個線程發出的(在寫這個volatile變量以前對共享變量所作修改的)消息。

·線程A寫一個volatile變量,隨後線程B讀這個volatile變量,這個過程實質上是線程A經過主內存向線程B發送消息。

 

鎖的內存語義

·線程A釋放一個鎖,實質上是線程A向接下來將要獲取這個鎖的某個線程發出了(線程A對共享變量所作修改的)消息。

·線程B獲取一個鎖,實質上是線程B接收了以前某個線程發出的(在釋放這個鎖以前對共享變量所作修改的)消息。

·線程A釋放鎖,隨後線程B獲取這個鎖,這個過程實質上是線程A經過主內存向線程B發送消息。

鎖內存語義的實現

藉助ReentrantLock的源代碼,來分析鎖內存語義的具體實現機制。

class ReentrantLockExample {
    int a = 0;
    ReentrantLock lock = new ReentrantLock();
    public void writer() {
        lock.lock();       // 獲取鎖
        try {
            a++;
        } finally {
            lock.unlock();  // 釋放鎖
        }
    }
    public void reader () {
        lock.lock();      // 獲取鎖
        try {
            int i = a;
            ……
        } finally {
            lock.unlock();  // 釋放鎖
        }
    }
}

在ReentrantLock中,調用lock()方法獲取鎖;調用unlock()方法釋放鎖。

ReentrantLock的實現依賴於Java同步器框架AbstractQueuedSynchronizer(本文簡稱之爲AQS)。AQS使用一個整型的volatile變量(命名爲state)來維護同步狀態,立刻咱們會看到,這個volatile變量是ReentrantLock內存語義實現的關鍵。

ReentrantLock分爲公平鎖和非公平鎖,咱們首先分析公平鎖。

使用公平鎖時,加鎖方法lock()調用軌跡以下。

1)ReentrantLock:lock()。

2)FairSync:lock()。

3)AbstractQueuedSynchronizer:acquire(int arg)。

4)ReentrantLock:tryAcquire(int acquires)。

在第4步真正開始加鎖,下面是該方法的源代碼。

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();    // 獲取鎖的開始,首先讀volatile變量state
    if (c == 0) {
        if (isFirst(current) &&
            compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)  
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

從上面源代碼中咱們能夠看出,加鎖方法首先讀volatile變量state。

在使用公平鎖時,解鎖方法unlock()調用軌跡以下。

1)ReentrantLock:unlock()。

2)AbstractQueuedSynchronizer:release(int arg)。

3)Sync:tryRelease(int releases)。

在第3步真正開始釋放鎖,下面是該方法的源代碼。

protected final boolean tryRelease(int releases) {
       int c = getState() - releases;
       if (Thread.currentThread() != getExclusiveOwnerThread())
           throw new IllegalMonitorStateException();
       boolean free = false;
       if (c == 0) {
           free = true;
           setExclusiveOwnerThread(null);
       }
       setState(c);     // 釋放鎖的最後,寫volatile變量state
       return free;
}

從上面的源代碼能夠看出,在釋放鎖的最後寫volatile變量state。

公平鎖在釋放鎖的最後寫volatile變量state,在獲取鎖時首先讀這個volatile變量。根據volatile的happens-before規則,釋放鎖的線程在寫volatile變量以前可見的共享變量,在獲取鎖的線程讀取同一個volatile變量後將當即變得對獲取鎖的線程可見。

如今咱們來分析非公平鎖的內存語義的實現。非公平鎖的釋放和公平鎖徹底同樣,因此這裏僅僅分析非公平鎖的獲取。使用非公平鎖時,加鎖方法lock()調用軌跡以下。

1)ReentrantLock:lock()。

2)NonfairSync:lock()。

3)AbstractQueuedSynchronizer:compareAndSetState(int expect,int update)。

在第3步真正開始加鎖,下面是該方法的源代碼。

protected final boolean compareAndSetState(int expect, int update) {
       return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

該方法以原子操做的方式更新state變量,本文把Java的compareAndSet()方法調用簡稱爲CAS。JDK文檔對該方法的說明以下:若是當前狀態值等於預期值,則以原子方式將同步狀態設置爲給定的更新值。此操做具備volatile讀和寫的內存語義。

如今對公平鎖和非公平鎖的內存語義作個總結。

·公平鎖和非公平鎖釋放時,最後都要寫一個volatile變量state。

·公平鎖獲取時,首先會去讀volatile變量。

·非公平鎖獲取時,首先會用CAS更新volatile變量,這個操做同時具備volatile讀和volatile寫的內存語義。

從本文對ReentrantLock的分析能夠看出,鎖釋放-獲取的內存語義的實現至少有下面兩種方式。

1)利用volatile變量的寫-讀所具備的內存語義。

2)利用CAS所附帶的volatile讀和volatile寫的內存語義。

concurrent包的實現

因爲Java的CAS同時具備volatile讀和volatile寫的內存語義,所以Java線程之間的通訊如今有了下面4種方式。

1)A線程寫volatile變量,隨後B線程讀這個volatile變量。

2)A線程寫volatile變量,隨後B線程用CAS更新這個volatile變量。

3)A線程用CAS更新一個volatile變量,隨後B線程用CAS更新這個volatile變量。

4)A線程用CAS更新一個volatile變量,隨後B線程讀這個volatile變量。

Java的CAS會使用現代處理器上提供的高效機器級別的原子指令,這些原子指令以原子方式對內存執行讀-改-寫操做,這是在多處理器中實現同步的關鍵(從本質上來講,可以支持原子性讀-改-寫指令的計算機,是順序計算圖靈機的異步等價機器,所以任何現代的多處理器都會去支持某種能對內存執行原子性讀-改-寫操做的原子指令)。同時,volatile變量的讀/寫和CAS能夠實現線程之間的通訊。把這些特性整合在一塊兒,就造成了整個concurrent包得以實現的基石。若是咱們仔細分析concurrent包的源代碼實現,會發現一個通用化的實現模式。

首先,聲明共享變量爲volatile。

而後,使用CAS的原子條件更新來實現線程之間的同步。

同時,配合以volatile的讀/寫和CAS所具備的volatile讀和寫的內存語義來實現線程之間的通訊。

AQS,非阻塞數據結構和原子變量類(java.util.concurrent.atomic包中的類),這些concurrent包中的基礎類都是使用這種模式來實現的,而concurrent包中的高層類又是依賴於這些基礎類來實現的。從總體來看,concurrent包的實現示意圖如3-28所示。

JMM的設計

首先,讓咱們來看JMM的設計意圖。從JMM設計者的角度,在設計JMM時,須要考慮兩個關鍵因素。

·程序員對內存模型的使用。程序員但願內存模型易於理解、易於編程。程序員但願基於一個強內存模型來編寫代碼。

·編譯器和處理器對內存模型的實現。編譯器和處理器但願內存模型對它們的束縛越少越好,這樣它們就能夠作儘量多的優化來提升性能。編譯器和處理器但願實現一個弱內存模型。

因爲這兩個因素互相矛盾,因此JSR-133專家組在設計JMM時的核心目標就是找到一個好的平衡點:一方面,要爲程序員提供足夠強的內存可見性保證;另外一方面,對編譯器和處理器的限制要儘量地放鬆。

雙重檢查鎖定的由來

下面是非線程安全的延遲初始化對象的示例代碼。

public class UnsafeLazyInitialization {
    private static Instance instance;
    public static Instance getInstance() {
        if (instance == null)               // 1:A線程執行
            instance = new Instance();      // 2:B線程執行
        return instance;
    }
}

在UnsafeLazyInitialization類中,假設A線程執行代碼1的同時,B線程執行代碼2。此時,線程A可能會看到instance引用的對象尚未完成初始化

public class SafeLazyInitialization {
    private static Instance instance;
    public synchronized static Instance getInstance() {
        if (instance == null)
            instance = new Instance();
        return instance;
    }
}

因爲對getInstance()方法作了同步處理,synchronized將致使性能開銷。若是getInstance()方法被多個線程頻繁的調用,將會致使程序執行性能的降低。反之,若是getInstance()方法不會被多個線程頻繁的調用,那麼這個延遲初始化方案將能提供使人滿意的性能。

在早期的JVM中,synchronized(甚至是無競爭的synchronized)存在巨大的性能開銷。所以,人們想出了一個「聰明」的技巧:雙重檢查鎖定(Double-Checked Locking)。人們想經過雙重檢查鎖定來下降同步的開銷。下面是使用雙重檢查鎖定來實現延遲初始化的示例代碼。

public class DoubleCheckedLocking {                      // 1
    private static Instance instance;                    // 2
    public static Instance getInstance() {               // 3
        if (instance == null) {                          // 4:第一次檢查
            synchronized (DoubleCheckedLocking.class) {  // 5:加鎖
                if (instance == null)                    // 6:第二次檢查
                    instance = new Instance();           // 7:問題的根源出在這裏
            }                                            // 8
        }                                                // 9
        return instance;                                 // 10
    }                                                    // 11
}

如上面代碼所示,若是第一次檢查instance不爲null,那麼就不須要執行下面的加鎖和初始化操做。所以,能夠大幅下降synchronized帶來的性能開銷。上面代碼表面上看起來,彷佛一箭雙鵰。

·多個線程試圖在同一時間建立對象時,會經過加鎖來保證只有一個線程能建立對象。

·在對象建立好以後,執行getInstance()方法將不須要獲取鎖,直接返回已建立好的對象。

雙重檢查鎖定看起來彷佛很完美,但這是一個錯誤的優化!在線程執行到第4行,代碼讀取到instance不爲null時,instance引用的對象有可能尚未完成初始化。

問題的根源

前面的雙重檢查鎖定示例代碼的第7行(instance=new Singleton();)建立了一個對象。這一行代碼能夠分解爲以下的3行僞代碼。

memory = allocate();  // 1:分配對象的內存空間
ctorInstance(memory);  // 2:初始化對象
instance = memory;    // 3:設置instance指向剛分配的內存地址

上面3行僞代碼中的2和3之間,可能會被重排序(在一些JIT編譯器上,這種重排序是真實發生的,詳情見參考文獻1的「Out-of-order writes」部分)。2和3之間重排序以後的執行時序以下。

memory = allocate();  // 1:分配對象的內存空間
instance = memory;    // 3:設置instance指向剛分配的內存地址                                       
                        // 注意,此時對象尚未被初始化!
ctorInstance(memory);  // 2:初始化對象

根據《The Java Language Specification,Java SE 7 Edition》(後文簡稱爲Java語言規範),全部線程在執行Java程序時必需要遵照intra-thread semantics。intra-thread semantics保證重排序不會改變單線程內的程序執行結果。換句話說,intra-thread semantics容許那些在單線程內,不會改變單線程程序執行結果的重排序。上面3行僞代碼的2和3之間雖然被重排序了,但這個重排序並不會違反intra-thread semantics。這個重排序在沒有改變單線程程序執行結果的前提下,能夠提升程序的執行性能。

爲了更好地理解intra-thread semantics,請看如圖3-37所示的示意圖(假設一個線程A在構造對象後,當即訪問這個對象)。

如圖3-37所示,只要保證2排在4的前面,即便2和3之間重排序了,也不會違反intra-thread semantics。

下面,再讓咱們查看多線程併發執行的狀況。如圖3-38所示。



因爲單線程內要遵照intra-thread semantics,從而能保證A線程的執行結果不會被改變。可是,當線程A和B按圖3-38的時序執行時,B線程將看到一個尚未被初始化的對象。

回到本文的主題,DoubleCheckedLocking示例代碼的第7行(instance=new Singleton();)若是發生重排序,另外一個併發執行的線程B就有可能在第4行判斷instance不爲null。線程B接下來將訪問instance所引用的對象,但此時這個對象可能尚未被A線程初始化!表3-6是這個場景的具體執行時序。

 

 

這裏A2和A3雖然重排序了,但Java內存模型的intra-thread semantics將確保A2必定會排在A4前面執行。所以,線程A的intra-thread semantics沒有改變,但A2和A3的重排序,將致使線程B在B1處判斷出instance不爲空,線程B接下來將訪問instance引用的對象。此時,線程B將會訪問到一個還未初始化的對象。

在知曉了問題發生的根源以後,咱們能夠想出兩個辦法來實現線程安全的延遲初始化。

1)不容許2和3重排序。

2)容許2和3重排序,但不容許其餘線程「看到」這個重排序。

後文介紹的兩個解決方案,分別對應於上面這兩點。

基於volatile的解決方案

public class SafeDoubleCheckedLocking {
    private volatile static Instance instance;
    public static Instance getInstance() {
        if (instance == null) {
            synchronized (SafeDoubleCheckedLocking.class) {
                if (instance == null)
                    instance = new Instance();         // instance爲volatile,如今沒問題了
            }
        }
        return instance;
    }
}

這個解決方案須要JDK 5或更高版本(由於從JDK 5開始使用新的JSR-133內存模型規範,這個規範加強了volatile的語義)。

當聲明對象的引用爲volatile後,3.8.2節中的3行僞代碼中的2和3之間的重排序,在多線程環境中將會被禁止。上面示例代碼將按以下的時序執行,如圖3-39所示。

這個方案本質上是經過禁止圖3-39中的2和3之間的重排序,來保證線程安全的延遲初始化。

基於類初始化的解決方案

public class InstanceFactory {
    private static class InstanceHolder {
        public static Instance instance = new Instance();
    }
    public static Instance getInstance() {
        return InstanceHolder.instance ;  // 這裏將致使InstanceHolder類被初始化
    }
}

這個方案的實質是:容許3.8.2節中的3行僞代碼中的2和3重排序,但不容許非構造線程(這裏指線程B)「看到」這個重排序。

初始化一個類,包括執行這個類的靜態初始化和初始化在這個類中聲明的靜態字段。根據Java語言規範,在首次發生下列任意一種狀況時,一個類或接口類型T將被當即初始化。

1)T是一個類,並且一個T類型的實例被建立。

2)T是一個類,且T中聲明的一個靜態方法被調用。

3)T中聲明的一個靜態字段被賦值。

4)T中聲明的一個靜態字段被使用,並且這個字段不是一個常量字段。

5)T是一個頂級類(Top Level Class,見Java語言規範的§7.6),並且一個斷言語句嵌套在T內部被執行。

在InstanceFactory示例代碼中,首次執行getInstance()方法的線程將致使InstanceHolder類被初始化(符合狀況4)。

各類內存模型之間的關係

JMM是一個語言級的內存模型,處理器內存模型是硬件級的內存模型,順序一致性內存模型是一個理論參考模型。下面是語言內存模型、處理器內存模型和順序一致性內存模型的強弱對比示意圖,如圖3-49所示。

從圖中能夠看出:常見的4種處理器內存模型比經常使用的3中語言內存模型要弱,處理器內存模型和語言內存模型都比順序一致性內存模型要弱。同處理器內存模型同樣,越是追求執行性能的語言,內存模型設計得會越弱。

一個Java程序從main()方法開始執行,而後按照既定的代碼邏輯執行,看似沒有其餘線程參與,但實際上Java程序天生就是多線程程序,由於執行main()方法的是一個名稱爲main的線程。下面使用JMX來查看一個普通的Java程序包含哪些線程

public class MultiThread{
    public static void main(String[] args) {
        // 獲取Java線程管理MXBean
        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
        // 不須要獲取同步的monitor和synchronizer信息,僅獲取線程和線程堆棧信息
        ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);
        // 遍歷線程信息,僅打印線程ID和線程名稱信息
        for (ThreadInfo threadInfo : threadInfos) {
            System.out.println("[" + threadInfo.getThreadId() + "] " + threadInfo.
            getThreadName());
        }
    }
}

輸出以下所示(輸出內容可能不一樣)。

[4] Signal Dispatcher  // 分發處理髮送給JVM信號的線程
[3] Finalizer       // 調用對象finalize方法的線程
[2] Reference Handler   // 清除Reference的線程
[1] main           // main線程,用戶程序入口

能夠看到,一個Java程序的運行不只僅是main()方法的運行,而是main線程和多個其餘線程的同時運行。

線程的狀態

Java線程在運行的生命週期中可能處於表4-1所示的6種不一樣的狀態,在給定的一個時刻,線程只能處於其中的一個狀態。

線程在自身的生命週期中,並非固定地處於某個狀態,而是隨着代碼的執行在不一樣的狀態之間進行切換,Java線程狀態變遷如圖4-1示。

由圖4-1中能夠看到,線程建立以後,調用start()方法開始運行。當線程執行wait()方法以後,線程進入等待狀態。進入等待狀態的線程須要依靠其餘線程的通知纔可以返回到運行狀態,而超時等待狀態至關於在等待狀態的基礎上增長了超時限制,也就是超時時間到達時將會返回到運行狀態。當線程調用同步方法時,在沒有獲取到鎖的狀況下,線程將會進入到阻塞狀態。線程在執行Runnable的run()方法以後將會進入到終止狀態。

Daemon線程

Daemon線程是一種支持型線程,由於它主要被用做程序中後臺調度以及支持性工做。這意味着,當一個Java虛擬機中不存在非Daemon線程的時候,Java虛擬機將會退出。能夠經過調用Thread.setDaemon(true)將線程設置爲Daemon線程。

注意 Daemon屬性須要在啓動線程以前設置,不能在啓動線程以後設置。

Daemon線程被用做完成支持性工做,可是在Java虛擬機退出時Daemon線程中的finally塊並不必定會執行,示例如代碼清單4-5所示。

public class Daemon {
    public static void main(String[] args) {
        Thread thread = new Thread(new DaemonRunner(), "DaemonRunner");
        thread.setDaemon(true);
        thread.start();
  }
    static class DaemonRunner implements Runnable {
        @Override
        public void run() {
            try {
                SleepUtils.second(10);
            } finally {
                System.out.println("DaemonThread finally run.");
            }
        }
    }
}

運行Daemon程序,能夠看到在終端或者命令提示符上沒有任何輸出。main線程(非Daemon線程)在啓動了線程DaemonRunner以後隨着main方法執行完畢而終止,而此時Java虛擬機中已經沒有非Daemon線程,虛擬機須要退出。Java虛擬機中的全部Daemon線程都須要當即終止,所以DaemonRunner當即終止,可是DaemonRunner中的finally塊並無執行。

注意 在構建Daemon線程時,不能依靠finally塊中的內容來確保執行關閉或清理資源的邏輯。

構造線程

private void init(ThreadGroup g, Runnable target, String name,long stackSize, 
AccessControlContext acc) {
      if (name == null) {
             throw new NullPointerException("name cannot be null");
      }
      // 當前線程就是該線程的父線程
      Thread parent = currentThread();
      this.group = g;
      // 將daemon、priority屬性設置爲父線程的對應屬性
      this.daemon = parent.isDaemon();
      this.priority = parent.getPriority();
      this.name = name.toCharArray();
      this.target = target;
      setPriority(priority);
      // 將父線程的InheritableThreadLocal複製過來
    if (parent.inheritableThreadLocals != null) 
        this.inheritableThreadLocals=ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
      // 分配一個線程ID
      tid = nextThreadID();
}

在上述過程當中,一個新構造的線程對象是由其parent線程來進行空間分配的,而child線程繼承了parent是否爲Daemon、優先級和加載資源的contextClassLoader以及可繼承的ThreadLocal,同時還會分配一個惟一的ID來標識這個child線程。至此,一個可以運行的線程對象就初始化好了,在堆內存中等待着運行。

啓動線程

線程對象在初始化完成以後,調用start()方法就能夠啓動這個線程。線程start()方法的含義是:當前線程(即parent線程)同步告知Java虛擬機,只要線程規劃器空閒,應當即啓動調用start()方法的線程。

理解中斷

中斷能夠理解爲線程的一個標識位屬性,它表示一個運行中的線程是否被其餘線程進行了中斷操做。中斷比如其餘線程對該線程打了個招呼,其餘線程經過調用該線程的interrupt()方法對其進行中斷操做。

例子

首先建立了兩個線程,SleepThread和BusyThread,前者不停地睡眠,後者一直運行,而後對這兩個線程分別進行中斷操做,觀察兩者的中斷標識位。

public class Interrupted {
    public static void main(String[] args) throws Exception {
        // sleepThread不停的嘗試睡眠
        Thread sleepThread = new Thread(new SleepRunner(), "SleepThread");
        sleepThread.setDaemon(true);
        // busyThread不停的運行
        Thread busyThread = new Thread(new BusyRunner(), "BusyThread");
        busyThread.setDaemon(true);
        sleepThread.start();
        busyThread.start();
        // 休眠5秒,讓sleepThread和busyThread充分運行
        TimeUnit.SECONDS.sleep(5);
        sleepThread.interrupt();
        busyThread.interrupt();
        System.out.println("SleepThread interrupted is " + sleepThread.isInterrupted());
        System.out.println("BusyThread interrupted is " + busyThread.isInterrupted());
        // 防止sleepThread和busyThread馬上退出
        SleepUtils.second(2);
    }
    static class SleepRunner implements Runnable {
        @Override
        public void run() {
            while (true) {
                SleepUtils.second(10); 
            }
        }
    }
    static class BusyRunner implements Runnable {
        @Override
        public void run() {
            while (true) {
            }
        }
    }
}

從結果能夠看出,拋出InterruptedException的線程SleepThread,其中斷標識位被清除了,而一直忙碌運做的線程BusyThread,中斷標識位沒有被清除。

過時的suspend()、resume()和stop()

音樂播放作出的暫停、恢復和中止操做對應在線程Thread的API就是suspend()、resume()和stop()。

不建議使用的緣由主要有:以suspend()方法爲例,在調用後,線程不會釋放已經佔有的資源(好比鎖),而是佔有着資源進入睡眠狀態,這樣容易引起死鎖問題。一樣,stop()方法在終結一個線程時不會保證線程的資源正常釋放,一般是沒有給予線程完成資源釋放工做的機會,所以會致使程序可能工做在不肯定狀態下。

安全地終止線程

中斷狀態是線程的一個標識位,而中斷操做是一種簡便的線程間交互方式,而這種交互方式最適合用來取消或中止任務。除了中斷之外,還能夠利用一個boolean變量來控制是否須要中止任務並終止該線程。

例子:建立了一個線程CountThread,它不斷地進行變量累加,而主線程嘗試對其進行中斷操做和中止操做。

public class Shutdown {
    public static void main(String[] args) throws Exception {
        Runner one = new Runner();
        Thread countThread = new Thread(one, "CountThread");
        countThread.start();
        // 睡眠1秒,main線程對CountThread進行中斷,使CountThread可以感知中斷而結束
        TimeUnit.SECONDS.sleep(1);
        countThread.interrupt();
        Runner two = new Runner();
     countThread
= new Thread(two, "CountThread"); countThread.start(); // 睡眠1秒,main線程對Runner two進行取消,使CountThread可以感知on爲false而結束 TimeUnit.SECONDS.sleep(1); two.cancel(); } private static class Runner implements Runnable { private long i; private volatile boolean on = true; @Override public void run() { while (on && !Thread.currentThread().isInterrupted()){ i++; } System.out.println("Count i = " + i); } public void cancel() { on = false; } } }

main線程經過中斷操做和cancel()方法都可使CountThread得以終止。這種經過標識位或者中斷操做的方式可以使線程在終止時有機會去清理資源,而不是武斷地將線程中止,所以這種終止線程的作法顯得更加安全和優雅。

線程間通訊

關鍵字volatile能夠用來修飾字段(成員變量),就是告知程序任何對該變量的訪問均須要從共享內存中獲取,而對它的改變必須同步刷新回共享內存,它能保證全部線程對變量訪問的可見性。

關鍵字synchronized能夠修飾方法或者以同步塊的形式來進行使用,它主要確保多個線程在同一個時刻,只能有一個線程處於方法或者同步塊中,它保證了線程對變量訪問的可見性和排他性。

例子:使用了同步塊和同步方法,經過使用javap工具查看生成的class文件信息來分析synchronized關鍵字的實現細節,示例以下。

public class Synchronized {
      public static void main(String[] args) {
           // 對Synchronized Class對象進行加鎖
           synchronized (Synchronized.class) {
           }
           // 靜態同步方法,對Synchronized Class對象進行加鎖
           m();
      }
      public static synchronized void m() {
      }
}

在Synchronized.class同級目錄執行javap–v Synchronized.class,部分相關輸出以下所示:

public static void main(java.lang.String[]);
       // 方法修飾符,表示:public staticflags: ACC_PUBLIC, ACC_STATIC
      Code:
           stack=2, locals=1, args_size=1
           0: ldc       #1  // class com/murdock/books/multithread/book/Synchronized
           2: dup 
           3: monitorenter  // monitorenter:監視器進入,獲取鎖
           4: monitorexit   // monitorexit:監視器退出,釋放鎖
           5: invokestatic  #16 // Method m:()V
           8: return
    public static synchronized void m();
    // 方法修飾符,表示: public static synchronized
    flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED
           Code:
                   stack=0, locals=0, args_size=0
                  0: return

上面class信息中,對於同步塊的實現使用了monitorenter和monitorexit指令,而同步方法則是依靠方法修飾符上的ACC_SYNCHRONIZED來完成的。不管採用哪一種方式,其本質是對一個對象的監視器(monitor)進行獲取,而這個獲取過程是排他的,也就是同一時刻只能有一個線程獲取到由synchronized所保護對象的監視器。

任意一個對象都擁有本身的監視器,當這個對象由同步塊或者這個對象的同步方法調用時,執行方法的線程必須先獲取到該對象的監視器才能進入同步塊或者同步方法,而沒有獲取到監視器(執行該方法)的線程將會被阻塞在同步塊和同步方法的入口處,進入BLOCKED狀態。

圖4-2描述了對象、對象的監視器、同步隊列和執行線程之間的關係。

從圖4-2中能夠看到,任意線程對Object(Object由synchronized保護)的訪問,首先要得到Object的監視器。若是獲取失敗,線程進入同步隊列,線程狀態變爲BLOCKED。當訪問Object的前驅(得到了鎖的線程)釋放了鎖,則該釋放操做喚醒阻塞在同步隊列中的線程,使其從新嘗試對監視器的獲取。

等待/通知機制

一個線程修改了一個對象的值,而另外一個線程感知到了變化,而後進行相應的操做,整個過程開始於一個線程,而最終執行又是另外一個線程。前者是生產者,後者就是消費者,這種模式隔離了「作什麼」(what)和「怎麼作」(How),在功能層面上實現瞭解耦,體系結構上具有了良好的伸縮性,可是在Java語言中如何實現相似的功能呢?

簡單的辦法是讓消費者線程不斷地循環檢查變量是否符合預期,以下面代碼所示,在while循環中設置不知足的條件,若是條件知足則退出while循環,從而完成消費者的工做。

while (value != desire) {
        Thread.sleep(1000);
}
doSomething();

上面這段僞代碼在條件不知足時就睡眠一段時間,這樣作的目的是防止過快的「無效」嘗試,這種方式看似可以解實現所需的功能,可是卻存在以下問題。

1)難以確保及時性。

2)難以下降開銷。

以上兩個問題,看似矛盾難以調和,可是Java經過內置的等待/通知機制可以很好地解決這個矛盾並實現所需的功能。

等待/通知的相關方法是任意Java對象都具有的,由於這些方法被定義在全部對象的超類java.lang.Object上

notify()

notifyAll()

wait()

waith(long)

waith(long,int)

等待/通知機制,是指一個線程A調用了對象O的wait()方法進入等待狀態,而另外一個線程B調用了對象O的notify()或者notifyAll()方法,線程A收到通知後從對象O的wait()方法返回,進而執行後續操做。上述兩個線程經過對象O來完成交互,而對象上的wait()和notify/notifyAll()的關係就如同開關信號同樣,用來完成等待方和通知方之間的交互工做。

例子:建立了兩個線程——WaitThread和NotifyThread,前者檢查flag值是否爲false,若是符合要求,進行後續操做,不然在lock上等待,後者在睡眠了一段時間後對lock進行通知,示例以下所示。

import java.util.concurrent.TimeUnit;

public class SleepUtils {
    public static final void second(long seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
        }
    }
}
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;

public class WaitNotify {
    static boolean flag = true;
    static Object lock = new Object();

    public static void main(String[] args) throws Exception {
        Thread waitThread = new Thread(new Wait(), "WaitThread");
        waitThread.start();
        TimeUnit.SECONDS.sleep(1);
        Thread notifyThread = new Thread(new Notify(), "NotifyThread");
        notifyThread.start();
    }

    static class Wait implements Runnable {
        public void run() {
            // 加鎖,擁有lock的Monitor
            synchronized (lock) {
                // 當條件不知足時,繼續wait,同時釋放了lock的鎖
                while (flag) {
                    try {
                        System.out.println(Thread.currentThread() + " flag is true. wait @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                        lock.wait();
                    } catch (InterruptedException e) {
                    }
                }
                // 條件知足時,完成工做
                System.out.println(Thread.currentThread() + " flag is false. running @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
            }
        }
    }

    static class Notify implements Runnable {
        public void run() {
            // 加鎖,擁有lock的Monitor
             synchronized (lock) {
                 // 獲取lock的鎖,而後進行通知,通知時不會釋放lock的鎖,
                 // 直到當前線程釋放了lock後,WaitThread才能從wait方法中返回
                 System.out.println(Thread.currentThread() + " hold lock. notify @ " + 
                 new SimpleDateFormat("HH:mm:ss").format(new Date()));
                 lock.notifyAll();
                 flag = false;
                 SleepUtils.second(5); 
             }
             // 再次加鎖
             synchronized (lock) {
                 System.out.println(Thread.currentThread() + " hold lock again. sleep @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                 SleepUtils.second(5);
             }
         }
    }
}

上述第3行和第4行輸出的順序可能會互換,而上述例子主要說明了調用wait()、notify()以及notifyAll()時須要注意的細節,以下。

1)使用wait()、notify()和notifyAll()時須要先對調用對象加鎖。

2)調用wait()方法後,線程狀態由RUNNING變爲WAITING,並將當前線程放置到對象的等待隊列。

3)notify()或notifyAll()方法調用後,等待線程依舊不會從wait()返回,須要調用notify()或notifAll()的線程釋放鎖以後,等待線程纔有機會從wait()返回。

4)notify()方法將等待隊列中的一個等待線程從等待隊列中移到同步隊列中,而notifyAll()方法則是將等待隊列中全部的線程所有移到同步隊列,被移動的線程狀態由WAITING變爲BLOCKED。

5)從wait()方法返回的前提是得到了調用對象的鎖。

從上述細節中能夠看到,等待/通知機制依託於同步機制,其目的就是確保等待線程從wait()方法返回時可以感知到通知線程對變量作出的修改。

圖4-3描述了上述示例的過程。

在圖4-3中,WaitThread首先獲取了對象的鎖,而後調用對象的wait()方法,從而放棄了鎖並進入了對象的等待隊列WaitQueue中,進入等待狀態。因爲WaitThread釋放了對象的鎖,NotifyThread隨後獲取了對象的鎖,並調用對象的notify()方法,將WaitThread從WaitQueue移到SynchronizedQueue中,此時WaitThread的狀態變爲阻塞狀態。NotifyThread釋放了鎖以後,WaitThread再次獲取到鎖並從wait()方法返回繼續執行。

等待/通知的經典範式

從4.3.2節中的WaitNotify示例中能夠提煉出等待/通知的經典範式,該範式分爲兩部分,分別針對等待方(消費者)和通知方(生產者)。

等待方遵循以下原則。

1)獲取對象的鎖。

2)若是條件不知足,那麼調用對象的wait()方法,被通知後仍要檢查條件。

3)條件知足則執行對應的邏輯。

對應的僞代碼以下。

synchronized(對象) {
       while(條件不知足) {
              對象.wait();
       }
       對應的處理邏輯
}

通知方遵循以下原則。

1)得到對象的鎖。

2)改變條件。

3)通知全部等待在對象上的線程。

對應的僞代碼以下。

synchronized(對象) {
       改變條件
       對象.notifyAll();
}

管道輸入/輸出流

管道輸入/輸出流和普通的文件輸入/輸出流或者網絡輸入/輸出流不一樣之處在於,它主要用於線程之間的數據傳輸,而傳輸的媒介爲內存。

管道輸入/輸出流主要包括了以下4種具體實現:PipedOutputStream、PipedInputStream、PipedReader和PipedWriter,前兩種面向字節,然後兩種面向字符。

Thread.join()的使用

若是一個線程A執行了thread.join()語句,其含義是:當前線程A等待thread線程終止以後才從thread.join()返回。線程Thread除了提供join()方法以外,還提供了join(long millis)和join(long millis,int nanos)兩個具有超時特性的方法。這兩個超時方法表示,若是線程thread在給定的超時時間裏沒有終止,那麼將會從該超時方法中返回。

在代碼清單4-13所示的例子中,建立了10個線程,編號0~9,每一個線程調用前一個線程的join()方法,也就是線程0結束了,線程1才能從join()方法中返回,而線程0須要等待main線程結束。

public class Join {
    public static void main(String[] args) throws Exception {
        Thread previous = Thread.currentThread();
        for (int i = 0; i < 10; i++) {
            // 每一個線程擁有前一個線程的引用,須要等待前一個線程終止,才能從等待中返回
            Thread thread = new Thread(new Domino(previous), String.valueOf(i));
            thread.start();
            previous = thread;
        }
        TimeUnit.SECONDS.sleep(5);
        System.out.println(Thread.currentThread().getName() + " terminate.");
    }
    static class Domino implements Runnable {
        private Thread thread;
        public Domino(Thread thread) {
            this.thread = thread;
        }
        public void run() {
            try {
                thread.join();
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread().getName() + " terminate.");
        }
    }
}

從上述輸出能夠看到,每一個線程終止的前提是前驅線程的終止,每一個線程等待前驅線程終止後,才從join()方法返回,這裏涉及了等待/通知機制(等待前驅線程結束,接收前驅線程結束通知)。

代碼清單4-14是JDK中Thread.join()方法的源碼(進行了部分調整)。

代碼清單4-14 Thread.java

// 加鎖當前線程對象
public final synchronized void join() throws InterruptedException {
       // 條件不知足,繼續等待
       while (isAlive()) {
              wait(0);
       }
       // 條件符合,方法返回
}

當線程終止時,會調用線程自身的notifyAll()方法,會通知全部等待在該線程對象上的線程。能夠看到join()方法的邏輯結構與4.3.3節中描述的等待/通知經典範式一致,即加鎖、循環和處理邏輯3個步驟。

ThreadLocal的使用

hreadLocal,即線程變量,是一個以ThreadLocal對象爲鍵、任意對象爲值的存儲結構。這個結構被附帶在線程上,也就是說一個線程能夠根據一個ThreadLocal對象查詢到綁定在這個線程上的一個值。

能夠經過set(T)方法來設置一個值,在當前線程下再經過get()方法獲取到原先設置的值。

在代碼清單4-15所示的例子中,構建了一個經常使用的Profiler類,它具備begin()和end()兩個方法,而end()方法返回從begin()方法調用開始到end()方法被調用時的時間差,單位是毫秒。

代碼清單4-15 Profiler.java

public class Profiler {
    // 第一次get()方法調用時會進行初始化(若是set方法沒有調用),每一個線程會調用一次
    private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<Long>() {
        protected Long initialValue() {
            return System.currentTimeMillis();
        }
    };
    public static final void begin() {
        TIME_THREADLOCAL.set(System.currentTimeMillis());
    }
    public static final long end() {
        return System.currentTimeMillis() - TIME_THREADLOCAL.get();
    }
    public static void main(String[] args) throws Exception {
        Profiler.begin();
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Cost: " + Profiler.end() + " mills");
    }
}

Profiler能夠被複用在方法調用耗時統計的功能上,在方法的入口前執行begin()方法,在方法調用後執行end()方法,好處是兩個方法的調用不用在一個方法或者類中,好比在AOP(面向方面編程)中,能夠在方法調用前的切入點執行begin()方法,而在方法調用後的切入點執行end()方法,這樣依舊能夠得到方法的執行耗時。

線程應用實例

等待超時模式

開發人員常常會遇到這樣的方法調用場景:調用一個方法時等待一段時間(通常來講是給定一個時間段),若是該方法可以在給定的時間段以內獲得結果,那麼將結果馬上返回,反之,超時返回默認結果。

前面的章節介紹了等待/通知的經典範式,即加鎖、條件循環和處理邏輯3個步驟,而這種範式沒法作到超時等待。而超時等待的加入,只須要對經典範式作出很是小的改動,改動內容以下所示。

假設超時時間段是T,那麼能夠推斷出在當前時間now+T以後就會超時。

定義以下變量。

·等待持續時間:REMAINING=T。

·超時時間:FUTURE=now+T。

這時僅須要wait(REMAINING)便可,在wait(REMAINING)返回以後會將執行:REMAINING=FUTURE–now。若是REMAINING小於等於0,表示已經超時,直接退出,不然將繼續執行wait(REMAINING)。

上述描述等待超時模式的僞代碼以下。

// 對當前對象加鎖
public synchronized Object get(long mills) throws InterruptedException {
       long future = System.currentTimeMillis() + mills;
       long remaining = mills;
       // 當超時大於0而且result返回值不知足要求
       while ((result == null) && remaining > 0) {
              wait(remaining);
              remaining = future - System.currentTimeMillis();
       }
              return result;
}

能夠看出,等待超時模式就是在等待/通知範式基礎上增長了超時控制,這使得該模式相比原有範式更具備靈活性,由於即便方法執行時間過長,也不會「永久」阻塞調用者,而是會按照調用者的要求「按時」返回。

相關文章
相關標籤/搜索