曹工說JDK源碼(2)--ConcurrentHashMap的多線程擴容,說白了,就是分段取任務

前言

先預先說明,我這邊jdk的代碼版本爲1.8.0_11,同時,由於我直接在本地jdk源碼上進行了部分修改、調試,因此,致使你們看到的我這邊貼的代碼,和你們的不太同樣。html

不過,我對源碼進行修改、重構時,會保證和原始代碼的功能、邏輯嚴格一致,更多時候,可能只是修改變量名,方便理解。java

你們也知道,jdk代碼寫得實在是比較深奧,變量名常常都是單字符,i,j,k啥的,實在是很難理解,因此,我通常會根據本身的理解,去重命名,爲了減輕咱們的頭腦負擔。node

至於怎麼去修改代碼並調試,能夠參考我以前的文章:git

曹工力薦:調試 jdk 中 rt.jar 包部分的源碼(可自由增長註釋,修改代碼並debug)數組

文章中,我改過的代碼放在:多線程

https://gitee.com/ckl111/jdk-debugjvm

sizeCtl field的初始化

你們知道,concurrentHashMap底層是數組+鏈表+紅黑樹,數組的長度假設爲n,在hashmap初始化的時候,這個n除了做爲數組長度,還會做爲另外一個關鍵field的值。ide

/**
     * Table initialization and resizing control.  When negative, the
     * table is being initialized or resized: -1 for initialization,
     * else -(1 + the number of active resizing threads).  Otherwise,
     * when table is null, holds the initial table size to use upon
     * creation, or 0 for default. After initialization, holds the
     * next element count value upon which to resize the table.
     */
    private transient volatile int sizeCtl;

該字段很是關鍵,根據取值不一樣,有不一樣的功能。函數

使用默認構造函數時

public ConcurrentHashMap() {
    }

此時,sizeCtl被初始化爲0.fetch

使用帶初始容量的構造函數時

此時,sizeCtl也是32,和容量一致。

使用另外一個map來初始化時

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }

此時,sizeCtl,直接使用了默認值,16.

使用初始容量、負載因子來初始化時

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, 1);
    }

這裏重載了:

這裏,咱們傳入的負載因子爲0.75,這也是默認的負載因子,傳入的初始容量爲14.

這裏面會根據: 1 + 14/0.75 = 19,拿到真正的size,而後根據size,獲取到第一個大於19的2的n次方,即32,來做爲數組容量,而後sizeCtl也被設置爲32.

initTable時,對sizeCtl field的修改

實際上,new一個hashmap的時候,咱們並無建立支撐數組,那,何時建立數組呢?是在真正往裏面放數據的時候,好比put的時候。

/** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());

        int binCount = 0;
        ConcurrentHashMapPutResultVO vo = new ConcurrentHashMapPutResultVO();
        vo.setBinCount(0);
        for (Node<K,V>[] tab = table;;) {
            int tableLength;
            // 1
            if (tab == null) {
                tab = initTable();
                continue;
            }
            ...
        }

1處,即會去初始化table。

/**
     * Initializes table, using the size recorded in sizeCtl.
     * 初始化hashmap,使用sizeCtl做爲容量
     */
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            sc = sizeCtl;
            if (sc < 0){
                Thread.yield(); // lost initialization race; just spin
                continue;
            }

            /**
             * 走到這裏,說明sizeCtl大於0,大於0,表明什麼,能夠去看下其構造函數,此時,sizeCtl表示
             * capacity的大小。
             * {@link #ConcurrentHashMap(int)}
             *
             * cas修改成-1,若是成功修改成-1,則表示搶到了鎖,能夠進行初始化
             *
             */
            // 1
            boolean bGotChanceToInit = U.compareAndSwapInt(this, SIZECTL, sc, -1);
            if (bGotChanceToInit) {
                try {
                    tab = table;
                    /**
                     * 若是當前表爲空,還沒有初始化,則進行初始化,分配空間
                     */
                    if (tab == null || tab.length == 0) {
                        /**
                         * sc大於0,則以sc爲準,不然使用默認的容量
                         */
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;

                        Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n];
                        table = tab = nt;
                        /**
                         * n >>> 2,無符號右移2位,則是n的四分之一。
                         * n- n/4,結果爲3/4 * n
                         * 則,這裏修改sc爲 3/4 * n
                         * 好比,默認容量爲16,則修改sc爲12
                         */
                        // 2
                        sc = n - (n >>> 2);
                    }
                } finally {
                    /**
                     * 修改sizeCtl到field
                     */
                    // 3
                    sizeCtl = sc;
                }
                break;
            }
        }

        return tab;
    }
  • 1處,cas修改sizeCtl爲-1,成功了的,得到初始化table的權利
  • 2處,修改局部變量sc爲: n - (n >>> 2),也就是修改成 0.75n,假設此時的數組容量爲16,那麼sc就是16 * 0.75 = 12.
  • 3處,將sc賦值到field: sizeCtl

