HashMap 是一個以鍵值對存儲數據的容器,可是在它是線程不安全的,在多線程的環境下它有不少潛在的問題。ConcurrentHashMap 做爲 HashMap 的併發版本,是一個線程安全的容器,在高併發的環境下相比 HashTable 依然能維持良好的性能。ConcurrentHashMap 在 jdk1.8以前是採用的 segment 分段鎖的思想,可是在 jdk1.8 以後做了很是大的改動,取消了分段鎖,而且加入了紅黑樹來提升查找速度。下面經過閱讀jdk1.8源代碼,分析併發容器 ConcurrentHashMap 內部結構以及實現原理。java
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable
繼承自 AbstractMap 實現了 ConcurrentMap 接口node
public interface ConcurrentMap<K, V> extends Map<K, V> { @Override default V getOrDefault(Object key, V defaultValue) { ... } @Override default void forEach(BiConsumer<? super K, ? super V> action) { ... } V putIfAbsent(K key, V value); boolean remove(Object key, Object value); boolean replace(K key, V oldValue, V newValue); V replace(K key, V value); @Override default void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) { ... } @Override default V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) { ... } @Override default V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { ... } @Override default V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { ... } @Override default V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) { ... } }
基本上是增長了一些默認的方法。關於java1.8默認方法有一段解釋能夠看這裏。數據庫
繼承體系基本和 HashMap 是差很少的。數組
static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } public final K getKey() { return key; } public final V getValue() { return val; } public final int hashCode() { return key.hashCode() ^ val.hashCode(); } public final String toString(){ return key + "=" + val; } public final V setValue(V value) { throw new UnsupportedOperationException(); } public final boolean equals(Object o) { Object k, v, u; Map.Entry<?,?> e; return ((o instanceof Map.Entry) && (k = (e = (Map.Entry<?,?>)o).getKey()) != null && (v = e.getValue()) != null && (k == key || k.equals(key)) && (v == (u = val) || v.equals(u))); } /** * Virtualized support for map.get(); overridden in subclasses. */ Node<K,V> find(int h, Object k) { Node<K,V> e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; } } transient volatile Node<K,V>[] table; private transient volatile Node<K,V>[] nextTable;
一個 table 數組存放 Node, Node類繼承自 Map.Entry,還有一個 nextTable 數組(這個是擴容時候使用的臨時數組,後面會講到)。和 HashMap 的內部結構很是的類似,可是不一樣的地方是 Node 類裏面的 val 和 next 都被設置了 volatile 關鍵字(可見性,修改內容以後當即寫入內存) ,table/nextTable 也被設置爲了 volatile。安全
大體結構示意圖是這樣的數據結構
是一個數組+鏈表+紅黑樹的結構多線程
ConcurrentHashMap 1.8 以前採用的是 Reentrantlock,經過鎖住一個 segment 來減小鎖的競爭,不一樣 segment 的鎖之間沒有競爭關係,從而提升併發性能。 可是在 1.8 以後,作了很是大的修改,取消了 segment,採用了和 HashMap 類似的數據結構,使用了 synchronized 和大量的 CAS 操做來保證原子性,而且引入紅黑樹來提升查詢的效率。併發
什麼是 CAS (Compare And Swap) 操做?和 數據庫樂觀鎖 的概念比較類似,不知道的能夠搜索一下,大概意思是 基於計算機硬件實現一個原子操做,有三個參數:內存地址,指望值,要修改的新值,當指望值和內存當中的值進行比較不相等的時候,表示內存中的值已經被別線程改動過,這時候失敗返回,當相等的時候,將內存中的值改成新的值,並返回成功。app
下面將會看到不少相似這樣的代碼dom
U.compareAndSwapInt(this, SIZECTL, sc, -1)
意思爲獲取到當前對象的 SIZECTL 偏移量(其實就是獲取到了 sizeCtl 變量的值),與 sc 變量做比較,若是相等則將 sizeCtl 的值更新爲 -1,而且返回true,若是 sizeCtl 和 sc 的值不相等的話,直接返回 false。 Java 是經過 Unsafe 類的 native 方法,調用的底層 cpu 指令來完成 CAS 操做的。
// Unsafe mechanics private static final sun.misc.Unsafe U; private static final long SIZECTL; private static final long TRANSFERINDEX; private static final long BASECOUNT; private static final long CELLSBUSY; private static final long CELLVALUE; private static final long ABASE; private static final int ASHIFT; static { try { U = sun.misc.Unsafe.getUnsafe(); Class<?> k = ConcurrentHashMap.class; SIZECTL = U.objectFieldOffset(k.getDeclaredField("sizeCtl")); TRANSFERINDEX = U.objectFieldOffset(k.getDeclaredField("transferIndex")); BASECOUNT = U.objectFieldOffset(k.getDeclaredField("baseCount")); CELLSBUSY = U.objectFieldOffset(k.getDeclaredField("cellsBusy")); Class<?> ck = CounterCell.class; CELLVALUE = U.objectFieldOffset(ck.getDeclaredField("value")); Class<?> ak = Node[].class; ABASE = U.arrayBaseOffset(ak); int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } }
先通讀一遍源代碼,帶着疑問,一行一行分析做者思路。
public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
主要分爲如下幾個步驟:
1.先經過 spread 方法計算出 key 的hash值
static final int HASH_BITS = 0x7fffffff; static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
spread 方法主要是 異或 傳入key的高16位和低16位,而且將結果和 HASH_BITS 按位與(0x7fffffff 二進制表示爲 011..31個1..1),目的爲消除異或出來的結果的符號位,以避免接下來計算數組位置是一個負數。
2.判斷 table 是否爲空,若是是空的就進行初始化。
private static final int DEFAULT_CAPACITY = 16; private transient volatile int sizeCtl; private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
先判斷一下 table 是否已被初始化,若是沒有,使用 CAS 操做將 sizeCtl 更新爲 -1,而後新建一個長度爲 16 的 Node數組,結束以前將 sizeCtl 設置爲 12 (sc = n - ( n >>> 2),n 初始化的時候爲 16, n >>> 2 無符號右移2位就是 4,16 - 4 = 12。其實就是 16 * 負載因子0.75 = 12 )。數組內部超過12個位置被賦值的時候會進行擴容。
3.經過 (n-1) & hash 擾動函數 計算出數組位置,使用 CAS 操做 tabAt 獲取該位置上的值,若是爲空,新建一個 Node 放入這個位置。
4.判斷該位置的hash值是否爲 MOVED
static final int MOVED = -1;
若是爲 MOVED 表示此時數組正在發生擴容,那麼當前線程幫助數組一塊兒進行擴容操做。(後面會詳細說到)
5.若是以上2種狀況都不是,那麼表示當前位置上存在 Node,判斷當前節點下若是是鏈表,就遍歷整個鏈表,若是找到相同的hash值和key直接返回舊值,若是沒有找到則新建一個 Node 放到鏈表的最後。若是當前節點是一個紅黑樹(鏈表長度超過8會自動轉爲紅黑樹),那麼按照紅黑樹的方法查找。注意這個地方使用了 synchronized 關鍵字,鎖住了一個數組的位置,防止其餘線程執行put操做的時候把鏈表上的值修改掉了。
6.出方法前有一個 addCount 方法
/** * Adds to count, and if table is too small and not already * resizing, initiates transfer. If already resizing, helps * perform transfer if work is available. Rechecks occupancy * after a transfer to see if another resize is already needed * because resizings are lagging additions. * * @param x the count to add * @param check if <0, don't check resize, if <= 1 only check if uncontended */ private final void addCount(long x, int check) { CounterCell[] as; long b, s; if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }
這個方法不是特別看得懂,可是看註釋的意思是若是數組過小了,就擴容一下。若是已經在擴容了,就順便幫着一塊兒擴容。
相比 1.8 以前的代碼仍是很容易看懂的。
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
get 方法仍是一如既讓的簡單,根據key和hash值來進行查找,get方法是不加鎖的。
public V remove(Object key) { return replaceNode(key, null, null); } final V replaceNode(Object key, V value, Object cv) { int hash = spread(key.hashCode()); for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null) break; else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; boolean validated = false; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { validated = true; for (Node<K,V> e = f, pred = null;;) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { V ev = e.val; if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { oldVal = ev; if (value != null) e.val = value; else if (pred != null) pred.next = e.next; else setTabAt(tab, i, e.next); } break; } pred = e; if ((e = e.next) == null) break; } } else if (f instanceof TreeBin) { validated = true; TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> r, p; if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { V pv = p.val; if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { oldVal = pv; if (value != null) p.val = value; else if (t.removeTreeNode(p)) setTabAt(tab, i, untreeify(t.first)); } } } } } if (validated) { if (oldVal != null) { if (value == null) addCount(-1L, -1); return oldVal; } break; } } } return null; }
remove 方法經過一遍代碼也是比較簡單的,一樣也是鎖住數組的一個位置,而後遍歷這個位置上的鏈表,若是找到hash值和key相同的節點,將前一個節點的next指向下一個節點。注意到remove中也會判斷數組是否處在擴容的階段,若是是,會幫助一塊兒擴容。
以上的部分都和 HashMap 差很少,可是讀到這裏纔是 ConcurrentHashMap 精髓的地方,在 put 和 remove 方法裏面都有一個 helpTransfer 方法。
/** * Helps transfer if a resize is in progress. */ final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }
看註釋 Helps transfer if a resize is in progress. 若是數組正在發生擴容,那麼幫着一塊兒擴容。牛逼啊,併發擴容,多線程一塊兒擴容。 看下 transfer 方法,具體是怎麼實現的。代碼比較長,稍微加了一些註釋方便閱讀。
static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } 。。。 } /** * Moves and/or copies the nodes in each bin to new table. See * above for explanation. */ private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { /** * 將 n 右移3位 至關於除8,而後除以 CPU核心數。若是獲得的結果小於 MIN_TRANSFER_STRIDE(16),那麼就使用 16。 * 若是臨時表(nextTab)沒有初始化,那麼以2倍的大小初始化(n << 1),sizeCtl 設置爲 Integer 的最大值 * transferIndex 設置爲 tab 的長度 */ int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } /** * ForwardingNode 是一個標示類,它的hash字段值爲 MOVED(-1),若是看到節點爲 ForwardingNode 類表示這個位置 * 已經被處理過了,這個位置上面的數據已經被搬走了,不須要處理了。 * advance 爲 true 表示當前節點已經處理完了,能夠繼續處理下一個節點 */ int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab /** * 從這裏開始 遍歷數組開始處理擴容 */ for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; // 4 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } /** * 這裏是一些擴容完以後的賦值操做, sizeCtl 最終被設置爲 新長度 * 負載因子 的結果 */ if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } /** * 若是老的數組位置上爲null,那麼直接將該位置放一個 ForwardingNode,表示處理完 */ else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); /** * 若是已是 ForwardingNode,那麼直接跳過,處理下一個 */ else if ((fh = f.hash) == MOVED) advance = true; // already processed else { /** * 這裏是真正遷移數組的地方,分別是鏈表和紅黑樹的狀況。fh >= 0 節點的hash值大於0,表示是鏈表,由於紅黑樹的hash值彷佛爲負數 * 這裏的遷移數據邏輯比較特別,歸納一下是這樣的 * 將當前鏈表的每一個節點的 hash 值與數組的長度按位與。結果只有2種,一種是0,另一種是n(也就是數組的長度) * 將一個鏈表分紅2組數據,而後一個循環,2組數據分別造成2個和原來順序相反的鏈表 * 剛纔按位與結果爲0的鏈表放在臨時數組的原來序號位置,按位與結果不等於0的鏈表放在 i + n 的位置(原來位置加上數組長度的位置), * 遷移完將原來數組的位置的節點設置爲 ForwardingNode, 而後進行下一輪。 * 這個地方 synchronized(f) 使得不一樣的線程能夠處理不一樣的節點並且互不影響。 */ synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
總結來講,擴容方法就是
上面的註釋再結合下面的圖一塊兒來理解,模擬了一些數據
ConcurrentHashMap<String, Integer> concurrentHashMap = new ConcurrentHashMap(); concurrentHashMap.put("a", 1); concurrentHashMap.put("q", 2); concurrentHashMap.put("A", 1); concurrentHashMap.put("AAA", 1); concurrentHashMap.put("1", 1); concurrentHashMap.put("122", 1); concurrentHashMap.put("uha", 1); concurrentHashMap.put("8y0", 1); concurrentHashMap.put("01nf", 1); concurrentHashMap.put("maog", 1); concurrentHashMap.put("b", 1); concurrentHashMap.put("c", 1); concurrentHashMap.put("d", 1); concurrentHashMap.put("e", 1); concurrentHashMap.put("f", 1); concurrentHashMap.put("g", 1); concurrentHashMap.put("h", 1); concurrentHashMap.put("i", 1); concurrentHashMap.put("j", 1); concurrentHashMap.put("k", 1); concurrentHashMap.put("l", 1); concurrentHashMap.put("m", 1); concurrentHashMap.put("n", 1);
在map中依次放入這麼多值
當put到k的時候超過了原數組的容量超過負載因子發生擴容,擴容前index=1位置和index=15位置的地方分別爲2個鏈表,擴容後index=1的部分數據被轉移到了index=17的位置,index=15位置上的部分數據被轉移到了index=31的位置上。這麼作的好處顯而易見,縮短了鏈表的長度,維持良好的性能。
可是爲何要這麼作,爲何鏈表要拆成拆成2份,爲何要和數組長度按位與,爲何一份放高位一份放地位,爲何這樣拆分以後下一次get的能正確的找到數組的位置?網上分析源碼的文章不少,可是我基本沒有看到解釋這些問題的。
首先看一下 a 和 q 進太高低位異或以後的hash值二進制按位與的結果
按位與的結果主要取決於第5個位上面的值,若是第5位是0的放在原來的序號位不動,若是是1的放在 index + 原數組長度的位置。
那爲何要這麼作,是由於要讓map在擴容以後經過擾動函數可以取到正確的值。
當n=16的時候取得是最後4位,當n=32的時候取得是最後5位,做爲數組下標
當n=16的時候取得的數組位置是 0001 對應 table[1] 的位置,當n=32的時候取得的數組位置是 10001 = 10000 + 0001 = 16 + 1 = 17 也就是 table[17] 的位置。
當第5位按位與爲1的話,結果一定是 10000 加上某個數,而 10000 是 2的4次方 正好是16,正好是原來數組的長度。牛逼。
因此在遷移數據的時候纔會有那種奇怪的操做,看似不經意的幾行代碼,實際上是通過精心設計的,實在是佩服。