深刻理解Java中的底層阻塞原理及實現

談到阻塞,相信你們都不會陌生了。阻塞的應用場景真的多得不要不要的,好比 生產-消費模式,限流統計等等。什麼 ArrayBlockingQueue、 LinkedBlockingQueue、DelayQueue 等等,都是阻塞隊列的實現啊,多簡單!java

阻塞,通常有兩個特性很亮眼:1. 不耗 CPU 等待;2. 線程安全;node

額,要這麼說也 OK 的。畢竟,咱們遇到的問題,到這裏就夠解決了。可是有沒有想過,這容器的阻塞又是如何實現的呢?linux

好吧,翻開源碼,也很簡單了:(好比 ArrayBlockingQueue 的 take、put….)安全

1架構

2app

3less

4分佈式

5ide

6工具

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

// ArrayBlockingQueue

 

/**

 * Inserts the specified element at the tail of this queue, waiting

 * for space to become available if the queue is full.

 *

 * @throws InterruptedException {@inheritDoc}

 * @throws NullPointerException {@inheritDoc}

 */

public void put(E e) throws InterruptedException {

    checkNotNull(e);

    final ReentrantLock lock = this.lock;

    lock.lockInterruptibly();

    try {

        while (count == items.length)

            // 阻塞的點

            notFull.await();

        enqueue(e);

    } finally {

        lock.unlock();

    }

}

 

/**

 * Inserts the specified element at the tail of this queue, waiting

 * up to the specified wait time for space to become available if

 * the queue is full.

 *

 * @throws InterruptedException {@inheritDoc}

 * @throws NullPointerException {@inheritDoc}

 */

public boolean offer(E e, long timeout, TimeUnit unit)

    throws InterruptedException {

 

    checkNotNull(e);

    long nanos = unit.toNanos(timeout);

    final ReentrantLock lock = this.lock;

    lock.lockInterruptibly();

    try {

        while (count == items.length) {

            if (nanos <= 0)

                return false;

            // 阻塞的點

            nanos = notFull.awaitNanos(nanos);

        }

        enqueue(e);

        return true;

    } finally {

        lock.unlock();

    }

}

 

public E take() throws InterruptedException {

    final ReentrantLock lock = this.lock;

    lock.lockInterruptibly();

    try {

        while (count == 0)

            // 阻塞的點

            notEmpty.await();

        return dequeue();

    } finally {

        lock.unlock();

    }

}

看來,最終都是依賴了 AbstractQueuedSynchronizer 類(著名的AQS)的 await 方法,看起來像那麼回事。那麼這個同步器的阻塞又是如何實現的呢?

Java的代碼老是好跟蹤的:

// AbstractQueuedSynchronizer.await()

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

/**

 * Implements interruptible condition wait.

 * <ol>

 * <li> If current thread is interrupted, throw InterruptedException.

 * <li> Save lock state returned by {@link #getState}.

 * <li> Invoke {@link #release} with saved state as argument,

 *      throwing IllegalMonitorStateException if it fails.

 * <li> Block until signalled or interrupted.

 * <li> Reacquire by invoking specialized version of

 *      {@link #acquire} with saved state as argument.

 * <li> If interrupted while blocked in step 4, throw InterruptedException.

 * </ol>

 */

public final void await() throws InterruptedException {

    if (Thread.interrupted())

        throw new InterruptedException();

    Node node = addConditionWaiter();

    int savedState = fullyRelease(node);

    int interruptMode = 0;

    while (!isOnSyncQueue(node)) {

        // 此處進行真正的阻塞

        LockSupport.park(this);

        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

            break;

    }

    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

        interruptMode = REINTERRUPT;

    if (node.nextWaiter != null) // clean up if cancelled

        unlinkCancelledWaiters();

    if (interruptMode != 0)

        reportInterruptAfterWait(interruptMode);

}

如上,能夠看到,真正的阻塞工做又轉交給了另外一個工具類: LockSupport 的 park 方法了,這回跟鎖扯上了關係,看起來已經愈來愈接近事實了:

// LockSupport.park()

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

/**

 * Disables the current thread for thread scheduling purposes unless the

 * permit is available.

 *

 * <p>If the permit is available then it is consumed and the call returns

 * immediately; otherwise

 * the current thread becomes disabled for thread scheduling

 * purposes and lies dormant until one of three things happens:

 *

 * <ul>

 * <li>Some other thread invokes {@link #unpark unpark} with the

 * current thread as the target; or

 *

 * <li>Some other thread {@linkplain Thread#interrupt interrupts}

 * the current thread; or

 *

 * <li>The call spuriously (that is, for no reason) returns.

 * </ul>

 *

 * <p>This method does <em>not</em> report which of these caused the

 * method to return. Callers should re-check the conditions which caused

 * the thread to park in the first place. Callers may also determine,

 * for example, the interrupt status of the thread upon return.

 *

 * @param blocker the synchronization object responsible for this

 *        thread parking

 * @since 1.6

 */

public static void park(Object blocker) {

    Thread t = Thread.currentThread();

    setBlocker(t, blocker);

    UNSAFE.park(false, 0L);

    setBlocker(t, null);

}