通過上面的分析,initTable時,這個字段可能有兩種取值:

  • -1,有線程正在對該table進行初始化
  • 0.75*數組長度,此時,已經初始化完成

上面說的是,在put的時候去initTable,實際上,這個initTable,也會在如下函數中被調用,其共同點就是,都是往裏面放數據的操做:

擴容時機

上面說了不少,目前,咱們知道的是,在initTable後,sizeCtl的值,是舊的數組的長度 * 0.75。

接下來,咱們看看擴容時機,在put時,會調用putVal,這個函數的大致步驟:

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    // 1
    int hash = spread(key.hashCode());

    int binCount = 0;
    System.out.println("binCount:" + binCount);
    // 2
    ConcurrentHashMapPutResultVO vo = new ConcurrentHashMapPutResultVO();
    vo.setBinCount(0);
    for (Node<K,V>[] tab = table;;) {
        int tableLength;
        // 3
        if (tab == null) {
            tab = initTable();
            continue;
        }
        
        tableLength = tab.length;
        if (tableLength == 0) {
            tab = initTable();
            continue;
        }

        int entryNodeHashCode;
		
        // 4
        int entryNodeIndex = (tableLength - 1) & hash;
        Node<K,V> entryNode = tabAt(tab,entryNodeIndex);

        /**
         * 5 若是咱們要放的桶,仍是個空的,則直接cas放進去
         */
        if (entryNode == null) {
            Node<K, V> node = new Node<>(hash, key, value, null);

            // no lock when adding to empty bin
            boolean bSuccess = casTabAt(tab, entryNodeIndex, null, node);
            if (bSuccess) {
                break;
            } else {
                /**
                 * 若是沒成功,則繼續下一輪循環
                 */
                continue;
            }
        }
		
        entryNodeHashCode = entryNode.hash;
        /**
         * 6 若是要放的這個桶,正在遷移,則幫助遷移
         */
        if (entryNodeHashCode == MOVED){
            tab = helpTransfer(tab, entryNode);
            continue;
        }


        /**
         * 7 對entryNode加鎖
         */
        V oldVal = null;
        System.out.println("sync");
        synchronized (entryNode) {
            /**
             * 這一行是判斷,在咱們執行前面的一堆方法的時候,看看entryNodeIndex處的node是否變化
             */
            if (tabAt(tab, entryNodeIndex) != entryNode) {
                continue;
            }

            /**
             * 8 hashCode大於0,說明不是處於遷移狀態
             */
            if (entryNodeHashCode >= 0) {
                /**
                 * 9 鏈表中找到合適的位置並放入
                 */
                findPositionAndPut(key, value, onlyIfAbsent, hash, vo, entryNode);
                binCount = vo.getBinCount();
                oldVal = (V) vo.getOldValue();
            }
            else if (entryNode instanceof TreeBin) {
                ...
            }
        }
		
        System.out.println("binCount:" + binCount);
        // 10
        if (binCount != 0) {
            if (binCount >= TREEIFY_THRESHOLD)
                treeifyBin(tab, entryNodeIndex);
            if (oldVal != null)
                return oldVal;
            break;
        }
    }
    // 11
    addCount(1L, binCount);
    return null;
}
  • 1處,計算key的hashcode

  • 2處,我這邊new了一個對象,裏面兩個字段:

    public class ConcurrentHashMapPutResultVO<V> {
        int binCount;
    
        V oldValue;
    }

    其中,oldValue用來存放,若是put進去的key/value,其中key已經存在的話,通常會直接覆蓋以前的舊值,這裏主要存放以前的舊值,由於咱們須要返回舊值。

    binCount,則存放:在找到對應的hash桶以後,在鏈表中,遍歷了多少個元素,該值後面會使用,做爲一個標誌,當該標誌大於0的時候,纔去進一步檢查,看看是否擴容。

  • 3處,若是table爲null,說明table裏沒有任何一個鍵值對,數組也還沒建立,則初始化table

  • 4處,根據hashcode,和(數組長度 - 1)相與,計算出應該存放的哈希桶在數組中的索引

  • 5處,若是要放的哈希桶,仍是空的,則直接cas設置進去,成功則跳出循環,不然重試

  • 6處,若是要放的這個桶,該節點的hashcode爲MOVED(一個常量,值爲-1),說明有其餘線程正在擴容該hashmap,則幫助擴容

  • 7處,對要存放的hash桶的頭節點加鎖

  • 8處,若是頭節點的hashcode大於0,說明是拉了一條鏈表,則調用子方法(我這邊本身抽的),去找到合適的位置並插入到鏈表

  • 9處,findPositionAndPut,在鏈表中,找到合適的位置,並插入

  • 10處,在findPositionAndPut函數中,會返回:爲了找到合適的位置,遍歷了多少個元素,這個值,就是binCount。

    若是這個binCount大於8,則說明遍歷了8個元素,則須要轉紅黑樹了。

  • 11處,由於咱們新增了一個元素,總數天然要加1,這裏面會去增長總數,和檢查是否須要擴容。

