Java多線程之併發協做生產者消費者設計模式

1、兩個線程一個生產者一個消費者java

需求情景多線程

  • 兩個線程,一個負責生產,一個負責消費,生產者生產一個,消費者消費一個。

涉及問題併發

  • 同步問題:如何保證同一資源被多個線程併發訪問時的完整性。經常使用的同步方法是採用標記或加鎖機制。
  • wait() / nofity() 方法是基類Object的兩個方法,也就意味着全部Java類都會擁有這兩個方法,這樣,咱們就能夠爲任何對象實現同步機制。
  • wait()方法:當緩衝區已滿/空時,生產者/消費者線程中止本身的執行,放棄鎖,使本身處於等待狀態,讓其餘線程執行。
  • notify()方法:當生產者/消費者向緩衝區放入/取出一個產品時,向其餘等待的線程發出可執行的通知,同時放棄鎖,使本身處於等待狀態。

代碼實現(共三個類和一個main方法的測試類)ide

Resource.java函數

package com.demo.ProducerConsumer;

/**
 * 資源
 * @author lixiaoxi
 *
 */
public class Resource {

    /*資源序號*/
    private int number = 0;
    /*資源標記*/
    private boolean flag = false;

    /**
     * 生產資源
     */
    public synchronized void create() {
        if (flag) {//先判斷標記是否已經生產了,若是已經生產,等待消費;
            try {
                wait();//讓生產線程等待
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        number++;//生產一個
        System.out.println(Thread.currentThread().getName() + "生產者------------" + number);
        flag = true;//將資源標記爲已經生產
        notify();//喚醒在等待操做資源的線程(隊列)
    }

    /**
     * 消費資源
     */
    public synchronized void destroy() {
        if (!flag) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println(Thread.currentThread().getName() + "消費者****" + number);

        flag = false;
        notify();
    }
}

Producer.java測試

package com.demo.ProducerConsumer;

/**
 * 生產者
 * @author lixiaoxi
 *
 */
public class Producer implements Runnable{

    private Resource resource;

    public Producer(Resource resource) {
        this.resource = resource;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resource.create();
        }

    }
}

Consumer.javathis

package com.demo.ProducerConsumer;

/**
 * 消費者
 * @author lixiaoxi
 *
 */
public class Consumer implements Runnable{

    private Resource resource;

    public Consumer(Resource resource) {
        this.resource = resource;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resource.destroy();
        }

    }
}

ProducerConsumerTest.javaspa

package com.demo.ProducerConsumer;

public class ProducerConsumerTest {

    public static void main(String args[]) {
        Resource resource = new Resource();
        new Thread(new Producer(resource)).start();//生產者線程
        new Thread(new Consumer(resource)).start();//消費者線程

    }
}

打印結果:線程

以上打印結果能夠看出沒有任何問題。3d

2、多個線程,多個生產者和多個消費者的問題

需求情景

  • 四個線程,兩個個負責生產,兩個個負責消費,生產者生產一個,消費者消費一個。

涉及問題

  • notifyAll()方法:當生產者/消費者向緩衝區放入/取出一個產品時,向其餘等待的全部線程發出可執行的通知,同時放棄鎖,使本身處於等待狀態。

再次測試代碼

ProducerConsumerTest.java

package com.demo.ProducerConsumer;

public class ProducerConsumerTest {

    public static void main(String args[]) {
        Resource resource = new Resource();
        new Thread(new Producer(resource)).start();//生產者線程
        new Thread(new Producer(resource)).start();//生產者線程
        new Thread(new Consumer(resource)).start();//消費者線程
        new Thread(new Consumer(resource)).start();//消費者線程

    }
}

運行結果:

經過以上打印結果發現問題

  • 147生產了一次,消費了兩次。
  • 169生產了,而沒有消費。

緣由分析

  • 當兩個線程同時操做生產者生產或者消費者消費時,若是有生產者或消費者的兩個線程都wait()時,再次notify(),因爲其中一個線程已經改變了標記而另一個線程再次往下直接執行的時候沒有判斷標記而致使的。
  • if判斷標記,只有一次,會致使不應運行的線程運行了。出現了數據錯誤的狀況。

