JUC鎖框架——LockSupport應用以及源碼分析

LockSupport用法

在沒有LockSupport以前,線程的掛起和喚醒我們都是經過Object的wait和notify/notifyAll方法實現,而Object的wait和notify/notifyAll方法只能在同步代碼塊裏用。而LockSupport的park和unpark無需同步代碼塊裏,直接調用就能夠了。html

public static void main(String[] args)throws Exception {
        Thread t1 = new Thread(()->{
            int sum = 0;
            for(int i=0;i<10;i++){
                sum+=i;
            }
            LockSupport.park();
            System.out.println(sum);
        });
        t1.start();
        Thread.sleep(1000);//等待t1中的求和計算完成,當咱們註釋此行時,則會致使unpark方法先於park方法。可是程序依然可以正確執行。
        LockSupport.unpark(t1);
    }

注意:咱們在調用Object的notify和wait方法時若是notify先於wait方法那麼程序將會無限制的等下去,而若是LockSupport的unpark方法先於park方法則不會無限制的等待。程序依然可以正確的執行java

LockSupport比Object的wait/notify有兩大優點:node

  • LockSupport不須要在同步代碼塊裏 。因此線程間也不須要維護一個共享的同步對象了,實現了線程間的解耦。
  • unpark函數能夠先於park調用,因此不須要擔憂線程間的執行的前後順序。

LockSupport在ThreadPoolExecutor中的應用1

一、首先咱們建立一個線程池並執行FutureTask任務。c++

public static void main(String[] args)throws Exception {
        ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(1000);
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5,5,1000, TimeUnit.SECONDS,queue);
        Future<String> future = poolExecutor.submit(()->{//此任務實現了Callable接口
            TimeUnit.SECONDS.sleep(5);
            return "hello";
        });
        String result = future.get();//同步阻塞等待線程池的執行結果。
        System.out.println(result);
    }

二、future.get()是如何阻塞當前線程並獲取線程池的執行結果?安全

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);//查看源碼實際上是建立了一個FutureTask對象
    execute(ftask);
    return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

經過上面的代碼咱們能夠看出,其實當咱們調用future.get()方法實際上是調用FutureTask的get()方法。函數

//判斷當前任務是否執行完畢,若是執行完畢直接返回任務結果,不然進入awaitDone方法阻塞等待。
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

關於FutureTask源碼分析具體能夠查看https://my.oschina.net/cqqcqqok/blog/1982375工具

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);//調用parkNanos方法阻塞線程
            }
            else
                LockSupport.park(this);//調用park方法組塞住當前線程。
        }
    }

三、從前面的分析咱們知道了當調用future.get()方法是如何阻塞線程,那麼當任務執行完成以後如何喚醒線程的呢?源碼分析

因爲咱們提交的任務已經被封裝爲了FutureTask對象,那麼任務的執行就是FutureTask的run方法執行。學習

public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //c.call()就是執行咱們提交的任務,任務執行完後調用了set方法,進入set方法發現set方法調用了finishCompletion方法
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}
private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);//經過cas操做將全部等待的線程拿出來,而後便使用LockSupport的unpark喚醒每一個線程。
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        done();
        callable = null;        // to reduce footprint
    }

LockSupport在ThreadPoolExecutor中的應用2

您有沒有這樣的一個疑問,當線程池裏沒有任務時,線程池裏的線程在幹嗎呢?若是讀過線程池源碼(https://my.oschina.net/cqqcqqok/blog/2049249 )那麼你必定知道,在線程getTask()獲取任務的時候,若是沒有新的任務那麼線程會調用隊列的take方法阻塞等待新任務。那隊列的take方法是否是也跟Future的get方法實現同樣呢?咱們以ArrayBlockingQueue爲例,來看一下源碼實現。ui

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();//此處調用了Condition的await方法
        return dequeue();
    } finally {
        lock.unlock();
    }
}

繼續看源碼

public final void await() throws InterruptedException {
    if (Thread.interrupted()) throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);//調用了LockSupport的park方法
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

LockSupport的實現

park和unpark簡單介紹

LockSupport實際上是一個簡單的代理類,它裏面的代碼都是使用Unsafe類裏面的native方法,這裏能夠簡單看看sun.misc.Unsafe 本文主要學習裏面的park和unpark方法。

在Unsafe裏面,park和unpark分別以下定義:

//isAbsolute表明傳入的time是絕對時間仍是相對時間
public native void park(boolean isAbsolute, long time);
public native void unpark(Object thread);

unpark函數爲線程提供「許可(permit)」,線程調用park函數則等待「許可」。這個有點像信號量,可是這個「許可」是不能疊加的,「許可」是一次性的。能夠理解爲設置一個變量0,1之間的切換。

若是線程B連續調用了屢次unpark函數,當線程A調用park函數就使用掉這個「許可」,若是線程A第二次調用park,則進入等待狀態。