看得出來,這裏的實現就比較簡潔了,先獲取當前線程,設置阻塞對象,阻塞,而後解除阻塞。

好吧,到底什麼是真正的阻塞,咱們仍是不得而知!

UNSAFE.park(false, 0L); 是個什麼東西? 看起來就是這一句起到了最關鍵的做用呢!但因爲這裏已是 native 代碼,咱們已經沒法再簡單的查看源碼了!那咋整呢?

那不行就看C/C++的源碼唄,看一下 parker 的定義(park.hpp):

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

class Parker : public os::PlatformParker {

private:

  volatile int _counter ;

  Parker * FreeNext ;

  JavaThread * AssociatedWith ; // Current association

 

public:

  Parker() : PlatformParker() {

    _counter       = 0 ;

    FreeNext       = NULL ;

    AssociatedWith = NULL ;

  }

protected:

  ~Parker() { ShouldNotReachHere(); }

public:

  // For simplicity of interface with Java, all forms of park (indefinite,

  // relative, and absolute) are multiplexed into one call.  c中暴露出兩個方法給java調用

  void park(bool isAbsolute, jlong time);

  void unpark();

 

  // Lifecycle operators

  static Parker * Allocate (JavaThread * t) ;

  static void Release (Parker * e) ;

private:

  static Parker * volatile FreeList ;

  static volatile int ListLock ;

 

};

那 park() 方法究竟是如何實現的呢? 實際上是繼承的 os::PlatformParker 的功能,也就是平臺相關的私有實現,以 Linux 平臺實現爲例(os_linux.hpp):

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

// Linux中的parker定義

class PlatformParker : public CHeapObj<mtInternal> {

  protected:

    enum {

        REL_INDEX = 0,

        ABS_INDEX = 1

    };

    int _cur_index;  // which cond is in use: -1, 0, 1

    pthread_mutex_t _mutex [1] ;

    pthread_cond_t  _cond  [2] ; // one for relative times and one for abs.

 

  public:       // TODO-FIXME: make dtor private

    ~PlatformParker() { guarantee (0, "invariant") ; }

 

  public:

    PlatformParker() {

      int status;

      status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr());

      assert_status(status == 0, status, "cond_init rel");

      status = pthread_cond_init (&_cond[ABS_INDEX], NULL);

      assert_status(status == 0, status, "cond_init abs");

      status = pthread_mutex_init (_mutex, NULL);

      assert_status(status == 0, status, "mutex_init");

      _cur_index = -1; // mark as unused

    }

};

看到 park.cpp 中沒有重寫 park() 和 unpark() 方法,也就是說阻塞實現徹底交由特定平臺代碼處理了(os_linux.cpp):

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

// park方法的實現,依賴於 _counter, _mutex[1], _cond[2]

void Parker::park(bool isAbsolute, jlong time) {

  // Ideally we'd do something useful while spinning, such

  // as calling unpackTime().

 

  // Optional fast-path check:

  // Return immediately if a permit is available.

  // We depend on Atomic::xchg() having full barrier semantics

  // since we are doing a lock-free update to _counter.

  if (Atomic::xchg(0, &_counter) > 0) return;

 

  Thread* thread = Thread::current();

  assert(thread->is_Java_thread(), "Must be JavaThread");

  JavaThread *jt = (JavaThread *)thread;

 

  // Optional optimization -- avoid state transitions if there's an interrupt pending.

  // Check interrupt before trying to wait

  if (Thread::is_interrupted(thread, false)) {

    return;

  }

 

  // Next, demultiplex/decode time arguments

  timespec absTime;

  if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all

    return;

  }

  if (time > 0) {

    unpackTime(&absTime, isAbsolute, time);

  }

 

  // Enter safepoint region

  // Beware of deadlocks such as 6317397.

  // The per-thread Parker:: mutex is a classic leaf-lock.

  // In particular a thread must never block on the Threads_lock while

  // holding the Parker:: mutex.  If safepoints are pending both the

  // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.

  ThreadBlockInVM tbivm(jt);

 

  // Don't wait if cannot get lock since interference arises from

  // unblocking.  Also. check interrupt before trying wait

  if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {

    return;

  }

 

  int status ;

  if (_counter > 0)  { // no wait needed

    _counter = 0;

    status = pthread_mutex_unlock(_mutex);

    assert (status == 0, "invariant") ;

    // Paranoia to ensure our locked and lock-free paths interact

    // correctly with each other and Java-level accesses.

    OrderAccess::fence();

    return;

  }

 

#ifdef ASSERT

  // Don't catch signals while blocked; let the running threads have the signals.

  // (This allows a debugger to break into the running thread.)

  sigset_t oldsigs;

  sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();

  pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);

#endif

 

  OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);

  jt->set_suspend_equivalent();

  // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()

 

  assert(_cur_index == -1, "invariant");

  if (time == 0) {

    _cur_index = REL_INDEX; // arbitrary choice when not timed

    status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;

  } else {

    _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;

    status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;

    if (status != 0 && WorkAroundNPTLTimedWaitHang) {

      pthread_cond_destroy (&_cond[_cur_index]) ;

      pthread_cond_init    (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());

    }

  }

  _cur_index = -1;

  assert_status(status == 0 || status == EINTR ||

                status == ETIME || status == ETIMEDOUT,

                status, "cond_timedwait");

 

