java併發:阻塞隊列

第一節 阻塞隊列html

1.1 初識阻塞隊列java

  隊列以一種先進先出的方式管理數據,阻塞隊列(BlockingQueue)是一個支持兩個附加操做的隊列,這兩個附加的操做是:在隊列爲空時,獲取元素的線程會等待隊列變爲非空;當隊列滿時,存儲元素的線程會等待隊列可用。在多線程進行合做時,阻塞隊列是頗有用的工具。node

  生產者-消費者模式:阻塞隊列經常使用於生產者和消費者的場景,生產者線程能夠按期的把中間結果存到阻塞隊列中,而消費者線程把中間結果取出並在未來修改它們。隊列會自動平衡負載,若是生產者線程集運行的比消費者線程集慢,則消費者線程集在等待結果時就會阻塞;若是生產者線程集運行的快,那麼它將等待消費者線程集遇上來。算法

    

  簡單解說一下如何理解上表,好比說阻塞隊列的插入方法,add(e)、offer(e)、put(e)等均爲阻塞隊列的插入方法,但它們的處理方式不同,add(e)方法可能會拋出異常,而put(e)方法則可能一直處於阻塞狀態,下面解說一下這些處理方式:數組

  A、拋出異常:所謂拋出異常是指當阻塞隊列滿時,再往隊列裏插入元素,會拋出IllegalStateException("Queue full")異常;當隊列爲空時,從隊列裏獲取元素時會拋出NoSuchElementException異常 。緩存

  B、返回特殊值:插入方法,該方法會返回是否成功,成功則返回true;移除方法,該方法是從隊列裏拿出一個元素,若是沒有則返回null數據結構

  C、一直阻塞:當阻塞隊列滿時,若是生產者線程往隊列裏put元素,隊列會一直阻塞生產者線程,直到將數據放入隊列或是響應中斷退出;當隊列爲空時,消費者線程試圖從隊列裏take元素,隊列也會一直阻塞消費者線程,直到隊列可用。多線程

  D、超時退出:當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,若是超過必定的時間,生產者線程就會退出。併發

 

