阻塞隊列之LinkedBlockingQueue

概述html

LinkedBlockingQueue內部由單鏈表實現,只能從head取元素,從tail添加元素。添加元素和獲取元素都有獨立的鎖,也就是說LinkedBlockingQueue是讀寫分離的,讀寫操做能夠並行執行。LinkedBlockingQueue採用可重入鎖(ReentrantLock)來保證在併發狀況下的線程安全。node

構造器安全

LinkedBlockingQueue一共有三個構造器,分別是無參構造器、能夠指定容量的構造器、能夠穿入一個容器的構造器。若是在建立實例的時候調用的是無參構造器,LinkedBlockingQueue的默認容量是Integer.MAX_VALUE,這樣作極可能會致使隊列尚未滿,可是內存卻已經滿了的狀況(內存溢出)。併發

1 public LinkedBlockingQueue();   //設置容量爲Integer.MAX
2 
3 public LinkedBlockingQueue(int capacity);  //設置指定容量
4 
5 public LinkedBlockingQueue(Collection<? extends E> c);  //穿入一個容器,若是調用該構造器,容量默認也是Integer.MAX_VALUE

LinkedBlockingQueue經常使用操做this

取數據spa

take():首選。當隊列爲空時阻塞線程

poll():彈出隊頂元素,隊列爲空時,返回空code

peek():和poll烈性,返回隊隊頂元素,但頂元素不彈出。隊列爲空時返回nullhtm

remove(Object o):移除某個元素,隊列爲空時拋出異常。成功移除返回trueblog

 

添加數據

put():首選。隊盡是阻塞

offer():隊滿時返回false

 

判斷隊列是否爲空

size()方法會遍歷整個隊列,時間複雜度爲O(n),因此最好選用isEmtpy

 

put元素原理

基本過程:

1.判斷元素是否爲null,爲null拋出異常

2.加鎖(可中斷鎖)

3.判斷隊列長度是否到達容量,若是到達一直等待

4.若是沒有隊滿,enqueue()在隊尾加入元素

5.隊列長度加1,此時若是隊列尚未滿,調用signal喚醒其餘堵塞隊列

 1  if (e == null) throw new NullPointerException();
 2        
 3         int c = -1;
 4         Node<E> node = new Node<E>(e);
 5         final ReentrantLock putLock = this.putLock;
 6         final AtomicInteger count = this.count;
 7         putLock.lockInterruptibly();
 8         try {
 9             while (count.get() == capacity) {
10                 notFull.await();
11             }
12             enqueue(node);
13             c = count.getAndIncrement();
14             if (c + 1 < capacity)
15                 notFull.signal();
16         } finally {
17             putLock.unlock();
18         }

 

take元素原理

 基本過程:

1.加鎖(依舊是ReentrantLock),注意這裏的鎖和寫入是不一樣的兩把鎖

2.判斷隊列是否爲空,若是爲空就一直等待

3.經過dequeue方法取得數據

3.取走元素後隊列是否爲空,若是不爲空喚醒其餘等待中的隊列

 1 public E take() throws InterruptedException {
 2         E x;
 3         int c = -1;
 4         final AtomicInteger count = this.count;
 5         final ReentrantLock takeLock = this.takeLock;
 6         takeLock.lockInterruptibly();
 7         try {
 8             while (count.get() == 0) {
 9                 notEmpty.await();
10             }
11             x = dequeue();
12             c = count.getAndDecrement();
13             if (c > 1)
14                 notEmpty.signal();
15         } finally {
16             takeLock.unlock();
17         }
18         if (c == capacity)
19             signalNotFull();
20         return x;
21     }

enqueue()和dequeue()方法實現都比較簡單,無非就是將元素添加到隊尾,從隊頂取走元素,感興趣的朋友能夠本身去看一下,這裏就不粘貼了。

 

LinkedBlockingQueue與LinkedBlockingDeque比較

 

