阻塞隊列BlockingQueue

阻塞隊列

概念

隊列

隊列就能夠想成是一個數組,從一頭進入,一頭出去,排隊買飯java

阻塞隊列

BlockingQueue 阻塞隊列,排隊擁堵,首先它是一個隊列,而一個阻塞隊列在數據結構中所起的做用大體以下圖所示:程序員

線程1往阻塞隊列中添加元素,而線程2從阻塞隊列中移除元素數組

  • 當阻塞隊列是空時,從隊列中獲取元素的操做將會被阻塞安全

    • 當蛋糕店的櫃子空的時候,沒法從櫃子裏面獲取蛋糕
  • 當阻塞隊列是滿時,從隊列中添加元素的操做將會被阻塞數據結構

    • 當蛋糕店的櫃子滿的時候,沒法繼續向櫃子裏面添加蛋糕了

也就是說 試圖從空的阻塞隊列中獲取元素的線程將會被阻塞,直到其它線程往空的隊列插入新的元素多線程

同理,試圖往已經滿的阻塞隊列中添加新元素的線程,直到其它線程往滿的隊列中移除一個或多個元素,或者徹底清空隊列後,使隊列從新變得空閒起來,並後續新增架構

爲何要用?

去海底撈吃飯,大廳滿了,須要進候廳等待,可是這些等待的客戶可以對商家帶來利潤,所以咱們很是歡迎他們阻塞測試

在多線程領域:所謂的阻塞,在某些清空下會掛起線程(即阻塞),一旦條件知足,被掛起的線程又會自動喚醒this

爲何須要BlockingQueue

好處是咱們不須要關心何時須要阻塞線程,何時須要喚醒線程,由於這一切BlockingQueue都幫你一手包辦了atom

在concurrent包發佈之前,在多線程環境下,咱們每一個程序員都必須本身取控制這些細節,尤爲還要兼顧效率和線程安全,而這會給咱們的程序帶來不小的複雜度。

架構

// 你用過List集合類

// ArrayList集合類熟悉麼?

// 還用過 CopyOnWriteList  和 BlockingQueue

BlockingQueue阻塞隊列是屬於一個接口,底下有七個實現類

  • ArrayBlockQueue:由數組結構組成的有界阻塞隊列
  • LinkedBlockingQueue:由鏈表結構組成的有界(可是默認大小 Integer.MAX_VALUE)的阻塞隊列
    • 有界,可是界限很是大,至關於無界,能夠當成無界
  • PriorityBlockQueue:支持優先級排序的無界阻塞隊列
  • DelayQueue:使用優先級隊列實現的延遲無界阻塞隊列
  • SynchronousQueue:不存儲元素的阻塞隊列,也即單個元素的隊列
    • 生產一個,消費一個,不存儲元素,不消費不生產
  • LinkedTransferQueue:由鏈表結構組成的無界阻塞隊列
  • LinkedBlockingDeque:由鏈表結構組成的雙向阻塞隊列

這裏須要掌握的是:ArrayBlockQueue、LinkedBlockingQueue、SynchronousQueue

BlockingQueue核心方法

拋出異常 當阻塞隊列滿時:在往隊列中add插入元素會拋出 IIIegalStateException:Queue full 當阻塞隊列空時:再往隊列中remove移除元素,會拋出NoSuchException
特殊性 插入方法,成功true,失敗false 移除方法:成功返回出隊列元素,隊列沒有就返回空
一直阻塞 當阻塞隊列滿時,生產者繼續往隊列裏put元素,隊列會一直阻塞生產線程直到put數據or響應中斷退出, 當阻塞隊列空時,消費者線程試圖從隊列裏take元素,隊列會一直阻塞消費者線程直到隊列可用。
超時退出 當阻塞隊列滿時,隊裏會阻塞生產者線程必定時間,超過限時後生產者線程會退出

拋出異常組

但執行add方法,向已經滿的ArrayBlockingQueue中添加元素時候,會拋出異常

// 阻塞隊列,須要填入默認值
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);

System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));

System.out.println(blockingQueue.add("XXX"));

運行後:

true
true
true
Exception in thread "main" java.lang.IllegalStateException: Queue full
	at java.util.AbstractQueue.add(AbstractQueue.java:98)
	at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
	at com.moxi.interview.study.queue.BlockingQueueDemo.main(BlockingQueueDemo.java:25)

同時若是咱們多取出元素的時候,也會拋出異常,咱們假設只存儲了3個值,可是取的時候,取了四次

// 阻塞隊列,須要填入默認值
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));

System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());

那麼出現異常

true
true
true
a
b
c
Exception in thread "main" java.util.NoSuchElementException
	at java.util.AbstractQueue.remove(AbstractQueue.java:117)
	at com.moxi.interview.study.queue.BlockingQueueDemo.main(BlockingQueueDemo.java:30)

布爾類型組

咱們使用 offer的方法,添加元素時候,若是阻塞隊列滿了後,會返回false,否者返回true

同時在取的時候,若是隊列已空,那麼會返回null

BlockingQueue blockingQueue = new ArrayBlockingQueue(3);

