J.U.C併發包誕生的那些事兒

前言

J.U.C是java包java.util.concurrent的簡寫,中文簡稱併發包,是jdk1.5新增用來編寫併發相關的基礎api。java從事者必定不陌生,同時,流量時代的今天,併發包也成爲了高級開發面試時必問的一塊內容,本篇內容主要聊聊J.U.C背後的哪些事兒,而後結合LockSupport和Unsafe探祕下併發包更底層的哪些代碼,有多是系列博文的一個開篇html

關於JCP和JSR

JCP是Java Community Process的簡寫,是一種開發和修訂Java技術規範的流程,同時,咱們提到JCP通常是指維護管理這套流程的組織,這個組織主要由Java開發者以及被受權的非盈利組織組成,他們掌管着Java的發展方向。JCP是Sun公司1998年12月8日提出的,旨在經過java社區的力量推動Java的發展。截止目前,已經從1.0版本發展到了最新的2019年7月21日頒佈的2.11版本。JCP流程中有四個主要階段,分別是啓動、發佈草案、最終版本、維護,一個Java的新功能從啓動階段提出,到順利走完整套流程後,就會出如今下個版本的JDK中了。java

JSR是Java Specification Requests的簡寫,是服務JCP啓動階段提出草案的規範,任何人註冊成爲JCP的會員後,均可以向JCP提交JSR。好比,你以爲JDK中String的操做方法沒有guava中的實用,你提個JSR加強String中的方法,只要可以經過JCP的審覈,就能夠在下個版本的JDK中看到了。咱們熟知的提案有,Java緩存api的JSR-10七、Bean屬性校驗JSR-303等,固然還有本篇要講的Java併發包JSR-166.程序員

Doug Lea和他的JSR-166

Doug Lea,中文名爲道格·利。是美國的一個大學教師,大神級的人物,J.U.C就是出自他之手。JDK1.5以前,咱們控制程序併發訪問同步代碼只能使用synchronized,那個時候synchronized的性能還沒優化好,性能並很差,控制線程也只能使用Object的wait和notify方法。這個時候Doug Lea給JCP提交了JSR-166的提案,在提交JSR-166以前,Doug Lea已經使用了相似J.U.C包功能的代碼已經三年多了,這些代碼就是J.U.C的原型,下面簡單看下這些具備歷史味道的代碼,同時也能引起咱們的一些思考,若是JDK中沒有,那麼就本身造呀!面試

Lock接口的原型

public class Mutex implements Sync  {
  /** The lock status **/
  protected boolean inuse_ = false;
  @Override
  public void acquire() throws InterruptedException {
    if (Thread.interrupted()) {
      throw new InterruptedException();
    }
    synchronized(this) {
      try {
        //若是inuse_爲true就wait住線程
        while (inuse_) {
          wait();
        }
        inuse_ = true;
      }
      catch (InterruptedException ex) {
        notify();
        throw ex;
      }
    }
  }
  /**
   * 釋放鎖,通知線程繼續執行
   */
  @Override
  public synchronized void release()  {
    inuse_ = false;
    notify(); 
  }
  @Override
  public boolean attempt(long msecs) throws InterruptedException {
    if (Thread.interrupted()) {
      throw new InterruptedException();
    }
    synchronized(this) {
      if (!inuse_) {
        inuse_ = true;
        return true;
      }
      else if (msecs <= 0) {
        return false;
      } else {
        long waitTime = msecs;
        long start = System.currentTimeMillis();
        try {
          for (;;) {
            wait(waitTime);
            if (!inuse_) {
              inuse_ = true;
              return true;
            }
            else {
              waitTime = msecs - (System.currentTimeMillis() - start);
              if (waitTime <= 0) 
                return false;
            }
          }
        }
        catch (InterruptedException ex) {
          notify();
          throw ex;
        }
      }
    }  
  }
}

CountDownLatch的原型