其中,第9步,由於是本身抽的函數,因此這裏貼出來給你們看下:

/**
     * 遍歷鏈表,找到應該放的位置;若是遍歷完了還沒找到,則放到最後
     * @param key
     * @param value
     * @param onlyIfAbsent
     * @param hash
     * @param vo
     * @param entryNode
     */
    private void findPositionAndPut(K key, V value, boolean onlyIfAbsent, int hash, ConcurrentHashMapPutResultVO vo, Node<K, V> entryNode) {
        vo.setBinCount(1);

        for (Node<K,V> currentIterateNode = entryNode;
                ;
             vo.setBinCount(vo.getBinCount() + 1)) {


            /**
             * 若是當前遍歷指向的節點的hash值,與參數中的key的hash值相等,則,
             * 繼續判斷
             */
            K currentIterateNodeKey = currentIterateNode.key;
            boolean bKeyEqualOrNot = Objects.equals(currentIterateNodeKey, key);
            /**
             * key的hash值相等,且equals比較也相等,則就是咱們要找的
             */
            if (currentIterateNode.hash == hash && bKeyEqualOrNot) {
                /**
                 * 獲取舊的值
                 */
                vo.setOldValue(currentIterateNode.val);

                /**
                 * 覆蓋舊的node的val
                 */
                if (!onlyIfAbsent)
                    currentIterateNode.val = value;
                // 這裏直接break跳出循環
                break;
            }

            /**
             * 把當前節點保存起來
             */
            Node<K,V> pred = currentIterateNode;
            /**
             * 獲取下一個節點
             */
            currentIterateNode = currentIterateNode.next;
            /**
             * 若是下一個節點爲null,說明當前已是鏈表的最後一個node了
             */
            if ( currentIterateNode  == null) {
                /**
                 * 則在當前節點後面,掛上新的節點
                 */
                pred.next = new Node<K,V>(hash, key,
                        value, null);
                break;
            }
        }

    }

第11步,也是咱們要看的重點:

private final void addCount(long delta, int check) {
        CounterCell[] counterCellsArray = counterCells;
		// 1
        long b = baseCount;
    	// 2
        long newBaseCount = b + delta;

        /**
         * 3 直接cas在baseCount上增長
         */
        boolean bSuccess = U.compareAndSwapLong(this, BASECOUNT, b, newBaseCount);
        if ( counterCellsArray != null ||  !bSuccess) {
			...
            newBaseCount = sumCount();
        }
		
    	// 4
        if (check >= 0) {
            while (true) {

                Node<K,V>[] tab = table;
                Node<K,V>[] nt;
                int n = 0;
                // 5
                int sc =  sizeCtl;
                // 6
                boolean bSumExteedSizeControl = newBaseCount >= (long) sc;
				// 7
                boolean bContinue = bSumExteedSizeControl && tab != null && (n = tab.length) < MAXIMUM_CAPACITY;
                if (bContinue) {
                    int rs = resizeStamp(n);
                    if (sc < 0) {
                        if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                                sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                                transferIndex <= 0)
                            break;
                        if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                            transfer(tab, nt);
                    } else if (U.compareAndSwapInt(this, SIZECTL, sc,
                            (rs << RESIZE_STAMP_SHIFT) + 2))
                        // 8
                        transfer(tab, null);
                    newBaseCount = sumCount();
                } else {
                    break;
                }
            }

        }
    }
  • 1處,baseCount是一個field,存儲當前hashmap中,有多少個鍵值對,你put一次,就一個;remove一次,就減一個。

  • 2處,b + delta,其中,b就是baseCount,是舊的數量;dalta,咱們傳入的是1,就是要增長的元素數量

    因此,b + delta,獲得的,就是通過此次put後,預期的數量

  • 3處,直接cas,修改baseCount這個field爲 新值,也就是第二步拿到的值。

  • 4處,這裏檢查check是否大於0,check,是第二個形參;這個參數,咱們外邊怎麼傳的?

    addCount(1L, binCount);

    不就是bincount嗎,也就是說,這裏檢查:咱們在put過程當中,在鏈表中遍歷了幾個元素,若是遍歷了至少1個元素,這裏要進入下面的邏輯:檢查是否要擴容,由於,你binCount大於0,說明可能已經開始出現哈希衝突了。

  • 5處,取field:sizeCtl的值,給局部變量sc

  • 6處,判斷當前的新的鍵值對總數,是否大於sc了;好比容量是16,那麼sizeCtl是12,若是此時,hashmap中存放的鍵值對已經大於等於12了,則要檢查是否擴容了

  • 7處,幾個組合條件,查看是否要擴容,其中,主要的條件就是第6步的那個。

  • 8處,調用transfer,進行擴容

總結一下,通過前面的第6處,咱們知道,若是存放的鍵值對總數,已經大於等於0.75*哈希桶(也就是底層數組的長度)的數量了,那麼,就基本要擴容了。

擴容的大致過程

擴容也是一個相對複雜的過程,這裏只說大概,詳細的放下講。

假設,如今底層數組長度,128,也就是128個哈希桶,當存放的鍵值對數量,大於等於 128 * 0.75的時候,就會開始擴容,擴容的過程,大概是:

  • 申請一個256(容量翻倍)的數組
  • 如今有128個桶,至關於,須要對128個桶進行遍歷,遍歷每一個桶拉出去的鏈表或紅黑樹,查看每一個鍵值對,是須要放到新數組的什麼位置

這個過程,昨天的博文,畫了個圖,這裏再貼一下。

擴容後:

但是,若是咱們要一個個去遍歷全部哈希桶,而後遍歷對應的鏈表/紅黑樹,會不會太慢了?徹底是單線程工做啊。

換個思路,咱們能不能加快點呢?好比,線程1能夠去處理數組的 0 -15這16個桶,16- 31這16個桶,徹底可讓線程2去作啊,這樣的話,不就多線程了嗎,不是就快了嗎?

沒錯,jdk就是這麼幹的。

