Java併發集合的實現原理

AtomicInteger

能夠用原子方式更新int值。類 AtomicBoolean、AtomicInteger、AtomicLong 和 AtomicReference 的實例各自提供對相應類型單個變量的訪問和更新。基本的原理都是使用CAS操做:java

1node

boolean compareAndSet(expectedValue, updateValue);數組

若是此方法(在不一樣的類間參數類型也不一樣)當前保持expectedValue,則以原子方式將變量設置爲updateValue,並在成功時報告true。緩存

循環CAS,參考AtomicInteger中的實現:安全

1服務器

2多線程

3併發

4框架

5less

6

7

8

9

10

11

12

public final int getAndIncrement() {

        for (;;) {

            int current = get();

            int next = current + 1;

            if (compareAndSet(current, next))

                return current;

        }

    }

 

    public final boolean compareAndSet(int expect, int update) {

        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);

    }

ABA問題

由於CAS須要在操做值的時候檢查下值有沒有發生變化,若是沒有發生變化則更新,可是若是一個值原來是A,變成了B,又變成了A,那麼使用CAS進行檢查時會發現它的值沒有發生變化,可是實際上卻變化了。ABA問題的解決思路就是使用版本號。在變量前面追加上版本號,每次變量更新的時候把版本號加一,那麼A-B-A 就會變成1A-2B-3A。

從Java1.5開始JDK的atomic包裏提供了一個類AtomicStampedReference來解決ABA問題。這個類的compareAndSet方法做用是首先檢查當前引用是否等於預期引用,而且當前標誌是否等於預期標誌,若是所有相等,則以原子方式將該引用和該標誌的值設置爲給定的更新值。

1

2

3

4

5

public boolean compareAndSet(

        V      expectedReference,//預期引用

        V      newReference,//更新後的引用

        int    expectedStamp, //預期標誌

        int    newStamp) //更新後的標誌

ArrayBlockingQueue

一個由數組支持的有界阻塞隊列。此隊列按 FIFO(先進先出)原則對元素進行排序。隊列的頭部是在隊列中存在時間最長的元素。隊列的尾部是在隊列中存在時間最短的元素。新元素插入到隊列的尾部,隊列獲取操做則是從隊列頭部開始得到元素。這是一個典型的「有界緩存區」,固定大小的數組在其中保持生產者插入的元素和使用者提取的元素。一旦建立了這樣的緩存區,就不能再增長其容量。試圖向已滿隊列中放入元素會致使操做受阻塞;試圖從空隊列中提取元素將致使相似阻塞。

此類支持對等待的生產者線程和使用者線程進行排序的可選公平策略。默認狀況下,不保證是這種排序。然而,經過將公平性(fairness)設置爲true而構造的隊列容許按照 FIFO 順序訪問線程。公平性一般會下降吞吐量,但也減小了可變性和避免了「不平衡性」。

一些源代碼參考:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

/** Main lock guarding all access */

    final ReentrantLock lock;

 

    public void put(E e) throws InterruptedException {

        checkNotNull(e);

        final ReentrantLock lock = this.lock;

        lock.lockInterruptibly();

        try {

            while (count == items.length)

                notFull.await();

            insert(e);

        } finally {

            lock.unlock();

        }

    }

 

    private void insert(E x) {

        items[putIndex] = x;

        putIndex = inc(putIndex);

        ++count;

        notEmpty.signal();

    }

 

    final int inc(int i) {

        return (++i == items.length) ? 0 : i;

    }

 

    public E take() throws InterruptedException {

        final ReentrantLock lock = this.lock;

        lock.lockInterruptibly();

        try {

            while (count == 0)

                notEmpty.await();

            return extract();

        } finally {

            lock.unlock();

        }

    }

 

    private E extract() {

        final Object[] items = this.items;

        E x = this.<E>cast(items[takeIndex]);

        items[takeIndex] = null;

        takeIndex = inc(takeIndex);

        --count;

        notFull.signal();

        return x;

    }

 

    final int dec(int i) {

        return ((i == 0) ? items.length : i) - 1;

    }

 

    @SuppressWarnings("unchecked")

    static <E> E cast(Object item) {

        return (E) item;

    }

ArrayBlockingQueue只使用了一個lock來控制互斥訪問,全部的互斥訪問都在這個lock的try finally中實現。

LinkedBlockingQueue

一個基於已連接節點的、範圍任意的blocking queue。此隊列按 FIFO(先進先出)排序元素。隊列的頭部是在隊列中時間最長的元素。隊列的尾部是在隊列中時間最短的元素。新元素插入到隊列的尾部,而且隊列獲取操做會得到位於隊列頭部的元素。連接隊列的吞吐量一般要高於基於數組的隊列,可是在大多數併發應用程序中,其可預知的性能要低。