LinkedBlockingDeque和LinkedBlockingQueue的相同點在於: 
1. 基於鏈表 
2. 容量可選,不設置的話,就是Int的最大值

和LinkedBlockingQueue的不一樣點在於: 
1. 雙端鏈表和單鏈表 
2. 不存在哨兵節點 
3. 一把鎖+兩個條件

實例:

 小記:AtomicInteger的getAndIncrment和getAndDcrement()等方法,這些方法分爲兩步,get和increment(decrement),在get和increment中間可能有其餘線程進入,致使多個線程get到的數值是相同的,也會致使多個線程累加後的值其實累加1.在這種狀況下,使用volatile也是沒有效果的,由於get以後沒有對值進行修改,不能觸發volatile的效果。

 1 public class ProducerAndConsumer {
 2     public static void main(String[] args){
 3 
 4         try{
 5             BlockingQueue queue = new LinkedBlockingQueue(5);
 6 
 7             ExecutorService executor = Executors.newFixedThreadPool(5);
 8             Produer producer = new Produer(queue);
 9             for(int i=0;i<3;i++){
10                 executor.execute(producer);
11             }
12             executor.execute(new Consumer(queue));
13 
14             executor.shutdown();
15         }catch (Exception e){
16             e.printStackTrace();
17         }
18 
19     }
20 }
21 
22 class Produer implements  Runnable{
23 
24     private BlockingQueue queue;
25     private int nums = 20;  //循環次數
26 
27     //標記數據編號
28     private static volatile AtomicInteger count = new AtomicInteger();
29     private boolean isRunning = true;
30     public Produer(){}
31 
32     public Produer(BlockingQueue queue){
33         this.queue = queue;
34     }
35 
36     public void run() {
37         String data = null;
38         try{
39             System.out.println("開始生產數據");
40             System.out.println("-----------------------");
41 
42           while(nums>0){
43                 nums--;
44                 count.decrementAndGet();
45 
46                 Thread.sleep(500);
47                 System.out.println(Thread.currentThread().getId()+ " :生產者生產了一個數據");
48                 queue.put(count.getAndIncrement());
49             }
50         }catch(Exception e){
51             e.printStackTrace();
52             Thread.currentThread().interrupt();
53         }finally{
54             System.out.println("生產者線程退出!");
55         }
56     }
57 }
58 
59 class Consumer implements Runnable{
60 
61     private BlockingQueue queue;
62     private int nums = 20;
63     private boolean isRunning = true;
64 
65     public Consumer(){}
66 
67     public Consumer(BlockingQueue queue){
68         this.queue = queue;
69     }
70 
71     public void run() {
72 
73         System.out.println("消費者開始消費");
74         System.out.println("-------------------------");
75 
76         while(nums>0){
77             nums--;
78             try{
79                 while(isRunning){
80                     int data = (Integer)queue.take();
81                     Thread.sleep(500);
82                     System.out.println("消費者消費的數據是" + data);
83             }
84 
85             }catch(Exception e){
86                 e.printStackTrace();
87                 Thread.currentThread().interrupt();
88             }finally {
89                 System.out.println("消費者線程退出!");
90             }
91 
92         }
93     }
94 }

效果:

 1 12 :生產者生產了一個數據
 2 11 :生產者生產了一個數據
 3 13 :生產者生產了一個數據
 4 12 :生產者生產了一個數據
 5 消費者消費的數據是-3
 6 11 :生產者生產了一個數據
 7 13 :生產者生產了一個數據
 8 12 :生產者生產了一個數據
 9 消費者消費的數據是-3
10 13 :生產者生產了一個數據
11 11 :生產者生產了一個數據
12 12 :生產者生產了一個數據
13 消費者消費的數據是-3
14 13 :生產者生產了一個數據
15 11 :生產者生產了一個數據
16 消費者消費的數據是-3
17 消費者消費的數據是-3

能夠看到,有多個producer在生產數據的時候get到的是相同的值。

相關文章
相關標籤/搜索