併發(3) 容器

  容器類中提供的ArrayList、HashMap、HashSet不是線程安全的,併發包下提供了相似功能的線程安全的集合。java

說明 原理
ConcurrentHashMap    
ConcurrentSkipListMap    
ConcurrentSkipListSet    
CopyOnWriteArrayList    
CopyOnWriteArraySet    

 

  隊列是一種數據結構,它以一種先進先出的方式管理數據。若是你試圖向一個 已經滿了的阻塞隊列中添加一個元素或者是從一個空的阻塞隊列中移除一個元索,將致使線程阻塞。node

隊列操做:數組

方法  說明
boolean add(E e)  添加一個元素到隊列中,若是隊列已滿,則拋出異常
E remove() 移除並返回隊列頭部的元素,若是隊列爲空,則拋出異常
E element() 返回隊列頭部的元素,若是隊列爲空,則拋出異常
boolean offer(E e) 添加一個元素到隊列中,若是隊列已滿,返回false
offer(E e, long timeout, TimeUnit unit) 添加一個元素到隊列中,等待指定時間,若是隊列已滿,返回false
E poll() 移除並返回隊列頭部的元素,若是隊列爲空,返回null
E poll(long timeout, TimeUnit unit) 移除並返回隊列頭部的元素,等待指定時間,若是隊列爲空,返回null
E peek() 返回隊列頭部的元素,若是隊列爲空,返回null
void put(E e) 返回隊列頭部的元素,若是隊列已滿,阻塞
E take() 移除並返回隊列頭部的元素,若是隊列爲空,阻塞

數組實現安全

ArrayBlockingQueue數據結構

  Qeueu的數組實現,底層使用一個數組實現,數組大小不可變,使用一個count表示當前元素個數,使用putIndex表示當前尾的index,使用takeIndex表示當前頭的index,putindex不必定比takeindex大,是在數組連續的循環。使用一個ReentrantLock控制讀寫併發。使用兩個Condition來阻塞數組爲空時消費或者數組滿時生產的線程,當數組中有數據或者有空間時喚醒。迭代器中使用一個index指向下一個元素位置;併發

  1 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
  2         implements BlockingQueue<E>, java.io.Serializable {
  3 
  4     //存儲數據數組
  5     final Object[] items;
  6     //頭index
  7     int takeIndex;
  8     //尾index
  9     int putIndex;
 10     //隊列長度
 11     int count;
 12     //
 13     final ReentrantLock lock;
 14     //非空條件
 15     private final Condition notEmpty;
 16     //非滿條件
 17     private final Condition notFull;
 18     //初始化
 19     public ArrayBlockingQueue(int capacity) {
 20         this(capacity, false);
 21     }
 22     public ArrayBlockingQueue(int capacity, boolean fair) {
 23         if (capacity <= 0)
 24             throw new IllegalArgumentException();
 25         this.items = new Object[capacity];
 26         lock = new ReentrantLock(fair);
 27         notEmpty = lock.newCondition();
 28         notFull =  lock.newCondition();
 29     }
 30     public ArrayBlockingQueue(int capacity, boolean fair,
 31                               Collection<? extends E> c) {
 32         this(capacity, fair);
 33 
 34         final ReentrantLock lock = this.lock;
 35         lock.lock(); // Lock only for visibility, not mutual exclusion
 36         try {
 37             int i = 0;
 38             try {
 39                 for (E e : c) {
 40                     checkNotNull(e);
 41                     items[i++] = e;
 42                 }
 43             } catch (ArrayIndexOutOfBoundsException ex) {
 44                 throw new IllegalArgumentException();
 45             }
 46             count = i;
 47             putIndex = (i == capacity) ? 0 : i;
 48         } finally {
 49             lock.unlock();
 50         }
 51     }
 52     //新增一個元素
 53     public boolean add(E e) {
 54         return super.add(e);
 55     }
 56 
 57     //新增一個元素
 58     public boolean offer(E e) {
 59         checkNotNull(e);
 60         final ReentrantLock lock = this.lock;
 61         lock.lock();
 62         try {
 63             //若是已滿,返回false,不然添加到隊列中
 64             if (count == items.length)
 65                 return false;
 66             else {
 67                 enqueue(e);
 68                 return true;
 69             }
 70         } finally {
 71             lock.unlock();
 72         }
 73     }
 74     //新增一個元素
 75     public void put(E e) throws InterruptedException {
 76         checkNotNull(e);
 77         final ReentrantLock lock = this.lock;
 78         lock.lockInterruptibly();
 79         try {
 80             //若是已滿,等待,不然添加到隊列中
 81             while (count == items.length)
 82                 notFull.await();
 83             enqueue(e);
 84         } finally {
 85             lock.unlock();
 86         }
 87     }
 88     //添加一個元素
 89     private void enqueue(E x) {
 90         final Object[] items = this.items;
 91         items[putIndex] = x;
 92         if (++putIndex == items.length)
 93             putIndex = 0;
 94         count++;
 95         notEmpty.signal();
 96     }
 97     //獲取頭部元素
 98     public E poll() {
 99         final ReentrantLock lock = this.lock;
100         lock.lock();
101         try {
102             //若是爲空返回null,不然返回頭部元素
103             return (count == 0) ? null : dequeue();
104         } finally {
105             lock.unlock();
106         }
107     }
108     //獲取頭部元素
109     public E take() throws InterruptedException {
110         final ReentrantLock lock = this.lock;
111         lock.lockInterruptibly();
112         try {
113             //若是爲空阻塞,不然返回頭部元素
114             while (count == 0)
115                 notEmpty.await();
116             return dequeue();
117         } finally {
118             lock.unlock();
119         }
120     }
121     //獲取頭部元素
122     private E dequeue() {
123         // assert lock.getHoldCount() == 1;
124         // assert items[takeIndex] != null;
125         final Object[] items = this.items;
126         @SuppressWarnings("unchecked")
127         E x = (E) items[takeIndex];
128         items[takeIndex] = null;
129         if (++takeIndex == items.length)
130             takeIndex = 0;
131         count--;
132         if (itrs != null)
133             itrs.elementDequeued();
134         notFull.signal();
135         return x;
136     }
137 }
View Code

