鎖是用來作併發最簡單的方式,固然其代價也是最高的。內核態的鎖的時候須要操做系統進行一次上下文切換,加鎖、釋放鎖會致使比較多的上下文切換和調度延時,等待鎖的線程會被掛起直至鎖釋放。在上下文切換的時候,cpu以前緩存的指令和數據都將失效,對性能有很大的損失。用戶態的鎖雖然避免了這些問題,可是其實它們只是在沒有真實的競爭時纔有效。html
Java在JDK1.5以前都是靠synchronized關鍵字保證同步的,這種經過使用一致的鎖定協議來協調對共享狀態的訪問,能夠確保不管哪一個線程持有守護變量的鎖,都採用獨佔的方式來訪問這些變量,若是出現多個線程同時訪問鎖,那第一些線線程將被掛起,當線程恢復執行時,必須等待其它線程執行完他們的時間片之後才能被調度執行,在掛起和恢復執行過程當中存在着很大的開銷。鎖還存在着其它一些缺點,當一個線程正在等待鎖時,它不能作任何事。若是一個線程在持有鎖的狀況下被延遲執行,那麼全部須要這個鎖的線程都沒法執行下去。若是被阻塞的線程優先級高,而持有鎖的線程優先級低,將會致使優先級反轉(Priority Inversion)。java
獨佔鎖是一種悲觀鎖,synchronized就是一種獨佔鎖,它假設最壞的狀況,而且只有在確保其它線程不會形成干擾的狀況下執行,會致使其它全部須要鎖的線程掛起,等待持有鎖的線程釋放鎖。而另外一個更加有效的鎖就是樂觀鎖。所謂樂觀鎖就是,每次不加鎖而是假設沒有衝突而去完成某項操做,若是由於衝突失敗就重試,直到成功爲止。node
與鎖相比,volatile變量是一和更輕量級的同步機制,由於在使用這些變量時不會發生上下文切換和線程調度等操做,可是volatile變量也存在一些侷限:不能用於構建原子的複合操做,所以當一個變量依賴舊值時就不能使用volatile變量。(參考:談談volatiile)nginx
原子操做指的是在一步以內就完成並且不能被中斷。原子操做在多線程環境中是線程安全的,無需考慮同步的問題。在java中,下列操做是原子操做:git
問題來了,爲何long型賦值不是原子操做呢?例如:github
long foo = 65465498L;
實時上java會分兩步寫入這個long變量,先寫32位,再寫後32位。這樣就線程不安全了。若是改爲下面的就線程安全了:redis
private volatile long foo;
由於volatile內部已經作了synchronized.算法
要實現無鎖(lock-free)的非阻塞算法有多種實現方法,其中 CAS(比較與交換,Compare and swap) 是一種有名的無鎖算法。CAS, CPU指令,在大多數處理器架構,包括IA3二、Space中採用的都是CAS指令,CAS的語義是「我認爲V的值應該爲A,若是是,那麼將V的值更新爲B,不然不修改並告訴V的值實際爲多少」,CAS是項 樂觀鎖 技術,當多個線程嘗試使用CAS同時更新同一個變量時,只有其中一個線程能更新變量的值,而其它線程都失敗,失敗的線程並不會被掛起,而是被告知此次競爭中失敗,並能夠再次嘗試。CAS有3個操做數,內存值V,舊的預期值A,要修改的新值B。當且僅當預期值A和內存值V相同時,將內存值V修改成B,不然什麼都不作。CAS無鎖算法的C實現以下:數據庫
int compare_and_swap (int* reg, int oldval, int newval) { ATOMIC(); int old_reg_val = *reg; if (old_reg_val == oldval) *reg = newval; END_ATOMIC(); return old_reg_val; }
CAS比較與交換的僞代碼能夠表示爲:編程
do{
備份舊數據;
基於舊數據構造新數據;
}while(!CAS( 內存地址,備份的舊數據,新數據 ))
就是指當二者進行比較時,若是相等,則證實共享數據沒有被修改,替換成新值,而後繼續往下運行;若是不相等,說明共享數據已經被修改,放棄已經所作的操做,而後從新執行剛纔的操做。容易看出 CAS 操做是基於共享數據不會被修改的假設,採用了相似於數據庫的 commit-retry 的模式。當同步衝突出現的機會不多時,這種假設能帶來較大的性能提高。
在JDK1.5以前,若是不編寫明確的代碼就沒法執行CAS操做,在JDK1.5中引入了底層的支持,在int、long和對象的引用等類型上都公開了CAS的操做,而且JVM把它們編譯爲底層硬件提供的最有效的方法,在運行CAS的平臺上,運行時把它們編譯爲相應的機器指令,若是處理器/CPU不支持CAS指令,那麼JVM將使用自旋鎖。所以,值得注意的是, CAS解決方案與平臺/編譯器緊密相關(好比x86架構下其對應的彙編指令是lock cmpxchg,若是想要64Bit的交換,則應使用lock cmpxchg8b。在.NET中咱們可使用Interlocked.CompareExchange函數) 。
在原子類變量中,如java.util.concurrent.atomic中的AtomicXXX,都使用了這些底層的JVM支持爲數字類型的引用類型提供一種高效的CAS操做,而在java.util.concurrent中的大多數類在實現時都直接或間接的使用了這些原子變量類。
Java 1.6中AtomicLong.incrementAndGet()的實現源碼爲:
1: /* 2: * Written by Doug Lea with assistance from members of JCP JSR-166 3: * Expert Group and released to the public domain, as explained at 4: * http://creativecommons.org/licenses/publicdomain 5: */ 6: 7: package java.util.concurrent.atomic; 8: import sun.misc.Unsafe; 9: 10: /** 11: * A <tt>long</tt> value that may be updated atomically. See the 12: * {@link java.util.concurrent.atomic} package specification for 13: * description of the properties of atomic variables. An 14: * <tt>AtomicLong</tt> is used in applications such as atomically 15: * incremented sequence numbers, and cannot be used as a replacement 16: * for a {@link java.lang.Long}. However, this class does extend 17: * <tt>Number</tt> to allow uniform access by tools and utilities that 18: * deal with numerically-based classes. 19: * 20: * @since 1.5 21: * @author Doug Lea 22: */ 23: public class AtomicLong extends Number implements java.io.Serializable { 24: private static final long serialVersionUID = 1927816293512124184L; 25: 26: // setup to use Unsafe.compareAndSwapLong for updates 27: private static final Unsafe unsafe = Unsafe.getUnsafe(); 28: private static final long valueOffset; 29: 30: /** 31: * Records whether the underlying JVM supports lockless 32: * CompareAndSet for longs. While the unsafe.CompareAndSetLong 33: * method works in either case, some constructions should be 34: * handled at Java level to avoid locking user-visible locks. 35: */ 36: static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8(); 37: 38: /** 39: * Returns whether underlying JVM supports lockless CompareAndSet 40: * for longs. Called only once and cached in VM_SUPPORTS_LONG_CAS. 41: */ 42: private static native boolean VMSupportsCS8(); 43: 44: static { 45: try { 46: valueOffset = unsafe.objectFieldOffset 47: (AtomicLong.class.getDeclaredField("value")); 48: } catch (Exception ex) { throw new Error(ex); } 49: } 50: 51: private volatile long value; 52: 53: /** 54: * Creates a new AtomicLong with the given initial value. 55: * 56: * @param initialValue the initial value 57: */ 58: public AtomicLong(long initialValue) { 59: value = initialValue; 60: } 61: 62: /** 63: * Creates a new AtomicLong with initial value <tt>0</tt>. 64: */ 65: public AtomicLong() { 66: } 67: 68: /** 69: * Gets the current value. 70: * 71: * @return the current value 72: */ 73: public final long get() { 74: return value; 75: } 76: 77: /** 78: * Sets to the given value. 79: * 80: * @param newValue the new value 81: */ 82: public final void set(long newValue) { 83: value = newValue; 84: } 85: 86: /** 87: * Eventually sets to the given value. 88: * 89: * @param newValue the new value 90: * @since 1.6 91: */ 92: public final void lazySet(long newValue) { 93: unsafe.putOrderedLong(this, valueOffset, newValue); 94: } 95: 96: /** 97: * Atomically sets to the given value and returns the old value. 98: * 99: * @param newValue the new value 100: * @return the previous value 101: */ 102: public final long getAndSet(long newValue) { 103: while (true) { 104: long current = get(); 105: if (compareAndSet(current, newValue)) 106: return current; 107: } 108: } 109: 110: /** 111: * Atomically sets the value to the given updated value 112: * if the current value <tt>==</tt> the expected value. 113: * 114: * @param expect the expected value 115: * @param update the new value 116: * @return true if successful. False return indicates that 117: * the actual value was not equal to the expected value. 118: */ 119: public final boolean compareAndSet(long expect, long update) { 120: return unsafe.compareAndSwapLong(this, valueOffset, expect, update); 121: } 122: 123: /** 124: * Atomically sets the value to the given updated value 125: * if the current value <tt>==</tt> the expected value. 126: * May fail spuriously and does not provide ordering guarantees, 127: * so is only rarely an appropriate alternative to <tt>compareAndSet</tt>. 128: * 129: * @param expect the expected value 130: * @param update the new value 131: * @return true if successful. 132: */ 133: public final boolean weakCompareAndSet(long expect, long update) { 134: return unsafe.compareAndSwapLong(this, valueOffset, expect, update); 135: } 136: 137: /** 138: * Atomically increments by one the current value. 139: * 140: * @return the previous value 141: */ 142: public final long getAndIncrement() { 143: while (true) { 144: long current = get(); 145: long next = current + 1; 146: if (compareAndSet(current, next)) 147: return current; 148: } 149: } 150: 151: /** 152: * Atomically decrements by one the current value. 153: * 154: * @return the previous value 155: */ 156: public final long getAndDecrement() { 157: while (true) { 158: long current = get(); 159: long next = current - 1; 160: if (compareAndSet(current, next)) 161: return current; 162: } 163: } 164: 165: /** 166: * Atomically adds the given value to the current value. 167: * 168: * @param delta the value to add 169: * @return the previous value 170: */ 171: public final long getAndAdd(long delta) { 172: while (true) { 173: long current = get(); 174: long next = current + delta; 175: if (compareAndSet(current, next)) 176: return current; 177: } 178: } 179: 180: /** 181: * Atomically increments by one the current value. 182: * 183: * @return the updated value 184: */ 185: public final long incrementAndGet() { 186: for (;;) { 187: long current = get(); 188: long next = current + 1; 189: if (compareAndSet(current, next)) 190: return next; 191: } 192: } 193: 194: /** 195: * Atomically decrements by one the current value. 196: * 197: * @return the updated value 198: */ 199: public final long decrementAndGet() { 200: for (;;) { 201: long current = get(); 202: long next = current - 1; 203: if (compareAndSet(current, next)) 204: return next; 205: } 206: } 207: 208: /** 209: * Atomically adds the given value to the current value. 210: * 211: * @param delta the value to add 212: * @return the updated value 213: */ 214: public final long addAndGet(long delta) { 215: for (;;) { 216: long current = get(); 217: long next = current + delta; 218: if (compareAndSet(current, next)) 219: return next; 220: } 221: } 222: 223: /** 224: * Returns the String representation of the current value. 225: * @return the String representation of the current value. 226: */ 227: public String toString() { 228: return Long.toString(get()); 229: } 230: 231: 232: public int intValue() { 233: return (int)get(); 234: } 235: 236: public long longValue() { 237: return (long)get(); 238: } 239: 240: public float floatValue() { 241: return (float)get(); 242: } 243: 244: public double doubleValue() { 245: return (double)get(); 246: } 247: 248: }
因而可知,AtomicLong.incrementAndGet的實現用了樂觀鎖技術,調用了 sun.misc.Unsafe 類庫裏面的 CAS算法,用CPU指令來實現無鎖自增。因此,AtomicLong.incrementAndGet的自增比用synchronized的鎖效率倍增。
public final int getAndIncrement() { for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return current; } } public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); }
下面是測試代碼:能夠看到用AtomicLong.incrementAndGet的性能比用synchronized高出幾倍。
package console; import java.util.concurrent.atomic.AtomicLong; public class main { /** * @param args */ public static void main(String[] args) { System.out.println("START -- "); calc(); calcSynchro(); calcAtomic(); testThreadsSync(); testThreadsAtomic(); testThreadsSync2(); testThreadsAtomic2(); System.out.println("-- FINISHED "); } private static void calc() { stopwatch sw = new stopwatch(); sw.start(); long val = 0; while (val < 10000000L) { val++; } sw.stop(); long milSecds = sw.getElapsedTime(); System.out.println(" calc() elapsed (ms): " + milSecds); } private static void calcSynchro() { stopwatch sw = new stopwatch(); sw.start(); long val = 0; while (val < 10000000L) { synchronized (main.class) { val++; } } sw.stop(); long milSecds = sw.getElapsedTime(); System.out.println(" calcSynchro() elapsed (ms): " + milSecds); } private static void calcAtomic() { stopwatch sw = new stopwatch(); sw.start(); AtomicLong val = new AtomicLong(0); while (val.incrementAndGet() < 10000000L) { } sw.stop(); long milSecds = sw.getElapsedTime(); System.out.println(" calcAtomic() elapsed (ms): " + milSecds); } private static void testThreadsSync(){ stopwatch sw = new stopwatch(); sw.start(); Thread t1 = new Thread(new LoopSync()); t1.start(); Thread t2 = new Thread(new LoopSync()); t2.start(); while (t1.isAlive() || t2.isAlive()) { } sw.stop(); long milSecds = sw.getElapsedTime(); System.out.println(" testThreadsSync() 1 thread elapsed (ms): " + milSecds); } private static void testThreadsAtomic(){ stopwatch sw = new stopwatch(); sw.start(); Thread t1 = new Thread(new LoopAtomic()); t1.start(); Thread t2 = new Thread(new LoopAtomic()); t2.start(); while (t1.isAlive() || t2.isAlive()) { } sw.stop(); long milSecds = sw.getElapsedTime(); System.out.println(" testThreadsAtomic() 1 thread elapsed (ms): " + milSecds); } private static void testThreadsSync2(){ stopwatch sw = new stopwatch(); sw.start(); Thread t1 = new Thread(new LoopSync()); t1.start(); Thread t2 = new Thread(new LoopSync()); t2.start(); while (t1.isAlive() || t2.isAlive()) { } sw.stop(); long milSecds = sw.getElapsedTime(); System.out.println(" testThreadsSync() 2 threads elapsed (ms): " + milSecds); } private static void testThreadsAtomic2(){ stopwatch sw = new stopwatch(); sw.start(); Thread t1 = new Thread(new LoopAtomic()); t1.start(); Thread t2 = new Thread(new LoopAtomic()); t2.start(); while (t1.isAlive() || t2.isAlive()) { } sw.stop(); long milSecds = sw.getElapsedTime(); System.out.println(" testThreadsAtomic() 2 threads elapsed (ms): " + milSecds); } private static class LoopAtomic implements Runnable { public void run() { AtomicLong val = new AtomicLong(0); while (val.incrementAndGet() < 10000000L) { } } } private static class LoopSync implements Runnable { public void run() { long val = 0; while (val < 10000000L) { synchronized (main.class) { val++; } } } } } public class stopwatch { private long startTime = 0; private long stopTime = 0; private boolean running = false; public void start() { this.startTime = System.currentTimeMillis(); this.running = true; } public void stop() { this.stopTime = System.currentTimeMillis(); this.running = false; } public long getElapsedTime() { long elapsed; if (running) { elapsed = (System.currentTimeMillis() - startTime); } else { elapsed = (stopTime - startTime); } return elapsed; } public long getElapsedTimeSecs() { long elapsed; if (running) { elapsed = ((System.currentTimeMillis() - startTime) / 1000); } else { elapsed = ((stopTime - startTime) / 1000); } return elapsed; } // sample usage // public static void main(String[] args) { // StopWatch s = new StopWatch(); // s.start(); // //code you want to time goes here // s.stop(); // System.out.println("elapsed time in milliseconds: " + // s.getElapsedTime()); // } }
下面是比非阻塞自增稍微複雜一點的CAS的例子:非阻塞堆棧/ ConcurrentStack
。 ConcurrentStack
中的 push()
和 pop()
操做在結構上與 NonblockingCounter
上類似,只是作的工做有些冒險,但願在 「提交」 工做的時候,底層假設沒有失效。 push()
方法觀察當前最頂的節點,構建一個新節點放在堆棧上,而後,若是最頂端的節點在初始觀察以後沒有變化,那麼就安裝新節點。若是 CAS 失敗,意味着另外一個線程已經修改了堆棧,那麼過程就會從新開始。
public class ConcurrentStack<E> { AtomicReference<Node<E>> head = new AtomicReference<Node<E>>(); public void push(E item) { Node<E> newHead = new Node<E>(item); Node<E> oldHead; do { oldHead = head.get(); newHead.next = oldHead; } while (!head.compareAndSet(oldHead, newHead)); } public E pop() { Node<E> oldHead; Node<E> newHead; do { oldHead = head.get(); if (oldHead == null) return null; newHead = oldHead.next; } while (!head.compareAndSet(oldHead,newHead)); return oldHead.item; } static class Node<E> { final E item; Node<E> next; public Node(E item) { this.item = item; } } }
在輕度到中度的爭用狀況下,非阻塞算法的性能會超越阻塞算法,由於 CAS 的多數時間都在第一次嘗試時就成功,而發生爭用時的開銷也不涉及線程掛起和上下文切換,只多了幾個循環迭代。沒有爭用的 CAS 要比沒有爭用的鎖便宜得多(這句話確定是真的,由於沒有爭用的鎖涉及 CAS 加上額外的處理),而爭用的 CAS 比爭用的鎖獲取涉及更短的延遲。
在高度爭用的狀況下(即有多個線程不斷爭用一個內存位置的時候),基於鎖的算法開始提供比非阻塞算法更好的吞吐率,由於當線程阻塞時,它就會中止爭用,耐心地等候輪到本身,從而避免了進一步爭用。可是,這麼高的爭用程度並不常見,由於多數時候,線程會把線程本地的計算與爭用共享數據的操做分開,從而給其餘線程使用共享數據的機會。
以上的示例(自增計數器和堆棧)都是很是簡單的非阻塞算法,一旦掌握了在循環中使用 CAS,就能夠容易地模仿它們。對於更復雜的數據結構,非阻塞算法要比這些簡單示例複雜得多,由於修改鏈表、樹或哈希表可能涉及對多個指針的更新。CAS 支持對單一指針的原子性條件更新,可是不支持兩個以上的指針。因此,要構建一個非阻塞的鏈表、樹或哈希表,須要找到一種方式,能夠用 CAS 更新多個指針,同時不會讓數據結構處於不一致的狀態。
在鏈表的尾部插入元素,一般涉及對兩個指針的更新:「尾」 指針老是指向列表中的最後一個元素,「下一個」 指針從過去的最後一個元素指向新插入的元素。由於須要更新兩個指針,因此須要兩個 CAS。在獨立的 CAS 中更新兩個指針帶來了兩個須要考慮的潛在問題:若是第一個 CAS 成功,而第二個 CAS 失敗,會發生什麼?若是其餘線程在第一個和第二個 CAS 之間企圖訪問鏈表,會發生什麼?
對於非複雜數據結構,構建非阻塞算法的 「技巧」 是確保數據結構總處於一致的狀態(甚至包括在線程開始修改數據結構和它完成修改之間),還要確保其餘線程不只可以判斷出第一個線程已經完成了更新仍是處在更新的中途,還可以判斷出若是第一個線程走向 AWOL,完成更新還須要什麼操做。若是線程發現了處在更新中途的數據結構,它就能夠 「幫助」 正在執行更新的線程完成更新,而後再進行本身的操做。當第一個線程回來試圖完成本身的更新時,會發現再也不須要了,返回便可,由於 CAS 會檢測到幫助線程的干預(在這種狀況下,是建設性的干預)。
這種 「幫助鄰居」 的要求,對於讓數據結構免受單個線程失敗的影響,是必需的。若是線程發現數據結構正處在被其餘線程更新的中途,而後就等候其餘線程完成更新,那麼若是其餘線程在操做中途失敗,這個線程就可能永遠等候下去。即便不出現故障,這種方式也會提供糟糕的性能,由於新到達的線程必須放棄處理器,致使上下文切換,或者等到本身的時間片過時(而這更糟)。
public class LinkedQueue <E> { private static class Node <E> { final E item; final AtomicReference<Node<E>> next; Node(E item, Node<E> next) { this.item = item; this.next = new AtomicReference<Node<E>>(next); } } private AtomicReference<Node<E>> head = new AtomicReference<Node<E>>(new Node<E>(null, null)); private AtomicReference<Node<E>> tail = head; public boolean put(E item) { Node<E> newNode = new Node<E>(item, null); while (true) { Node<E> curTail = tail.get(); Node<E> residue = curTail.next.get(); if (curTail == tail.get()) { if (residue == null) /* A */ { if (curTail.next.compareAndSet(null, newNode)) /* C */ { tail.compareAndSet(curTail, newNode) /* D */ ; return true; } } else { tail.compareAndSet(curTail, residue) /* B */; } } } } }
具體算法相見 IBM Developerworks
Java5中的ConcurrentHashMap,線程安全,設計巧妙,用桶粒度的鎖,避免了put和get中對整個map的鎖定,尤爲在get中,只對一個HashEntry作鎖定操做,性能提高是顯而易見的。
具體實現中使用了鎖分離機制,在 這個帖子 中有很是詳細的討論。 這裏 有關於Java內存模型結合ConcurrentHashMap的分析。如下是JDK6的ConcurrentHashMap的源碼:
/* * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ /* * This file is available under and governed by the GNU General Public * License version 2 only, as published by the Free Software Foundation. * However, the following notice accompanied the original version of this * file: * * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at * http://creativecommons.org/licenses/publicdomain */ package java.util.concurrent; import java.util.concurrent.locks.*; import java.util.*; import java.io.Serializable; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.ObjectStreamField; /** * A hash table supporting full concurrency of retrievals and * adjustable expected concurrency for updates. This class obeys the * same functional specification as {@link java.util.Hashtable}, and * includes versions of methods corresponding to each method of * <tt>Hashtable</tt>. However, even though all operations are * thread-safe, retrieval operations do <em>not</em> entail locking, * and there is <em>not</em> any support for locking the entire table * in a way that prevents all access. This class is fully * interoperable with <tt>Hashtable</tt> in programs that rely on its * thread safety but not on its synchronization details. * * <p> Retrieval operations (including <tt>get</tt>) generally do not * block, so may overlap with update operations (including * <tt>put</tt> and <tt>remove</tt>). Retrievals reflect the results * of the most recently <em>completed</em> update operations holding * upon their onset. For aggregate operations such as <tt>putAll</tt> * and <tt>clear</tt>, concurrent retrievals may reflect insertion or * removal of only some entries. Similarly, Iterators and * Enumerations return elements reflecting the state of the hash table * at some point at or since the creation of the iterator/enumeration. * They do <em>not</em> throw {@link ConcurrentModificationException}. * However, iterators are designed to be used by only one thread at a time. * * <p> The allowed concurrency among update operations is guided by * the optional <tt>concurrencyLevel</tt> constructor argument * (default <tt>16</tt>), which is used as a hint for internal sizing. The * table is internally partitioned to try to permit the indicated * number of concurrent updates without contention. Because placement * in hash tables is essentially random, the actual concurrency will * vary. Ideally, you should choose a value to accommodate as many * threads as will ever concurrently modify the table. Using a * significantly higher value than you need can waste space and time, * and a significantly lower value can lead to thread contention. But * overestimates and underestimates within an order of magnitude do * not usually have much noticeable impact. A value of one is * appropriate when it is known that only one thread will modify and * all others will only read. Also, resizing this or any other kind of * hash table is a relatively slow operation, so, when possible, it is * a good idea to provide estimates of expected table sizes in * constructors. * * <p>This class and its views and iterators implement all of the * <em>optional</em> methods of the {@link Map} and {@link Iterator} * interfaces. * * <p> Like {@link Hashtable} but unlike {@link HashMap}, this class * does <em>not</em> allow <tt>null</tt> to be used as a key or value. * * <p>This class is a member of the * <a href="{@docRoot}/../technotes/guides/collections/index.html"> * Java Collections Framework</a>. * * @since 1.5 * @author Doug Lea * @param <K> the type of keys maintained by this map * @param <V> the type of mapped values */ public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { private static final long serialVersionUID = 7249069246763182397L; /* * The basic strategy is to subdivide the table among Segments, * each of which itself is a concurrently readable hash table. To * reduce footprint, all but one segments are constructed only * when first needed (see ensureSegment). To maintain visibility * in the presence of lazy construction, accesses to segments as * well as elements of segment's table must use volatile access, * which is done via Unsafe within methods segmentAt etc * below. These provide the functionality of AtomicReferenceArrays * but reduce the levels of indirection. Additionally, * volatile-writes of table elements and entry "next" fields * within locked operations use the cheaper "lazySet" forms of * writes (via putOrderedObject) because these writes are always * followed by lock releases that maintain sequential consistency * of table updates. * * Historical note: The previous version of this class relied * heavily on "final" fields, which avoided some volatile reads at * the expense of a large initial footprint. Some remnants of * that design (including forced construction of segment 0) exist * to ensure serialization compatibility. */ /* ---------------- Constants -------------- */ /** * The default initial capacity for this table, * used when not otherwise specified in a constructor. */ static final int DEFAULT_INITIAL_CAPACITY = 16; /** * The default load factor for this table, used when not * otherwise specified in a constructor. */ static final float DEFAULT_LOAD_FACTOR = 0.75f; /** * The default concurrency level for this table, used when not * otherwise specified in a constructor. */ static final int DEFAULT_CONCURRENCY_LEVEL = 16; /** * The maximum capacity, used if a higher value is implicitly * specified by either of the constructors with arguments. MUST * be a power of two <= 1<<30 to ensure that entries are indexable * using ints. */ static final int MAXIMUM_CAPACITY = 1 << 30; /** * The minimum capacity for per-segment tables. Must be a power * of two, at least two to avoid immediate resizing on next use * after lazy construction. */ static final int MIN_SEGMENT_TABLE_CAPACITY = 2; /** * The maximum number of segments to allow; used to bound * constructor arguments. Must be power of two less than 1 << 24. */ static final int MAX_SEGMENTS = 1 << 16; // slightly conservative /** * Number of unsynchronized retries in size and containsValue * methods before resorting to locking. This is used to avoid * unbounded retries if tables undergo continuous modification * which would make it impossible to obtain an accurate result. */ static final int RETRIES_BEFORE_LOCK = 2; /* ---------------- Fields -------------- */ /** * Mask value for indexing into segments. The upper bits of a * key's hash code are used to choose the segment. */ final int segmentMask; /** * Shift value for indexing within segments. */ final int segmentShift; /** * The segments, each of which is a specialized hash table. */ final Segment<K,V>[] segments; transient Set<K> keySet; transient Set<Map.Entry<K,V>> entrySet; transient Collection<V> values; /** * ConcurrentHashMap list entry. Note that this is never exported * out as a user-visible Map.Entry. */ static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; HashEntry(int hash, K key, V value, HashEntry<K,V> next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } /** * Sets next field with volatile write semantics. (See above * about use of putOrderedObject.) */ final void setNext(HashEntry<K,V> n) { UNSAFE.putOrderedObject(this, nextOffset, n); } // Unsafe mechanics static final sun.misc.Unsafe UNSAFE; static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = HashEntry.class; nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } } /** * Gets the ith element of given table (if nonnull) with volatile * read semantics. */ @SuppressWarnings("unchecked") static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) { return (tab == null) ? null : (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)i << TSHIFT) + TBASE); } /** * Sets the ith element of given table, with volatile write * semantics. (See above about use of putOrderedObject.) */ static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i, HashEntry<K,V> e) { UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e); } /** * Applies a supplemental hash function to a given hashCode, which * defends against poor quality hash functions. This is critical * because ConcurrentHashMap uses power-of-two length hash tables, * that otherwise encounter collisions for hashCodes that do not * differ in lower or upper bits. */ private static int hash(int h) { // Spread bits to regularize both segment and index locations, // using variant of single-word Wang/Jenkins hash. h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16); } /** * Segments are specialized versions of hash tables. This * subclasses from ReentrantLock opportunistically, just to * simplify some locking and avoid separate construction. */ static final class Segment<K,V> extends ReentrantLock implements Serializable { /* * Segments maintain a table of entry lists that are always * kept in a consistent state, so can be read (via volatile * reads of segments and tables) without locking. This * requires replicating nodes when necessary during table * resizing, so the old lists can be traversed by readers * still using old version of table. * * This class defines only mutative methods requiring locking. * Except as noted, the methods of this class perform the * per-segment versions of ConcurrentHashMap methods. (Other * methods are integrated directly into ConcurrentHashMap * methods.) These mutative methods use a form of controlled * spinning on contention via methods scanAndLock and * scanAndLockForPut. These intersperse tryLocks with * traversals to locate nodes. The main benefit is to absorb * cache misses (which are very common for hash tables) while * obtaining locks so that traversal is faster once * acquired. We do not actually use the found nodes since they * must be re-acquired under lock anyway to ensure sequential * consistency of updates (and in any case may be undetectably * stale), but they will normally be much faster to re-locate. * Also, scanAndLockForPut speculatively creates a fresh node * to use in put if no node is found. */ private static final long serialVersionUID = 2249069246763182397L; /** * The maximum number of times to tryLock in a prescan before * possibly blocking on acquire in preparation for a locked * segment operation. On multiprocessors, using a bounded * number of retries maintains cache acquired while locating * nodes. */ static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; /** * The per-segment table. Elements are accessed via * entryAt/setEntryAt providing volatile semantics. */ transient volatile HashEntry<K,V>[] table; /** * The number of elements. Accessed only either within locks * or among other volatile reads that maintain visibility. */ transient int count; /** * The total number of mutative operations in this segment. * Even though this may overflows 32 bits, it provides * sufficient accuracy for stability checks in CHM isEmpty() * and size() methods. Accessed only either within locks or * among other volatile reads that maintain visibility. */ transient int modCount; /** * The table is rehashed when its size exceeds this threshold. * (The value of this field is always <tt>(int)(capacity * * loadFactor)</tt>.) */ transient int threshold; /** * The load factor for the hash table. Even though this value * is same for all segments, it is replicated to avoid needing * links to outer object. * @serial */ final float loadFactor; Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } final V put(K key, int hash, V value, boolean onlyIfAbsent) { HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; if (c > threshold && first != null && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; } /** * Doubles size of table and repacks entries, also adding the * given node to new table */ @SuppressWarnings("unchecked") private void rehash(HashEntry<K,V> node) { /* * Reclassify nodes in each list to new table. Because we * are using power-of-two expansion, the elements from * each bin must either stay at same index, or move with a * power of two offset. We eliminate unnecessary node * creation by catching cases where old nodes can be * reused because their next fields won't change. * Statistically, at the default threshold, only about * one-sixth of them need cloning when a table * doubles. The nodes they replace will be garbage * collectable as soon as they are no longer referenced by * any reader thread that may be in the midst of * concurrently traversing table. Entry accesses use plain * array indexing because they are followed by volatile * table write. */ HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1; threshold = (int)(newCapacity * loadFactor); HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; int idx = e.hash & sizeMask; if (next == null) // Single node on list newTable[idx] = e; else { // Reuse consecutive sequence at same slot HashEntry<K,V> lastRun = e; int lastIdx = idx; for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // Clone remaining nodes for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; } /** * Scans for a node containing given key while trying to * acquire lock, creating and returning one if not found. Upon * return, guarantees that lock is held. UNlike in most * methods, calls to method equals are not screened: Since * traversal speed doesn't matter, we might as well help warm * up the associated code and accesses as well. * * @return a new node if key not found, else null */ private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // negative while locating node while (!tryLock()) { HashEntry<K,V> f; // to recheck first below if (retries < 0) { if (e == null) { if (node == null) // speculatively create node node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } else if (key.equals(e.key)) retries = 0; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node; } /** * Scans for a node containing the given key while trying to * acquire lock for a remove or replace operation. Upon * return, guarantees that lock is held. Note that we must * lock even if the key is not found, to ensure sequential * consistency of updates. */ private void scanAndLock(Object key, int hash) { // similar to but simpler than scanAndLockForPut HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; int retries = -1; while (!tryLock()) { HashEntry<K,V> f; if (retries < 0) { if (e == null || key.equals(e.key)) retries = 0; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; retries = -1; } } } /** * Remove; match on key only if value null, else match both. */ final V remove(Object key, int hash, Object value) { if (!tryLock()) scanAndLock(key, hash); V oldValue = null; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; HashEntry<K,V> e = entryAt(tab, index); HashEntry<K,V> pred = null; while (e != null) { K k; HashEntry<K,V> next = e.next; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { V v = e.value; if (value == null || value == v || value.equals(v)) { if (pred == null) setEntryAt(tab, index, next); else pred.setNext(next); ++modCount; --count; oldValue = v; } break; } pred = e; e = next; } } finally { unlock(); } return oldValue; } final boolean replace(K key, int hash, V oldValue, V newValue) { if (!tryLock()) scanAndLock(key, hash); boolean replaced = false; try { HashEntry<K,V> e; for (e = entryForHash(this, hash); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { if (oldValue.equals(e.value)) { e.value = newValue; ++modCount; replaced = true; } break; } } } finally { unlock(); } return replaced; } final V replace(K key, int hash, V value) { if (!tryLock()) scanAndLock(key, hash); V oldValue = null; try { HashEntry<K,V> e; for (e = entryForHash(this, hash); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; e.value = value; ++modCount; break; } } } finally { unlock(); } return oldValue; } final void clear() { lock(); try { HashEntry<K,V>[] tab = table; for (int i = 0; i < tab.length ; i++) setEntryAt(tab, i, null); ++modCount; count = 0; } finally { unlock(); } } } // Accessing segments /** * Gets the jth element of given segment array (if nonnull) with * volatile element access semantics via Unsafe. */ @SuppressWarnings("unchecked") static final <K,V> Segment<K,V> segmentAt(Segment<K,V>[] ss, int j) { long u = (j << SSHIFT) + SBASE; return ss == null ? null : (Segment<K,V>) UNSAFE.getObjectVolatile(ss, u); } /** * Returns the segment for the given index, creating it and * recording in segment table (via CAS) if not already present. * * @param k the index * @return the segment */ @SuppressWarnings("unchecked") private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { Segment<K,V> proto = ss[0]; // use segment 0 as prototype int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; } // Hash-based segment and entry accesses /** * Get the segment for the given hash */ @SuppressWarnings("unchecked") private Segment<K,V> segmentForHash(int h) { long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u); } /** * Gets the table entry for the given segment and hash */ @SuppressWarnings("unchecked") static final <K,V> HashEntry<K,V> entryForHash(Segment<K,V> seg, int h) { HashEntry<K,V>[] tab; return (seg == null || (tab = seg.table) == null) ? null : (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); } /* ---------------- Public operations -------------- */ /** * Creates a new, empty map with the specified initial * capacity, load factor and concurrency level. * * @param initialCapacity the initial capacity. The implementation * performs internal sizing to accommodate this many elements. * @param loadFactor the load factor threshold, used to control resizing. * Resizing may be performed when the average number of elements per * bin exceeds this threshold. * @param concurrencyLevel the estimated number of concurrently * updating threads. The implementation performs internal sizing * to try to accommodate this many threads. * @throws IllegalArgumentException if the initial capacity is * negative or the load factor or concurrencyLevel are * nonpositive. */ @SuppressWarnings("unchecked") public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; } /** * Creates a new, empty map with the specified initial capacity * and load factor and with the default concurrencyLevel (16). * * @param initialCapacity The implementation performs internal * sizing to accommodate this many elements. * @param loadFactor the load factor threshold, used to control resizing. * Resizing may be performed when the average number of elements per * bin exceeds this threshold. * @throws IllegalArgumentException if the initial capacity of * elements is negative or the load factor is nonpositive * * @since 1.6 */ public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL); } /** * Creates a new, empty map with the specified initial capacity, * and with default load factor (0.75) and concurrencyLevel (16). * * @param initialCapacity the initial capacity. The implementation * performs internal sizing to accommodate this many elements. * @throws IllegalArgumentException if the initial capacity of * elements is negative. */ public ConcurrentHashMap(int initialCapacity) { this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); } /** * Creates a new, empty map with a default initial capacity (16), * load factor (0.75) and concurrencyLevel (16). */ public ConcurrentHashMap() { this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); } /** * Creates a new map with the same mappings as the given map. * The map is created with a capacity of 1.5 times the number * of mappings in the given map or 16 (whichever is greater), * and a default load factor (0.75) and concurrencyLevel (16). * * @param m the map */ public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1, DEFAULT_INITIAL_CAPACITY), DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); putAll(m); } /** * Returns <tt>true</tt> if this map contains no key-value mappings. * * @return <tt>true</tt> if this map contains no key-value mappings */ public boolean isEmpty() { /* * Sum per-segment modCounts to avoid mis-reporting when * elements are concurrently added and removed in one segment * while checking another, in which case the table was never * actually empty at any point. (The sum ensures accuracy up * through at least 1<<31 per-segment modifications before * recheck.) Methods size() and containsValue() use similar * constructions for stability checks. */ long sum = 0L; final Segment<K,V>[] segments = this.segments; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { if (seg.count != 0) return false; sum += seg.modCount; } } if (sum != 0L) { // recheck unless no modifications for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { if (seg.count != 0) return false; sum -= seg.modCount; } } if (sum != 0L) return false; } return true; } /** * Returns the number of key-value mappings in this map. If the * map contains more than <tt>Integer.MAX_VALUE</tt> elements, returns * <tt>Integer.MAX_VALUE</tt>. * * @return the number of key-value mappings in this map */ public int size() { // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. final Segment<K,V>[] segments = this.segments; int size; boolean overflow; // true if size overflows 32 bits long sum; // sum of modCounts long last = 0L; // previous sum int retries = -1; // first iteration isn't retry try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } if (sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; } /** * Returns the value to which the specified key is mapped, * or {@code null} if this map contains no mapping for the key. * * <p>More formally, if this map contains a mapping from a key * {@code k} to a value {@code v} such that {@code key.equals(k)}, * then this method returns {@code v}; otherwise it returns * {@code null}. (There can be at most one such mapping.) * * @throws NullPointerException if the specified key is null */ public V get(Object key) { int hash = hash(key.hashCode()); for (HashEntry<K,V> e = entryForHash(segmentForHash(hash), hash); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) return e.value; } return null; } /** * Tests if the specified object is a key in this table. * * @param key possible key * @return <tt>true</tt> if and only if the specified object * is a key in this table, as determined by the * <tt>equals</tt> method; <tt>false</tt> otherwise. * @throws NullPointerException if the specified key is null */ public boolean containsKey(Object key) { int hash = hash(key.hashCode()); for (HashEntry<K,V> e = entryForHash(segmentForHash(hash), hash); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) return true; } return false; } /** * Returns <tt>true</tt> if this map maps one or more keys to the * specified value. Note: This method requires a full internal * traversal of the hash table, and so is much slower than * method <tt>containsKey</tt>. * * @param value value whose presence in this map is to be tested * @return <tt>true</tt> if this map maps one or more keys to the * specified value * @throws NullPointerException if the specified value is null */ public boolean containsValue(Object value) { // Same idea as size() if (value == null) throw new NullPointerException(); final Segment<K,V>[] segments = this.segments; boolean found = false; long last = 0; int retries = -1; try { outer: for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } long hashSum = 0L; int sum = 0; for (int j = 0; j < segments.length; ++j) { HashEntry<K,V>[] tab; Segment<K,V> seg = segmentAt(segments, j); if (seg != null && (tab = seg.table) != null) { for (int i = 0 ; i < tab.length; i++) { HashEntry<K,V> e; for (e = entryAt(tab, i); e != null; e = e.next) { V v = e.value; if (v != null && value.equals(v)) { found = true; break outer; } } } sum += seg.modCount; } } if (retries > 0 && sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return found; } /** * Legacy method testing if some key maps into the specified value * in this table. This method is identical in functionality to * {@link #containsValue}, and exists solely to ensure * full compatibility with class {@link java.util.Hashtable}, * which supported this method prior to introduction of the * Java Collections framework. * @param value a value to search for * @return <tt>true</tt> if and only if some key maps to the * <tt>value</tt> argument in this table as * determined by the <tt>equals</tt> method; * <tt>false</tt> otherwise * @throws NullPointerException if the specified value is null */ public boolean contains(Object value) { return containsValue(value); } /** * Maps the specified key to the specified value in this table. * Neither the key nor the value can be null. * * <p> The value can be retrieved by calling the <tt>get</tt> method * with a key that is equal to the original key. * * @param key key with which the specified value is to be associated * @param value value to be associated with the specified key * @return the previous value associated with <tt>key</tt>, or * <tt>null</tt> if there was no mapping for <tt>key</tt> * @throws NullPointerException if the specified key or value is null */ public V put(K key, V value) { if (value == null) throw new NullPointerException(); int hash = hash(key.hashCode()); int j = (hash >>> segmentShift) & segmentMask; Segment<K,V> s = segmentAt(segments, j); if (s == null) s = ensureSegment(j); return s.put(key, hash, value, false); } /** * {@inheritDoc} * * @return the previous value associated with the specified key, * or <tt>null</tt> if there was no mapping for the key * @throws NullPointerException if the specified key or value is null */ public V putIfAbsent(K key, V value) { if (value == null) throw new NullPointerException(); int hash = hash(key.hashCode()); int j = (hash >>> segmentShift) & segmentMask; Segment<K,V> s = segmentAt(segments, j); if (s == null) s = ensureSegment(j); return s.put(key, hash, value, true); } /** * Copies all of the mappings from the specified map to this one. * These mappings replace any mappings that this map had for any of the * keys currently in the specified map. * * @param m mappings to be stored in this map */ public void putAll(Map<? extends K, ? extends V> m) { for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) put(e.getKey(), e.getValue()); } /** * Removes the key (and its corresponding value) from this map. * This method does nothing if the key is not in the map. * * @param key the key that needs to be removed * @return the previous value associated with <tt>key</tt>, or * <tt>null</tt> if there was no mapping for <tt>key</tt> * @throws NullPointerException if the specified key is null */ public V remove(Object key) { int hash = hash(key.hashCode()); Segment<K,V> s = segmentForHash(hash); return s == null ? null : s.remove(key, hash, null); } /** * {@inheritDoc} * * @throws NullPointerException if the specified key is null */ public boolean remove(Object key, Object value) { int hash = hash(key.hashCode()); Segment<K,V> s; return value != null && (s = segmentForHash(hash)) != null && s.remove(key, hash, value) != null; } /** * {@inheritDoc} * * @throws NullPointerException if any of the arguments are null */ public boolean replace(K key, V oldValue, V newValue) { int hash = hash(key.hashCode()); if (oldValue == null || newValue == null) throw new NullPointerException(); Segment<K,V> s = segmentForHash(hash); return s != null && s.replace(key, hash, oldValue, newValue); } /** * {@inheritDoc} * * @return the previous value associated with the specified key, * or <tt>null</tt> if there was no mapping for the key * @throws NullPointerException if the specified key or value is null */ public V replace(K key, V value) { int hash = hash(key.hashCode()); if (value == null) throw new NullPointerException(); Segment<K,V> s = segmentForHash(hash); return s == null ? null : s.replace(key, hash, value); } /** * Removes all of the mappings from this map. */ public void clear() { final Segment<K,V>[] segments = this.segments; for (int j = 0; j < segments.length; ++j) { Segment<K,V> s = segmentAt(segments, j); if (s != null) s.clear(); } } /** * Returns a {@link Set} view of the keys contained in this map. * The set is backed by the map, so changes to the map are * reflected in the set, and vice-versa. The set supports element * removal, which removes the corresponding mapping from this map, * via the <tt>Iterator.remove</tt>, <tt>Set.remove</tt>, * <tt>removeAll</tt>, <tt>retainAll</tt>, and <tt>clear</tt> * operations. It does not support the <tt>add</tt> or * <tt>addAll</tt> operations. * * <p>The view's <tt>iterator</tt> is a "weakly consistent" iterator * that will never throw {@link ConcurrentModificationException}, * and guarantees to traverse elements as they existed upon * construction of the iterator, and may (but is not guaranteed to) * reflect any modifications subsequent to construction. */ public Set<K> keySet() { Set<K> ks = keySet; return (ks != null) ? ks : (keySet = new KeySet()); } /** * Returns a {@link Collection} view of the values contained in this map. * The collection is backed by the map, so changes to the map are * reflected in the collection, and vice-versa. The collection * supports element removal, which removes the corresponding * mapping from this map, via the <tt>Iterator.remove</tt>, * <tt>Collection.remove</tt>, <tt>removeAll</tt>, * <tt>retainAll</tt>, and <tt>clear</tt> operations. It does not * support the <tt>add</tt> or <tt>addAll</tt> operations. * * <p>The view's <tt>iterator</tt> is a "weakly consistent" iterator * that will never throw {@link ConcurrentModificationException}, * and guarantees to traverse elements as they existed upon * construction of the iterator, and may (but is not guaranteed to) * reflect any modifications subsequent to construction. */ public Collection<V> values() { Collection<V> vs = values; return (vs != null) ? vs : (values = new Values()); } /** * Returns a {@link Set} view of the mappings contained in this map. * The set is backed by the map, so changes to the map are * reflected in the set, and vice-versa. The set supports element * removal, which removes the corresponding mapping from the map, * via the <tt>Iterator.remove</tt>, <tt>Set.remove</tt>, * <tt>removeAll</tt>, <tt>retainAll</tt>, and <tt>clear</tt> * operations. It does not support the <tt>add</tt> or * <tt>addAll</tt> operations. * * <p>The view's <tt>iterator</tt> is a "weakly consistent" iterator * that will never throw {@link ConcurrentModificationException}, * and guarantees to traverse elements as they existed upon * construction of the iterator, and may (but is not guaranteed to) * reflect any modifications subsequent to construction. */ public Set<Map.Entry<K,V>> entrySet() { Set<Map.Entry<K,V>> es = entrySet; return (es != null) ? es : (entrySet = new EntrySet()); } /** * Returns an enumeration of the keys in this table. * * @return an enumeration of the keys in this table * @see #keySet() */ public Enumeration<K> keys() { return new KeyIterator(); } /** * Returns an enumeration of the values in this table. * * @return an enumeration of the values in this table * @see #values() */ public Enumeration<V> elements() { return new ValueIterator(); } /* ---------------- Iterator Support -------------- */ abstract class HashIterator { int nextSegmentIndex; int nextTableIndex; HashEntry<K,V>[] currentTable; HashEntry<K, V> nextEntry; HashEntry<K, V> lastReturned; HashIterator() { nextSegmentIndex = segments.length - 1; nextTableIndex = -1; advance(); } /** * Set nextEntry to first node of next non-empty table * (in backwards order, to simplify checks). */ final void advance() { for (;;) { if (nextTableIndex >= 0) { if ((nextEntry = entryAt(currentTable, nextTableIndex--)) != null) break; } else if (nextSegmentIndex >= 0) { Segment<K,V> seg = segmentAt(segments, nextSegmentIndex--); if (seg != null && (currentTable = seg.table) != null) nextTableIndex = currentTable.length - 1; } else break; } } final HashEntry<K,V> nextEntry() { HashEntry<K,V> e = nextEntry; if (e == null) throw new NoSuchElementException(); lastReturned = e; // cannot assign until after null check if ((nextEntry = e.next) == null) advance(); return e; } public final boolean hasNext() { return nextEntry != null; } public final boolean hasMoreElements() { return nextEntry != null; } public final void remove() { if (lastReturned == null) throw new IllegalStateException(); ConcurrentHashMap.this.remove(lastReturned.key); lastReturned = null; } } final class KeyIterator extends HashIterator implements Iterator<K>, Enumeration<K> { public final K next() { return super.nextEntry().key; } public final K nextElement() { return super.nextEntry().key; } } final class ValueIterator extends HashIterator implements Iterator<V>, Enumeration<V> { public final V next() { return super.nextEntry().value; } public final V nextElement() { return super.nextEntry().value; } } /** * Custom Entry class used by EntryIterator.next(), that relays * setValue changes to the underlying map. */ final class WriteThroughEntry extends AbstractMap.SimpleEntry<K,V> { WriteThroughEntry(K k, V v) { super(k,v); } /** * Set our entry's value and write through to the map. The * value to return is somewhat arbitrary here. Since a * WriteThroughEntry does not necessarily track asynchronous * changes, the most recent "previous" value could be * different from what we return (or could even have been * removed in which case the put will re-establish). We do not * and cannot guarantee more. */ public V setValue(V value) { if (value == null) throw new NullPointerException(); V v = super.setValue(value); ConcurrentHashMap.this.put(getKey(), value); return v; } } final class EntryIterator extends HashIterator implements Iterator<Entry<K,V>> { public Map.Entry<K,V> next() { HashEntry<K,V> e = super.nextEntry(); return new WriteThroughEntry(e.key, e.value); } } final class KeySet extends AbstractSet<K> { public Iterator<K> iterator() { return new KeyIterator(); } public int size() { return ConcurrentHashMap.this.size(); } public boolean isEmpty() { return ConcurrentHashMap.this.isEmpty(); } public boolean contains(Object o) { return ConcurrentHashMap.this.containsKey(o); } public boolean remove(Object o) { return ConcurrentHashMap.this.remove(o) != null; } public void clear() { ConcurrentHashMap.this.clear(); } } final class Values extends AbstractCollection<V> { public Iterator<V> iterator() { return new ValueIterator(); } public int size() { return ConcurrentHashMap.this.size(); } public boolean isEmpty() { return ConcurrentHashMap.this.isEmpty(); } public boolean contains(Object o) { return ConcurrentHashMap.this.containsValue(o); } public void clear() { ConcurrentHashMap.this.clear(); } } final class EntrySet extends AbstractSet<Map.Entry<K,V>> { public Iterator<Map.Entry<K,V>> iterator() { return new EntryIterator(); } public boolean contains(Object o) { if (!(o instanceof Map.Entry)) return false; Map.Entry<?,?> e = (Map.Entry<?,?>)o; V v = ConcurrentHashMap.this.get(e.getKey()); return v != null && v.equals(e.getValue()); } public boolean remove(Object o) { if (!(o instanceof Map.Entry)) return false; Map.Entry<?,?> e = (Map.Entry<?,?>)o; return ConcurrentHashMap.this.remove(e.getKey(), e.getValue()); } public int size() { return ConcurrentHashMap.this.size(); } public boolean isEmpty() { return ConcurrentHashMap.this.isEmpty(); } public void clear() { ConcurrentHashMap.this.clear(); } } /* ---------------- Serialization Support -------------- */ /** * Save the state of the <tt>ConcurrentHashMap</tt> instance to a * stream (i.e., serialize it). * @param s the stream * @serialData * the key (Object) and value (Object) * for each key-value mapping, followed by a null pair. * The key-value mappings are emitted in no particular order. */ private void writeObject(java.io.ObjectOutputStream s) throws IOException { // force all segments for serialization compatibility for (int k = 0; k < segments.length; ++k) ensureSegment(k); s.defaultWriteObject(); final Segment<K,V>[] segments = this.segments; for (int k = 0; k < segments.length; ++k) { Segment<K,V> seg = segmentAt(segments, k); seg.lock(); try { HashEntry<K,V>[] tab = seg.table; for (int i = 0; i < tab.length; ++i) { HashEntry<K,V> e; for (e = entryAt(tab, i); e != null; e = e.next) { s.writeObject(e.key); s.writeObject(e.value); } } } finally { seg.unlock(); } } s.writeObject(null); s.writeObject(null); } /** * Reconstitute the <tt>ConcurrentHashMap</tt> instance from a * stream (i.e., deserialize it). * @param s the stream */ @SuppressWarnings("unchecked") private void readObject(java.io.ObjectInputStream s) throws IOException, ClassNotFoundException { // Don't call defaultReadObject() ObjectInputStream.GetField oisFields = s.readFields(); final Segment<K,V>[] oisSegments = (Segment<K,V>[])oisFields.get("segments", null); final int ssize = oisSegments.length; if (ssize < 1 || ssize > MAX_SEGMENTS || (ssize & (ssize-1)) != 0 ) // ssize not power of two throw new java.io.InvalidObjectException("Bad number of segments:" + ssize); int sshift = 0, ssizeTmp = ssize; while (ssizeTmp > 1) { ++sshift; ssizeTmp >>>= 1; } UNSAFE.putIntVolatile(this, SEGSHIFT_OFFSET, 32 - sshift); UNSAFE.putIntVolatile(this, SEGMASK_OFFSET, ssize - 1); UNSAFE.putObjectVolatile(this, SEGMENTS_OFFSET, oisSegments); // Re-initialize segments to be minimally sized, and let grow. int cap = MIN_SEGMENT_TABLE_CAPACITY; final Segment<K,V>[] segments = this.segments; for (int k = 0; k < segments.length; ++k) { Segment<K,V> seg = segments[k]; if (seg != null) { seg.threshold = (int)(cap * seg.loadFactor); seg.table = (HashEntry<K,V>[]) new HashEntry[cap]; } } // Read the keys and values, and put the mappings in the table for (;;) { K key = (K) s.readObject(); V value = (V) s.readObject(); if (key == null) break; put(key, value); } } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long SBASE; private static final int SSHIFT; private static final long TBASE; private static final int TSHIFT; private static final long SEGSHIFT_OFFSET; private static final long SEGMASK_OFFSET; private static final long SEGMENTS_OFFSET; static { int ss, ts; try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class tc = HashEntry[].class; Class sc = Segment[].class; TBASE = UNSAFE.arrayBaseOffset(tc); SBASE = UNSAFE.arrayBaseOffset(sc); ts = UNSAFE.arrayIndexScale(tc); ss = UNSAFE.arrayIndexScale(sc); SEGSHIFT_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segmentShift")); SEGMASK_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segmentMask")); SEGMENTS_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segments")); } catch (Exception e) { throw new Error(e); } if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0) throw new Error("data type scale not a power of two"); SSHIFT = 31 - Integer.numberOfLeadingZeros(ss); TSHIFT = 31 - Integer.numberOfLeadingZeros(ts); } }
高併發環境下要實現高吞吐量和線程安全,兩個思路:一個是用優化的鎖實現,一個是lock-free的無鎖結構。但非阻塞算法要比基於鎖的算法複雜得多。開發非阻塞算法是至關專業的訓練,並且要證實算法的正確也極爲困難,不只和具體的目標機器平臺和編譯器相關,並且須要複雜的技巧和嚴格的測試。雖然Lock-Free編程很是困難,可是它一般能夠帶來比基於鎖編程更高的吞吐量。因此Lock-Free編程是大有前途的技術。它在線程停止、優先級倒置以及信號安全等方面都有着良好的表現。
disruptor 無鎖消息隊列數據結構的類圖和 技術文檔下載
另外,在設計思路上除了儘可能減小資源爭用之外,還能夠借鑑 nginx/node.js 等單線程大循環的機制,用單線程或CPU數相同的線程開闢大的隊列,併發的時候任務壓入隊列,線程輪詢而後一個個順序執行。因爲每一個都採用異步I/O,沒有阻塞線程。這個大隊列可使用RabbitMQueue,或是JDK的同步隊列(性能稍差),或是使用 Disruptor無鎖隊列 (Java)。任務處理能夠所有放在內存(多級緩存、讀寫分離、ConcurrentHashMap、甚至分佈式緩存Redis)中進行增刪改查。最後用Quarz維護定時把緩存數據同步到DB中。固然,這只是中小型系統的思路,若是是大型分佈式系統會很是複雜,須要分而治理,用SOA的思路,參考 這篇文章的圖 。
若是深刻 JVM 和操做系統,會發現非阻塞算法無處不在。 垃圾收集器 使用非阻塞算法加快併發和平行的垃圾蒐集; 調度器 使用非阻塞算法有效地調度線程和進程,實現內在鎖。在 Mustang(Java 6.0)中,基於鎖的 SynchronousQueue
算法被新的非阻塞版本代替。不多有開發人員會直接使用 SynchronousQueue
,可是經過 Executors.newCachedThreadPool()
工廠構建的線程池用它做爲工做隊列。比較緩存線程池性能的對比測試顯示, 新的非阻塞同步隊列 實現提供了幾乎是當前實現 3 倍的速度。在 Mustang 的後續版本(代碼名稱爲 Dolphin)中,已經規劃了進一步的改進。