jdk維護了一個field,這個field,專門用來存當前能夠獲取的任務的索引,舉個例子:

你們看上圖就懂了,一開始,這裏假設咱們有128個桶,每次每一個線程,去拿16個桶來處理。

剛開始的時候,field:transferIndex就等於127,也就是最後一個桶的位置,而後咱們要從後往前取,那麼,127 到112,恰好就是16個桶,因此,申請任務的時候,就會用cas去更新field爲112,則表示,本身取到了112 到127這一個區間的hash桶遷移任務。

若是自始至終,只有一個線程呢,它處理完了112 - 127這一批hash桶後,會繼續取下一波任務,96 - 112;以此類推。

若是多線程的話呢,也是相似的,反正都是去嘗試cas更新transferIndex的值爲任務區間的開始下標的值,成功了,就算任務認領成功了。

多線程,怎麼知道須要去幫助擴容呢? 發起擴容的線程,在處理完bucket[k]時,會把老的table中的對應的bucket[k]的頭節點,修改成下面這種類型的節點:

static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
    }

其餘線程,在put或者其餘操做時,發現頭結點變成了這個,就會去協助擴容了。

多線程擴容,和分段取任務的差異?

我我的感受,差異不大,多線程擴容,就是多線程去獲取本身的那一段任務,而後來完成。我這邊寫了簡單的demo,不過感受仍是頗有用的,能夠幫助咱們理解。

import sun.misc.Unsafe;

import java.lang.reflect.Field;
import java.util.concurrent.*;
import java.util.concurrent.locks.LockSupport;

public class ConcurrentTaskFetch {

    /**
     * 空閒任務索引,獲取任務時,從該下標開始,往前獲取。
     * 好比當前下標爲10,表示tasks數組中,0-10這個區間的任務,沒人領取
     */
    // 0
    private  volatile int freeTaskIndexForFetch;
	
    // 1
    private static final int TASK_COUNT_PER_FETCH = 16;
	
    // 2
    private String[] tasks = new String[128];

