Java併發讀書筆記:如何實現線程間正確通訊

在併發編程中,保證線程同步,從而實現線程之間正確通訊,是一個值得考慮的問題。本篇將參考許多著名書籍,學習如何讓多個線程之間相互配合,完成咱們指定的任務。面試

1、synchronized 與 volatile

synchronized關鍵字是Java提供的互斥的內置鎖,該鎖機制不用顯式加鎖或者釋放鎖。互斥執行的特性能夠確保對整個臨界區代碼的執行具備原子性,同步機制保證了共享數據在同一個時刻只被一個線程使用編程

回顧如下synchronized的底層實現:併發

咱們能夠對下面這段代碼進行反編譯:javap -v TestData.classide

public class TestData {
    public static synchronized void m1(){}
    public synchronized void m2(){}
    public static void main(String[] args) {
        synchronized (TestData.class){
        }
    }
}

編譯結果以下:
學習

雖然同步方法和代碼塊的實現細節不一樣,可是歸根結底:JVM對於方法或者代碼塊的實現是基於對Monitor對象的進入和退出操做測試

以同步代碼塊舉例:this

  • monitorenter指令被安排到了代碼塊開始位置,monitorexit被安排到代碼塊正常結束和異常處
  • 任何對象都有一個monitor與之相關聯,當一個monitor被持有以後,它將會出於鎖定狀態。
  • 當JVM執行到monitorenter指令時,將會嘗試去獲取當前對象對應的monitor的全部權。
    • 若其餘前程已經有monitor的全部權,那麼當前線程將會進入同步隊列(SynchronizedQueue),陷入阻塞狀態(BLOCKED),直到monitor被釋放。
    • 若monitor進入數爲0,線程能夠進入monitor,此時該線程稱爲monitor的持有者(owner),並計數加一。
    • 若當前線程已經擁有monitor,是容許從新進入該monitor的,此時計數加一。
  • 得到鎖,鎖計數加一。失去鎖,計數減一。計數爲0,即爲釋放鎖。釋放鎖的操做將會喚醒阻塞在同步隊列中的的線程,使其從新得到嘗試對monitor的獲取。

下圖源自《Java併發編程得藝術》4-2
線程


新Java內存模型中提供了比鎖更加輕量級的通訊機制,它加強了volatile的內存語義,讓volatile擁有和鎖同樣的語義:告知程序任何對volatile修飾變量的訪問都要從共享內存中獲取,對它的改變必須同步刷新回共享內存,保證了線程對變量訪問的可見性code

關於volatile的重點學習,以後再作總結。

2、等待/通知機制

等待/通知相關的方法被定義在java.lang.Object上,這些方法必須由鎖對象來調用。同步實例方法爲this,靜態方法爲類對象,代碼塊的鎖是括號裏的玩意兒。

這些方法必須須要獲取鎖對象以後才能調用,也就是必需要在同步塊中或同步方法中調用,不然會拋出IllegalMonitorStateException的異常。

等待

wait() : 調用該方法的線程進入WAITING狀態,並釋放對象的鎖,此時當前線程只有被其餘線程通知或中斷纔會返回。

wait(long)wait(long, int):進入TIMED_WAITING狀態,釋放鎖,當前線程有通知或中斷會返回,時間到了也會返回。

通知

notify() : 當前線程通知一個在該對象上等待的另外一線程,被喚醒的線程從等待隊列(WAITING)被移動到同步隊列(BLOCKED)中,意思是被喚醒的線程不會當即執行,須要等當前線程釋放鎖以後,而且在同步隊列中的線程獲得了鎖才能執行。
notifyAll() :當前線程通知全部等待在該對象上的線程,將全部在等待隊列中的線程所有移到同步隊列中。

假設A和B須要獲取同一把鎖,A進入以後,B進入同步隊列,陷入阻塞(BLOCKED)。

若是A中調用鎖的wait()方法,A釋放鎖,並陷入等待(WAITING)。此時另一個線程B獲取的當前鎖,B運行。

若是此時B中調用鎖的notify()方法,A被喚醒,從等待隊列轉移到同步隊列,只有B運行完畢了,鎖被釋放了,A拿到鎖了,A纔出來運行。

等待/通知機制依託於同步機制,確保等待線程從wait()方法返回時可以感知到通知線程對變量作出的修改

面試常問的幾個問題

sleep方法和wait方法的區別

sleep()和wait()方法均可以讓線程放棄CPU一段時間,進入等待(WAITING)狀態

sleep()靜態方法定義在Thread類中,wait()定義在Object類中。