注意,unpark函數能夠先於park調用。好比線程B調用unpark函數,給線程A發了一個「許可」,那麼當線程A調用park時,它發現已經有「許可」了,那麼它會立刻再繼續運行,也就是不會阻塞。

而若是線程A處於等待許可狀態,再次調用park,則會永遠等待下去,調用unpark也沒法喚醒。

下面先看LockSupport裏面的park和unpark定義:

public static void park(Object blocker) {
    //獲取當前運行線程
    Thread t = Thread.currentThread();
    //設置t中的parkBlockerOffset(掛起線程對象的偏移地址)的值爲blocker
    //這個字段能夠理解爲專門爲LockSupport而設計的,它被用來記錄線程是被誰堵塞的,當程序出現問題時候,經過線程監控分析工具能夠找出問題所在。註釋說該對象被LockSupport的getBlocker和setBlocker來獲取和設置,且都是經過地址偏移量方式獲取和修改的。因爲這個變量LockSupport提供了park(Object parkblocker)方法。
    setBlocker(t, blocker);
    //阻塞線程
    UNSAFE.park(false, 0L);
    //線程被釋放,則將parkBlockerOffset設爲null
    setBlocker(t, null);
}

public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread);
}

park和unpark的c++實現

首先,包含park和unpark的頭函數在:http://hg.openjdk.java.net/jdk8u/jdk8u40/hotspot/file/68577993c7db/src/share/vm/runtime/park.hpp 而其具體實現函數則在:http://hg.openjdk.java.net/jdk8u/jdk8u40/hotspot/file/95e9083cf4a7/src/os/solaris/vm/os_solaris.cpp

void Parker::park(bool isAbsolute, jlong time) {
 //_counter字段,就是用來記錄「許可」的。
 //先嚐試可否直接拿到「許可」,即_counter>0時,若是成功,則把_counter設置爲0
  if (_counter > 0) {
      _counter = 0 ;
      OrderAccess::fence();
      //直接返回
      return ;
  }
  //獲取當前線程
  Thread* thread = Thread::current();
  assert(thread->is_Java_thread(), "Must be JavaThread");
  JavaThread *jt = (JavaThread *)thread;
  //若是中途已是interrupt了,那麼馬上返回,不阻塞
  if (Thread::is_interrupted(thread, false)) {
    return;
  }
  //記錄當前絕對時間戳
  timespec absTime;
  //若是park的超時時間已到,則返回
  if (time < 0) { // don't wait at all
    return;
  }
  if (time > 0) {
  //更換時間戳
    unpackTime(&absTime, isAbsolute, time);
  }
   //進入安全點,利用該thread構造一個ThreadBlockInVM。
  ThreadBlockInVM tbivm(jt);
  if (Thread::is_interrupted(thread, false) ||
      os::Solaris::mutex_trylock(_mutex) != 0) {
    return;
  }
  //記錄等待狀態
  int status ;
  //中途再次給予了許可,則直接返回不等帶。
  if (_counter > 0)  { // no wait needed
    _counter = 0;
    status = os::Solaris::mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
    OrderAccess::fence();
    return;
  }
#ifdef ASSERT
  sigset_t oldsigs;
  sigset_t* allowdebug_blocked = os::Solaris::allowdebug_blocked_signals();
  thr_sigsetmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
#endif
  OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
  jt->set_suspend_equivalent();
#if defined(__sparc) && defined(COMPILER2)
  if (ClearFPUAtPark) { _mark_fpu_nosave() ; }
#endif
  if (time == 0) {
    //等待時間爲0時,直接等待
    status = os::Solaris::cond_wait (_cond, _mutex) ;
  } else {
    //time不爲0,則繼續等待。
    status = os::Solaris::cond_timedwait (_cond, _mutex, &absTime);
  }
   assert_status(status == 0 || status == EINTR ||
                status == ETIME || status == ETIMEDOUT,
                status, "cond_timedwait");
#ifdef ASSERT
  thr_sigsetmask(SIG_SETMASK, &oldsigs, NULL);
#endif
  _counter = 0 ;
  status = os::Solaris::mutex_unlock(_mutex);
  assert_status(status == 0, status, "mutex_unlock") ;
  if (jt->handle_special_suspend_equivalent_condition()) {
    jt->java_suspend_self();
  }
  OrderAccess::fence();
}
void Parker::unpark() {
//定義兩個變量,staus用於判斷是否獲取鎖
  int s, status ;
  //獲取鎖
  status = os::Solaris::mutex_lock (_mutex) ;
  //判斷是否成功
  assert (status == 0, "invariant") ;
  //存儲原先變量_counter
  s = _counter;
  //把_counter設爲1
  _counter = 1;
  //釋放鎖
  status = os::Solaris::mutex_unlock (_mutex) ;
  assert (status == 0, "invariant") ;
  if (s < 1) {
  //若是原先_counter信號量小於1,即爲0,則進行signal操做,喚醒操做
    status = os::Solaris::cond_signal (_cond) ;
    assert (status == 0, "invariant") ;
  }
}

參考地址

相關文章
相關標籤/搜索