1.2 Java中的阻塞隊列dom

  java.util.concurrent包提供了幾種不一樣形式的阻塞隊列,如數組阻塞隊列ArrayBlockingQueue、鏈表阻塞隊列LinkedBlockingQueue、優先級阻塞隊列PriorityBlockingQueue和延時隊列DelayQueue等,下面簡單介紹一下這幾個阻塞隊列:

  數組阻塞隊列:ArrayBlockingQueue是一個由數組支持的有界阻塞隊列,內部維持着一個定長的數據緩衝隊列(該隊列由數組構成),此隊列按照先進先出(FIFO)的原則對元素進行排序,在構造時須要給定容量。ArrayBlockingQueue內部還保存着兩個整形變量,分別標識着隊列的頭部和尾部在數組中的位置。

  對於數組阻塞隊列,能夠選擇是否須要公平性,所謂公平訪問隊列是指阻塞的全部生產者線程或消費者線程,當隊列可用時,能夠按照阻塞的前後順序訪問隊列,即先阻塞的生產者線程,能夠先往隊列裏插入元素,先阻塞的消費者線程,能夠先從隊列裏獲取元素。一般,公平性會使你在性能上付出代價,只有在的確很是須要的時候再使用它。

  咱們可使用如下代碼建立一個公平的阻塞隊列:

ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);

  數組阻塞隊列的公平性是使用可重入鎖實現的,其構造函數代碼以下:

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
      throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
}

  鏈表阻塞隊列:LinkedBlockingQueue基於鏈表的有界阻塞隊列,內部維持着一個數據緩衝隊列(該隊列由鏈表構成),此隊列按照先進先出的原則對元素進行排序。當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,並緩存在隊列內部,而生產者當即返回;只有當隊列緩衝區達到最大值緩存容量時(能夠經過LinkedBlockingQueue的構造函數指定該值),纔會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程將會被喚醒,反之對於消費者這端的處理也基於一樣的原理。須要注意的是,若是構造一個LinkedBlockingQueue對象,而沒有指定其容量大小,LinkedBlockingQueue會默認一個相似無限大小(Integer.Max_VALUE)的容量,這樣的話,若是生產者的速度一旦大於消費者的速度,也許尚未等到隊列滿阻塞產生,系統內存就有可能已經被消耗殆盡了。

  LinkedBlockingQueue之因此可以高效的處理併發數據,是由於其對於生產者端和消費者端分別採用了獨立的鎖來控制數據同步,這也意味着在高併發的狀況下生產者和消費者能夠並行地操做隊列中的數據,以此來提升整個隊列的併發性能。

  優先級阻塞隊列:PriorityBlockingQueue是一個支持優先級排序的無界阻塞隊列,默認狀況下元素採起天然順序排列,也能夠經過構造函數傳入的Compator對象來決定。在實現PriorityBlockingQueue時,內部控制線程同步的鎖採用的是公平鎖。須要注意的是PriorityBlockingQueue並不會阻塞數據生產者,而只是在沒有可消費的數據時阻塞數據的消費者,所以使用的時候要特別注意,生產者生產數據的速度絕對不能快於消費者消費數據的速度,不然時間一長,會最終耗盡全部的可用堆內存空間。

  延時隊列:DelayQueue是一個支持延時獲取元素的使用優先級隊列實現的無界阻塞隊列。隊列中的元素必須實現Delayed接口和Comparable接口(用以指定元素的順序),也就是說DelayQueue裏面的元素必須有public void compareTo(To)和long getDelay(TimeUnit unit)方法存在;在建立元素時能夠指定多久才能從隊列中獲取當前元素,只有在延遲期滿時才能從隊列中提取元素。

  鏈表雙向阻塞隊列:LinkedBlockingDeque是由一個鏈表結構組成的雙向阻塞隊列。所謂雙向隊列指的是你能夠從隊列的兩端插入和移出元素,雙端隊列因多了一個操做入口,在多線程同時入隊時減小了一半的競爭。在初始化LinkedBlockingDeque時,能夠設置容量,防止其過渡膨脹,相比其餘的阻塞隊列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法,以First單詞結尾的方法,表示插入,獲取(peek)或移除雙端隊列的第一個元素;以Last單詞結尾的方法,表示插入,獲取或移除雙端隊列的最後一個元素;插入方法add等同於addLast,移除方法remove等同於removeFirst。雙向阻塞隊列能夠運用在「工做竊取」模式中。

  鏈表傳輸隊列:LinkedTransferQueue是一個由鏈表結構組成的無界傳輸阻塞隊列,相對於其餘阻塞隊列,LinkedTransferQueue多了tryTransfer()方法和transfer()方法。

  transfer()方法:若是當前有消費者正在等待接收元素(消費者使用take()方法或帶時間限制的poll()方法),transfer()方法能夠把生產者傳入的元素馬上傳輸給消費者;若是沒有消費者在等待接收元素,transfer()方法會將元素存放到隊列的tail節點,並等到該元素被消費者消費了才返回。

  transfer()方法的關鍵代碼以下:

Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);

  代碼解說:第一行代碼是試圖把存放當前元素的s節點做爲tail節點,第二行代碼是讓CPU自旋等待消費者消費元素。由於自旋會消耗CPU,因此自旋必定的次數後使用Thread.yield()方法來暫停當前正在執行的線程,並執行其餘線程。

  tryTransfer()方法:該方法是用來試探生產者傳入的元素是否能直接傳給消費者,若是沒有消費者等待接收元素,則返回false。與transfer()方法的區別:tryTransfer()方法是當即返回(不管消費者是否接收),transfer()方法是必須等到消費者消費了才返回。對於帶有時間限制的tryTransfer(E e, long timeout, TimeUnit unit)方法,則是試圖把生產者傳入的元素直接傳給消費者,可是若是沒有消費者消費該元素則等待指定的時間以後再返回,若是超時還沒消費元素,則返回false,若是在超時時間內消費了元素,則返回true。

  欲瞭解LinkedTransferQueue的更多內容,可查看如下文章:http://ifeve.com/java-transfer-queue/、http://www.tuicool.com/articles/ZFriEz、http://guojuanjun.blog.51cto.com/277646/948298/,本文不細述。

  SynchronousQueue:SynchronousQueue是一種無界、無緩衝的阻塞隊列,能夠認爲SynchronousQueue是一個緩存值爲1的阻塞隊列,可是SynchronousQueue內部並無數據緩存空間,數據是在配對的生產者和消費者線程之間直接傳遞的。能夠這樣來理解:SynchronousQueue是一個傳球手,SynchronousQueue不存儲數據元素,隊列頭元素是第一個排隊要插入數據的線程,而不是要交換的數據,SynchronousQueue負責把生產者線程處理的數據直接傳遞給消費者線程,生產者和消費者互相等待對方,握手,而後一塊兒離開。SynchronousQueue的吞吐量高於LinkedBlockingQueue 和 ArrayBlockingQueue。

 