可選的容量範圍構造方法參數做爲防止隊列過分擴展的一種方法。若是未指定容量,則它等於Integer.MAX_VALUE。除非插入節點會使隊列超出容量,不然每次插入後會動態地建立連接節點。

若是構造一個LinkedBlockingQueue對象,而沒有指定其容量大小,LinkedBlockingQueue會默認一個相似無限大小的容量(Integer.MAX_VALUE),這樣的話,若是生產者的速度一旦大於消費者的速度,也許尚未等到隊列滿阻塞產生,系統內存就有可能已被消耗殆盡了。

一些實現代碼:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

/** The capacity bound, or Integer.MAX_VALUE if none */

    private final int capacity;

 

    /** Current number of elements */

    private final AtomicInteger count = new AtomicInteger(0);

 

    /** Lock held by take, poll, etc */

    private final ReentrantLock takeLock = new ReentrantLock();

 

    /** Wait queue for waiting takes */

    private final Condition notEmpty = takeLock.newCondition();

 

    /** Lock held by put, offer, etc */

    private final ReentrantLock putLock = new ReentrantLock();

 

    /** Wait queue for waiting puts */

    private final Condition notFull = putLock.newCondition();

 

    public void put(E e) throws InterruptedException {

        if (e == null) throw new NullPointerException();

        // Note: convention in all put/take/etc is to preset local var

        // holding count negative to indicate failure unless set.

        int c = -1;

        Node<E> node = new Node(e);

        final ReentrantLock putLock = this.putLock;

        final AtomicInteger count = this.count;

        putLock.lockInterruptibly();

        try {

            /*

             * Note that count is used in wait guard even though it is

             * not protected by lock. This works because count can

             * only decrease at this point (all other puts are shut

             * out by lock), and we (or some other waiting put) are

             * signalled if it ever changes from capacity. Similarly

             * for all other uses of count in other wait guards.

             */

            while (count.get() == capacity) {

                notFull.await();

            }

            enqueue(node);

            c = count.getAndIncrement();

            if (c + 1 < capacity)

                notFull.signal();

        } finally {

            putLock.unlock();

        }

        if (c == 0)

            signalNotEmpty();

    }

 

    public E take() throws InterruptedException {

        E x;

        int c = -1;

        final AtomicInteger count = this.count;

        final ReentrantLock takeLock = this.takeLock;

        takeLock.lockInterruptibly();

        try {

            while (count.get() == 0) {

                notEmpty.await();

            }

            x = dequeue();

            c = count.getAndDecrement();

            if (c > 1)

                notEmpty.signal();

        } finally {

            takeLock.unlock();

        }

        if (c == capacity)

            signalNotFull();

        return x;

    }

從源代碼實現來看,LinkedBlockingQueue使用了2個lock,一個takelock和一個putlock,讀和寫用不一樣的lock來控制,這樣併發效率更高。

ConcurrentLinkedQueue

ArrayBlockingQueue和LinkedBlockingQueue都是使用lock來實現的,也就是阻塞式的隊列,而ConcurrentLinkedQueue使用CAS來實現,是非阻塞式的「lock-free」實現。

ConcurrentLinkedQueue源代碼的實現有點複雜,具體的可看這篇文章的分析:

http://www.infoq.com/cn/articles/ConcurrentLinkedQueue

ConcurrentHashMap

HashMap不是線程安全的。

HashTable容器使用synchronized來保證線程安全,在線程競爭激烈的狀況下HashTable的效率很是低下。

ConcurrentHashMap採用了Segment分段技術,容器裏有多把鎖,每把鎖用於鎖容器其中一部分數據,那麼當多線程訪問容器裏不一樣數據段的數據時,線程間就不會存在鎖競爭,從而能夠有效的提升併發訪問效率。

ConcurrentHashMap結構:

ConcurrentHashMap的實現原理分析:

http://www.infoq.com/cn/articles/ConcurrentHashMap

CopyOnWriteArrayList

CopyOnWrite容器即寫時複製的容器。往一個容器添加元素的時候,不直接往當前容器添加,而是先將當前容器進行Copy,複製出一個新的容器,而後新的容器裏添加元素,添加完元素以後,再將原容器的引用指向新的容器。這樣作的好處是能夠對CopyOnWrite容器進行併發的讀,而不須要加鎖,由於當前容器不會添加任何元素。因此CopyOnWrite容器也是一種讀寫分離的思想,讀和寫不一樣的容器。相似的有CopyOnWriteArraySet。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