若是線程持有某個對象的監視器,wait()調用以後,當前線程會釋放鎖,而sleep()則不會釋放這個鎖

關於放棄對象監視器

對於放棄對象監視器,wait()方法和notify()/notifyAll()有必定區別:

鎖對象調用wait()方法以後,會當即釋放對象監視器。而notify()/notifyAll()則不會當即釋放,而是等到線程剩餘代碼執行完畢以後纔會釋放監視器。


3、等待通知典型

經過wait()和notify()/notifyAll()能夠有效地協調多個線程之間的工做,提升了線程通訊的效率。

生產者消費者模型

  • 經過平衡生產者的生產能力和消費者能力來提高整個系統的運行效率。
  • 減小生產者與消費者之間的聯繫。實現很好的解耦。

下面代碼保留主要的思路,具體的視狀況而定。

定義一個簡單的產品類Product,裏面定義一個判斷產品有無的標識位。

//產品
    public class Product {
        public boolean exist = false;
    }

而後定義消費方中的run方法。
首先獲取對象的鎖,若是產品不存在,則等待,不然消費一次,並把標識位置爲false,並喚醒生產線程。

//消費方
    synchronized (product) {
        while (true) {
            TimeUnit.SECONDS.sleep(1);
            while (!product.exist) {
                product.wait();
            }
            System.out.println("消費一次");
            product.exist = false;
            product.notifyAll();
        }
    }

生產方與消費方對應,依舊是先獲取對象的鎖,而後對標識位進行判斷,若是已經有產品了,就等待,不然就生產一次,並把標識位附爲true,最後喚醒正在等待的消費方。

//生產方
    synchronized (product) {
        while (true) {
            TimeUnit.SECONDS.sleep(1);
            while (product.exist) {
                product.wait();
            }
            System.out.println("生產一次");
            product.exist = true;
            product.notifyAll();
        }
    }

這個過程是最基本的,咱們更須要理解wait(),notify()等方法帶來的便利之處。在真實場景更加複雜的狀況下,好比在生產與消費速度不對等的狀況下,須要建立緩衝區等等。

可能會出錯的代碼

//T1
    synchronized (product) {
            product.exist = false;
            product.notifyAll();
        }
    }

    //T2
    synchronized(product){
        while(product.exist)
            product.wait();
    }
    
    //T3
    while (product.exist) {
        //A
        synchronized(product){
            product.wait();
        }
    }

假設T1和T3是通訊雙方,這時就可能會產生通知丟失的狀況:

  • 假設T3尚未得到鎖,運行到A點,這時線程調度器將資源分給T1線程,此時T3在A點阻塞。
  • T1線程中但願阻止T3陷入等待,因而將標識符設置位false,在標識位上出現了競爭。
  • 可是當T1執行完畢,T3繼續執行的時候,並不能知道這個標識位已經發生改變,因而它將會永久陷入等待。

因而,咱們能夠學習到一點,爲了消除多個線程在標識位上出現的競爭,咱們能夠採用T2的形式,給線程上一把鎖,保證被通知以後先檢查條件是否符合。

4、使用顯式的Lock和Condition

咱們以前也學習過,使用顯式的Lock對象來保證線程同步的話,隱式的監視器就不存在了,也就沒法使用wait()和notify()/notifyAll()。

Java提供了Condition接口來保持線程之間的協調通訊,經過Condition對象和Lock對象的配合,能夠完成synchronized同步方法與代碼塊完成的任務。

我很好奇Condition和Lock是怎麼創建聯繫的,因而查看了它們的繼承關係:

Condition接口是JDK1.5出現的,該接口提供的方法被ConditionObject類實現,該類是AbstractQueuedSynchronizer(AQS)的內部類,而ReentrantLock類內部維護了一個Sync對象,Sync擁有一個返回ConditionObject實例的方法,Sync繼承於AQS。

Condition接口內的方法詳解

參考JDK1.8官方文檔

void await() throws InterruptedException;

當前線程進入等待狀態直到被通知或者中斷,當前線程進入運行狀態且從await()方法返回的四種狀況:

  • 其餘線程調用這個條件的signal()方法,且當前線程被選擇爲要喚醒的線程
  • 其餘線程會爲此條件調用signalAll()方法。
  • 其餘線程中斷當前線程。
  • 發生」虛假喚醒「現象,參考:虛假喚醒(spurious wakeup)

須要注意的是:在上面全部狀況下,要想從await()返回,當前線程必須從新獲取與此條件關聯的鎖