    public static void main(String[] args) {
        ConcurrentTaskFetch fetch = new ConcurrentTaskFetch();
        // 3
        fetch.init();

        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
        executor.prestartAllCoreThreads();

        CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
		
        // 4
        for (int i = 0; i < 10; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        cyclicBarrier.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
					
                    // 5
                    FetchedTaskInfo fetchedTaskInfo = fetch.fetchTask();
                    if (fetchedTaskInfo != null) {
                        System.out.println("thread:" + Thread.currentThread().getName() + ",get task success:" + fetchedTaskInfo);
                        try {
                            TimeUnit.SECONDS.sleep(3);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }

                        System.out.println("thread:" + Thread.currentThread().getName()  +  ", process task finished");
                    }
                }
            });
        }


        LockSupport.park();
    }

    public void init() {
        for (int i = 0; i < 128; i++) {
            tasks[i] = "task" + i;
        }
        freeTaskIndexForFetch = tasks.length;
    }

	// 6
    public FetchedTaskInfo fetchTask() {
        System.out.println("Thread start fetch task:"+Thread.currentThread().getName()+",time: "+System.currentTimeMillis());

        while (true){
			// 6.1
            if (freeTaskIndexForFetch == 0) {
                System.out.println("thread:" + Thread.currentThread().getName() + ",get task failed,there is no task");
                return null;
            }

            /**
             * 6.2 獲取當前任務的集合的上界
             */
            int subTaskListEndIndex = this.freeTaskIndexForFetch;

            /**
             * 6.3 獲取當前任務的集合的下界
             */
            int subTaskListStartIndex = subTaskListEndIndex > TASK_COUNT_PER_FETCH ?
                    subTaskListEndIndex - TASK_COUNT_PER_FETCH : 0;

            /**
             * 6.4
             * 如今,咱們拿到了集合的上下界,即[subTaskListStartIndex,subTaskListEndIndex)
             * 該區間爲前開後閉,因此,實際的區間爲:
             * [subTaskListStartIndex,subTaskListEndIndex - 1]
             */

            /**
             * 6.5 使用cas,嘗試更新{@link freeTaskIndexForFetch} 爲 subTaskListStartIndex
             */
            if (U.compareAndSwapInt(this, FREE_TASK_INDEX_FOR_FETCH, subTaskListEndIndex, subTaskListStartIndex)) {
                // 6.6 
                FetchedTaskInfo info = new FetchedTaskInfo();
                info.setStartIndex(subTaskListStartIndex);
                info.setEndIndex(subTaskListEndIndex - 1);


                return info;
            }
        }

    }



    // Unsafe mechanics
    private static final sun.misc.Unsafe U;

    private static final long FREE_TASK_INDEX_FOR_FETCH;

    static {
        try {
//            U = sun.misc.Unsafe.getUnsafe();
            Field f = Unsafe.class.getDeclaredField("theUnsafe");
            f.setAccessible(true);
            U = (Unsafe) f.get(null);
            Class<?> k = ConcurrentTaskFetch.class;
            FREE_TASK_INDEX_FOR_FETCH = U.objectFieldOffset
                    (k.getDeclaredField("freeTaskIndexForFetch"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }


    static class FetchedTaskInfo{
        int startIndex;
        int endIndex;

        public int getStartIndex() {
            return startIndex;
        }

        public void setStartIndex(int startIndex) {
            this.startIndex = startIndex;
        }

        public int getEndIndex() {
            return endIndex;
        }

        public void setEndIndex(int endIndex) {
            this.endIndex = endIndex;
        }

        @Override
        public String toString() {
            return "FetchedTaskInfo{" +
                    "startIndex=" + startIndex +
                    ", endIndex=" + endIndex +
                    '}';
        }
    }
}
  • 0處,定義了一個field,相似於前面的transferIndex

    /**
         * 空閒任務索引,獲取任務時,從該下標開始,往前獲取。
         * 好比當前下標爲10,表示tasks數組中,0-10這個區間的任務,沒人領取
         */
        // 0
        private  volatile int freeTaskIndexForFetch;
  • 1,定義了每次取多少個任務,這裏也是16個

    private static final int TASK_COUNT_PER_FETCH = 16;
  • 2,定義任務列表,共128個任務

  • 3,main函數中,進行任務初始化

    public void init() {
        for (int i = 0; i < 128; i++) {
            tasks[i] = "task" + i;
        }
        freeTaskIndexForFetch = tasks.length;
    }

    主要初始化任務列表,其次,將freeTaskIndexForFetch 賦值爲128,後續取任務,從這個下標開始

  • 4處,啓動10個線程,每一個線程去執行取任務,按理說,咱們128個任務,每一個線程取16個,只能有8個線程取到任務,2個線程取不到

  • 5處,線程邏輯裏,去獲取任務

  • 6處,獲取任務的方法定義

  • 6.1 ,若是可獲取的任務索引爲0了,說明沒任務了,直接返回

  • 6.2,獲取當前任務的集合的上界

  • 6.3,獲取當前任務的集合的下界,減去16就好了

  • 6.4,拿到了集合的上下界,即[subTaskListStartIndex,subTaskListEndIndex)

  • 6.5, 使用cas,更新field爲:6.4中的任務下界。

執行效果演示:

能夠看到,8個線程取到任務,2個線程沒取到。

該思想在內存分配時的應用

其實jvm內存分配時,也是相似的思路,好比,設置堆內存爲200m,那這200m是啓動時立馬從操做系統分配了的。

接下來,就是每次new對象的時候,去這個大內存裏,找個小空間,這個過程,也是須要cas去競爭的,好比確定也有個全局的字段,來表示當前可用內存的索引,好比該索引爲100,表示,第100個字節後的空間是能夠用的,那我要new個對象,這個對象有3個字段,須要大概30個字節,那我是否是須要把這個索引更新爲130。

這中間是多線程的,因此也是要cas操做。

道理都是相似的。

總結

時間倉促,有問題在所不免,歡迎及時指出或加羣討論。

相關文章
相關標籤/搜索