public class CountDown implements Sync {
    protected final int initialCount_;
    protected int count_;
    /**
     * Create a new CountDown with given count value
     **/
    public CountDown(int count) {
        count_ = initialCount_ = count;
    }
    @Override
    public void acquire() throws InterruptedException {
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        synchronized (this) {
            while (count_ > 0) {
                wait();
            }
        }
    }
    @Override
    public boolean attempt(long msecs) throws InterruptedException {
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        synchronized (this) {
            if (count_ <= 0) {
                return true;
            } else if (msecs <= 0) {
                return false;
            } else {
                long waitTime = msecs;
                long start = System.currentTimeMillis();
                for (; ; ) {
                    wait(waitTime);
                    if (count_ <= 0) {
                        return true;
                    } else {
                        waitTime = msecs - (System.currentTimeMillis() - start);
                        if (waitTime <= 0) {
                            return false;
                        }
                    }
                }
            }
        }
    }
    /**
     * Decrement the count.
     * After the initialCount'th release, all current and future
     * acquires will pass
     **/
    @Override
    public synchronized void release() {
        if (--count_ == 0) {
            notifyAll();
        }
    }

    /**
     * Return the initial count value
     **/
    public int initialCount() {
        return initialCount_;
    }

    public synchronized int currentCount() {
        return count_;
    }
}

AbstractQueuedSynchronizer抽象類的原型

瞭解J.U.C的都知道,在這個包裏面AbstractQueuedSynchronizer是精髓所在,這就是咱們在聊併發包時俗稱的AQS,這個框架設計爲同步狀態的原子性管理、線程的阻塞和解除阻塞以及排隊提供一種通用的機制。併發包下ReentrantLock、CountDownLatch等都是基於AQS來實現的,在看下上面的原型實現都是實現的Sync接口,是否是似曾相識,下面的Sync就是AbstractQueuedSynchronizer的原型了編程

public interface Sync {

  public void acquire() throws InterruptedException;

  public boolean attempt(long msecs) throws InterruptedException;

  public void release();

  /**  One second, in milliseconds; convenient as a time-out value **/
  public static final long ONE_SECOND = 1000;
  /**  One minute, in milliseconds; convenient as a time-out value **/
  public static final long ONE_MINUTE = 60 * ONE_SECOND;
  /**  One hour, in milliseconds; convenient as a time-out value **/
  public static final long ONE_HOUR = 60 * ONE_MINUTE;
  /**  One day, in milliseconds; convenient as a time-out value **/
  public static final long ONE_DAY = 24 * ONE_HOUR;
  /**  One week, in milliseconds; convenient as a time-out value **/
  public static final long ONE_WEEK = 7 * ONE_DAY;
  /**  One year in milliseconds; convenient as a time-out value  **/
  public static final long ONE_YEAR = (long)(365.2425 * ONE_DAY);
  /**  One century in milliseconds; convenient as a time-out value **/
  public static final long ONE_CENTURY = 100 * ONE_YEAR;
}

JSR-166的詳細內容

一、請描述擬議的規範:

這個JSR的目標相似於JDK1.2 Collections包的目標:api

  • 1.標準化一個簡單,可擴展的框架,該框架將經常使用的實用程序組織成一個足夠小的包,以便用戶能夠輕鬆學習並由開發人員維護。
  • 2.提供一些高質量的實現。

該包將包含接口和類,這些接口和類在各類編程樣式和應用程序中都頗有用。這些類包括:緩存

  • 原子變量。
  • 專用鎖,屏障,信號量和條件變量。
  • 爲多線程使用而設計的隊列和相關集合。
  • 線程池和自定義執行框架。

咱們還將研究核心語言和庫中的相關支持。
請注意,這些與J2EE中使用的事務併發控制框架徹底不一樣。(可是,對於那些建立此類框架的人來講,它們會頗有用。)安全

二、什麼是目標Java平臺?

J2SE多線程

三、擬議規範將解決Java社區的哪些需求?

底層線程原語(例如synchronized塊,Object.wait和Object.notify)不足以用於許多編程任務。所以,應用程序員常常被迫實現本身的更高級別的併發工具。這致使了巨大的重複工做。此外,衆所周知,這些設施難以正確,甚至更難以優化。應用程序員編寫的併發工具一般不正確或效率低下。提供一組標準的併發實用程序將簡化編寫各類多線程應用程序的任務,並一般能夠提升使用它們的應用程序的質量。併發

四、爲何現有規範不知足這種需求?

目前,開發人員只能使用Java語言自己提供的併發控制結構。對於某些應用程序來講,這些級別過低,而對其餘應用程序則不完整

五、請簡要介紹基礎技術或技術:

絕大多數軟件包將在低級Java構造之上實現。可是,有一些關於原子性和監視器的關鍵JVM /語言加強功能是得到高效和正確語義所必需的。

