Java併發包源碼分析 Java併發基礎總結

  併發是一種能並行運行多個程序或並行運行一個程序中多個部分的能力。若是程序中一個耗時的任務能以異步或並行的方式運行,那麼整個程序的吞吐量和可交互性將大大改善。現代的PC都有多個CPU或一個CPU中有多個核,是否能合理運用多核的能力將成爲一個大規模應用程序的關鍵。html

  Java基礎部分知識總結點擊Java併發基礎總結。Java多線程相關類的實現都在Java的併發包concurrent,concurrent包主要包含3部份內容,第一個是atomic包,裏面主要是一些原子類,好比AtomicInteger、AtomicIntegerArray等;第二個是locks包,裏面主要是鎖相關的類,好比ReentrantLock、Condition等;第三個就是屬於concurrent包的內容,主要包括線程池相關類(Executors)、阻塞集合類(BlockingQueue)、併發Map類(ConcurrentHashMap)、線程相關類(Thread、Runnable、Callable)等。java

atomic包源碼分析

  atomic包是專門爲線程安全設計的Java包,包含多個原子操做類。其基本思想就是在多線程環境下,當有多個線程同時執行這些類的實例的方法時,具備排他性,一個線程進入方法執行指令時,不會被其餘的線程打斷,而別的線程就像自旋鎖同樣,一直等待該方法執行完成。node

  原子變量的底層使用了處理器提供的原子指令,可是不一樣的CPU架構可能提供的原子指令不同,也有可能須要某種形式的內部鎖,因此該方法不能絕對保證線程不被阻塞。數組

  atomic包一共有12個類,四種原子更新方式,分別是原子更新基本類型、原子更新數組、原子更新引用和原子更新字段。JDK1.5中引入了底層的支持,在int、long和對象的引用等類型上都公開了CAS的操做,而且JVM把它們編譯爲底層硬件提供的最有效的方法,在運行CAS的平臺上,運行時把它們編譯爲相應的機器指令。在java.util.concurrent.atomic包下面的全部的原子變量類型中,好比AtomicInteger,都使用了這些底層的JVM支持爲數字類型的引用類型提供一種高效的CAS操做。安全

  Unsafe中的操做通常都是基於CAS來實現的,CAS就是Compare and Swap的意思,比較並操做。不少的cpu直接支持CAS指令。CAS是一項樂觀鎖技術,當多個線程嘗試使用CAS同時更新同一個變量時,只有其中一個線程能更新變量的值,而其它線程都失敗,失敗的線程並不會被掛起,而是被告知此次競爭中失敗,並能夠再次嘗試。CAS有3個操做數,內存值V,舊的預期值A,要修改的新值B。當且僅當預期值A和內存值V相同時,將內存值V修改成B,不然什麼都不作。數據結構

/**
 * AtomicMain
 * atomic class test
 */
public class AtomicMain {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService   executor  = Executors.newCachedThreadPool();
        AtomicInteger     data      = new AtomicInteger(0);
        AtomicIntegerArray array    = new AtomicIntegerArray(10);
        AtomicReference   reference = new AtomicReference();

        /* AtomicInteger測試 */
        executor.execute(new AtomicIntegerTask(data));
        executor.execute(new AtomicIntegerTask(data));

        /* AtomicIntegerArray測試 */
        executor.execute(new AtomicIntegerArrayTask(array));
        executor.execute(new AtomicIntegerArrayTask(array));

        User user = new User("xxx", 18);
        reference.set(user);
        executor.execute(new AtomicReferenceTask(reference));

        /**
         * shutdown表示線程池再也不接收新的任務了,
         * 而不是阻塞到線程池任務執行完成以後再返回
         */
        executor.shutdown();
        /* 延時保證線程池任務執行完畢 */
        Thread.sleep(100);

        System.out.println(data);

        for (int i = 0; i < 10; i++) {
            System.out.print(array.get(i) + " ");
        }
        System.out.println();