1.4 詳解SynchronousQueue

【本小節主要摘自參考資料,做爲學習筆記O(∩_∩)O】

(1)認識SynchronousQueue 

  SynchronousQueue的isEmpty()方法永遠返回true,remainingCapacity()方法永遠返回0,remove()和removeAll() 方法永遠返回false,iterator()方法永遠返回null,peek()方法永遠返回null,故咱們不能經過調用peek()方法來看隊列中是否有數據元素,由於數據元素只有當你試着取走的時候纔可能存在,不取走而只想偷窺一下是不行的,一樣遍歷這個隊列的操做也是不容許的。

  SynchronousQueue的一個使用場景是在線程池裏,Executors.newCachedThreadPool()就使用了SynchronousQueue,這個線程池根據須要建立新的線程(新任務到來時),固然若是有空閒線程的話,則會複用這些線程。

(2)SynchronousQueue實現機制

  阻塞算法的實現一般是在內部採用一個鎖來保證在多個線程中的put()和take()方法是串行執行的,以下代碼是通常put()和take()方法的實現:

public class NativeSynchronousQueue<E> {
    boolean putting = false;
    E item = null;

    public synchronized E take() throws InterruptedException {
        while (item == null)
            wait();
        E e = item;
        item = null;
        notifyAll();
        return e;
    }

    public synchronized void put(E e) throws InterruptedException {
        if (e==null) return;
        while (putting)
            wait();
        putting = true;
        item = e;
        notifyAll();
        while (item!=null)
            wait();
        putting = false;
        notifyAll();
    }
}

  經典同步隊列的實現採用了三個信號量,代碼以下:

public class SemaphoreSynchronousQueue<E> {
    E item = null;
    Semaphore sync = new Semaphore(0);
    Semaphore send = new Semaphore(1);
    Semaphore recv = new Semaphore(0);

    public E take() throws InterruptedException {
        recv.acquire();
        E x = item;
        sync.release();
        send.release();
        return x;
    }

    public void put (E x) throws InterruptedException{
        send.acquire();
        item = x;
        recv.release();
        sync.acquire();
    }
}

  Java5中SynchronousQueue的實現相對來講作了一些優化,它只使用了一個鎖,使用隊列代替信號量,容許發佈者直接發佈數據,而不是要首先從阻塞在信號量處被喚醒,代碼以下:

public class Java5SynchronousQueue<E> {
    ReentrantLock qlock = new ReentrantLock();
    Queue waitingProducers = new Queue();
    Queue waitingConsumers = new Queue();

    static class Node extends AbstractQueuedSynchronizer {
        E item;
        Node next;

        Node(Object x) { item = x; }
        void waitForTake() { /* (uses AQS) */ }
           E waitForPut() { /* (uses AQS) */ }
    }

    public E take() {
        Node node;
        boolean mustWait;
        qlock.lock();
        node = waitingProducers.pop();
        if(mustWait = (node == null))
           node = waitingConsumers.push(null);
         qlock.unlock();

        if (mustWait)
           return node.waitForPut();
        else
            return node.item;
    }