六、API規範是否有建議的包名?(即javapi.something,org.something等)

java.util.concurrent中

七、建議的規範是否與您知道的特定操做系統,CPU或I/O設備有任何依賴關係?

只是間接地,由於在不一樣平臺上運行的JVM可能可以以不一樣方式優化某些構造。

八、當前的安全模型是否存在沒法解決的安全問題?

沒有

九、是否存在國際化或本地化問題?

沒有

十、是否有任何現有規範可能所以工做而過期,棄用或須要修訂?

沒有

十一、請描述制定本規範的預期時間表。

目標是將此規範包含在J2SE 1.5(Tiger)的JSR中。

十二、請描述致力於制定本規範的專家組的預期工做模式。

電子郵件,電話會議和不常見的會議。咱們還將使用或建立一個開放的郵件列表,供專家組之外的其餘感興趣的人討論。

解密LockSupport和Unsafe

前面說到AQS是併發包下的精髓所在,那麼LockSupport和Unsafe就是整個JSR-166併發包的全部功能實現的靈魂,縱觀整個併發包下的代碼,無處不見LockSupport和Unsafe的身影。LockSupport提供了兩個關鍵方法,park和unpark,用來操做線程的阻塞和放行,功能能夠類比Object的wait和notify,可是比這兩個api更靈活。下面是博主簡化了的實現(JDK中不是這樣的)

/**
   用於建立鎖和其餘同步類的基本線程阻塞基礎類,提供基礎的線程控制功能。
 */
public class LockSupport {
    private LockSupport() {}
    /**
      解除park的阻塞,若是還沒阻塞,它對{@code park}的下一次調用將保證不會阻塞
     */
    public static void unpark(Thread thread) {
        if (thread != null) {
            UNSAFE.unpark(thread);
        }
    }
    /**
      阻塞當前線程,除非先調用了unpark()方法。
     */
    public static void park() {
        UNSAFE.park(false, 0L);
    }

    //Hotspot implementation via intrinsics API
    private static final Unsafe UNSAFE;
    static {
        try {
            try {
                final PrivilegedExceptionAction<Unsafe> action = new PrivilegedExceptionAction<Unsafe>() {
                    @Override
                    public Unsafe run() throws Exception {
                        Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
                        theUnsafe.setAccessible(true);
                        return (Unsafe) theUnsafe.get(null);
                    }
                };
                UNSAFE = AccessController.doPrivileged(action);
            } catch (Exception e) {
                throw new RuntimeException("Unable to load unsafe", e);
            }
        } catch (Exception ex) { throw new Error(ex); }
    }
}

有了park和unpark後,咱們也能夠這樣來實現Lock的功能,代碼以下,有了ConcurrentLinkedQueue加持後,就能夠在基本的鎖的功能上,實現公平鎖的語義了。

/**
 * 簡版先進先出的公平鎖實現
 * @author: kl @kailing.pub
 * @date: 2019/9/2
 */
public class FIFOMutex {
    private final AtomicBoolean locked = new AtomicBoolean(false);
    private final Queue<Thread> waiters = new ConcurrentLinkedQueue<>();
    public void lock() {
        boolean wasInterrupted = false;
        Thread current = Thread.currentThread();
        waiters.add(current);
        // 在隊列中不是第一個或沒法獲取鎖時阻塞
        while (waiters.peek() != current || !locked.compareAndSet(false, true)) {
            LockSupport.park(this);
            if (Thread.interrupted()) {
                wasInterrupted = true;
            }
        }
        waiters.remove();
        if (wasInterrupted) {
            current.interrupt();
        }
    }
    public void unlock() {
        locked.set(false);
        LockSupport.unpark(waiters.peek());
    }
}

神祕的Unsafe,JSR166增長了哪些內容

心細的你可能發現了LockSupport最終仍是基於Unsafe的park和unpark來實現的,Unsafe在JDK1.5以前就存在的,那JSR166後增長了哪些內容呢?先來看下Unsafe是什麼來頭。JDK源碼中是這樣描述的:一組用於執行低層、不安全操做的方法。儘管該類和全部方法都是公共的,可是該類的使用受到限制,由於只有受信任的代碼才能得到該類的實例。如其名,不安全的,因此在JDK1.8後直接不提供源碼了,JDK中其餘的代碼均可以在IDE中直接看到.java的文件,而Unsafe只有.class編譯後的代碼。由於Unsafe是真的有黑魔法,能夠直接操做系統級的資源,好比系統內存、線程等。JDK不直接對外暴露Unsafe的api,若是直接在本身的應用程序中像JDK中那麼獲取Unsafe的實例,如:

private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();

會直接拋異常SecurityException("Unsafe"),正確的獲取方式以下:

private static final Unsafe UNSAFE;
    static {
        try {
            try {
                final PrivilegedExceptionAction<Unsafe> action = new PrivilegedExceptionAction<Unsafe>() {
                    @Override
                    public Unsafe run() throws Exception {
                        Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
                        theUnsafe.setAccessible(true);
                        return (Unsafe) theUnsafe.get(null);
                    }
                };
                UNSAFE = AccessController.doPrivileged(action);
            } catch (Exception e) {
                throw new RuntimeException("Unable to load unsafe", e);
            }
        } catch (Exception ex) { throw new Error(ex); }
    }

其實,深刻到Unsafe後,會發現深刻不下去了,Unsafe中的方法是都是native標記的本地方法,沒有實現,如:

public native void unpark(Object var1);

    public native void park(boolean var1, long var2);

若是是在Windows下,最終調用的就是使用C++開發的最終編譯成.dll的包,因此只要看到C++相關的代碼就知道怎麼回事了

首先定位到Unsafe.cpp,文件位置在:openjdkhotspotsrcsharevmprimsUnsafe.cpp,會發現和JSR166相關的都有註釋,如:// These are the methods prior to the JSR 166 changes in 1.6.0。根據這些信息,得知JSR166在Unsafe中新增了五個方法,分別是compareAndSwapObject、compareAndSwapInt、compareAndSwapLong、park、unpark,這就是併發包中CAS原子操做和線程控制的核心所在了,併發包中的大部分功能都是基於他們來實現的。最後咱們看下park和unpark的具體實現,在學校學的C語言丟的差很差多了,可是下面的代碼還語義仍是很清晰的

// JSR166
// -------------------------------------------------------

/*
 * The Windows implementation of Park is very straightforward: Basic
 * operations on Win32 Events turn out to have the right semantics to
 * use them directly. We opportunistically resuse the event inherited
 * from Monitor.
 *
void Parker::park(bool isAbsolute, jlong time) {
  guarantee (_ParkEvent != NULL, "invariant") ;
  // First, demultiplex/decode time arguments
  if (time < 0) { // don't wait
    return;
  }
  else if (time == 0 && !isAbsolute) {
    time = INFINITE;
  }
  else if  (isAbsolute) {
    time -= os::javaTimeMillis(); // convert to relative time
    if (time <= 0) // already elapsed
      return;
  }
  else { // relative
    time /= 1000000; // Must coarsen from nanos to millis
    if (time == 0)   // Wait for the minimal time unit if zero
      time = 1;
  }

  JavaThread* thread = (JavaThread*)(Thread::current());
  assert(thread->is_Java_thread(), "Must be JavaThread");
  JavaThread *jt = (JavaThread *)thread;

  // Don't wait if interrupted or already triggered
  if (Thread::is_interrupted(thread, false) ||
    WaitForSingleObject(_ParkEvent, 0) == WAIT_OBJECT_0) {
    ResetEvent(_ParkEvent);
    return;
  }
  else {
    ThreadBlockInVM tbivm(jt);
    OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
    jt->set_suspend_equivalent();

    WaitForSingleObject(_ParkEvent,  time);
    ResetEvent(_ParkEvent);

    // If externally suspended while waiting, re-suspend
    if (jt->handle_special_suspend_equivalent_condition()) {
      jt->java_suspend_self();
    }
  }
}

void Parker::unpark() {
  guarantee (_ParkEvent != NULL, "invariant") ;
  SetEvent(_ParkEvent);
}

結語

咱們一直受益於J.U.C的代碼,網上也不乏大量的解讀分析J.U.C源碼的文章,可是不多有講J.U.C背後的關於J.U.C誕生的那些事兒,在深刻了解併發包的代碼同時,發現了不少值的分享的事情,整個J.U.C的技術脈絡也無比的清晰,故記錄下來了。今後,博主在技術界,又多了一位崇拜的偶像Doug Lea,但願,在讀完本文後也能成爲你的偶像。

相關文章
相關標籤/搜索