#ifdef ASSERT

  pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);

#endif

 

  _counter = 0 ;

  status = pthread_mutex_unlock(_mutex) ;

  assert_status(status == 0, status, "invariant") ;

  // Paranoia to ensure our locked and lock-free paths interact

  // correctly with each other and Java-level accesses.

  OrderAccess::fence();

 

  // If externally suspended while waiting, re-suspend

  if (jt->handle_special_suspend_equivalent_condition()) {

    jt->java_suspend_self();

  }

}

 

// unpark 實現,相對簡單些

void Parker::unpark() {

  int s, status ;

  status = pthread_mutex_lock(_mutex);

  assert (status == 0, "invariant") ;

  s = _counter;

  _counter = 1;

  if (s < 1) {

    // thread might be parked

    if (_cur_index != -1) {

      // thread is definitely parked

      if (WorkAroundNPTLTimedWaitHang) {

        status = pthread_cond_signal (&_cond[_cur_index]);

        assert (status == 0, "invariant");

        status = pthread_mutex_unlock(_mutex);

        assert (status == 0, "invariant");

      } else {

        // must capture correct index before unlocking

        int index = _cur_index;

        status = pthread_mutex_unlock(_mutex);

        assert (status == 0, "invariant");

        status = pthread_cond_signal (&_cond[index]);

        assert (status == 0, "invariant");

      }

    } else {

      pthread_mutex_unlock(_mutex);

      assert (status == 0, "invariant") ;

    }

  } else {

    pthread_mutex_unlock(_mutex);

    assert (status == 0, "invariant") ;

  }

}

從上面代碼能夠看出,阻塞主要藉助於三個變量,_cond、_mutex、_counter, 調用 Linux 系統的 pthread_cond_wait、pthread_mutex_lock、pthread_mutex_unlock (一組 POSIX 標準的阻塞接口)等平臺相關的方法進行阻塞了!

而 park.cpp 中,則只有  Allocate、Release 等的一些常規操做!

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

// 6399321 As a temporary measure we copied & modified the ParkEvent::

// allocate() and release() code for use by Parkers.  The Parker:: forms

// will eventually be removed as we consolide and shift over to ParkEvents

// for both builtin synchronization and JSR166 operations.

 

volatile int Parker::ListLock = 0 ;

Parker * volatile Parker::FreeList = NULL ;

 

Parker * Parker::Allocate (JavaThread * t) {

  guarantee (t != NULL, "invariant") ;

  Parker * p ;

 

  // Start by trying to recycle an existing but unassociated

  // Parker from the global free list.

  // 8028280: using concurrent free list without memory management can leak

  // pretty badly it turns out.

  Thread::SpinAcquire(&ListLock, "ParkerFreeListAllocate");

  {

    p = FreeList;

    if (p != NULL) {

      FreeList = p->FreeNext;

    }

  }

  Thread::SpinRelease(&ListLock);

 

  if (p != NULL) {

    guarantee (p->AssociatedWith == NULL, "invariant") ;

  } else {

    // Do this the hard way -- materialize a new Parker..

    p = new Parker() ;

  }

  p->AssociatedWith = t ;          // Associate p with t

  p->FreeNext       = NULL ;

  return p ;

}

 

void Parker::Release (Parker * p) {

  if (p == NULL) return ;

  guarantee (p->AssociatedWith != NULL, "invariant") ;

  guarantee (p->FreeNext == NULL      , "invariant") ;

  p->AssociatedWith = NULL ;

 

  Thread::SpinAcquire(&ListLock, "ParkerFreeListRelease");

  {

    p->FreeNext = FreeList;

    FreeList = p;

  }

  Thread::SpinRelease(&ListLock);

}

綜上源碼,在進行阻塞的時候,底層並無(並不必定)要用 while 死循環來阻塞,更多的是藉助於操做系統的實現來進行阻塞的。固然,這也更符合你們的猜測!

從上的代碼咱們也發現一點,底層在作許多事的時候,都不忘考慮線程中斷,也就是說,即便在阻塞狀態也是能夠接收中斷信號的,這爲上層語言打開了方便之門。

若是要細說阻塞,其實還遠沒完,不過再往操做系統層面如何實現,就得再下點功夫,去翻翻資料了,把底線壓在操做系統層面,大多數狀況下也夠用了!

歡迎學Java和大數據的朋友們加入java架構交流: 855835163

加羣連接:https://jq.qq.com/?_wv=1027&amp;k=5dPqXGI

羣內提供免費的架構資料還有:Java工程化、高性能及分佈式、高性能、深刻淺出。高架構。性能調優、Spring,MyBatis,Netty源碼分析和大數據等多個知識點高級進階乾貨的免費直播講解  能夠進來一塊兒學習交流哦

相關文章
相關標籤/搜索