解決方案

  • while判斷標記,解決了線程獲取執行權後,是否要運行!也就是每次wait()後再notify()時先再次判斷標記。

代碼改進(Resource中的 if -> while)

Resource.java

package com.demo.ProducerConsumer;

/**
 * 資源
 * @author lixiaoxi
 *
 */
public class Resource {

    /*資源序號*/
    private int number = 0;
    /*資源標記*/
    private boolean flag = false;

    /**
     * 生產資源
     */
    public synchronized void create() {
        while (flag) {//先判斷標記是否已經生產了,若是已經生產,等待消費;
            try {
                wait();//讓生產線程等待
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        number++;//生產一個
        System.out.println(Thread.currentThread().getName() + "生產者------------" + number);
        flag = true;//將資源標記爲已經生產
        notify();//喚醒在等待操做資源的線程(隊列)
    }

    /**
     * 消費資源
     */
    public synchronized void destroy() {
        while (!flag) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println(Thread.currentThread().getName() + "消費者****" + number);

        flag = false;
        notify();
    }
}

運行結果:

再次發現問題

  • 打印到某個值好比生產完187,程序運行卡死了,好像鎖死了同樣。

緣由分析

  • notify:只能喚醒一個線程,若是本方喚醒了本方,沒有意義。並且while判斷標記+notify會致使」死鎖」

解決方案

  • notifyAll解決了本方線程必定會喚醒對方線程的問題

最後代碼改進(Resource中的 notify() -> notifyAll())

Resource.java

package com.demo.ProducerConsumer;

/**
 * 資源
 * @author lixiaoxi
 *
 */
public class Resource {

    /*資源序號*/
    private int number = 0;
    /*資源標記*/
    private boolean flag = false;

    /**
     * 生產資源
     */
    public synchronized void create() {
        while (flag) {//先判斷標記是否已經生產了,若是已經生產,等待消費;
            try {
                wait();//讓生產線程等待
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        number++;//生產一個
        System.out.println(Thread.currentThread().getName() + "生產者------------" + number);
        flag = true;//將資源標記爲已經生產
        notifyAll();//喚醒在等待操做資源的線程(隊列)
    }

    /**
     * 消費資源
     */
    public synchronized void destroy() {
        while (!flag) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println(Thread.currentThread().getName() + "消費者****" + number);

        flag = false;
        notifyAll();
    }
}

運行結果:

以上就大功告成了,沒有任何問題。

    再來梳理一下整個流程。按照示例,生產者消費者交替運行,每次生產後都有對應的消費者,測試類建立實例,若是是生產者先運行,進入run()方法,進入create()方法,flag默認爲false,number+1,生產者生產一個產品,flag置爲true,同時調用notifyAll()方法,喚醒全部正在等待的線程,接下來若是仍是生產者運行呢?這是flag爲true,進入while循環,執行wait()方法,接下來若是是消費者運行的話,調用destroy()方法,這時flag爲true,消費者購買了一次產品,隨即將flag置爲false,並喚醒全部正在等待的線程。這就是一次完整的多生產者對應多消費者的問題。

3、使用Lock和Condition來解決生產者消費者問題

    上面的代碼有一個問題,就是咱們爲了不全部的線程都處於等待的狀態,使用了notifyAll方法來喚醒全部的線程,即notifyAll喚醒的是本身方和對方線程。若是我須要只是喚醒對方的線程,好比:生產者只能喚醒消費者的線程,消費者只能喚醒生產者的線程。 

在jdk1.5當中爲咱們提供了多線程的升級解決方案: 

1. 將同步synchronized替換成了Lock操做。 

2. 將Object中的wait,notify,notifyAll方法替換成了Condition對象。 

3. 能夠只喚醒對方的線程。

完整代碼:

Resource1.java

package com.demo.ProducerConsumer;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 資源
 * @author lixiaoxi
 *
 */
public class Resource1 {

    /*資源序號*/
    private int number = 0;
    /*資源標記*/
    private boolean flag = false;
    
    private Lock lock = new ReentrantLock();
    //使用lock創建生產者的condition對象
    private Condition condition_pro = lock.newCondition(); 
    //使用lock創建消費者的condition對象
    private Condition condition_con = lock.newCondition(); 


    /**
     * 生產資源
     */
    public void create() throws InterruptedException {
        
        try{
            lock.lock();
            //先判斷標記是否已經生產了,若是已經生產,等待消費
            while(flag){
                //生產者等待
                condition_pro.await();
            }
            //生產一個
            number++;
            System.out.println(Thread.currentThread().getName() + "生產者------------" + number);
            //將資源標記爲已經生產
            flag = true;
            //生產者生產完畢後,喚醒消費者的線程(注意這裏不是signalAll)
            condition_con.signal();
        }finally{
            lock.unlock();
        }
    }

    /**
     * 消費資源
     */
    public void destroy() throws InterruptedException{

        try{
            lock.lock();
            //先判斷標記是否已經消費了,若是已經消費,等待生產
            while(!flag){
                //消費者等待
                condition_con.await();
            }
            
            System.out.println(Thread.currentThread().getName() + "消費者****" + number);
            //將資源標記爲已經消費
            flag = false;
            //消費者消費完畢後,喚醒生產者的線程
            condition_pro.signal();
        }finally{
            lock.unlock();
        }
    }
}

Producer1.java

package com.demo.ProducerConsumer;

/**
 * 生產者
 * @author lixiaoxi
 *
 */
public class Producer1 implements Runnable{

    private Resource1 resource;

    public Producer1(Resource1 resource) {
        this.resource = resource;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(10);
                resource.create();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
}

Consumer1.java

package com.demo.ProducerConsumer;

/**
 * 消費者
 * @author lixiaoxi
 *
 */
public class Consumer1 implements Runnable{

    private Resource1 resource;

    public Consumer1(Resource1 resource) {
        this.resource = resource;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(10);
                resource.destroy();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
}

ProducerConsumerTest1.java

package com.demo.ProducerConsumer;

public class ProducerConsumerTest1 {

    public static void main(String args[]) {
        Resource1 resource = new Resource1();
        new Thread(new Producer1(resource)).start();//生產者線程
        new Thread(new Producer1(resource)).start();//生產者線程
        new Thread(new Consumer1(resource)).start();//消費者線程
        new Thread(new Consumer1(resource)).start();//消費者線程

    }
}

運行結果:

4、總結

一、若是生產者、消費者都是1個,那麼flag標記能夠用if判斷。這裏有多個,必須用while判斷。

二、在while判斷的同時,notify函數可能喚醒本類線程(如一個消費者喚醒另外一個消費者),這會致使全部消費者忙等待,程序沒法繼續往下執行。使用notifyAll函數代替notify能夠解決這個問題,notifyAll能夠保證非本類線程被喚醒(消費者線程能喚醒生產者線程,反之也能夠),解決了忙等待問題。

當心假死

    生產者/消費者模型最終達到的目的是平衡生產者和消費者的處理能力,達到這個目的的過程當中,並不要求只有一個生產者和一個消費者。能夠多個生產者對應多個消費者,能夠一個生產者對應一個消費者,能夠多個生產者對應一個消費者。

    假死就發生在上面三種場景下。假死指的是所有線程都進入了WAITING狀態,那麼程序就再也不執行任何業務功能了,整個項目呈現停滯狀態。

    比方說有生產者A和生產者B,緩衝區因爲空了,消費者處於WAITING。生產者B處於WAITING,生產者A被消費者通知生產,生產者A生產出來的產品本應該通知消費者,結果通知了生產者B,生產者B被喚醒,發現緩衝區滿了,因而繼續WAITING。至此,兩個生產者線程處於WAITING,消費者處於WAITING,系統假死。

    上面的分析能夠看出,假死出現的緣由是由於notify的是同類,因此非單生產者/單消費者的場景,能夠採起兩種方法解決這個問題:

(1)synchronized用notifyAll()喚醒全部線程、ReentrantLock用signalAll()喚醒全部線程。

(2)用ReentrantLock定義兩個Condition,一個表示生產者的Condition,一個表示消費者的Condition,喚醒的時候調用相應的Condition的signal()方法就能夠了。

相關文章
相關標籤/搜索