        System.out.println(user);
    }

    /**
     * AtomicInteger
     */
    static class AtomicIntegerTask implements Runnable {
        private AtomicInteger data;

        public AtomicIntegerTask(AtomicInteger data) {
            this.data = data;
        }

        public void run() {
            int cnt = 10;

            while (cnt-- > 0) {
                data.incrementAndGet();
            }
        }
    }

    /**
     * 傳進來的Array大小至少爲10
     * AtomicIntegerArray是原子性的,保證對該array整個內存操做的原子性,
     * 也就是說不可能同時有A線程對array[0]操做,而B線程對array[1]操做
     */
    static class AtomicIntegerArrayTask implements Runnable {
        private AtomicIntegerArray array;

        public AtomicIntegerArrayTask(AtomicIntegerArray array) {
            this.array = array;
        }

        public void run() {
            int cnt = 10;

            while (cnt-- > 0) {
                for (int i = 0; i < 10; i++) {
                    array.getAndAdd(i, 1);
                }
            }
        }
    }

    static class AtomicReferenceTask implements Runnable {
        private AtomicReference reference;

        public AtomicReferenceTask(AtomicReference reference) {
            this.reference = reference;
        }

        public void run() {
            reference.set(new User("luoxn28", 23));
        }
    }

    static class User {
        public String name;
        public int    age;

        public User(String name, int age) {
            this.name = name;
            this.age  = age;
        }

        @Override
        public String toString() {
            return "User{" +
                    "name='" + name + '\'' +
                    ", age=" + age +
                    '}';
        }
    }
}

AtomicInteger. incrementAndGet流程

/**
 * 原子自增1
 * this表示AtomicInteger實例
 * valueOffset表示value數據域相對於this的內存地址的偏移位置
 */
public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        /* 獲取value在內存中的值,而後進行CAS操做 */
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

lock包源碼分析

  lock包裏面主要是鎖相關的類,好比ReentrantLock、Condition等。多線程

  Lock接口主要有lock、lockInterruptibly、tryLock、unlock、newCondition等方法:架構

public interface Lock {

    /**
     * 獲取鎖,獲取不到時該線程一直處於休眠狀態
     */
    void lock();

    /**
     * 若是所可用則獲取鎖;不然線程處理休眠狀態,若是此時發生中斷,則拋出InterruptException異常
     */
    void lockInterruptibly() throws InterruptedException;

    /**
     * 若是鎖可用則獲取鎖並返回true,不然返回false
     */
    boolean tryLock();

    /**
     * tryLock的待超時時間版本
     */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    /**
     * 釋放鎖
     */
    void unlock();

    /**
     * 返回用來與此 Lock 實例一塊兒使用的 Condition 實例
     */
    Condition newCondition();
}

使用Lock示例:併發

public class LockMain {
    public static void main(String[] args) throws InterruptedException {
        Lock lock = new ReentrantLock();
        AtomicInteger data = new AtomicInteger(0);

        ExecutorService executor = Executors.newCachedThreadPool();
        executor.execute(new LockAddTask(lock, data));
        executor.execute(new LockAddTask(lock, data));

        executor.shutdown();
        Thread.sleep(10);

        System.out.println(data.get());
    }

    static class LockAddTask implements Runnable {
        private Lock lock;
        private AtomicInteger data;

        public LockAddTask(Lock lock, AtomicInteger data) {
            this.lock = lock;
            this.data = data;
        }