public boolean add(T e) {

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

        Object[] elements = getArray();

        int len = elements.length;

        // 複製出新數組

        Object[] newElements = Arrays.copyOf(elements, len + 1);

        // 把新元素添加到新數組裏

        newElements[len] = e;

        // 把原數組引用指向新數組

        setArray(newElements);

        return true;

    } finally {

        lock.unlock();

    }

}

  

final void setArray(Object[] a) {

    array = a;

}

讀的時候不須要加鎖,若是讀的時候有多個線程正在向ArrayList添加數據,讀仍是會讀到舊的數據,由於寫的時候不會鎖住舊的ArrayList。

1

2

3

public E get(int index) {

    return get(getArray(), index);

}

AbstractQueuedSynchronizer

爲實現依賴於先進先出 (FIFO) 等待隊列的阻塞鎖和相關同步器(信號量、事件,等等)提供一個框架。此類的設計目標是成爲依靠單個原子 int 值來表示狀態的大多數同步器的一個有用基礎。子類必須定義更改此狀態的受保護方法,並定義哪一種狀態對於此對象意味着被獲取或被釋放。假定這些條件以後,此類中的其餘方法就能夠實現全部排隊和阻塞機制。子類能夠維護其餘狀態字段,但只是爲了得到同步而只追蹤使用 getState()、setState(int) 和 compareAndSetState(int, int) 方法來操做以原子方式更新的 int 值。

使用示例
如下是一個非再進入的互斥鎖類,它使用值 0 表示未鎖定狀態,使用 1 表示鎖定狀態。當非重入鎖定不嚴格地須要當前擁有者線程的記錄時,此類使得使用監視器更加方便。它還支持一些條件並公開了一個檢測方法:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

class Mutex implements Lock, java.io.Serializable {

 

    // Our internal helper class

    private static class Sync extends AbstractQueuedSynchronizer {

      // Report whether in locked state

      protected boolean isHeldExclusively() {

        return getState() == 1;

      }

 

      // Acquire the lock if state is zero

      public boolean tryAcquire(int acquires) {

        assert acquires == 1; // Otherwise unused

       if (compareAndSetState(0, 1)) {

         setExclusiveOwnerThread(Thread.currentThread());

         return true;

       }

       return false;

      }

 

      // Release the lock by setting state to zero

      protected boolean tryRelease(int releases) {

        assert releases == 1; // Otherwise unused

        if (getState() == 0) throw new IllegalMonitorStateException();

        setExclusiveOwnerThread(null);

        setState(0);

        return true;

      }

        

      // Provide a Condition

      Condition newCondition() { return new ConditionObject(); }

 

      // Deserialize properly

      private void readObject(ObjectInputStream s)

        throws IOException, ClassNotFoundException {

        s.defaultReadObject();

        setState(0); // reset to unlocked state

      }

    }

 

    // The sync object does all the hard work. We just forward to it.

    private final Sync sync = new Sync();

 

    public void lock()                { sync.acquire(1); }

    public boolean tryLock()          { return sync.tryAcquire(1); }

    public void unlock()              { sync.release(1); }

    public Condition newCondition()   { return sync.newCondition(); }

    public boolean isLocked()         { return sync.isHeldExclusively(); }

    public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }

    public void lockInterruptibly() throws InterruptedException {

      sync.acquireInterruptibly(1);

    }

    public boolean tryLock(long timeout, TimeUnit unit)

        throws InterruptedException {

      return sync.tryAcquireNanos(1, unit.toNanos(timeout));

    }

 }

ThreadPoolExecutor

ThreadPoolExecutor 的內部工做原理,整個思路總結起來就是 5 句話:

1. 若是當前池大小 poolSize 小於 corePoolSize ,則建立新線程執行任務。

2. 若是當前池大小 poolSize 大於 corePoolSize ,且等待隊列未滿,則進入等待隊列

3. 若是當前池大小 poolSize 大於 corePoolSize 且小於 maximumPoolSize ,且等待隊列已滿,則建立新線程執行任務。

4. 若是當前池大小 poolSize 大於 corePoolSize 且大於 maximumPoolSize ,且等待隊列已滿,則調用拒絕策略來處理該任務。

5. 線程池裏的每一個線程執行完任務後不會馬上退出,而是會去檢查下等待隊列裏是否還有線程任務須要執行,若是在 keepAliveTime 裏等不到新的任務了,那麼線程就會退出。

排隊有三種通用策略:

