1、前言
concurrentHashMap與ConcurrentSkipListMap性能測試
在4線程1.6萬數據的條件下,ConcurrentHashMap 存取速度是ConcurrentSkipListMap 的4倍左右。 html
但ConcurrentSkipListMap有幾個ConcurrentHashMap 不能比擬的優勢: java
一、ConcurrentSkipListMap 的key是有序的。 node
二、ConcurrentSkipListMap 支持更高的併發。ConcurrentSkipListMap 的存取時間是log(N),和線程數幾乎無關。也就是說在數據量必定的狀況下,併發的線程越多,ConcurrentSkipListMap越能體現出他的優點。 算法
2、使用建議
在非多線程的狀況下,應當儘可能使用TreeMap。此外對於併發性相對較低的並行程序可使用Collections.synchronizedSortedMap將TreeMap進行包裝,也能夠提供較好的效率。對於高併發程序,應當使用ConcurrentSkipListMap,可以提供更高的併發度。 安全
因此在多線程程序中,若是須要對Map的鍵值進行排序時,請儘可能使用ConcurrentSkipListMap,可能獲得更好的併發度。
注意,調用ConcurrentSkipListMap的size時,因爲多個線程能夠同時對映射表進行操做,因此映射表須要遍歷整個鏈表才能返回元素個數,這個操做是個O(log(n))的操做。 數據結構
2、什麼是SkipList
Skip list(跳錶)是一種能夠代替平衡樹的數據結構,默認是按照Key值升序的。Skip list讓已排序的數據分佈在多層鏈表中,以0-1隨機數決定一個數據的向上攀升與否,經過「空間來換取時間」的一個算法,在每一個節點中增長了向前的指針,在插入、刪除、查找時能夠忽略一些不可能涉及到的結點,從而提升了效率。 多線程
從機率上保持數據結構的平衡比顯示的保持數據結構平衡要簡單的多。對於大多數應用,用Skip list要比用樹算法相對簡單。因爲Skip list比較簡單,實現起來會比較容易,雖然和平衡樹有着相同的時間複雜度(O(logn)),可是skip list的常數項會相對小不少。Skip list在空間上也比較節省。一個節點平均只須要1.333個指針(甚至更少)。
圖1-1 Skip list結構圖(以7,14,21,32,37,71,85序列爲例) 併發
Skip list的性質
(1) 由不少層結構組成,level是經過必定的機率隨機產生的。
(2) 每一層都是一個有序的鏈表,默認是升序,也能夠根據建立映射時所提供的Comparator進行排序,具體取決於使用的構造方法。
(3) 最底層(Level 1)的鏈表包含全部元素。
(4) 若是一個元素出如今Level i 的鏈表中,則它在Level i 之下的鏈表也都會出現。
(5) 每一個節點包含兩個指針,一個指向同一鏈表中的下一個元素,一個指向下面一層的元素。 app
3、什麼是ConcurrentSkipListMap
ConcurrentSkipListMap提供了一種線程安全的併發訪問的排序映射表。內部是SkipList(跳錶)結構實現,在理論上可以在O(log(n))時間內完成查找、插入、刪除操做。
注意,調用ConcurrentSkipListMap的size時,因爲多個線程能夠同時對映射表進行操做,因此映射表須要遍歷整個鏈表才能返回元素個數,這個操做是個O(log(n))的操做。 框架
ConcurrentSkipListMap存儲結構
ConcurrentSkipListMap存儲結構圖
跳躍表(SkipList):(如上圖所示)
1.多條鏈構成,是關鍵字升序排列的數據結構;
2.包含多個級別,一個head引用指向最高的級別,最低(底部)的級別,包含全部的key;
3.每個級別都是其更低級別的子集,而且是有序的;
4.若是關鍵字 key在 級別level=i中出現,則,level<=i的鏈表中都會包含該關鍵字key;
------------------------
ConcurrentSkipListMap主要用到了Node和Index兩種節點的存儲方式,經過volatile關鍵字實現了併發的操做
- static final class Node<K,V> {
- final K key;
- volatile Object value;//value值
- volatile Node<K,V> next;//next引用
- ……
- }
- static class Index<K,V> {
- final Node<K,V> node;
- final Index<K,V> down;//downy引用
- volatile Index<K,V> right;//右邊引用
- ……
- }
------------------------
ConcurrentSkipListMap的查找
經過SkipList的方式進行查找操做:(下圖以「查找91」進行說明:)
紅色虛線,表示查找的路徑,藍色向右箭頭表示right引用;黑色向下箭頭表示down引用;
/get方法,經過doGet操做實現
- public V get(Object key) {
- return doGet(key);
- }
- //doGet的實現
- private V doGet(Object okey) {
- Comparable<? super K> key = comparable(okey);
- Node<K,V> bound = null;
- Index<K,V> q = head;//把頭結點做爲當前節點的前驅節點
- Index<K,V> r = q.right;//前驅節點的右節點做爲當前節點
- Node<K,V> n;
- K k;
- int c;
- for (;;) {//遍歷
- Index<K,V> d;
- // 依次遍歷right節點
- if (r != null && (n = r.node) != bound && (k = n.key) != null) {
- if ((c = key.compareTo(k)) > 0) {//因爲key都是升序排列的,全部當前關鍵字大於所要查找的key時繼續向右遍歷
- q = r;
- r = r.right;
- continue;
- } else if (c == 0) {
- //若是找到了相等的key節點,則返回該Node的value若是value爲空多是其餘併發delete致使的,因而經過另外一種
- //遍歷findNode的方式再查找
- Object v = n.value;
- return (v != null)? (V)v : getUsingFindNode(key);
- } else
- bound = n;
- }
- //若是一個鏈表中right沒能找到key對應的value,則調整到其down的引用處繼續查找
- if ((d = q.down) != null) {
- q = d;
- r = d.right;
- } else
- break;
- }
- // 若是經過上面的遍歷方式,還沒能找到key對應的value,再經過Node.next的方式進行查找
- for (n = q.node.next; n != null; n = n.next) {
- if ((k = n.key) != null) {
- if ((c = key.compareTo(k)) == 0) {
- Object v = n.value;
- return (v != null)? (V)v : getUsingFindNode(key);
- } else if (c < 0)
- break;
- }
- }
- return null;
- }
------------------------------------------------
ConcurrentSkipListMap的刪除
經過SkipList的方式進行刪除操做:(下圖以「刪除23」進行說明:)
紅色虛線,表示查找的路徑,藍色向右箭頭表示right引用;黑色向下箭頭表示down引用;
- //remove操做,經過doRemove實現,把全部level中出現關鍵字key的地方都delete掉
- public V remove(Object key) {
- return doRemove(key, null);
- }
- final V doRemove(Object okey, Object value) {
- Comparable<? super K> key = comparable(okey);
- for (;;) {
- Node<K,V> b = findPredecessor(key);//獲得key的前驅(就是比key小的最大節點)
- Node<K,V> n = b.next;//前驅節點的next引用
- for (;;) {//遍歷
- if (n == null)//若是next引用爲空,直接返回
- return null;
- Node<K,V> f = n.next;
- if (n != b.next) // 若是兩次得到的b.next不是相同的Node,就跳轉到第一層循環從新得到b和n
- break;
- Object v = n.value;
- if (v == null) { // 當n被其餘線程delete的時候,其value==null,此時作輔助處理,並從新獲取b和n
- n.helpDelete(b, f);
- break;
- }
- if (v == n || b.value == null) // 當其前驅被delet的時候直接跳出,從新獲取b和n
- break;
- int c = key.compareTo(n.key);
- if (c < 0)
- return null;
- if (c > 0) {//當key較大時就繼續遍歷
- b = n;
- n = f;
- continue;
- }
- if (value != null && !value.equals(v))
- return null;
- if (!n.casValue(v, null))
- break;
- if (!n.appendMarker(f) || !b.casNext(n, f))//casNext方法就是經過比較和設置b(前驅)的next節點的方式來實現刪除操做
- findNode(key); // 經過嘗試findNode的方式繼續find
- else {
- findPredecessor(key); // Clean index
- if (head.right == null) //若是head的right引用爲空,則表示不存在該level
- tryReduceLevel();
- }
- return (V)v;
- }
- }
- }
-------------------------------------
ConcurrentSkipListMap的插入
經過SkipList的方式進行插入操做:(下圖以「添加55」的兩種狀況,進行說明:)
在level=2(該level存在)的狀況下添加55的圖示:只需在level<=2的合適位置插入55便可
--------
在level=4(該level不存在,圖示level4是新建的)的狀況下添加55的狀況:首先新建level4,而後在level<=4的合適位置插入55
-----------
- //put操做,經過doPut實現
- public V put(K key, V value) {
- if (value == null)
- throw new NullPointerException();
- return doPut(key, value, false);
- }
- private V doPut(K kkey, V value, boolean onlyIfAbsent) {
- Comparable<? super K> key = comparable(kkey);
- for (;;) {
- Node<K,V> b = findPredecessor(key);//前驅
- Node<K,V> n = b.next;
- //定位的過程就是和get操做類似
- for (;;) {
- if (n != null) {
- Node<K,V> f = n.next;
- if (n != b.next) // 先後值不一致的狀況下,跳轉到第一層循環從新得到b和n
- break;;
- Object v = n.value;
- if (v == null) { // n被delete的狀況下
- n.helpDelete(b, f);
- break;
- }
- if (v == n || b.value == null) // b 被delete的狀況,從新獲取b和n
- break;
- int c = key.compareTo(n.key);
- if (c > 0) {
- b = n;
- n = f;
- continue;
- }
- if (c == 0) {
- if (onlyIfAbsent || n.casValue(v, value))
- return (V)v;
- else
- break; // restart if lost race to replace value
- }
- // else c < 0; fall through
- }
- Node<K,V> z = new Node<K,V>(kkey, value, n);
- if (!b.casNext(n, z))
- break; // restart if lost race to append to b
- int level = randomLevel();//獲得一個隨機的level做爲該key-value插入的最高level
- if (level > 0)
- insertIndex(z, level);//進行插入操做
- return null;
- }
- }
- }
-
- /**
- * 得到一個隨機的level值
- */
- private int randomLevel() {
- int x = randomSeed;
- x ^= x << 13;
- x ^= x >>> 17;
- randomSeed = x ^= x << 5;
- if ((x & 0x8001) != 0) // test highest and lowest bits
- return 0;
- int level = 1;
- while (((x >>>= 1) & 1) != 0) ++level;
- return level;
- }
- //執行插入操做:如上圖所示,有兩種可能的狀況:
- //1.當level存在時,對level<=n都執行insert操做
- //2.當level不存在(大於目前的最大level)時,首先添加新的level,而後在執行操做1
- private void insertIndex(Node<K,V> z, int level) {
- HeadIndex<K,V> h = head;
- int max = h.level;
- if (level <= max) {//狀況1
- Index<K,V> idx = null;
- for (int i = 1; i <= level; ++i)//首先獲得一個包含1~level個級別的down關係的鏈表,最後的inx爲最高level
- idx = new Index<K,V>(z, idx, null);
- addIndex(idx, h, level);//把最高level的idx傳給addIndex方法
- } else { // 狀況2 增長一個新的級別
- level = max + 1;
- Index<K,V>[] idxs = (Index<K,V>[])new Index[level+1];
- Index<K,V> idx = null;
- for (int i = 1; i <= level; ++i)//該步驟和狀況1相似
- idxs[i] = idx = new Index<K,V>(z, idx, null);
- HeadIndex<K,V> oldh;
- int k;
- for (;;) {
- oldh = head;
- int oldLevel = oldh.level;
- if (level <= oldLevel) { // lost race to add level
- k = level;
- break;
- }
- HeadIndex<K,V> newh = oldh;
- Node<K,V> oldbase = oldh.node;
- for (int j = oldLevel+1; j <= level; ++j)
- newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);//建立新的
- if (casHead(oldh, newh)) {
- k = oldLevel;
- break;
- }
- }
- addIndex(idxs[k], oldh, k);
- }
- }
- /**
- *在1~indexlevel層中插入數據
- */
- private void addIndex(Index<K,V> idx, HeadIndex<K,V> h, int indexLevel) {
- // insertionLevel 表明要插入的level,該值會在indexLevel~1間遍歷一遍
- int insertionLevel = indexLevel;
- Comparable<? super K> key = comparable(idx.node.key);
- if (key == null) throw new NullPointerException();
- // 和get操做相似,不一樣的就是查找的同時在各個level上加入了對應的key
- for (;;) {
- int j = h.level;
- Index<K,V> q = h;
- Index<K,V> r = q.right;
- Index<K,V> t = idx;
- for (;;) {
- if (r != null) {
- Node<K,V> n = r.node;
- // compare before deletion check avoids needing recheck
- int c = key.compareTo(n.key);
- if (n.value == null) {
- if (!q.unlink(r))
- break;
- r = q.right;
- continue;
- }
- if (c > 0) {
- q = r;
- r = r.right;
- continue;
- }
- }
- if (j == insertionLevel) {//在該層level中執行插入操做
- // Don't insert index if node already deleted
- if (t.indexesDeletedNode()) {
- findNode(key); // cleans up
- return;
- }
- if (!q.link(r, t))//執行link操做,其實就是inset的實現部分
- break; // restart
- if (--insertionLevel == 0) {
- // need final deletion check before return
- if (t.indexesDeletedNode())
- findNode(key);
- return;
- }
- }
- if (--j >= insertionLevel && j < indexLevel)//key移動到下一層level
- t = t.down;
- q = q.down;
- r = q.right;
- }
- }
- }
參考:
集合框架 Map篇(5)----ConcurrentSkipListMap http://hi.baidu.com/yao1111yao/item/0f3008163c4b82c938cb306d Java裏多個Map的性能比較(TreeMap、HashMap、ConcurrentSkipListMap) http://blog.hongtium.com/java-map-skiplist/ 跳錶SkipList的原理和實現 http://imtinx.iteye.com/blog/1291165