System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("d"));

System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());

運行結果

true
true
true
false
a
b
c
null

阻塞隊列組

咱們使用 put的方法,添加元素時候,若是阻塞隊列滿了後,添加消息的線程,會一直阻塞,直到隊列元素減小,會被清空,纔會喚醒

通常在消息中間件,好比RabbitMQ中會使用到,由於須要保證消息百分百不丟失,所以只有讓它阻塞

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
System.out.println("================");

blockingQueue.take();
blockingQueue.take();
blockingQueue.take();
blockingQueue.take();

同時使用take取消息的時候,若是內容不存在的時候,也會被阻塞

不見不散組

offer( ) , poll 加時間

使用offer插入的時候,須要指定時間,若是2秒尚未插入,那麼就放棄插入

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("b", 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("c", 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("d", 2L, TimeUnit.SECONDS));

同時取的時候也進行判斷

System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));

若是2秒內取不出來,那麼就返回null

SynchronousQueue

SynchronousQueue沒有容量,與其餘BlockingQueue不一樣,SynchronousQueue是一個不存儲的BlockingQueue,每個put操做必須等待一個take操做,否者不能繼續添加元素

下面咱們測試SynchronousQueue添加元素的過程

首先咱們建立了兩個線程,一個線程用於生產,一個線程用於消費

生產的線程分別put了 A、B、C這三個字段

BlockingQueue<String> blockingQueue = new SynchronousQueue<>();

