BlockingQueue-線程的阻塞隊列

 BlockingQueue做爲線程容器,能夠爲線程同步提供有力的保障,其主要用到的方法包括:
[java]
add(E o); //將指定的元素添加到此隊列中(若是當即可行),在成功時返回 true,其餘狀況則拋出 IllegalStateException。 
drainTo(Collection<? super E> c);  //移除此隊列中全部可用的元素,並將它們添加到給定 collection 中。 
drainTo(Collection<? super E> c,int maxElements);//最多今後隊列中移除給定數量的可用元素,並將這些元素添加到給定 collection 中 
offer(E o);  //若是可能的話,將指定元素插入此隊列中。 
offer(E o, long timeout, TimeUnit unit);  //將指定的元素插入此隊列中,若是沒有可用空間,將等待指定的等待時間(若是有必要)。 
poll(long timeout, TimeUnit unit);  //檢索並移除此隊列的頭部,若是此隊列中沒有任何元素,則等待指定等待的時間(若是有必要)。 
put(E o);    //將指定元素添加到此隊列中,若是沒有可用空間,將一直等待(若是有必要)。 
remainingCapacity();  //返回在無阻塞的理想狀況下(不存在內存或資源約束)此隊列能接受的元素數量;若是沒有內部限制,則返回 Integer.MAX_VALUE。 
take();  //檢索並移除此隊列的頭部,若是此隊列不存在任何元素,則一直等待。  java

       上述方法中主要用到的是put()和take()方法,也只有這兩個方法具備阻塞等待功能,另外BlockingQueue 不接受 null 元素。試圖 add、put 或 offer 一個 null 元素時,某些實現會拋出 NullPointerException。null 被用做指示 poll 操做失敗的警惕值。
       BlockingQueue 能夠定義爲限定容量的隊列,它有一個 remainingCapacity容量值,超出此容量,便沒法無阻塞地 put 額外的元素。也能夠定義爲沒有容量限制的隊列,沒有容量約束的 BlockingQueue 老是報告 Integer.MAX_VALUE 的剩餘容量。
       BlockingQueue 實現是線程安全的。全部排隊方法均可以使用內部鎖定或其餘形式的併發控制來自動達到它們的目的。然而,大量的 Collection 操做(addAll、containsAll、retainAll 和 removeAll)沒有 必要自動執行,除非在實現中特別說明。所以,舉例來講,在只添加了 c 中的一些元素後,addAll(c) 有可能失敗(拋出一個異常)。它實質上不 支持使用任何一種「close」或「shutdown」操做來指示再也不添加任何項。這種功能的需求和使用有依賴於實現的傾向。例如,一種經常使用的策略是:對於生產者,插入特殊的 end-of-stream 或 poison 對象,並根據使用者獲取這些對象的時間來對它們進行解釋。
      BlockingQueue 主要用於實現生產者-使用者隊列,但它另外還支持 Collection 接口。所以,舉例來講,使用 remove(x) 從隊列中移除任意一個元素是有可能的。然而,這種操做一般不 會有效執行,只能有計劃地偶爾使用,好比在取消排隊信息時。阻塞隊列與Semaphore有很大類似性,但也有不少不一樣,阻塞隊列通常是一方存數據,另外一方釋放數據,而Semaphore一般是同一方獲取和釋放信號。下面經過一個例子加以說明:
[java]  安全


package com.bw30;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class BlockingQueueTest {
	public static void main(String[] args) {
		final BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
		for (int i = 0; i < 2; i++) {
			new Thread() {
				public void run() {
					while (true) {
						System.out.println(Thread.currentThread().getName()+ "準備存放數據");
						try {
							Thread.sleep((long) (Math.random() * 1000));
							queue.put(1);
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
						System.out.println(Thread.currentThread().getName()
								+ "存放數據完成,當前隊列中含有" + queue.size() + "個數據");
					}
				}
			}.start();
		}
		for (int i = 0; i < 2; i++) {
			new Thread() {
				public void run() {//當隊列中無數據時,獲取隊列數據queue.take()方法阻塞
					int i = 0;
					while (true) {
						System.out.println(Thread.currentThread().getName()
								+ "準備獲取數據");
						try {
							Thread.sleep((long) (Math.random() * 1000));
							i = queue.take();
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
						System.out.println(Thread.currentThread().getName()
								+ "獲取數據完成,數據爲:" + i + " 隊列還剩:" + queue.size()
								+ "個元素");
					}
				}
			}.start();
		}
	}
}


       上例中定義了一個最多能夠存放3個數據的BlockingQueue,並建立了兩個用於put()的線程,一個用於take()的線程,這邊可以更容易使阻塞隊列造成滿隊列,當隊列中的有3個數據的時候,兩個put()線程就等待,只有當take()線程取走一個數據時才能夠繼續往隊列中添加數據。運行結果以下(只去部分結果):
[java] 
Thread-1準備放數據! 
Thread-1已經放了數據,隊列目前有1個數據 
Thread-1準備放數據! 
Thread-1已經放了數據,隊列目前有2個數據 
Thread-0準備放數據! 
Thread-0已經放了數據,隊列目前有3個數據 
Thread-0準備放數據! 
Thread-2準備取數據! 
Thread-2已經取走數據,隊列目前有2個數據  併發

      既然阻塞隊列能夠實現線程之間的等待,那麼咱們就能夠經過兩個具備1個空間的阻塞隊列能夠實現線程的同步,關鍵代碼以下:
[java]  dom

BlockingQueue<Integer> queue1 = new ArrayBlockingQueue<Integer>(1); 
 BlockingQueue<Integer> queue2 = new ArrayBlockingQueue<Integer>(1); 
 public  void sub(int i){ 
                try { 
                    queue1.put(1); 
                } catch (InterruptedException e) { 
                    e.printStackTrace(); 
                } 
                for(int j=1;j<=10;j++){ 
                    System.out.println("sub thread sequece of " + j + ",loop of " + i); 
                } 
                try { 
                    queue2.take(); 
                } catch (InterruptedException e) { 
                    // TODO Auto-generated catch block 
                    e.printStackTrace(); 
                } 
          } 
          public  void main(int i){ 
                try { 
                    queue2.put(1); 
                } catch (InterruptedException e1) { 
                    // TODO Auto-generated catch block 
                    e1.printStackTrace(); 
                } 
                for(int j=1;j<=100;j++){ 
                    System.out.println("main thread sequece of " + j + ",loop of " + i); 
                } 
                try { 
                    queue1.take(); www.2cto.com
                } catch (InterruptedException e) { 
                    // TODO Auto-generated catch block 
                    e.printStackTrace(); 
                } 
          }

        上例中定義了兩個方法,一個sub()和一個main(),兩個方法要實現同步,因爲定義的兩個阻塞隊列都是容量爲1,因此只要有一個queue1.put(1);那麼sub()方法就必須等待,只有當main()方法中queue1.take();之後sub()方法才能夠繼續進行,main()方法也相似。 oop

相關文章
相關標籤/搜索