淺談Java隊列Queue

隊列Queue就是一個先進先出的數據結構,與List、Set同一級別,繼承了Collection接口。java

1、Queue的實現數組

一、阻塞隊列(BlockingQueue)緩存

① 插入:隊列不滿時可執行插入元素線程,直到隊列滿。安全

② 移除:隊列不爲空均可移除,直到隊列爲空。數據結構

拋出異常:滿的時候插入,空的時候取出都會拋異常。多線程

返回特殊值:插入成功返回true併發

一直阻塞:滿時put和空時take會阻塞線程,直到隊列可用。app

超時退出:滿時會阻塞插入一段時間,若是超過必定時間,線程就會退出。ide

JDK7提供了7個阻塞隊列:高併發

① ArrayListBlockingQueue

ArrayBlockingQueue是一個由數組支持的有界阻塞隊列。在讀寫操做上都須要鎖住整個容器,所以吞吐量與通常的實現是類似的,適合於實現「生產者消費者」模式。 
② LinkedBlockingQueue

一個由鏈表結構組成的雙向阻塞隊列。

基於鏈表的阻塞隊列,內部維持這一關數據緩衝隊列。當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,並緩存在隊列內部,而生產者當即返回;當隊列緩衝區達到最大緩衝容量時,纔會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生成者線程纔會被喚醒,反之對消費者的處理也基於一樣的原理。

LinkedBlockingQueue之因此可以高效的處理併發數據,還由於其對於生產者和消費者分別採用了獨立的鎖來控制數據同步,也就意味着在高併發的狀況下生產者和消費者能夠並行的操做隊列中的數據,以此來提升整個隊列的併發性能。

③ SynchronousQueue

一個不存儲元素的阻塞隊列,在某次添加元素後必須等待其餘線程取走後才能繼續添加。

④ PriorityBlockingQueue 

是一個帶優先級的隊列,而不是先進先出隊列,該隊列也沒有上限,可是若是隊列爲空,那麼取元素的操做take就會阻塞。

⑤ DelayQueue

是一個存放Delayed 元素的無界阻塞隊列,只有在延遲期滿時才能從中提取元素。

ArrayListBlockingQueue和LinkedBlockingQueue的區別?

① 隊列中鎖的實現不一樣

ArrayBlockingQueue生產者消費者使用同一個鎖。

LinkedBlockingQueue生產用的是putLock,消費是takeLock。

② 在生產和消費時操做不一樣

ArrayBlockingQueue實現的隊列中在生產和消費的時候,是直接將枚舉對象插入或移除的;

LinkedBlockingQueue實現的隊列中在生產和消費的時候,須要把枚舉對象轉換爲節點進行插入或移除,會影響性能。

③ 隊列大小初始化方式不一樣

ArrayBlockingQueue實現的隊列必須指定大小

LinkedBlockingQueue能夠不指定大小,默認是Integer.MAX_VALUE

ArrayBlockingQueue性能要比LinkedBlockingQueue性能要好,執行速度更快,ArrayBlockingQueue優先使用!

二、非阻塞隊列

ConcurrentLinkedQueue是一個基於連接節點的無界線程安全隊列,它採用先進先出的規則對節點進行排序,當咱們添加一個元素的時候,它會添加到隊列的尾部;當咱們獲取一個元素時,它會返回隊列頭部的元素。

入隊和出隊操做均利用CAS(compare and set)更新,這樣容許多個線程併發執行,而且不會由於加鎖而阻塞線程,使得併發性能更好。

注:CAS用於實現多線程同步的原子指令,它將內存位置的內容與給定值進行比較,只有在相同的狀況下,將該內存位置的內容修改成新的給定值。 

2、代碼實例

package OSChina.Client;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BlockingQueueTest {
    static final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
    //定義裝蘋果的籃子
    public static class Basket{
        // 籃子,可以容納3個蘋果
       static BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);
        //生產蘋果,放入籃子
        public void produce() throws InterruptedException{
            queue.put("An apple");
        }
        // 消費蘋果,從籃子中取走
        public String consume() throws InterruptedException{
            String apple = queue.take();
            return apple;
        }
        public static int getAppleNumber(){
            return queue.size();
        }
    }

    public static void testBasket(){
        // 創建一個裝蘋果的籃子
        final Basket basket = new Basket();
        //定義蘋果生產者
        class Producer implements Runnable{
            @Override
            public void run() {
                try {
                    while (true){
                        System.out.println("生產者準備生產蘋果:"+sdf.format(new Date())+",籃子中蘋果數量:"+Basket.getAppleNumber());
                        basket.produce();
                        System.out.println("生產者生產蘋果完畢:" + sdf.format(new Date())+",籃子中蘋果數量:"+Basket.getAppleNumber());
                        Thread.sleep(300);
                    }
                }catch (InterruptedException  ex){
                }
            }
        }
        //定義蘋果消費者
        class Consumer implements Runnable{
            @Override
            public void run() {
                try {
                    while (true){
                        System.out.println("消費者準備消費蘋果:" + sdf.format(new Date())+",籃子中蘋果數量:"+Basket.getAppleNumber());
                        basket.consume();
                        System.out.println("消費完後有蘋果:" + sdf.format(new Date())+",籃子中蘋果數量:"+Basket.getAppleNumber());
                        Thread.sleep(1000);
                    }
                }catch (InterruptedException ex){
                }
            }
        }

        ExecutorService service = Executors.newFixedThreadPool(2);
        Producer producer = new Producer();
        Consumer consumer = new Consumer();
        service.submit(producer);
        service.submit(consumer);
        // 程序運行10s後,全部任務中止
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        service.shutdownNow();
    }

    public static void main(String[] args) {
        BlockingQueueTest.testBasket();
    }
}

 

江疏影講Java@目錄

相關文章
相關標籤/搜索