new Thread(() -> {
    try {       
        System.out.println(Thread.currentThread().getName() + "\t put A ");
        blockingQueue.put("A");
       
        System.out.println(Thread.currentThread().getName() + "\t put B ");
        blockingQueue.put("B");        
        
        System.out.println(Thread.currentThread().getName() + "\t put C ");
        blockingQueue.put("C");        
        
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}, "t1").start();

消費線程使用take,消費阻塞隊列中的內容,而且每次消費前,都等待5秒

new Thread(() -> {
            try {

                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.take();
                System.out.println(Thread.currentThread().getName() + "\t take A ");

                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.take();
                System.out.println(Thread.currentThread().getName() + "\t take B ");

                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.take();
                System.out.println(Thread.currentThread().getName() + "\t take C ");

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t2").start();

最後結果輸出爲:

t1	 put A 
t2	 take A 

5秒後...

t1	 put B 
t2	 take B 

5秒後...

t1	 put C 
t2	 take C

咱們從最後的運行結果能夠看出,每次t1線程向隊列中添加阻塞隊列添加元素後,t1輸入線程就會等待 t2消費線程,t2消費後,t2處於掛起狀態,等待t1在存入,從而周而復始,造成 一存一取的狀態

阻塞隊列的用處

生產者消費者模式

一個初始值爲0的變量,兩個線程對其交替操做,一個加1,一個減1,來5輪

關於多線程的操做,咱們須要記住下面幾句

  • 線程 操做 資源類
  • 判斷 幹活 通知
  • 防止虛假喚醒機制

咱們下面實現一個簡單的生產者消費者模式,首先有資源類ShareData

/**
 * 資源類
 */
class ShareData {

    private int number = 0;

    private Lock lock = new ReentrantLock();

    private Condition condition = lock.newCondition();

    public void increment() throws Exception{
        // 同步代碼塊,加鎖
        lock.lock();
        try {
            // 判斷
            while(number != 0) {
                // 等待不能生產
                condition.await();
            }

            // 幹活
            number++;

            System.out.println(Thread.currentThread().getName() + "\t " + number);

            // 通知 喚醒
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void decrement() throws Exception{
        // 同步代碼塊,加鎖
        lock.lock();
        try {
            // 判斷
            while(number == 0) {
                // 等待不能消費
                condition.await();
            }

            // 幹活
            number--;

            System.out.println(Thread.currentThread().getName() + "\t " + number);

            // 通知 喚醒
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

}

裏面有一個number變量,同時提供了increment 和 decrement的方法,分別讓number 加1和減1

可是咱們在進行判斷的時候,爲了防止出現虛假喚醒機制,不能使用if來進行判斷,而應該使用while

// 判斷
while(number != 0) {
    // 等待不能生產
    condition.await();
}

不能使用 if判斷

// 判斷
if(number != 0) {
    // 等待不能生產
    condition.await();
}

完整代碼

/**
 * 生產者消費者 傳統版
 * 題目:一個初始值爲0的變量,兩個線程對其交替操做,一個加1,一個減1,來5輪
 */
/**
 * 線程 操做 資源類
 * 判斷 幹活 通知
 * 防止虛假喚醒機制
 */

/**
 * 資源類
 */
class ShareData {

    private int number = 0;

    private Lock lock = new ReentrantLock();

    private Condition condition = lock.newCondition();

    public void increment() throws Exception{
        // 同步代碼塊,加鎖
        lock.lock();
        try {
            // 判斷
            while(number != 0) {
                // 等待不能生產
                condition.await();
            }

            // 幹活
            number++;

            System.out.println(Thread.currentThread().getName() + "\t " + number);

            // 通知 喚醒
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void decrement() throws Exception{
        // 同步代碼塊,加鎖
        lock.lock();
        try {
            // 判斷
            while(number == 0) {
                // 等待不能消費
                condition.await();
            }

            // 幹活
            number--;

            System.out.println(Thread.currentThread().getName() + "\t " + number);

            // 通知 喚醒
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

}
public class ProdConsumerTraditionDemo {

    public static void main(String[] args) {

        // 高內聚,低耦合    內聚指的是,一個空調,自身帶有調節溫度高低的方法
        ShareData shareData = new ShareData();

        // t1線程,生產
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.increment();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "t1").start();

        // t2線程,消費
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.decrement();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "t2").start();
    }
}

最後運行成功後,咱們一個進行生產,一個進行消費

t1	 1
t2	 0
t1	 1
t2	 0
t1	 1
t2	 0
t1	 1
t2	 0
t1	 1
t2	 0

生成者和消費者3.0

在concurrent包發佈之前,在多線程環境下,咱們每一個程序員都必須本身去控制這些細節,尤爲還要兼顧效率和線程安全,則這會給咱們的程序帶來不小的時間複雜度

如今咱們使用新版的阻塞隊列版生產者和消費者,使用:volatile、CAS、atomicInteger、BlockQueue、線程交互、原子引用

/**
 * 生產者消費者  阻塞隊列版
 * 使用:volatile、CAS、atomicInteger、BlockQueue、線程交互、原子引用
 *
 */

class MyResource {
    // 默認開啓,進行生產消費
    // 這裏用到了volatile是爲了保持數據的可見性,也就是當TLAG修改時,要立刻通知其它線程進行修改
    private volatile boolean FLAG = true;

    // 使用原子包裝類,而不用number++
    private AtomicInteger atomicInteger = new AtomicInteger();

    // 這裏不能爲了知足條件,而實例化一個具體的SynchronousBlockingQueue
    BlockingQueue<String> blockingQueue = null;

    // 而應該採用依賴注入裏面的,構造注入方法傳入
    public MyResource(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
        // 查詢出傳入的class是什麼
        System.out.println(blockingQueue.getClass().getName());
    }

    /**
     * 生產
     * @throws Exception
     */
    public void myProd() throws Exception{
        String data = null;
        boolean retValue;
        // 多線程環境的判斷,必定要使用while進行,防止出現虛假喚醒
        // 當FLAG爲true的時候,開始生產
        while(FLAG) {
            data = atomicInteger.incrementAndGet() + "";

            // 2秒存入1個data
            retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
            if(retValue) {
                System.out.println(Thread.currentThread().getName() + "\t 插入隊列:" + data  + "成功" );
            } else {
                System.out.println(Thread.currentThread().getName() + "\t 插入隊列:" + data  + "失敗" );
            }

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println(Thread.currentThread().getName() + "\t 中止生產,表示FLAG=false,生產介紹");
    }

    /**
     * 消費
     * @throws Exception
     */
    public void myConsumer() throws Exception{
        String retValue;
        // 多線程環境的判斷,必定要使用while進行,防止出現虛假喚醒
        // 當FLAG爲true的時候,開始生產
        while(FLAG) {
            // 2秒存入1個data
            retValue = blockingQueue.poll(2L, TimeUnit.SECONDS);
            if(retValue != null && retValue != "") {
                System.out.println(Thread.currentThread().getName() + "\t 消費隊列:" + retValue  + "成功" );
            } else {
                FLAG = false;
                System.out.println(Thread.currentThread().getName() + "\t 消費失敗,隊列中已爲空,退出" );

                // 退出消費隊列
                return;
            }
        }
    }

    /**
     * 中止生產的判斷
     */
    public void stop() {
        this.FLAG = false;
    }

}
public class ProdConsumerBlockingQueueDemo {

    public static void main(String[] args) {
        // 傳入具體的實現類, ArrayBlockingQueue
        MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(10));

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "\t 生產線程啓動");
            System.out.println("");
            System.out.println("");
            try {
                myResource.myProd();
                System.out.println("");
                System.out.println("");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "prod").start();


        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "\t 消費線程啓動");

            try {
                myResource.myConsumer();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "consumer").start();

        // 5秒後,中止生產和消費
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("");
        System.out.println("");
        System.out.println("5秒中後,生產和消費線程中止,線程結束");
        myResource.stop();
    }
}

最後運行結果

java.util.concurrent.ArrayBlockingQueue
prod	 生產線程啓動


consumer	 消費線程啓動
prod	 插入隊列:1成功
consumer	 消費隊列:1成功
prod	 插入隊列:2成功
consumer	 消費隊列:2成功
prod	 插入隊列:3成功
consumer	 消費隊列:3成功
prod	 插入隊列:4成功
consumer	 消費隊列:4成功
prod	 插入隊列:5成功
consumer	 消費隊列:5成功


5秒中後,生產和消費線程中止,線程結束
prod	 中止生產,表示FLAG=false,生產介紹
相關文章
相關標籤/搜索