    public void put(E e) {
         Node node;
         boolean mustWait;
         qlock.lock();
         node = waitingConsumers.pop();
         if (mustWait = (node == null))
             node = waitingProducers.push(e);
         qlock.unlock();

         if (mustWait)
             node.waitForTake();
         else
            node.item = e;
    }
}

  Java6中SynchronousQueue的實現採用了一種性能更好的無鎖算法——擴展的「Dual stack and Dual queue」算法,性能比Java5的實現有較大提高。

  聲明一個SynchronousQueue有兩種不一樣的方式,支持公平和非公平兩種競爭機制,它們之間有着不太同樣的行爲,公平模式和非公平模式的區別:若是採用公平模式,SynchronousQueue會採用公平鎖,並配合一個FIFO隊列來阻塞多餘的生產者和消費者;若是是非公平模式(SynchronousQueue默認),SynchronousQueue會採用非公平鎖,同時配合一個LIFO隊列來管理多餘的生產者和消費者。二者性能至關,通常狀況下,Fifo一般能夠支持更大的吞吐量,但Lifo能夠更大程度的保持線程的本地化,須要注意的是,若採用非公平模式,若是生產者和消費者的處理速度有差距,則很容易出現飢渴的狀況(可能某些生產者或者消費者的數據永遠都得不處處理)。

(3)參考資料

(1)http://ifeve.com/java-synchronousqueue/

(2)http://blog.itpub.net/29644969/viewspace-1169051/

 

 

第二節 使用示例

2.1 生產者-消費者示例

一個生產者-N個消費者,程序功能:在一個目錄及它的全部子目錄下搜索全部文件,打印出包含指定關鍵字的文件列表。

package com.test;

import java.io.*;
import java.util.*;
import java.util.concurrent.*;

public class BlockingQueueTest {
    public static void main(String[] args) {
        Scanner in = new Scanner(System.in);
        System.out.print("Enter base directory (e.g. /usr/local/jdk1.6.0/src): ");
        String directory = in.nextLine();
        System.out.print("Enter keyword (e.g. volatile): ");
        String keyword = in.nextLine();
        final int FILE_QUEUE_SIZE = 10;
        final int SEARCH_THREADS = 100;
        BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);
        FileEnumerationTask enumerator = new FileEnumerationTask(queue,new File(directory));
        new Thread(enumerator).start();
        for (int i = 1; i <= SEARCH_THREADS; i++)
            new Thread(new SearchTask(queue, keyword)).start();
    }
}

/**
 * This task enumerates all files in a directory and its subdirectories.
 */
class FileEnumerationTask implements Runnable {
    /**
     * Constructs a FileEnumerationTask.
     * 
     * @param queue
     *            the blocking queue to which the enumerated files are added
     * @param startingDirectory
     *            the directory in which to start the enumeration
     */
    public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory) {
        this.queue = queue;
        this.startingDirectory = startingDirectory;
    }

    public void run() {
        try {
            enumerate(startingDirectory);
            queue.put(DUMMY);
        } catch (InterruptedException e) {
        }
    }

    /**
     * Recursively enumerates all files in a given directory and its
     * subdirectories
     * 
     * @param directory
     *            the directory in which to start
     */
    public void enumerate(File directory) throws InterruptedException {
        File[] files = directory.listFiles();
        for (File file : files) {
            if (file.isDirectory())
                enumerate(file);
            else
                queue.put(file);
        }
    }

    public static File DUMMY = new File("");
    private BlockingQueue<File> queue;
    private File startingDirectory;
}

/**
 * This task searches files for a given keyword.
 */
class SearchTask implements Runnable {
    /**
     * Constructs a SearchTask.
     * 
     * @param queue
     *            the queue from which to take files
     * @param keyword
     *            the keyword to look for
     */
    public SearchTask(BlockingQueue<File> queue, String keyword) {
        this.queue = queue;
        this.keyword = keyword;
    }

    public void run() {
        try {
            boolean done = false;
            while (!done) {
                File file = queue.take();
                if (file == FileEnumerationTask.DUMMY) {
                    queue.put(file);
                    done = true;
                } else
                    search(file);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
        }
    }

    /**
     * Searches a file for a given keyword and prints all matching lines.
     * 
     * @param file
     *            the file to search
     */
    public void search(File file) throws IOException {
        Scanner in = new Scanner(new FileInputStream(file));
        int lineNumber = 0;
        while (in.hasNextLine()) {
            lineNumber++;
            String line = in.nextLine().trim();
            if (line.contains(keyword))
                System.out.printf("%s:%d    %s%n", file.getPath(), lineNumber,
                        line);
        }
        in.close();
    }

    private BlockingQueue<File> queue;
    private String keyword;
}

解說:上述程序展現瞭如何使用阻塞隊列來控制線程集,生產者線程枚舉在全部子目錄下的全部文件並把它們放到一個阻塞隊列中,同時咱們還啓動了大量的搜索線程,每一個搜索線程從隊列中取出一個文件,打開它,打印出包含關鍵字的全部行,而後取出下一個文件。