直接提交。工做隊列的默認選項是SynchronousQueue,它將任務直接提交給線程而不保持它們。在此,若是不存在可用於當即運行任務的線程,則試圖把任務加入隊列將失敗,所以會構造一個新的線程。此策略能夠避免在處理可能具備內部依賴性的請求集時出現鎖。直接提交一般要求無界 maximumPoolSizes 以免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續到達時,此策略容許無界線程具備增加的可能性。

無界隊列。使用無界隊列(例如,不具備預約義容量的LinkedBlockingQueue)將致使在全部 corePoolSize 線程都忙時新任務在隊列中等待。這樣,建立的線程就不會超過 corePoolSize。(所以,maximumPoolSize 的值也就無效了。)當每一個任務徹底獨立於其餘任務,即任務執行互不影響時,適合於使用無界隊列;例如,在 Web 頁服務器中。這種排隊可用於處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略容許無界線程具備增加的可能性。

有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(如ArrayBlockingQueue)有助於防止資源耗盡,可是可能較難調整和控制。隊列大小和最大池大小可能須要相互折衷:使用大型隊列和小型池能夠最大限度地下降 CPU 使用率、操做系統資源和上下文切換開銷,可是可能致使人工下降吞吐量。若是任務頻繁阻塞(例如,若是它們是 I/O 邊界),則系統可能爲超過您許可的更多線程安排時間。使用小型隊列一般要求較大的池大小,CPU 使用率較高,可是可能遇到不可接受的調度開銷,這樣也會下降吞吐量。

ThreadFactory 和 RejectedExecutionHandler是ThreadPoolExecutor的兩個屬性,也 能夠認爲是兩個簡單的擴展點. ThreadFactory 是建立線程的工廠。默認的線程工廠會建立一個帶有「 pool-poolNumber-thread-threadNumber 」爲名字的線程,若是咱們有特別的須要,如線程組命名、優先級等,能夠定製本身的ThreadFactory 。

RejectedExecutionHandler 是拒絕的策略。常見有如下幾種:

  • AbortPolicy :不執行,會拋出 RejectedExecutionException 異常。
  • CallerRunsPolicy :由調用者(調用線程池的主線程)執行。
  • DiscardOldestPolicy :拋棄等待隊列中最老的。
  • DiscardPolicy: 不作任何處理,即拋棄當前任務。

ScheduleThreadPoolExecutor 是對ThreadPoolExecutor的集成。增長了定時觸發線程任務的功能。須要注意從內部實現看,ScheduleThreadPoolExecutor 使用的是 corePoolSize 線程和一個無界隊列的固定大小的池,因此調整 maximumPoolSize 沒有效果。無界隊列是一個內部自定義的 DelayedWorkQueue 。

FixedThreadPool

1

2

3

4

5

public static ExecutorService newFixedThreadPool(int nThreads) { 

    return new ThreadPoolExecutor(nThreads, nThreads, 

                                  0L, TimeUnit.MILLISECONDS, 

                                  new LinkedBlockingQueue<Runnable>()); 

}

實際上就是個不支持keepalivetime,且corePoolSize和maximumPoolSize相等的線程池。

SingleThreadExecutor

1

2

3

4

5

6

public static ExecutorService newSingleThreadExecutor() { 

    return new FinalizableDelegatedExecutorService 

        (new ThreadPoolExecutor(1, 1

                                0L, TimeUnit.MILLISECONDS, 

                                new LinkedBlockingQueue<Runnable>())); 

}

實際上就是個不支持keepalivetime,且corePoolSize和maximumPoolSize都等1的線程池。

CachedThreadPool

1

2

3

4

5

public static ExecutorService newCachedThreadPool() { 

      return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

                                  60L, TimeUnit.SECONDS, 

                                  new SynchronousQueue<Runnable>()); 

}

實際上就是個支持keepalivetime時間是60秒(線程空閒存活時間),且corePoolSize爲0,maximumPoolSize無窮大的線程池。

SingleThreadScheduledExecutor

1

2

3

4

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { 

    return new DelegatedScheduledExecutorService 

        (new ScheduledThreadPoolExecutor(1, threadFactory)); 

}

其實是個corePoolSize爲1的ScheduledExecutor。上文說過ScheduledExecutor採用無界等待隊列,因此maximumPoolSize沒有做用。

ScheduledThreadPool

1

2

3

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { 

    return new ScheduledThreadPoolExecutor(corePoolSize); 

}

其實是corePoolSize課設定的ScheduledExecutor。上文說過ScheduledExecutor採用無界等待隊列,因此maximumPoolSize沒有做用。

相關文章
相關標籤/搜索