        public void run() {
            int cnt = 10;

            while (cnt-- > 0) {
                try {
                    lock.lockInterruptibly();
                    data.getAndIncrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }
}

concurrent包源碼分析

BlockingQueue

public interface BlockingQueue<E> extends Queue<E> {

    /**
     * 底層調用的是offer,若是滿了拋出異常
     */
    boolean add(E e);

    /**
     * 當集合爲滿時,一直等待
     */
    void put(E e) throws InterruptedException;

    /**
     * 當集合爲滿時,一直等待到超時
     */
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;
    boolean offer(E e);

    /**
     * 當集合爲空時,始終等待
     */
    E take() throws InterruptedException;

    /**
     * 當集合爲空時,一直等到超時,若是還爲空則返回null
     */
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 底層調用的是poll,若是空了拋出異常
     */
    boolean remove(Object o);

    //...
}

ArrayBlockingQueue

  ArrayBlockingQueue是一個基於數組的有界阻塞隊列,按照FIFO(先進先出)原則對元素進行排序,在構造方法中會new一個數組,而且new ReentrantLock,而且初始化notEmpty和notFull兩個Condition。框架

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

  執行put操做時,首先獲取lock,若是數組已經滿了,則調用notFull.await等待;不然調用enqueue插入元素,插入成功後把count計數值加1,調用notEmpty.signal。判斷數組是否滿了是根據count是否等於數組長度來肯定的,由於往數組中插入元素時,首先從下標爲0位置開始插入,插到下標爲array.length-1時,若是count小於array.length,則下一次從下標爲0位置插入。

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

  執行take操做時,首先獲取lock,若是數組爲空,則調用notEmpty.await等待;不然調用dequeue取出元素,取出成功後把count計數值減1,調用notFull.signal。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

  lock的wait/signal更多知識:http://www.cnblogs.com/alphablox/archive/2013/01/20/2868479.html

LinkedBlockingQueue

  LinkedBlockingQueue是基於鏈表結構的阻塞隊列,按照FIFO(先進先出)原則對元組進行排序,新元素是尾部插入,吞吐量一般高於ArrayBlockingQueue。該類中包含一個takeLock和基於takeLock的Condition對象notEmpty,一個putLock鎖,和基於putLock的Condition對象notFull。在構造方法中會新new一個Node,last和head都指向該Node節點。

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

  執行put操做時,首先獲取putLock,若是鏈表節點數已經達到上限,則調用notFull.await等待;不然調用enqueue插入元素,插入成功後把count值原子加1,若是鏈表節點數未達到上限,則調用notFull.signal。而後獲取takeLock,再調用notEmpty.signal通知。

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
private void enqueue(Node<E> node) {
    last = last.next = node;
}
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

  執行take操做時,首先獲取takeLock,若是鏈表爲空,則調用notEmpty.await等待;不然調用dequeue取出元素,而後把count值原子減1,若是此時鏈表非空,則調用notEmpty.signal。而後獲取putLock,再調用putLock.signal通知。

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

ConcurrentHashMap

       ConcurrentHashMap是concurrent包中一個重要的類,其高效支併發操做,被普遍使用,Spring框架的底層數據結構就是使用ConcurrentHashMap實現的。同HashTable相比,它的鎖粒度更細,而不是像HashTable同樣爲每一個方法都添加了synchronized鎖。

       Java8中的ConcurrentHashMap廢棄了Segment(鎖段)的概念,而是用CAS和synchronized方法來實現。利用CAS來獲取table數組中的單個Node節點,獲取成功進行更新操做時,再使用synchronized處理對應Node節點所對應鏈表(或紅黑樹)中的數據。

使用ConcurrentHashMap程序示例

/**
 * HashMapMain test
 */
public class HashMapMain {
    public static void main(String[] args) throws InterruptedException {
        ConcurrentHashMap<String, String> hashMap = new ConcurrentHashMap<String, String>();
        ExecutorService executorService = Executors.newCachedThreadPool();

        executorService.execute(new HashMapPutTask(hashMap));
        executorService.execute(new HashMapPutTask(hashMap));
        executorService.execute(new HashMapPutTask(hashMap));

        executorService.shutdown();
        /**
         * Main thread wait for other thread over.
         */
        Thread.sleep(2000);

        Set<Map.Entry<String, String>> set = hashMap.entrySet();
        Iterator<Map.Entry<String, String>> iter = set.iterator();
        int i = 0;
        while (iter.hasNext()) {
            Map.Entry<String, String> keyValue = iter.next();
            System.out.println(++i + " -> " + keyValue.getKey() + ": " + keyValue.getValue());
        }
    }

    static class HashMapPutTask implements Runnable {
        private ConcurrentHashMap<String, String> hashMap;

        public HashMapPutTask(ConcurrentHashMap<String, String> hashMap) {
            this.hashMap = hashMap;
        }

        public void run() {
            int cnt = 10;

            while (cnt-- > 0) {
                String key   = UUID.randomUUID().toString();
                String value = UUID.randomUUID().toString();
                hashMap.put(key, value);
            }
        }
    }
}

幾個核心的內部類:

Node

  Node是最核心的內部類,它包裝了key-value鍵值對,全部插入ConcurrentHashMap的數據都包裝在這裏面。它與HashMap中的定義很類似,可是可是有一些差異它對value和next屬性設置了volatile同步鎖,它不容許調用setValue方法直接改變Node的value域,它增長了find方法輔助map.get()方法。

static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
    // ...
}

TreeNode

  樹節點類,另一個核心的數據結構。當鏈表長度過長的時候,會轉換爲TreeNode。可是與HashMap不相同的是,它並非直接轉換爲紅黑樹,而是把這些結點包裝成TreeNode放在TreeBin對象中,由TreeBin完成對紅黑樹的操做。並且TreeNode在ConcurrentHashMap繼承自Node類,而並不是HashMap中的繼承自LinkedHashMap.Entry<K,V>類,也就是說TreeNode帶有next指針,這樣作的目的是方便基於TreeBin的訪問。

TreeBin

       這個類並不負責包裝用戶的key、value信息,而是包裝的不少TreeNode節點。它代替了TreeNode的根節點,也就是說在實際的ConcurrentHashMap「數組」中,存放的是TreeBin對象,而不是TreeNode對象,這是與HashMap的區別。另外這個類還帶有了讀寫鎖。

put操做