LinkedBlockingQueueide

  Qeueu的列表實現,底層使用一個單向鏈表實現。大小可變也能夠設定大小。使用一個節點做爲頭節點,不存儲數據;使用一個節點做爲尾節點,存儲數據,使用兩個ReentrantLock分別控制讀寫鎖,頭節點不存儲數據也是避免讀寫併發衝突,count使用了原子變量也是爲了不讀寫衝突。this

  由於使用了讀寫鎖,因此吞吐量要比ArrayBlockingQueue好。對內存和GC的影響會大於ArrayBlockingQueue。spa

  1 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
  2         implements BlockingQueue<E>, java.io.Serializable {
  3     private static final long serialVersionUID = -6903933977591709194L;
  4     //鏈表節點
  5     static class Node<E> {
  6         E item;
  7 
  8         /**
  9          * One of:
 10          * - the real successor Node
 11          * - this Node, meaning the successor is head.next
 12          * - null, meaning there is no successor (this is the last node)
 13          */
 14         Node<E> next;
 15 
 16         Node(E x) { item = x; }
 17     }
 18     //隊列容量
 19     private final int capacity;
 20     //隊列元素個數
 21     private final AtomicInteger count = new AtomicInteger();
 22     //頭節點
 23     transient Node<E> head;
 24     //尾節點
 25     private transient Node<E> last;
 26     //取數據鎖
 27     private final ReentrantLock takeLock = new ReentrantLock();
 28     
 29     private final Condition notEmpty = takeLock.newCondition();
 30     //存數據鎖
 31     private final ReentrantLock putLock = new ReentrantLock();
 32 
 33     private final Condition notFull = putLock.newCondition();
 34     
 35     public LinkedBlockingQueue() {
 36         this(Integer.MAX_VALUE);
 37     }
 38 
 39     public LinkedBlockingQueue(int capacity) {
 40         if (capacity <= 0) throw new IllegalArgumentException();
 41         this.capacity = capacity;
 42         last = head = new Node<E>(null);
 43     }
 44     //存放元素
 45     public void put(E e) throws InterruptedException {
 46         if (e == null) throw new NullPointerException();
 47         int c = -1;
 48         Node<E> node = new Node<E>(e);
 49         final ReentrantLock putLock = this.putLock;
 50         final AtomicInteger count = this.count;
 51         putLock.lockInterruptibly();
 52         try {
 53             //若是到達容量,則等待
 54             while (count.get() == capacity) {
 55                 notFull.await();
 56             }
 57             enqueue(node);
 58             c = count.getAndIncrement();
 59             if (c + 1 < capacity)
 60                 notFull.signal();
 61         } finally {
 62             putLock.unlock();
 63         }
 64         if (c == 0)
 65             signalNotEmpty();
 66     }
 67     
 68     //存放元素
 69     public boolean offer(E e) {
 70         if (e == null) throw new NullPointerException();
 71         final AtomicInteger count = this.count;
 72         if (count.get() == capacity)
 73             return false;
 74         int c = -1;
 75         Node<E> node = new Node<E>(e);
 76         final ReentrantLock putLock = this.putLock;
 77         putLock.lock();
 78         try {
 79             //若是到達容量,返回false
 80             if (count.get() < capacity) {
 81                 enqueue(node);
 82                 c = count.getAndIncrement();
 83                 if (c + 1 < capacity)
 84                     notFull.signal();
 85             }
 86         } finally {
 87             putLock.unlock();
 88         }
 89         if (c == 0)
 90             signalNotEmpty();
 91         return c >= 0;
 92     }
 93     private void enqueue(Node<E> node) {
 94         // assert putLock.isHeldByCurrentThread();
 95         // assert last.next == null;
 96         last = last.next = node;
 97     }
 98     //獲取元素
 99     public E take() throws InterruptedException {
100         E x;
101         int c = -1;
102         final AtomicInteger count = this.count;
103         final ReentrantLock takeLock = this.takeLock;
104         takeLock.lockInterruptibly();
105         try {
106             //若是隊列爲空,則等待
107             while (count.get() == 0) {
108                 notEmpty.await();
109             }
110             x = dequeue();
111             c = count.getAndDecrement();
112             if (c > 1)
113                 notEmpty.signal();
114         } finally {
115             takeLock.unlock();
116         }
117         if (c == capacity)
118             signalNotFull();
119         return x;
120     }
121     //獲取元素
122     public E poll() {
123         final AtomicInteger count = this.count;
124         if (count.get() == 0)
125             return null;
126         E x = null;
127         int c = -1;
128         final ReentrantLock takeLock = this.takeLock;
129         takeLock.lock();
130         try {
131             //若是隊列爲空,返回null
132             if (count.get() > 0) {
133                 x = dequeue();
134                 c = count.getAndDecrement();
135                 if (c > 1)
136                     notEmpty.signal();
137             }
138         } finally {
139             takeLock.unlock();
140         }
141         if (c == capacity)
142             signalNotFull();
143         return x;
144     }
145     private E dequeue() {
146         // assert takeLock.isHeldByCurrentThread();
147         // assert head.item == null;
148         Node<E> h = head;
149         Node<E> first = h.next;
150         h.next = h; // help GC
151         head = first;
152         E x = first.item;
153         first.item = null;
154         return x;
155     }
156     //獲取元素
157     public E peek() {
158         if (count.get() == 0)
159             return null;
160         final ReentrantLock takeLock = this.takeLock;
161         takeLock.lock();
162         try {
163             Node<E> first = head.next;
164             if (first == null)
165                 return null;
166             else
167                 return first.item;
168         } finally {
169             takeLock.unlock();
170         }
171     }
172 }
View Code