  在上述代碼中,咱們使用了一個小技巧來在工做結束後終止線程,爲了發出完成信號,枚舉線程把一個虛擬對象放入隊列,當搜索線程取到這個虛擬對象時,就將其放回並終止(這相似於在行李輸送帶上放一個寫着「最後一個包」的虛擬包)。

注意:在這個程序中,咱們使用的是ArrayBlockingQueue,使用隊列數據結構做爲一種同步機制,這裏不須要人任何顯示的線程同步。

對比分析:

  ArrayBlockingQueue在生產者放入數據和消費者獲取數據,都是共用同一個鎖對象,由此也意味着二者沒法真正並行運行,這點尤爲不一樣於LinkedBlockingQueue;按照實現原理來分析,ArrayBlockingQueue徹底能夠採用分離鎖,從而實現生產者和消費者操做的徹底並行運行。Doug Lea之因此沒這樣去作,也許是由於ArrayBlockingQueue的數據寫入和獲取操做已經足夠輕巧,以致於引入獨立的鎖機制,除了給代碼帶來額外的複雜性外,其在性能上徹底佔不到任何便宜。

  ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不一樣之處在於,前者在插入或刪除元素時不會產生或銷燬任何額外的對象實例,然後者則會生成一個額外的Node對象,這在長時間內須要高效併發地處理大批量數據的系統中,其對於GC的影響仍是存在必定的區別。

 

2.2 DelayQueue使用示例

咱們能夠將延時隊列DelayQueue運用在如下場景中:

  (1)緩存系統的設計:能夠用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。

  (2)定時任務調度:使用DelayQueue保存當天將要執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行任務,好比TimerQueue就是使用DelayQueue實現的。

DelayQueue使用實例以下:

(1)實現一個Student對象做爲DelayQueue的元素,Student必須實現Delayed接口的兩個方法

package com.test;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class Student implements Delayed {//必須實現Delayed接口
    
    private String name;
    private long submitTime;// 交卷時間
    private long workTime;// 考試時間

    public Student(String name, long submitTime) {
        this.name = name;
        this.workTime = submitTime;
        this.submitTime = TimeUnit.NANOSECONDS.convert(submitTime, TimeUnit.MILLISECONDS) + System.nanoTime();
        System.out.println(this.name + " 交卷,用時" + workTime);
    }

    public String getName() {
        return this.name + " 交卷,用時" + workTime;
    }
    
    //必須實現getDelay方法
    public long getDelay(TimeUnit unit) {
        //返回一個延遲時間
        return unit.convert(submitTime - System.nanoTime(), unit.NANOSECONDS);
    }

    //必須實現compareTo方法
    public int compareTo(Delayed o) {
        Student that = (Student) o;
        return submitTime > that.submitTime ? 1 : (submitTime < that.submitTime ? -1 : 0);
    }

}

(2)主線程程序

package com.test;
import java.util.concurrent.DelayQueue;
public class DelayQueueTest {
    public static void main(String[] args) throws Exception {
        
        // 新建一個等待隊列
        final DelayQueue<Student> bq = new DelayQueue<Student>(); for (int i = 0; i < 5; i++) {
            Student student = new Student("學生"+i,Math.round((Math.random()*10+i)));
            bq.put(student); // 將數據存到隊列裏!
        }
        //獲取但不移除此隊列的頭部;若是此隊列爲空,則返回 null。
        System.out.println(bq.peek().getName());
    }
}

上述程序運行結果以下:

學生0 交卷,用時8
學生1 交卷,用時9
學生2 交卷,用時4
學生3 交卷,用時9
學生4 交卷,用時12
學生2 交卷,用時4

 

 

第三節 使用阻塞式隊列處理大數據

 鄙人暫時尚未研究這部份內容,此處僅貼出兩個資源,以供後續學習

(1)http://blog.csdn.net/lifetragedy/article/details/50593588

(2)http://download.csdn.net/detail/lifetragedy/9419773

 

第四節 參考資料

(1)http://www.cnblogs.com/dolphin0520/p/3932906.html

相關文章
相關標籤/搜索