  ConcurrentHashMap最經常使用的就是put和get兩個方法。如今來介紹put方法,這個put方法依然沿用HashMap的put方法的思想,根據hash值計算這個新插入的點在table中的位置i,若是i位置是空的,直接放進去,不然進行判斷,若是i位置是樹節點,按照樹的方式插入新的節點,不然把i插入到鏈表的末尾。ConcurrentHashMap中依然沿用這個思想,有一個最重要的不一樣點就是ConcurrentHashMap不容許key或value爲null值。另外因爲涉及到多線程,put方法就要複雜一點。在多線程中可能有如下兩個狀況

  • 若是一個或多個線程正在對ConcurrentHashMap進行擴容操做,當前線程也要進入擴容的操做中。這個擴容的操做之因此能被檢測到,是由於transfer方法中在空結點上插入forward節點,若是檢測到須要插入的位置被forward節點佔有,就幫助進行擴容;
  • 若是檢測到要插入的節點是非空且不是forward節點,就對這個節點加鎖,這樣就保證了線程安全。儘管這個有一些影響效率,可是仍是會比hashTable的synchronized要好得多。

  總體流程就是首先定義不容許key或value爲null的狀況放入  對於每個放入的值,首先利用spread方法對key的hashcode進行一次hash計算,由此來肯定這個值在table中的位置。若是這個位置是空的,那麼直接放入,並且不須要加鎖操做。

  若是這個位置存在結點,說明發生了hash碰撞,首先進入sychnorized同步代碼塊,而後判斷這個節點的類型。若是是鏈表節點(fh>0),則獲得的結點就是hash值相同的節點組成的鏈表的頭節點。須要依次向後遍歷肯定這個新加入的值所在位置。若是遇到hash值與key值都與新加入節點是一致的狀況,則只須要更新value值便可。不然依次向後遍歷,直到鏈表尾插入這個結點。  若是加入這個節點之後鏈表長度大於8,就把這個鏈表轉換成紅黑樹。若是這個節點的類型已是樹節點的話,直接調用樹節點的插入方法進行插入新的值。

public V put(K key, V value) {
    return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
        //不容許 key或value爲null
    if (key == null || value == null) throw new NullPointerException();
    //計算hash值
    int hash = spread(key.hashCode());
    int binCount = 0;
    //死循環 什麼時候插入成功 什麼時候跳出
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //若是table爲空的話,初始化table
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        //根據hash值計算出在table裏面的位置 
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            //若是這個位置沒有值 ,直接放進去,不須要加鎖
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //當遇到錶鏈接點時,須要進行整合表的操做
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            //結點上鎖  這裏的結點能夠理解爲hash值相同組成的鏈表的頭結點
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    //fh〉0 說明這個節點是一個鏈表的節點 不是樹的節點
                    if (fh >= 0) {
                        binCount = 1;
                        //在這裏遍歷鏈表全部的結點
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            //若是hash值和key值相同  則修改對應結點的value值
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            //若是遍歷到了最後一個結點,那麼就證實新的節點須要插入 就把它插入在鏈表尾部
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    //若是這個節點是樹節點,就按照樹的方式插入值
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                //若是鏈表長度已經達到臨界值8 就須要把鏈表轉換爲樹結構
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //將當前ConcurrentHashMap的元素數量+1
    addCount(1L, binCount);
    return null;
}

get方法

  get方法比較簡單,給定一個key來肯定value的時候,必須知足兩個條件  key相同  hash值相同,對於節點可能在鏈表或樹上的狀況,須要分別去查找。

public V get(Object key) {  
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;  
    //計算hash值  
    int h = spread(key.hashCode());  
    //根據hash值肯定節點位置  
    if ((tab = table) != null && (n = tab.length) > 0 &&  
        (e = tabAt(tab, (n - 1) & h)) != null) {  
        //若是搜索到的節點key與傳入的key相同且不爲null,直接返回這個節點    
        if ((eh = e.hash) == h) {  
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))  
                return e.val;  
        }  
        //若是eh<0 說明這個節點在樹上 直接尋找  
        else if (eh < 0)  
            return (p = e.find(h, key)) != null ? p.val : null;  
         //不然遍歷鏈表 找到對應的值並返回  
        while ((e = e.next) != null) {  
            if (e.hash == h &&  
                ((ek = e.key) == key || (ek != null && key.equals(ek))))  
                return e.val;  
        }  
    }  
    return null;  
}  

 

參考:

  一、ConcurrentHashMap源碼分析(JDK8版本)

  二、Java併發基礎總結

相關文章
相關標籤/搜索