void awaitUninterruptibly()

當前線程進入等待狀態直到被通知,它對中斷不敏感,所以他從等待狀態返回的場景區別於await(),僅僅少了第三點中斷場景。

long awaitNanos(long nanosTimeout) throws InterruptedException

該方法致使當前線程等待,直到被通知、中斷,或超時。該方法根據返回時提供的nanosTimeout值,返回剩餘等待的納秒數的估計值,若是超時,則返回小於或等於零的值。此值可用於肯定在等待返回但等待條件仍然無效的狀況下是否須要從新等待,以及須要多長時間從新等待。

還有零一個相似的方法就不贅述了。

boolean awaitUntil(Date deadline) throws InterruptedException

該方法致使當前線程等待,直到被通知或中斷,或到了指定的截止日期。若是返回時截止日期已通過了,則爲false,不然爲true。

void signal()

喚醒一個正在等待的線程,被喚醒的線程想要從await方法返回須要從新得到Condition相關聯的鎖。

void signalAll()

喚醒全部等待的線程,一樣的,想要從await方法返回就必須從新得到Condition相關聯鎖。

Condition與Lock配合

Condition接口依賴於Lock,咱們能夠這樣建立特定Lock實例的Condition實例:

Lock lock = new ReentrantLock(); //建立Lock對象
Condition condition = lock.newCondition(); //利用lock對象的newCondition()建立Condition對象

既然具備依賴關係,那麼只有獲取了lock,才能夠調用Condition中提供的方法,也就是隻能在Lock.lock()與Lock.unlock()之間調用。

官方文檔給出的定義:Condition的存在能夠將對象監視器方法(wait、notify和notifyAll)分解到不一樣的對象中,經過將它們與任意Lock實現結合使用,實現每一個對象具備多個等待集的效果。鎖代替同步方法和語句的使用,條件代替對象監視器方法的使用。

一個Lock對象能夠關聯多個Condition對象,分別做爲不一樣的條件檢測,這裏給一個簡易版生產者消費者模型的Demo:

  • 首先定義一個公共產品類,在類中定義相應的生產、消費邏輯。
public class Product {
    //共享產品編號
    private int count = 0;
    //標識位,標識是否還有產品
    private boolean flag = false;

    //建立Lock鎖對象
    private Lock lock = new ReentrantLock();
    //建立兩個Condition對象,做爲兩種條件檢測
    private Condition condProducer = lock.newCondition();
    private Condition condConsumer = lock.newCondition();

