SynchronousQueue使用實例

本文主要講一下SynchronousQueue。html

定義

SynchronousQueue,實際上它不是一個真正的隊列,由於它不會爲隊列中元素維護存儲空間。與其餘隊列不一樣的是,它維護一組線程,這些線程在等待着把元素加入或移出隊列。java

若是以洗盤子的比喻爲例,那麼這就至關於沒有盤架,而是將洗好的盤子直接放入下一個空閒的烘乾機中。這種實現隊列的方式看似很奇怪,但因爲能夠直接交付工做,從而下降了將數據從生產者移動到消費者的延遲。(在傳統的隊列中,在一個工做單元能夠交付以前,必須經過串行方式首先完成入列[Enqueue]或者出列[Dequeue]等操做。)緩存

直接交付方式還會將更多關於任務狀態的信息反饋給生產者。當交付被接受時,它就知道消費者已經獲得了任務,而不是簡單地把任務放入一個隊列——這種區別就比如將文件直接交給同事,仍是將文件放到她的郵箱中並但願她能儘快拿到文件。dom

由於SynchronousQueue沒有存儲功能,所以put和take會一直阻塞,直到有另外一個線程已經準備好參與到交付過程當中。僅當有足夠多的消費者,而且老是有一個消費者準備好獲取交付的工做時,才適合使用同步隊列。ide

實例

public class SynchronousQueueExample {

    static class SynchronousQueueProducer implements Runnable {

        protected BlockingQueue<String> blockingQueue;
        final Random random = new Random();

        public SynchronousQueueProducer(BlockingQueue<String> queue) {
            this.blockingQueue = queue;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    String data = UUID.randomUUID().toString();
                    System.out.println("Put: " + data);
                    blockingQueue.put(data);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    static class SynchronousQueueConsumer implements Runnable {

        protected BlockingQueue<String> blockingQueue;

        public SynchronousQueueConsumer(BlockingQueue<String> queue) {
            this.blockingQueue = queue;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    String data = blockingQueue.take();
                    System.out.println(Thread.currentThread().getName()
                            + " take(): " + data);
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    public static void main(String[] args) {
        final BlockingQueue<String> synchronousQueue = new SynchronousQueue<String>();

        SynchronousQueueProducer queueProducer = new SynchronousQueueProducer(
                synchronousQueue);
        new Thread(queueProducer).start();

        SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer(
                synchronousQueue);
        new Thread(queueConsumer1).start();

        SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer(
                synchronousQueue);
        new Thread(queueConsumer2).start();

    }
}

插入數據的線程和獲取數據的線程,交替執行ui

應用場景

Executors.newCachedThreadPool()this

/**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available, and uses the provided
     * ThreadFactory to create new threads when needed.
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws NullPointerException if threadFactory is null
     */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

因爲ThreadPoolExecutor內部實現任務提交的時候調用的是工做隊列(BlockingQueue接口的實現類)的非阻塞式入隊列方法(offer方法),所以,在使用SynchronousQueue做爲工做隊列的前提下,客戶端代碼向線程池提交任務時,而線程池中又沒有空閒的線程可以從SynchronousQueue隊列實例中取一個任務,那麼相應的offer方法調用就會失敗(即任務沒有被存入工做隊列)。此時,ThreadPoolExecutor會新建一個新的工做者線程用於對這個入隊列失敗的任務進行處理(假設此時線程池的大小還未達到其最大線程池大小)。線程

因此,使用SynchronousQueue做爲工做隊列,工做隊列自己並不限制待執行的任務的數量。但此時須要限定線程池的最大大小爲一個合理的有限值,而不是Integer.MAX_VALUE,不然可能致使線程池中的工做者線程的數量一直增長到系統資源所沒法承受爲止。code

若是應用程序確實須要比較大的工做隊列容量,而又想避免無界工做隊列可能致使的問題,不妨考慮SynchronousQueue。SynchronousQueue實現上並不使用緩存空間。htm

使用SynchronousQueue的目的就是保證「對於提交的任務,若是有空閒線程,則使用空閒線程來處理;不然新建一個線程來處理任務」。

doc

相關文章
相關標籤/搜索