容器類中提供的ArrayList、HashMap、HashSet不是線程安全的,併發包下提供了相似功能的線程安全的集合。java
類 | 說明 | 原理 |
ConcurrentHashMap | ||
ConcurrentSkipListMap | ||
ConcurrentSkipListSet | ||
CopyOnWriteArrayList | ||
CopyOnWriteArraySet |
隊列是一種數據結構,它以一種先進先出的方式管理數據。若是你試圖向一個 已經滿了的阻塞隊列中添加一個元素或者是從一個空的阻塞隊列中移除一個元索,將致使線程阻塞。node
隊列操做:數組
方法 | 說明 |
boolean add(E e) | 添加一個元素到隊列中,若是隊列已滿,則拋出異常 |
E remove() | 移除並返回隊列頭部的元素,若是隊列爲空,則拋出異常 |
E element() | 返回隊列頭部的元素,若是隊列爲空,則拋出異常 |
boolean offer(E e) | 添加一個元素到隊列中,若是隊列已滿,返回false |
offer(E e, long timeout, TimeUnit unit) | 添加一個元素到隊列中,等待指定時間,若是隊列已滿,返回false |
E poll() | 移除並返回隊列頭部的元素,若是隊列爲空,返回null |
E poll(long timeout, TimeUnit unit) | 移除並返回隊列頭部的元素,等待指定時間,若是隊列爲空,返回null |
E peek() | 返回隊列頭部的元素,若是隊列爲空,返回null |
void put(E e) | 返回隊列頭部的元素,若是隊列已滿,阻塞 |
E take() | 移除並返回隊列頭部的元素,若是隊列爲空,阻塞 |
數組實現安全
ArrayBlockingQueue數據結構
Qeueu的數組實現,底層使用一個數組實現,數組大小不可變,使用一個count表示當前元素個數,使用putIndex表示當前尾的index,使用takeIndex表示當前頭的index,putindex不必定比takeindex大,是在數組連續的循環。使用一個ReentrantLock控制讀寫併發。使用兩個Condition來阻塞數組爲空時消費或者數組滿時生產的線程,當數組中有數據或者有空間時喚醒。迭代器中使用一個index指向下一個元素位置;併發
1 public class ArrayBlockingQueue<E> extends AbstractQueue<E> 2 implements BlockingQueue<E>, java.io.Serializable { 3 4 //存儲數據數組 5 final Object[] items; 6 //頭index 7 int takeIndex; 8 //尾index 9 int putIndex; 10 //隊列長度 11 int count; 12 //鎖 13 final ReentrantLock lock; 14 //非空條件 15 private final Condition notEmpty; 16 //非滿條件 17 private final Condition notFull; 18 //初始化 19 public ArrayBlockingQueue(int capacity) { 20 this(capacity, false); 21 } 22 public ArrayBlockingQueue(int capacity, boolean fair) { 23 if (capacity <= 0) 24 throw new IllegalArgumentException(); 25 this.items = new Object[capacity]; 26 lock = new ReentrantLock(fair); 27 notEmpty = lock.newCondition(); 28 notFull = lock.newCondition(); 29 } 30 public ArrayBlockingQueue(int capacity, boolean fair, 31 Collection<? extends E> c) { 32 this(capacity, fair); 33 34 final ReentrantLock lock = this.lock; 35 lock.lock(); // Lock only for visibility, not mutual exclusion 36 try { 37 int i = 0; 38 try { 39 for (E e : c) { 40 checkNotNull(e); 41 items[i++] = e; 42 } 43 } catch (ArrayIndexOutOfBoundsException ex) { 44 throw new IllegalArgumentException(); 45 } 46 count = i; 47 putIndex = (i == capacity) ? 0 : i; 48 } finally { 49 lock.unlock(); 50 } 51 } 52 //新增一個元素 53 public boolean add(E e) { 54 return super.add(e); 55 } 56 57 //新增一個元素 58 public boolean offer(E e) { 59 checkNotNull(e); 60 final ReentrantLock lock = this.lock; 61 lock.lock(); 62 try { 63 //若是已滿,返回false,不然添加到隊列中 64 if (count == items.length) 65 return false; 66 else { 67 enqueue(e); 68 return true; 69 } 70 } finally { 71 lock.unlock(); 72 } 73 } 74 //新增一個元素 75 public void put(E e) throws InterruptedException { 76 checkNotNull(e); 77 final ReentrantLock lock = this.lock; 78 lock.lockInterruptibly(); 79 try { 80 //若是已滿,等待,不然添加到隊列中 81 while (count == items.length) 82 notFull.await(); 83 enqueue(e); 84 } finally { 85 lock.unlock(); 86 } 87 } 88 //添加一個元素 89 private void enqueue(E x) { 90 final Object[] items = this.items; 91 items[putIndex] = x; 92 if (++putIndex == items.length) 93 putIndex = 0; 94 count++; 95 notEmpty.signal(); 96 } 97 //獲取頭部元素 98 public E poll() { 99 final ReentrantLock lock = this.lock; 100 lock.lock(); 101 try { 102 //若是爲空返回null,不然返回頭部元素 103 return (count == 0) ? null : dequeue(); 104 } finally { 105 lock.unlock(); 106 } 107 } 108 //獲取頭部元素 109 public E take() throws InterruptedException { 110 final ReentrantLock lock = this.lock; 111 lock.lockInterruptibly(); 112 try { 113 //若是爲空阻塞,不然返回頭部元素 114 while (count == 0) 115 notEmpty.await(); 116 return dequeue(); 117 } finally { 118 lock.unlock(); 119 } 120 } 121 //獲取頭部元素 122 private E dequeue() { 123 // assert lock.getHoldCount() == 1; 124 // assert items[takeIndex] != null; 125 final Object[] items = this.items; 126 @SuppressWarnings("unchecked") 127 E x = (E) items[takeIndex]; 128 items[takeIndex] = null; 129 if (++takeIndex == items.length) 130 takeIndex = 0; 131 count--; 132 if (itrs != null) 133 itrs.elementDequeued(); 134 notFull.signal(); 135 return x; 136 } 137 }
LinkedBlockingQueueide
Qeueu的列表實現,底層使用一個單向鏈表實現。大小可變也能夠設定大小。使用一個節點做爲頭節點,不存儲數據;使用一個節點做爲尾節點,存儲數據,使用兩個ReentrantLock分別控制讀寫鎖,頭節點不存儲數據也是避免讀寫併發衝突,count使用了原子變量也是爲了不讀寫衝突。this
由於使用了讀寫鎖,因此吞吐量要比ArrayBlockingQueue好。對內存和GC的影響會大於ArrayBlockingQueue。spa
1 public class LinkedBlockingQueue<E> extends AbstractQueue<E> 2 implements BlockingQueue<E>, java.io.Serializable { 3 private static final long serialVersionUID = -6903933977591709194L; 4 //鏈表節點 5 static class Node<E> { 6 E item; 7 8 /** 9 * One of: 10 * - the real successor Node 11 * - this Node, meaning the successor is head.next 12 * - null, meaning there is no successor (this is the last node) 13 */ 14 Node<E> next; 15 16 Node(E x) { item = x; } 17 } 18 //隊列容量 19 private final int capacity; 20 //隊列元素個數 21 private final AtomicInteger count = new AtomicInteger(); 22 //頭節點 23 transient Node<E> head; 24 //尾節點 25 private transient Node<E> last; 26 //取數據鎖 27 private final ReentrantLock takeLock = new ReentrantLock(); 28 29 private final Condition notEmpty = takeLock.newCondition(); 30 //存數據鎖 31 private final ReentrantLock putLock = new ReentrantLock(); 32 33 private final Condition notFull = putLock.newCondition(); 34 35 public LinkedBlockingQueue() { 36 this(Integer.MAX_VALUE); 37 } 38 39 public LinkedBlockingQueue(int capacity) { 40 if (capacity <= 0) throw new IllegalArgumentException(); 41 this.capacity = capacity; 42 last = head = new Node<E>(null); 43 } 44 //存放元素 45 public void put(E e) throws InterruptedException { 46 if (e == null) throw new NullPointerException(); 47 int c = -1; 48 Node<E> node = new Node<E>(e); 49 final ReentrantLock putLock = this.putLock; 50 final AtomicInteger count = this.count; 51 putLock.lockInterruptibly(); 52 try { 53 //若是到達容量,則等待 54 while (count.get() == capacity) { 55 notFull.await(); 56 } 57 enqueue(node); 58 c = count.getAndIncrement(); 59 if (c + 1 < capacity) 60 notFull.signal(); 61 } finally { 62 putLock.unlock(); 63 } 64 if (c == 0) 65 signalNotEmpty(); 66 } 67 68 //存放元素 69 public boolean offer(E e) { 70 if (e == null) throw new NullPointerException(); 71 final AtomicInteger count = this.count; 72 if (count.get() == capacity) 73 return false; 74 int c = -1; 75 Node<E> node = new Node<E>(e); 76 final ReentrantLock putLock = this.putLock; 77 putLock.lock(); 78 try { 79 //若是到達容量,返回false 80 if (count.get() < capacity) { 81 enqueue(node); 82 c = count.getAndIncrement(); 83 if (c + 1 < capacity) 84 notFull.signal(); 85 } 86 } finally { 87 putLock.unlock(); 88 } 89 if (c == 0) 90 signalNotEmpty(); 91 return c >= 0; 92 } 93 private void enqueue(Node<E> node) { 94 // assert putLock.isHeldByCurrentThread(); 95 // assert last.next == null; 96 last = last.next = node; 97 } 98 //獲取元素 99 public E take() throws InterruptedException { 100 E x; 101 int c = -1; 102 final AtomicInteger count = this.count; 103 final ReentrantLock takeLock = this.takeLock; 104 takeLock.lockInterruptibly(); 105 try { 106 //若是隊列爲空,則等待 107 while (count.get() == 0) { 108 notEmpty.await(); 109 } 110 x = dequeue(); 111 c = count.getAndDecrement(); 112 if (c > 1) 113 notEmpty.signal(); 114 } finally { 115 takeLock.unlock(); 116 } 117 if (c == capacity) 118 signalNotFull(); 119 return x; 120 } 121 //獲取元素 122 public E poll() { 123 final AtomicInteger count = this.count; 124 if (count.get() == 0) 125 return null; 126 E x = null; 127 int c = -1; 128 final ReentrantLock takeLock = this.takeLock; 129 takeLock.lock(); 130 try { 131 //若是隊列爲空,返回null 132 if (count.get() > 0) { 133 x = dequeue(); 134 c = count.getAndDecrement(); 135 if (c > 1) 136 notEmpty.signal(); 137 } 138 } finally { 139 takeLock.unlock(); 140 } 141 if (c == capacity) 142 signalNotFull(); 143 return x; 144 } 145 private E dequeue() { 146 // assert takeLock.isHeldByCurrentThread(); 147 // assert head.item == null; 148 Node<E> h = head; 149 Node<E> first = h.next; 150 h.next = h; // help GC 151 head = first; 152 E x = first.item; 153 first.item = null; 154 return x; 155 } 156 //獲取元素 157 public E peek() { 158 if (count.get() == 0) 159 return null; 160 final ReentrantLock takeLock = this.takeLock; 161 takeLock.lock(); 162 try { 163 Node<E> first = head.next; 164 if (first == null) 165 return null; 166 else 167 return first.item; 168 } finally { 169 takeLock.unlock(); 170 } 171 } 172 }
SynchronousQueue線程
PriorityBlockingQueue
優先級隊列,按照自定義的優先級順序進行讀取。底層使用一個數組實現二叉堆。大小可變且無邊界。
DelayQueue
延時隊列,底層使用一個PriorityQueue實現,使用一個ReentrantLock控制併發。其實就是在每次往優先級隊列中添加元素,而後以元素的delay/過時值做爲排序的因素,以此來達到先過時的元素會拍在隊首,每次從隊列裏取出來都是最早要過時的元素
1 public class DelayQueue<E extends Delayed> extends AbstractQueue<E> 2 implements BlockingQueue<E> { 3 4 private final transient ReentrantLock lock = new ReentrantLock(); 5 private final PriorityQueue<E> q = new PriorityQueue<E>(); 6 private Thread leader = null; 7 private final Condition available = lock.newCondition(); 8 9 public DelayQueue() {} 10 //向隊列中添加元素,由於是無邊界隊列,因此不會拋異常 11 public boolean add(E e) { 12 return offer(e); 13 } 14 //向隊列中添加元素 15 public boolean offer(E e) { 16 final ReentrantLock lock = this.lock; 17 lock.lock(); 18 try { 19 q.offer(e); 20 if (q.peek() == e) { 21 leader = null; 22 available.signal(); 23 } 24 return true; 25 } finally { 26 lock.unlock(); 27 } 28 } 29 //向隊列中添加元素,由於是無邊界隊列,因此不會阻塞 30 public void put(E e) { 31 offer(e); 32 } 33 //從隊列總獲取數據 34 public E poll() { 35 final ReentrantLock lock = this.lock; 36 lock.lock(); 37 try { 38 E first = q.peek(); 39 if (first == null || first.getDelay(NANOSECONDS) > 0) 40 return null; 41 else 42 return q.poll(); 43 } finally { 44 lock.unlock(); 45 } 46 } 47 //從隊列總獲取數據,若是隊列爲空,則阻塞 48 public E take() throws InterruptedException { 49 final ReentrantLock lock = this.lock; 50 lock.lockInterruptibly(); 51 try { 52 for (;;) { 53 E first = q.peek(); 54 if (first == null) 55 available.await(); 56 else { 57 long delay = first.getDelay(NANOSECONDS); 58 if (delay <= 0) 59 return q.poll(); 60 first = null; // don't retain ref while waiting 61 if (leader != null) 62 available.await(); 63 else { 64 Thread thisThread = Thread.currentThread(); 65 leader = thisThread; 66 try { 67 available.awaitNanos(delay); 68 } finally { 69 if (leader == thisThread) 70 leader = null; 71 } 72 } 73 } 74 } 75 } finally { 76 if (leader == null && q.peek() != null) 77 available.signal(); 78 lock.unlock(); 79 } 80 } 81 82 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 83 long nanos = unit.toNanos(timeout); 84 final ReentrantLock lock = this.lock; 85 lock.lockInterruptibly(); 86 try { 87 for (;;) { 88 E first = q.peek(); 89 if (first == null) { 90 if (nanos <= 0) 91 return null; 92 else 93 nanos = available.awaitNanos(nanos); 94 } else { 95 long delay = first.getDelay(NANOSECONDS); 96 if (delay <= 0) 97 return q.poll(); 98 if (nanos <= 0) 99 return null; 100 first = null; // don't retain ref while waiting 101 if (nanos < delay || leader != null) 102 nanos = available.awaitNanos(nanos); 103 else { 104 Thread thisThread = Thread.currentThread(); 105 leader = thisThread; 106 try { 107 long timeLeft = available.awaitNanos(delay); 108 nanos -= delay - timeLeft; 109 } finally { 110 if (leader == thisThread) 111 leader = null; 112 } 113 } 114 } 115 } 116 } finally { 117 if (leader == null && q.peek() != null) 118 available.signal(); 119 lock.unlock(); 120 } 121 } 122 // 123 public E peek() { 124 final ReentrantLock lock = this.lock; 125 lock.lock(); 126 try { 127 return q.peek(); 128 } finally { 129 lock.unlock(); 130 } 131 } 132 133 134 }