    //生產方法
    public void produce() {
        lock.lock(); //上鎖
        try {
            //驅使線程等待的條件
            while (flag) {
                condProducer.await(); //若是flag爲true,則不用生產
            }
            count++;
            System.out.println(Thread.currentThread().getName() + "生產產品一件,產品編號" + count);
            //生產完成,將標識爲改成false
            flag = true;
            //喚醒conConsumer條件下的全部線程(固然,這裏只有一個)
            condConsumer.signalAll();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();//在finally中,保證解鎖
        }
    }
    //消費方法
    public void consume() {
        lock.lock();
        try {
            //驅使線程等待的條件
            while (!flag) {
                condConsumer.await(); //若是flag爲false,則不用消費
            }

            //消費的邏輯
            System.out.println(Thread.currentThread().getName() + "消費產品一件,產品編號" + count);
            flag = false;
            condProducer.signalAll();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
  • 而後建立生產者Producer和消費者Consumer兩個線程類,只需將公共產品對象傳入構造器中,使其創建聯繫。
//生產者線程
class Producer implements Runnable {
    private Product product;

    Producer(Product product) {
        this.product = product;
    }

    @Override
    public void run() {
        //每一個生產者線程生產會生產五件產品
        for (int i = 0; i < 5; i++) {
            product.produce();
        }
    }
}
//消費者線程
class Consumer implements Runnable {
    private Product product;

    Consumer(Product product) {
        this.product = product;
    }

    @Override
    public void run() {
        //每一個消費者線程會消費五件產品
        for (int i = 0; i < 5; i++) {
            product.consume();
        }
    }
}

仍是那個問題:咱們須要用wait()方法須要被包含在while循環語句中,防止過早或意外的通知,保證只有不符合等待的條件才能退出循環。換句話說,使用while循環而不用if判斷能夠有效防止」虛假喚醒「的現象。

Condition接口與Object監視器

下表參考自《Java併發編程的藝術》方騰飛

對比項 Object Monitor Method Condition
前置條件 獲取對象的鎖 Lock.lock()獲取鎖、Lock.newCondition()獲取Condition對象
調用方式 如object.wait() 如condition.await()
等待隊列個數 一個 多個
釋放鎖、進入等待 支持、如void wait() 支持、如void await()
釋放鎖、進入超時等待 支持、如void wait(long ) 支持long awaitNanos(long)
釋放鎖、進入等待狀態到未來某個時間 不支持 支持、例如 long awaitNanos(long)
等待狀態不響應中斷 不支持 支持、例如 void awaitUninterruptibly()
喚醒等待隊列中的一個線程 支持、如void notify() 支持、如 void signal()
喚醒等待隊列中的全部線程 支持、如void notifyAll() 支持、如void signalAll()

關於Condition和Lock,以後會有相關文章對它們進行更詳細的系統學習,本篇文章主要理解它們進行線程通訊的基本方法。

5、管道輸入、輸出流

管道輸入輸出流主要用於線程之間的數據傳輸,傳輸媒介爲內存。

面向字節:PipedOutputStream、PipedInputStream
面向字符:PipedWriter、PipedReader

下面是一個經過管道輸入輸出流完成線程間通訊:

public class Piped {
    public static void main(String[] args) throws IOException {
        //建立管道輸入輸出流
        PipedWriter out = new PipedWriter();
        PipedReader in = new PipedReader();

        //將輸入輸出流連結起來,不然在使用的時候會拋出異常
        out.connect(in);
        Thread printThread = new Thread(new Print(in),"PrintThread");
        printThread.start();
        
        //標準輸入流轉化到管道輸出流
        int receive;
        try{
            while((receive = System.in.read())!=-1){
                out.write(receive);
            }
        }finally {
            out.close();
        }
    }

    //定義線程類,接收管道輸入流,寫入標準輸出流
    static class Print implements Runnable{
        private PipedReader in;
        public Print(PipedReader in){
            this.in = in;
        }
        @Override
        public void run() {
            int receive;
            try{
                while((receive = in.read())!=-1){
                    System.out.print((char)receive);
                }
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    }
}

6、Thread.join()

關於join方法,以前已經作過相應的總結,這邊就再也不作詳細的說明。

官方解釋簡潔明瞭:Waits for this thread to die.,很明顯,針對線程來講,誰調用,等誰死。舉個例子:當在A線程中調用B線程的join()方法時,A線程將會陷入等待或超時等待,直到B線程執行完畢消亡才轉變爲阻塞

join()方法具體有三個:

//等待該線程消亡。
public final void join()
//等待該線程消亡,只不過最多等millis毫秒。
public final synchronized void join(long millis)
//等待該線程消亡,只不過最多等millis毫秒+nanos納秒(毫微秒)。
public final synchronized void join(long millis, int nanos)

7、利用ThreadLocal

一樣的,關於ThreadLocal更詳細的學習會在以後出爐,本篇着重理解通訊方法。

ThreadLocal,是線程局部變量,它是一個以ThreadLocal對象爲鍵、任意對象爲值的存儲結構,該結構被附帶在線程上,線程能夠根據一個Thread對象查詢到綁定在這個線程上的值。

它爲每個使用該變量的線程都提供了一個變量值的副本,使得每個線程均可以獨立地改變本身的副本,而不會產生多個線程在操做共享數據通過主內存時產生的數據競爭的問題。

咱們能夠利用set和get,設置和取出局部變量的值。須要明確的是:無論有多少個線程,用ThreadLocal定義了局部變量,就會在線程中各自產生一份副本,自此,各個線程之間的讀和寫操做互不相關,咱們能夠利用這一性質,完成咱們特殊的需求。

public class Profiler {
    // 定義一個ThreadLocal類型的變量,該變量是一個線程局部變量
    private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<Long>(){
        //重寫方法,爲該局部變量賦初始值
        protected Long initialValue(){
            return System.currentTimeMillis();
        }
    };
    //public void set(T value),設置該局部變量值
    public static  final void begin(){
        TIME_THREADLOCAL.set(System.currentTimeMillis());
    }
    //public T get() ,取出該局部變量的值
    public static final long cost(){
        return System.currentTimeMillis() - TIME_THREADLOCAL.get();
    }
    //測試
    public static void main(String[] args) throws Exception{
        Profiler.begin();
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Cost: "+ Profiler.cost()+" mills");

    }
}

上面使用案例摘自《Java併發編程的藝術》,關於ThreadLocal更詳細的分析,以後會再作總結。


參考:《Java併發編程的藝術》、JDK官方文檔

相關文章
相關標籤/搜索