背景(註釋):java
一個併發的相似ConcurrentNavigableMap的實現。node
這個map經過實現Comparable或者提供一個Comparator來實現排列的,經過構造函數來提供。算法
這個實現是一個SkipLists的併發版本而且爲containsKey/get/put/remove操做提供了log(n)的消耗。插入、刪除、更新和讀取能夠在多個線程之間安全併發。數組
Iterators和spliterators是弱兼容的。從小到大的key排列中的視圖比從大到小的要快。安全
全部從這些方法中返回的Map.Entry只是某時刻的一個快照。他們不提供setValue方法。(注意到你能夠經過改變映射經過使用put、putIfAbsent、replace方法,依賴於你本身想要的效果)併發
注意不像其餘大部分的集合那樣,size方法不是一個常量時間運算。由於這個map的異步特性,決定了這個map的元素個數必須經過遍歷獲得,因此在遍歷過程當中改變了map那麼就會獲得一個不精確的結果。而且,這些以大量數據位參數的方法像putAll、equals、toArray、containsValue以及clear不會保證以原子的方式執行。好比,一個遍歷操做和putAll操做併發,那麼只會看到部分添加的元素。app
這個類和它的視圖還有迭代器實現了全部的可選的Map和Iterator接口的方法。像其餘的併發結合,這個類不支持把null做爲key或者value,由於null做爲返回值沒法區分是否缺乏元素。dom
算法(註釋):異步
這個類實現了一個相似於樹的二維跳錶,標識段經過連接包含不一樣數據的基本節點來展現。有兩個緣由說明爲什麼使用這種方法代替類數組的結構:函數
爲了使用刪除標記,這個鏈表使用null的方法來指示刪除,一種相似於延遲刪除的模型。若是一個節點的value爲null,那麼它被認爲是局部刪除,就算它仍是可達的。這樣須要組織合適的併發控制在代替vs刪除操做--一個代替操做必須失敗,若是刪除操做首先nulling,以及一個刪除操做必須返回刪除以前的值。(這個刪除是能夠和其餘方法併發的,若是其餘方法返回null說明不存在這個元素)
這裏有一個當節點刪除時的事件序列(b:前驅,n:當前節點,f:後繼),初始化:
+------+ +------+ +------+
... | b |------>| n |----->| f | ...
+------+ +------+ +------+
1 首先經過CAS操做讓n的value從non-null變爲null。如今沒有公共操做會認爲這個映射(n)會存在了。固然,其餘的不間斷的插入或者刪除操做仍是可能改變n的指向下一個的引用的。
2 CAS操做n的next引用指向一個新的標記節點。如今沒有其餘節點可以被附加到n的後面。從而可以在基於CAS的鏈表中避免刪除錯誤。
+------+ +------+ +------+ +------+
... | b |------>| n |----->|marker|------>| f | ...
+------+ +------+ +------+ +------+
3 CAS操做b的next引用從而忽略了n和他的標記節點。如今,沒有新的遍歷會遭遇n,最後n和marker會被垃圾回收。
+------+ +------+
... | b |----------------------------------->| f | ...
+------+ +------+
第一步的失敗會致使簡單的重試(由於另外一個操做而競爭失敗)。2、三兩步失敗是由於其餘線程在遍歷的過程當中注意到一個節點有null值,經過協做的方式幫忙marking或者unlinking了。這種協做的方式保證了沒有線程會由於執行刪除的線程尚未進展而卡住等待。這種標記節點的用法稍微複雜化了協做代碼,由於遍歷過程必須確保一直地讀取四個節點(b,n,marker,f),不是僅僅(b,n,f),當一個節點的next域指向了一個marker,它就不會改變。
跳錶的模型中增長了段,因此基礎遍歷開始於接近目的地--常常只須要遍歷不多的節點。不須要改變算法除了只要確保遍歷開始於前驅(here,b),沒有被刪除(結構上),不然在處理這個刪除以後重試。
段層級以鏈表的形式經過volatile的next來使用CAS操做。在段上的競爭會好比新增/刪除節點會致使連接失敗。就算這個發生時,段鏈表依然保持有序,從而能夠做爲劃分。這個會影響性能,可是跳錶自己就是依賴機率的,結果就是"p"值可能會小於虛值。這個競爭窗口會保持得足夠小,從而在實際上失敗是很是少的,甚至在大量競爭的狀況下。
由於使用了一些重試邏輯從而使得base和index鏈表的重試是比較廉價的。遍歷會在嘗試了大多數」協做「CAS操做以後執行。這個不是很是必要,可是隱含的價值是能夠幫助減小其餘下游的CAS失敗操做,從而好於從新開始的開銷。這樣惡化了壞狀況,可是改進了高度競爭的狀況。
區別於其餘的跳錶實現,段插入和刪除須要一個分開的遍歷過程在基本層面的動做以後,增長或者刪除段節點。這樣增長了一個線程的消耗,可是提高了多個線程的競爭性能,經過縮小干擾窗口,刪除使得全部index節點不可達在刪除操做返回後,從而也避免了垃圾回收。這個在這裏是很是重要的,由於咱們不可以直接把擁有key的節點直接去除,由於他們仍然可能被讀取。
段使用了保持良好性能的稀疏策略:初始的k=1,p=0.5意味着四分之一的節點有段中的下標。。。。。。這個期待的總共的空間比咱們的java.util.TreeMap稍微少點。
改變段的層級(這個相似樹結構的高度)使用CAS操做。初始化的高度爲1.當建立一個比當前層級高的段時會在頭上增長一個層級。爲了保持良好的性能,刪除方法中會使用啓發式的方法去下降層級,若是最高的層級上是空的。這樣可能出如今沒有層級的段上遭遇競爭。這樣不會形成多大傷害,實際上相對於沒有限制的提高層級,這是個更好的選擇。
實現這些的代碼比你想象的更詳細。大多數運算會涉及定位元素(或者插入元素的位置)。這些代碼不能被很是好的分塊,由於子運算須要立刻獲得前面運算的結果,不這樣作的話會增長GC的負擔。(這是又一個我但願JAVA提供宏的地方)findPredecessor()操做搜僅僅索段節點,返回最底層節點的前驅。findNode()操做完成最底層節點的搜索。這種方法一樣出現了一點代碼的複製。
爲了在線程之間參數隨機的值,咱們使用了JDK中的隨機支持(經過"secondary seed")。
實現:
讓咱們來看源代碼,首先是put方法:
public V put(K key, V value) { if (value == null) throw new NullPointerException(); return doPut(key, value, false); }
private V doPut(K key, V value, boolean onlyIfAbsent) { Node<K,V> z; // added node if (key == null) throw new NullPointerException(); Comparator<? super K> cmp = comparator; outer: for (;;) { for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { if (n != null) { Object v; int c; Node<K,V> f = n.next; if (n != b.next) // inconsistent read break; if ((v = n.value) == null) { // n is deleted n.helpDelete(b, f); break; } if (b.value == null || v == n) // b is deleted break; if ((c = cpr(cmp, key, n.key)) > 0) { b = n; n = f; continue; } if (c == 0) { if (onlyIfAbsent || n.casValue(v, value)) { @SuppressWarnings("unchecked") V vv = (V)v; return vv; } break; // restart if lost race to replace value } // else c < 0; fall through } z = new Node<K,V>(key, value, n); if (!b.casNext(n, z)) break; // restart if lost race to append to b break outer; } } int rnd = ThreadLocalRandom.nextSecondarySeed(); if ((rnd & 0x80000001) == 0) { // test highest and lowest bits int level = 1, max; while (((rnd >>>= 1) & 1) != 0) ++level; Index<K,V> idx = null; HeadIndex<K,V> h = head; if (level <= (max = h.level)) { for (int i = 1; i <= level; ++i) idx = new Index<K,V>(z, idx, null); } else { // try to grow by one level level = max + 1; // hold in array and later pick the one to use @SuppressWarnings("unchecked")Index<K,V>[] idxs = (Index<K,V>[])new Index<?,?>[level+1]; for (int i = 1; i <= level; ++i) idxs[i] = idx = new Index<K,V>(z, idx, null); for (;;) { h = head; int oldLevel = h.level; if (level <= oldLevel) // lost race to add level break; HeadIndex<K,V> newh = h; Node<K,V> oldbase = h.node; for (int j = oldLevel+1; j <= level; ++j) newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j); if (casHead(h, newh)) { h = newh; idx = idxs[level = oldLevel]; break; } } } // find insertion points and splice in splice: for (int insertionLevel = level;;) { int j = h.level; for (Index<K,V> q = h, r = q.right, t = idx;;) { if (q == null || t == null) break splice; if (r != null) { Node<K,V> n = r.node; // compare before deletion check avoids needing recheck int c = cpr(cmp, key, n.key); if (n.value == null) { if (!q.unlink(r)) break; r = q.right; continue; } if (c > 0) { q = r; r = r.right; continue; } } if (j == insertionLevel) { if (!q.link(r, t)) break; // restart if (t.node.value == null) { findNode(key); break splice; } if (--insertionLevel == 0) break splice; } if (--j >= insertionLevel && j < level) t = t.down; q = q.down; r = q.right; } } } return null; }
首先put方法直接調用doPut方法,這裏的關鍵加入了參數onlyIfAbsent,用於指明是否只在缺乏的狀況下添加。
因爲doPut方法比較長(超過100行),咱們把它分爲兩部分來看(詳情以下):
第一部分是outer嵌套的雙層循環:
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) { if (key == null) throw new NullPointerException(); // don't postpone errors for (;;) { for (Index<K,V> q = head, r = q.right, d;;) { if (r != null) { Node<K,V> n = r.node; K k = n.key; if (n.value == null) { if (!q.unlink(r)) break; // restart r = q.right; // reread r continue; } if (cpr(cmp, key, k) > 0) { q = r; r = r.right; continue; } } if ((d = q.down) == null) return q.node; q = d; r = d.right; } } }
private Node<K,V> findNode(Object key) { if (key == null) throw new NullPointerException(); // don't postpone errors Comparator<? super K> cmp = comparator; outer: for (;;) { for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { Object v; int c; if (n == null) break outer; Node<K,V> f = n.next; if (n != b.next) // inconsistent read break; if ((v = n.value) == null) { // n is deleted n.helpDelete(b, f); break; } if (b.value == null || v == n) // b is deleted break; if ((c = cpr(cmp, key, n.key)) == 0) return n; if (c < 0) break outer; b = n; n = f; } } return null; }findNode的原理實際上也在doPut中出現過了,它會進行以下操做:
public V remove(Object key) { return doRemove(key, null); }
final V doRemove(Object key, Object value) { if (key == null) throw new NullPointerException(); Comparator<? super K> cmp = comparator; outer: for (;;) { for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { Object v; int c; if (n == null) break outer; Node<K,V> f = n.next; if (n != b.next) // inconsistent read break; if ((v = n.value) == null) { // n is deleted n.helpDelete(b, f); break; } if (b.value == null || v == n) // b is deleted break; if ((c = cpr(cmp, key, n.key)) < 0) break outer; if (c > 0) { b = n; n = f; continue; } if (value != null && !value.equals(v)) break outer; if (!n.casValue(v, null)) break; if (!n.appendMarker(f) || !b.casNext(n, f)) findNode(key); // retry via findNode else { findPredecessor(key, cmp); // clean index if (head.right == null) tryReduceLevel(); } @SuppressWarnings("unchecked") V vv = (V)v; return vv; } } return null; }
背景(註釋):
一個併發的相似ConcurrentNavigableMap的實現。
這個map經過實現Comparable或者提供一個Comparator來實現排列的,經過構造函數來提供。
這個實現是一個SkipLists的併發版本而且爲containsKey/get/put/remove操做提供了log(n)的消耗。插入、刪除、更新和讀取能夠在多個線程之間安全併發。
Iterators和spliterators是弱兼容的。從小到大的key排列中的視圖比從大到小的要快。
全部從這些方法中返回的Map.Entry只是某時刻的一個快照。他們不提供setValue方法。(注意到你能夠經過改變映射經過使用put、putIfAbsent、replace方法,依賴於你本身想要的效果)
注意不像其餘大部分的集合那樣,size方法不是一個常量時間運算。由於這個map的異步特性,決定了這個map的元素個數必須經過遍歷獲得,因此在遍歷過程當中改變了map那麼就會獲得一個不精確的結果。而且,這些以大量數據位參數的方法像putAll、equals、toArray、containsValue以及clear不會保證以原子的方式執行。好比,一個遍歷操做和putAll操做併發,那麼只會看到部分添加的元素。
這個類和它的視圖還有迭代器實現了全部的可選的Map和Iterator接口的方法。像其餘的併發結合,這個類不支持把null做爲key或者value,由於null做爲返回值沒法區分是否缺乏元素。
算法(註釋):
這個類實現了一個相似於樹的二維跳錶,標識段經過連接包含不一樣數據的基本節點來展現。有兩個緣由說明爲什麼使用這種方法代替類數組的結構:
爲了使用刪除標記,這個鏈表使用null的方法來指示刪除,一種相似於延遲刪除的模型。若是一個節點的value爲null,那麼它被認爲是局部刪除,就算它仍是可達的。這樣須要組織合適的併發控制在代替vs刪除操做--一個代替操做必須失敗,若是刪除操做首先nulling,以及一個刪除操做必須返回刪除以前的值。(這個刪除是能夠和其餘方法併發的,若是其餘方法返回null說明不存在這個元素)
這裏有一個當節點刪除時的事件序列(b:前驅,n:當前節點,f:後繼),初始化:
+------+ +------+ +------+
... | b |------>| n |----->| f | ...
+------+ +------+ +------+
1 首先經過CAS操做讓n的value從non-null變爲null。如今沒有公共操做會認爲這個映射(n)會存在了。固然,其餘的不間斷的插入或者刪除操做仍是可能改變n的指向下一個的引用的。
2 CAS操做n的next引用指向一個新的標記節點。如今沒有其餘節點可以被附加到n的後面。從而可以在基於CAS的鏈表中避免刪除錯誤。
+------+ +------+ +------+ +------+
... | b |------>| n |----->|marker|------>| f | ...
+------+ +------+ +------+ +------+
3 CAS操做b的next引用從而忽略了n和他的標記節點。如今,沒有新的遍歷會遭遇n,最後n和marker會被垃圾回收。
+------+ +------+
... | b |----------------------------------->| f | ...
+------+ +------+
第一步的失敗會致使簡單的重試(由於另外一個操做而競爭失敗)。2、三兩步失敗是由於其餘線程在遍歷的過程當中注意到一個節點有null值,經過協做的方式幫忙marking或者unlinking了。這種協做的方式保證了沒有線程會由於執行刪除的線程尚未進展而卡住等待。這種標記節點的用法稍微複雜化了協做代碼,由於遍歷過程必須確保一直地讀取四個節點(b,n,marker,f),不是僅僅(b,n,f),當一個節點的next域指向了一個marker,它就不會改變。
跳錶的模型中增長了段,因此基礎遍歷開始於接近目的地--常常只須要遍歷不多的節點。不須要改變算法除了只要確保遍歷開始於前驅(here,b),沒有被刪除(結構上),不然在處理這個刪除以後重試。
段層級以鏈表的形式經過volatile的next來使用CAS操做。在段上的競爭會好比新增/刪除節點會致使連接失敗。就算這個發生時,段鏈表依然保持有序,從而能夠做爲劃分。這個會影響性能,可是跳錶自己就是依賴機率的,結果就是"p"值可能會小於虛值。這個競爭窗口會保持得足夠小,從而在實際上失敗是很是少的,甚至在大量競爭的狀況下。
由於使用了一些重試邏輯從而使得base和index鏈表的重試是比較廉價的。遍歷會在嘗試了大多數」協做「CAS操做以後執行。這個不是很是必要,可是隱含的價值是能夠幫助減小其餘下游的CAS失敗操做,從而好於從新開始的開銷。這樣惡化了壞狀況,可是改進了高度競爭的狀況。
區別於其餘的跳錶實現,段插入和刪除須要一個分開的遍歷過程在基本層面的動做以後,增長或者刪除段節點。這樣增長了一個線程的消耗,可是提高了多個線程的競爭性能,經過縮小干擾窗口,刪除使得全部index節點不可達在刪除操做返回後,從而也避免了垃圾回收。這個在這裏是很是重要的,由於咱們不可以直接把擁有key的節點直接去除,由於他們仍然可能被讀取。
段使用了保持良好性能的稀疏策略:初始的k=1,p=0.5意味着四分之一的節點有段中的下標。。。。。。這個期待的總共的空間比咱們的java.util.TreeMap稍微少點。
改變段的層級(這個相似樹結構的高度)使用CAS操做。初始化的高度爲1.當建立一個比當前層級高的段時會在頭上增長一個層級。爲了保持良好的性能,刪除方法中會使用啓發式的方法去下降層級,若是最高的層級上是空的。這樣可能出如今沒有層級的段上遭遇競爭。這樣不會形成多大傷害,實際上相對於沒有限制的提高層級,這是個更好的選擇。
實現這些的代碼比你想象的更詳細。大多數運算會涉及定位元素(或者插入元素的位置)。這些代碼不能被很是好的分塊,由於子運算須要立刻獲得前面運算的結果,不這樣作的話會增長GC的負擔。(這是又一個我但願JAVA提供宏的地方)findPredecessor()操做搜僅僅索段節點,返回最底層節點的前驅。findNode()操做完成最底層節點的搜索。這種方法一樣出現了一點代碼的複製。
爲了在線程之間參數隨機的值,咱們使用了JDK中的隨機支持(經過"secondary seed")。
實現:
讓咱們來看源代碼,首先是put方法:
public V put(K key, V value) { if (value == null) throw new NullPointerException(); return doPut(key, value, false); }
private V doPut(K key, V value, boolean onlyIfAbsent) { Node<K,V> z; // added node if (key == null) throw new NullPointerException(); Comparator<? super K> cmp = comparator; outer: for (;;) { for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { if (n != null) { Object v; int c; Node<K,V> f = n.next; if (n != b.next) // inconsistent read break; if ((v = n.value) == null) { // n is deleted n.helpDelete(b, f); break; } if (b.value == null || v == n) // b is deleted break; if ((c = cpr(cmp, key, n.key)) > 0) { b = n; n = f; continue; } if (c == 0) { if (onlyIfAbsent || n.casValue(v, value)) { @SuppressWarnings("unchecked") V vv = (V)v; return vv; } break; // restart if lost race to replace value } // else c < 0; fall through } z = new Node<K,V>(key, value, n); if (!b.casNext(n, z)) break; // restart if lost race to append to b break outer; } } int rnd = ThreadLocalRandom.nextSecondarySeed(); if ((rnd & 0x80000001) == 0) { // test highest and lowest bits int level = 1, max; while (((rnd >>>= 1) & 1) != 0) ++level; Index<K,V> idx = null; HeadIndex<K,V> h = head; if (level <= (max = h.level)) { for (int i = 1; i <= level; ++i) idx = new Index<K,V>(z, idx, null); } else { // try to grow by one level level = max + 1; // hold in array and later pick the one to use @SuppressWarnings("unchecked")Index<K,V>[] idxs = (Index<K,V>[])new Index<?,?>[level+1]; for (int i = 1; i <= level; ++i) idxs[i] = idx = new Index<K,V>(z, idx, null); for (;;) { h = head; int oldLevel = h.level; if (level <= oldLevel) // lost race to add level break; HeadIndex<K,V> newh = h; Node<K,V> oldbase = h.node; for (int j = oldLevel+1; j <= level; ++j) newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j); if (casHead(h, newh)) { h = newh; idx = idxs[level = oldLevel]; break; } } } // find insertion points and splice in splice: for (int insertionLevel = level;;) { int j = h.level; for (Index<K,V> q = h, r = q.right, t = idx;;) { if (q == null || t == null) break splice; if (r != null) { Node<K,V> n = r.node; // compare before deletion check avoids needing recheck int c = cpr(cmp, key, n.key); if (n.value == null) { if (!q.unlink(r)) break; r = q.right; continue; } if (c > 0) { q = r; r = r.right; continue; } } if (j == insertionLevel) { if (!q.link(r, t)) break; // restart if (t.node.value == null) { findNode(key); break splice; } if (--insertionLevel == 0) break splice; } if (--j >= insertionLevel && j < level) t = t.down; q = q.down; r = q.right; } } } return null; }
首先put方法直接調用doPut方法,這裏的關鍵加入了參數onlyIfAbsent,用於指明是否只在缺乏的狀況下添加。
因爲doPut方法比較長(超過100行),咱們把它分爲兩部分來看(詳情以下):
第一部分是outer嵌套的雙層循環:
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) { if (key == null) throw new NullPointerException(); // don't postpone errors for (;;) { for (Index<K,V> q = head, r = q.right, d;;) { if (r != null) { Node<K,V> n = r.node; K k = n.key; if (n.value == null) { if (!q.unlink(r)) break; // restart r = q.right; // reread r continue; } if (cpr(cmp, key, k) > 0) { q = r; r = r.right; continue; } } if ((d = q.down) == null) return q.node; q = d; r = d.right; } } }
private Node<K,V> findNode(Object key) { if (key == null) throw new NullPointerException(); // don't postpone errors Comparator<? super K> cmp = comparator; outer: for (;;) { for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { Object v; int c; if (n == null) break outer; Node<K,V> f = n.next; if (n != b.next) // inconsistent read break; if ((v = n.value) == null) { // n is deleted n.helpDelete(b, f); break; } if (b.value == null || v == n) // b is deleted break; if ((c = cpr(cmp, key, n.key)) == 0) return n; if (c < 0) break outer; b = n; n = f; } } return null; }findNode的原理實際上也在doPut中出現過了,它會進行以下操做:
public V remove(Object key) { return doRemove(key, null); }
final V doRemove(Object key, Object value) { if (key == null) throw new NullPointerException(); Comparator<? super K> cmp = comparator; outer: for (;;) { for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { Object v; int c; if (n == null) break outer; Node<K,V> f = n.next; if (n != b.next) // inconsistent read break; if ((v = n.value) == null) { // n is deleted n.helpDelete(b, f); break; } if (b.value == null || v == n) // b is deleted break; if ((c = cpr(cmp, key, n.key)) < 0) break outer; if (c > 0) { b = n; n = f; continue; } if (value != null && !value.equals(v)) break outer; if (!n.casValue(v, null)) break; if (!n.appendMarker(f) || !b.casNext(n, f)) findNode(key); // retry via findNode else { findPredecessor(key, cmp); // clean index if (head.right == null) tryReduceLevel(); } @SuppressWarnings("unchecked") V vv = (V)v; return vv; } } return null; }