Java多線程進階(三一)—— J.U.C之collections框架:BlockingQueue接口

圖片描述

本文首發於一世流雲專欄: https://segmentfault.com/blog...

1、引言

從本節開始,咱們將介紹juc-collections框架中的「阻塞隊列」部分。阻塞隊列在實際應用中很是普遍,許多消息中間件中定義的隊列,一般就是一種「阻塞隊列」。算法

那麼「阻塞隊列」和咱們以前討論過的ConcurrentLinkedQueue、ConcurrentLinkedDeque有什麼不一樣呢?

ConcurrentLinkedQueueConcurrentLinkedDeque是以非阻塞算法實現的高性能隊列,其使用場景通常是在併發環境下,須要「隊列」/「棧」這類數據結構時纔會使用;而「阻塞隊列」一般利用了「鎖」來實現,也就是會阻塞調用線程,其使用場景通常是在「生產者-消費者」模式中,用於線程之間的數據交換或系統解耦。segmentfault

Java多線程基礎(七)——Producer-Consumer模式中,咱們曾簡要的談到過「生產者-消費者」這種模式。在這種模式中,「生產者」和「消費者」是相互獨立的,二者之間的通訊須要依靠一個隊列。這個隊列,其實就是本文中的「阻塞隊列」。安全

引入「阻塞隊列」的最大好處就是解耦,在軟件工程中,「高內聚,低耦合」是進行模塊設計的準則之一,這樣「生產者」和「消費者」實際上是互不影響的,未來任意一方須要升級時,能夠保證系統的平滑過渡。數據結構

clipboard.png

2、BlockingQueue簡介

BlockingQueue是在JDK1.5時,隨着J.U.C引入的一個接口:多線程

clipboard.png

BlockingQueue繼承了Queue接口,提供了一些阻塞方法,主要做用以下:併發

  1. 當線程向隊列中插入元素時,若是隊列已滿,則阻塞線程,直到隊列有空閒位置(非滿);
  2. 當線程從隊列中取元素(刪除隊列元素)時,若是隊列未空,則阻塞線程,直到隊列有元素;

既然BlockingQueue是一種隊列,因此也具有隊列的三種基本方法:插入刪除讀取框架

操做類型 拋出異常 返回特殊值 阻塞線程 超時
插入 add(e) offer(e) put(e) offer(e, time, unit)
刪除 remove() poll() take() poll(time, unit)
讀取 element() peek() / /

能夠看到,對於每種基本方法,「拋出異常」和「返回特殊值」的方法定義和Queue是徹底同樣的。BlockingQueue只是增長了兩類和阻塞相關的方法:put(e)take()offer(e, time, unit)poll(time, unit)dom

put(e)take()方法會一直阻塞調用線程,直到線程被中斷或隊列狀態可用;
offer(e, time, unit)poll(time, unit)方法會限時阻塞調用線程,直到超時或線程被中斷或隊列狀態可用。ide

public interface BlockingQueue<E> extends Queue<E> {
    /**
     * 插入元素e至隊尾, 若是隊列已滿, 則阻塞調用線程直到隊列有空閒空間.
     */
    void put(E e) throws InterruptedException;

    /**
     * 插入元素e至隊列, 若是隊列已滿, 則限時阻塞調用線程,直到隊列有空閒空間或超時.
     */
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 從隊首刪除元素,若是隊列爲空, 則阻塞調用線程直到隊列中有元素.
     */
    E take() throws InterruptedException;

    /**
     * 從隊首刪除元素,若是隊列爲空, 則限時阻塞調用線程,直到隊列中有元素或超時.
     */
    E poll(long timeout, TimeUnit unit) throws InterruptedException;

    // ...
}

除此以外,BlockingQueue還具備如下特色:性能

  • BlockingQueue隊列中不能包含null元素;
  • BlockingQueue接口的實現類都必須是線程安全的,實現類通常經過「鎖」保證線程安全;
  • BlockingQueue 能夠是限定容量的。remainingCapacity()方法用於返回剩餘可用容量,對於沒有容量限制的BlockingQueue實現,該方法老是返回Integer.MAX_VALUE 。

3、再談「生產者-消費者」模式

最後,咱們來看下如何利用BlockingQueue來實現生產者-消費者模式。在生產者-消費者模式中,一共有四類角色:生產者消費者消息隊列消息體。咱們利用BlockingQueue來實現消息隊列,其他部分沒有什麼變化。

Producer(生產者)

生產者生產消息體(Data),並將消息體(Data)傳遞給通道(Channel)。

/**
 * 生產者
 */
public class Producer implements Runnable {

    private Channel channel;

    public Producer(Channel channel) {
        this.channel = channel;
    }

    @Override
    public void run() {

        while (true) {
            String v = String.valueOf(ThreadLocalRandom.current().nextInt());
            Data data = new Data(v);
            try {
                channel.put(data);
                System.out.println(Thread.currentThread().getName() + " produce :" + data);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            Thread.yield();
        }
    }
}

Consumer(消費者)

消費者從通道(Channel)中獲取數據,進行處理。

/**
 * 消費者
 */
public class Consumer implements Runnable {
    private final Channel channel;

    public Consumer(Channel channel) {
        this.channel = channel;
    }

    @Override
    public void run() {
        while (true) {

            try {
                Object obj = channel.take();
                System.out.println(Thread.currentThread().getName() + " consume :" + obj.toString());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            Thread.yield();
        }
    }
}

Channel(通道)

至關於消息的隊列,對消息進行排隊,控制消息的傳輸。

/**
 * 通道類
 */
public class Channel {

    private final BlockingQueue blockingQueue;


    public Channel(BlockingQueue blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    public Object take() throws InterruptedException {
        return blockingQueue.take();
    }

    public void put(Object o) throws InterruptedException {
        blockingQueue.put(o);
    }

}

Data(消息體/數據)

Data表明了實際生產或消費的數據。

/**
 * 數據/消息
 */
public class Data<T> implements Serializable {
    private T data;

    public Data(T data) {
        this.data = data;
    }

    public T getData() {
        return data;
    }

    public void setData(T data) {
        this.data = data;
    }

    @Override
    public String toString() {
        return "Data{" +
            "data=" + data +
            '}';
    }
}

調用以下:

public class Main {
    public static void main(String[] args) {
        BlockingQueue blockingQueue = new SomeQueueImplementation();
        Channel channel = new Channel(blockingQueue);

        Producer p = new Producer(channel);
        Consumer c1 = new Consumer(channel);
        Consumer c2 = new Consumer(channel);

        new Thread(p).start();
        new Thread(c1).start();
        new Thread(c2).start();

    }
}
相關文章
相關標籤/搜索