SynchronousQueue線程

  

PriorityBlockingQueue

  優先級隊列,按照自定義的優先級順序進行讀取。底層使用一個數組實現二叉堆。大小可變且無邊界。

DelayQueue

  延時隊列,底層使用一個PriorityQueue實現,使用一個ReentrantLock控制併發。其實就是在每次往優先級隊列中添加元素,而後以元素的delay/過時值做爲排序的因素,以此來達到先過時的元素會拍在隊首,每次從隊列裏取出來都是最早要過時的元素

  1 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
  2         implements BlockingQueue<E> {
  3 
  4     private final transient ReentrantLock lock = new ReentrantLock();
  5     private final PriorityQueue<E> q = new PriorityQueue<E>();
  6     private Thread leader = null;
  7     private final Condition available = lock.newCondition();
  8 
  9     public DelayQueue() {}
 10     //向隊列中添加元素,由於是無邊界隊列,因此不會拋異常
 11     public boolean add(E e) {
 12         return offer(e);
 13     }
 14     //向隊列中添加元素
 15     public boolean offer(E e) {
 16         final ReentrantLock lock = this.lock;
 17         lock.lock();
 18         try {
 19             q.offer(e);
 20             if (q.peek() == e) {
 21                 leader = null;
 22                 available.signal();
 23             }
 24             return true;
 25         } finally {
 26             lock.unlock();
 27         }
 28     }
 29     //向隊列中添加元素,由於是無邊界隊列,因此不會阻塞
 30     public void put(E e) {
 31         offer(e);
 32     }
 33     //從隊列總獲取數據
 34     public E poll() {
 35         final ReentrantLock lock = this.lock;
 36         lock.lock();
 37         try {
 38             E first = q.peek();
 39             if (first == null || first.getDelay(NANOSECONDS) > 0)
 40                 return null;
 41             else
 42                 return q.poll();
 43         } finally {
 44             lock.unlock();
 45         }
 46     }
 47     //從隊列總獲取數據,若是隊列爲空,則阻塞
 48     public E take() throws InterruptedException {
 49         final ReentrantLock lock = this.lock;
 50         lock.lockInterruptibly();
 51         try {
 52             for (;;) {
 53                 E first = q.peek();
 54                 if (first == null)
 55                     available.await();
 56                 else {
 57                     long delay = first.getDelay(NANOSECONDS);
 58                     if (delay <= 0)
 59                         return q.poll();
 60                     first = null; // don't retain ref while waiting
 61                     if (leader != null)
 62                         available.await();
 63                     else {
 64                         Thread thisThread = Thread.currentThread();
 65                         leader = thisThread;
 66                         try {
 67                             available.awaitNanos(delay);
 68                         } finally {
 69                             if (leader == thisThread)
 70                                 leader = null;
 71                         }
 72                     }
 73                 }
 74             }
 75         } finally {
 76             if (leader == null && q.peek() != null)
 77                 available.signal();
 78             lock.unlock();
 79         }
 80     }
 81 
 82     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 83         long nanos = unit.toNanos(timeout);
 84         final ReentrantLock lock = this.lock;
 85         lock.lockInterruptibly();
 86         try {
 87             for (;;) {
 88                 E first = q.peek();
 89                 if (first == null) {
 90                     if (nanos <= 0)
 91                         return null;
 92                     else
 93                         nanos = available.awaitNanos(nanos);
 94                 } else {
 95                     long delay = first.getDelay(NANOSECONDS);
 96                     if (delay <= 0)
 97                         return q.poll();
 98                     if (nanos <= 0)
 99                         return null;
100                     first = null; // don't retain ref while waiting
101                     if (nanos < delay || leader != null)
102                         nanos = available.awaitNanos(nanos);
103                     else {
104                         Thread thisThread = Thread.currentThread();
105                         leader = thisThread;
106                         try {
107                             long timeLeft = available.awaitNanos(delay);
108                             nanos -= delay - timeLeft;
109                         } finally {
110                             if (leader == thisThread)
111                                 leader = null;
112                         }
113                     }
114                 }
115             }
116         } finally {
117             if (leader == null && q.peek() != null)
118                 available.signal();
119             lock.unlock();
120         }
121     }
122     //
123     public E peek() {
124         final ReentrantLock lock = this.lock;
125         lock.lock();
126         try {
127             return q.peek();
128         } finally {
129             lock.unlock();
130         }
131     }
132 
133 
134 }
View Code
相關文章
相關標籤/搜索