併發系列(1)之 Thread 詳解

本文主要結合 java.lang.Thread 源碼,梳理 Java 線程的總體脈絡;java

1、線程概述

對於 Java 中的線程主要是依賴於系統的 API 實現的,這一點能夠從 java.lang.Thread;源碼中關鍵的方法都是 native 方法看出,也能夠直接查看 OpenJDK 源碼看出來,這一點後面還會講到;對於 JDK1.8 而言,他的 Windows 版和 Linux 版使用的都是 1:1 線程模型,即系統內核線程和輕量級進程的比是 1:1編程

  • 內核線程(Kernel-Level Thread,KLT):是由操做系統內核(Kernel)直接支持的線程,這種線程由內核來完成切換,內核經過調度器(Schedule)進行調度,並負責將線程的任務映射到各個處理器上;
  • 輕量級進程(Light Weight Process,LWP):程序能夠直接使用的一種內核線程高級接口,也就是咱們一般意義上的線程;

如圖所示:安全

thread11

優勢:多線程

  • 由內核線程的支持,每一個線程都成爲一個獨立的調度單元,即線程之間不會相互阻塞影響;使用內核提供的線程調度功能及處理器映射,能夠完成線程的切換,並將線程的任務映射到其餘處理器上,充分利用多核處理器的優點,實現真正的並行。

缺點:併發

  • 同時因爲基於內核線程實現,線程的建立、關閉、同步等操做都須要系統調用;須要在用戶態和內核態之間切換,代價相對較高;
  • 另外每一個線程都須要消耗必定的內核資源,如內核線程的棧空間,因此係統支持的輕量級進程是有限的;

2、線程狀態

Java 線程的整個生命週期可能會經歷如下5中狀態,如圖所示:jvm

threadstates

  • 新建(New):新建後未啓動的線程;
  • 運行(Runnable):包括運行中(Running)和就緒(Ready)兩種狀態;也就是正在運行,或者等待 CPU 分配執行時間;
  • 等待(Waiting):無限期的等待其餘線程顯示喚醒;
  • 超時等待(Timed_Waiting):必定時間內沒有被其餘線程喚醒,則由系統自動喚醒;
  • 阻塞(Blocked):等待獲取排它鎖;
  • 終止(Terminated):運行終止;

3、源碼分析

1. native註冊

/* Make sure registerNatives is the first thing <clinit> does. */
private static native void registerNatives();
static {
  registerNatives();
}

這段代碼在不少地方都出現過,好比:ide

java.lang.System
java.lang.Object
java.lang.Class

其做用就是在使用 JNI 時須要向 JVM 註冊,其方法名默認爲 Java_<fully qualified class name>_method;可是若是以爲這樣的名字太長,這是就可使用 registerNatives() 向 JVM 註冊任意的函數名;函數

Thread 中的 native 方法有:源碼分析

private native void start0();
private native void stop0(Object o);
public final native boolean isAlive();
private native void suspend0();
private native void resume0();
private native void setPriority0(int newPriority);
public static native void yield();
public static native void sleep(long millis) throws InterruptedException;
public static native Thread currentThread();
public native int countStackFrames();
private native void interrupt0();
private native boolean isInterrupted(boolean ClearInterrupted);
public static native boolean holdsLock(Object obj);
private native static Thread[] getThreads();
private native static StackTraceElement[][] dumpThreads(Thread[] threads);
private native void setNativeName(String name);

其對應 JVM 源碼this

// openjdk\jdk\src\share\native\java\lang\Thread.c
static JNINativeMethod methods[] = {
    {"start0",           "()V",              (void *)&JVM_StartThread},
    {"stop0",            "(" OBJ ")V",       (void *)&JVM_StopThread},
    {"isAlive",          "()Z",              (void *)&JVM_IsThreadAlive},
    {"suspend0",         "()V",              (void *)&JVM_SuspendThread},
    {"resume0",          "()V",              (void *)&JVM_ResumeThread},
    {"setPriority0",     "(I)V",             (void *)&JVM_SetThreadPriority},
    {"yield",            "()V",              (void *)&JVM_Yield},
    {"sleep",            "(J)V",             (void *)&JVM_Sleep},
    {"currentThread",    "()" THD,           (void *)&JVM_CurrentThread},
    {"countStackFrames", "()I",              (void *)&JVM_CountStackFrames},
    {"interrupt0",       "()V",              (void *)&JVM_Interrupt},
    {"isInterrupted",    "(Z)Z",             (void *)&JVM_IsInterrupted},
    {"holdsLock",        "(" OBJ ")Z",       (void *)&JVM_HoldsLock},
    {"getThreads",        "()[" THD,         (void *)&JVM_GetAllThreads},
    {"dumpThreads",      "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
    {"setNativeName",    "(" STR ")V",       (void *)&JVM_SetNativeThreadName},
};

其具體實現能夠查看

openjdk\hotspot\src\share\vm\prims\jvm.h
openjdk\hotspot\src\share\vm\prims\jvm.cpp

2. 構造方法和成員變量

public class Thread implements Runnable {
  private volatile String name;        // 線程名稱,若是沒有指定,就經過 Thread-線程序列號 命名
  private int priority;                // 線程優先級,1-10 默認與父線程優先級相同(main 線程優先級爲 5)
  private boolean daemon = false;      // 是不是守護線程
  private Runnable target;             // Runnable 對象
  private ThreadGroup group;           // 所屬線程組
  ThreadLocal.ThreadLocalMap threadLocals = null;             // 線程本地變量
  ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;  // 可繼承的線程本地變量
  private long tid;                                           // 線程 tid
  
  ...

  public Thread() {
    init(null, null, "Thread-" + nextThreadNum(), 0);
  }

  public Thread(Runnable target) {
    init(null, target, "Thread-" + nextThreadNum(), 0);
  }

  public Thread(ThreadGroup group, Runnable target) {
    init(group, target, "Thread-" + nextThreadNum(), 0);
  }

  public Thread(String name) {
    init(null, null, name, 0);
  }

  public Thread(ThreadGroup group, String name) {
    init(group, null, name, 0);
  }

  public Thread(Runnable target, String name) {
    init(null, target, name, 0);
  }

  public Thread(ThreadGroup group, Runnable target, String name) {
    init(group, target, name, 0);
  }

  public Thread(ThreadGroup group, Runnable target, String name, long stackSize) {
    init(group, target, name, stackSize);
  }
  
  private void init(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc, boolean inheritThreadLocals) {
    if (name == null) {
      throw new NullPointerException("name cannot be null");
    }

    this.name = name;

    Thread parent = currentThread();
    SecurityManager security = System.getSecurityManager();
    if (g == null) {
      if (security != null) {
        g = security.getThreadGroup();
      }
      if (g == null) {
        g = parent.getThreadGroup();
      }
    }
    
    g.checkAccess();

    if (security != null) {
      if (isCCLOverridden(getClass())) {
        security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION);
      }
    }

    g.addUnstarted();

    this.group = g;
    this.daemon = parent.isDaemon();
    this.priority = parent.getPriority();
    if (security == null || isCCLOverridden(parent.getClass()))
      this.contextClassLoader = parent.getContextClassLoader();
    else
      this.contextClassLoader = parent.contextClassLoader;
    this.inheritedAccessControlContext = acc != null ? acc : AccessController.getContext();
    this.target = target;
    setPriority(priority);
    if (inheritThreadLocals && parent.inheritableThreadLocals != null)
      this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
    this.stackSize = stackSize;
    tid = nextThreadID();
  }

  ...
  
}

能夠看到左右的構造方法最終都會調用 init();並初始化所屬線程組、名字、 Runnable、棧大小等信息;整個過程至關於配置了一個線程工廠,此時只是初始化了全部的配置,線程尚未真正建立,固然資源一樣也尚未分配,只有在調用 start() 的時候線程纔會真正建立;

此外能夠看到線程建立過程當中會有不少的權限檢查,例如:

SecurityManager security = System.getSecurityManager();
if (security != null) {
  if (isCCLOverridden(getClass())) {
    security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION);
  }
}

一般狀況下權限的檢查默認是沒有開啓的,因此 security 一直都是 null ;這裏須要在啓動 JVM 的時候指定 -Djava.security.manager ;固然也能夠指定特定的 SecurityManager;可是在開啓的時候極可能會遇到相似:java.security.AccessControlException: access denied ;權限檢查失敗的錯誤;

此時能夠在 jre\lib\security\java.policy 中添加相應的權限;或者直接開啓全部權限 permission java.security.AllPermission;

// jre\lib\security\java.policy

grant {
  permission java.lang.RuntimePermission "stopThread";
  permission java.net.SocketPermission "localhost:0", "listen";
  permission java.util.PropertyPermission "java.version", "read";

  ...

  permission java.security.AllPermission;
};

3. start 方法

public synchronized void start() {
  if (threadStatus != 0) throw new IllegalThreadStateException();
  group.add(this);
  boolean started = false;
  try {
    start0();
    started = true;
  } finally {
    try {
      if (!started) {
        group.threadStartFailed(this);
      }
    } catch (Throwable ignore) {
      //
    }
  }
}

private native void start0();

能夠看到這是一個同步方法,而且同一個線程不能啓動兩次;這裏首先將線程加入對應的線程組,再真正建立線程,若是建立失敗就在線程組中標記;對應的這個 native 方法 start0 ,的源碼一樣能夠查看 openjdk\hotspot\src\share\vm\prims\jvm.cpp,這裏就不詳細介紹了;

4. exit 方法

private void exit() {
  if (group != null) {
    group.threadTerminated(this);
    group = null;
  }
  /* Aggressively null out all reference fields: see bug 4006245 */
  target = null;
  /* Speed the release of some of these resources */
  threadLocals = null;
  inheritableThreadLocals = null;
  inheritedAccessControlContext = null;
  blocker = null;
  uncaughtExceptionHandler = null;
}

exit 方法則是由系統調用,在 Thread 銷燬前釋放資源;

5. 棄用方法

在源碼裏面還有幾個棄用的方法:

public final void stop() { }            // 中止線程
public final void suspend() { }         // 暫停線程
public final void resume() { }          // 恢復線程
  • stop:中止線程會致使它解鎖已鎖定的全部監視器,從而產生同步問題,由於它本質上是不安全的。
  • suspend:暫停線程容易出現死鎖,若是目標線程在監視器上保持鎖定,那麼在恢復目標線程以前,任何線程都沒法訪問此資源。
  • resume:恢復線程一樣容易出現死鎖, 若是 A 線程在恢復 B 線程以前鎖定監視器,而後在調用 resume 恢復 B,此時 B 會嘗試再次獲取鎖,這樣就會致使死鎖。


4、線程通信

其實全部的多線程問題,其本質都是線程之間的通信問題,也有的說是通信和同步兩個問題(線程間操做的順序);但我以爲同步仍然是線程之間經過某種方式進行通信,肯定各自執行的相對順序;因此仍然能夠算做是一種通信問題;這裏線程之間的通信問題能夠分紅兩種:

  • 共享變量,相似鎖對象、volatile、中斷等操做均可以算是共享變量通信;
  • 消息傳遞,相似 wait\notify、管道等則能夠算是經過消息直接傳遞通信;

下面咱們將介紹和 Thread 類直接相關的幾種通信,關於鎖的部分以後的博客還會詳細介紹;

1. wait\notify 機制

@Slf4j public class WaitNotify {
  private static boolean flag = true;
  private static final Object LOCK = new Object();

  public static void main(String[] args) throws Exception {
    Thread waitThread = new Thread(new Wait(), "WaitThread");
    waitThread.start();
    TimeUnit.SECONDS.sleep(1);

    Thread notifyThread = new Thread(new Notify(), "NotifyThread");
    notifyThread.start();
  }

  private static class Wait implements Runnable {
    @Override
    public void run() {
      // 加鎖,擁有lock的Monitor
      synchronized (LOCK) {
        // 當條件不知足時,繼續wait,同時釋放了lock的鎖
        while (flag) {
          try {
            log.info("flag is true. wait");
            LOCK.wait();
          } catch (InterruptedException e) {
          }
        }
        // 條件知足時,完成工做
        log.info("flag is false. running");
      }
    }
  }

  private static class Notify implements Runnable {
    @Override
    public void run() {
      // 加鎖,擁有lock的Monitor
      synchronized (LOCK) {
        // 獲取lock的鎖,而後進行通知,通知時不會釋放lock的鎖,
        // 直到當前線程釋放了lock後,WaitThread才能從wait方法中返回
        log.info("hold lock. notify");
        LOCK.notify();
        flag = false;
        SleepUtils.second(5);
      }
      // 再次加鎖
      synchronized (LOCK) {
        log.info("hold lock again. sleep");
        SleepUtils.second(5);
      }
    }
  }
}

// 打印:

[13 21:18:18,533 INFO ] [WaitThread]   WaitNotify - flag is true. wait
[13 21:18:19,533 INFO ] [NotifyThread] WaitNotify - hold lock. notify
[13 21:18:24,535 INFO ] [NotifyThread] WaitNotify - hold lock again. sleep
[13 21:18:29,536 INFO ] [WaitThread]   WaitNotify - flag is false. running

2. join

@Slf4j public class Join {
  public static void main(String[] args) throws Exception {
    Thread previous = Thread.currentThread();
    for (int i = 0; i < 5; i++) {
      // 每一個線程擁有前一個線程的引用,須要等待前一個線程終止,才能從等待中返回
      Thread thread = new Thread(new Domino(previous), String.valueOf(i));
      thread.start();
      previous = thread;
    }

    TimeUnit.SECONDS.sleep(5);
    log.info("terminate.");
  }

  private static class Domino implements Runnable {
    private Thread thread;

    public Domino(Thread thread) {
      this.thread = thread;
    }

    @Override
    public void run() {
      try {
        thread.join();
      } catch (InterruptedException e) {
      }
      log.info("terminate.");
    }
  }
}

// 打印:

[13 21:27:27,573 INFO ] [main] Join - terminate.
[13 21:27:27,574 INFO ] [0]    Join - terminate.
[13 21:27:27,574 INFO ] [1]    Join - terminate.
[13 21:27:27,574 INFO ] [2]    Join - terminate.
[13 21:27:27,574 INFO ] [3]    Join - terminate.
[13 21:27:27,574 INFO ] [4]    Join - terminate.

3. interrupt

以上 wait\notify、join 都比較簡單,你們直接看代碼應該就能理解;可是 interrupt 機制 則比較複雜一點,咱們先從源碼分析;

interrupt 方法:

private volatile Interruptible blocker;
private final Object blockerLock = new Object();

// Set the blocker field; invoked via sun.misc.SharedSecrets from java.nio code
void blockedOn(Interruptible b) {
  synchronized (blockerLock) {
    blocker = b;
  }
}

public void interrupt() {
  if (this != Thread.currentThread()) checkAccess();

  synchronized (blockerLock) {
    Interruptible b = blocker;
    if (b != null) {
      interrupt0();       // Just to set the interrupt flag
      b.interrupt(this);
      return;
    }
  }
  interrupt0();
}

private native void interrupt0();

在 Thread 的源碼上有詳細的註釋,如下我簡單翻譯:

  • 若是線程處於 WAITINF、TIMED_WAITING (正阻塞於 Object 類的 wait()、wait(long)、wait(long, int)方法,或者 Thread 類的 join()、join(long)、join(long, int)、sleep(long)、sleep(long, int)方法),則該線程的中斷狀態將被清除,並收到一個 java.lang.InterruptedException
  • 若是線程正阻塞於 InterruptibleChannel 上的 I/O 操做,則該通道將被關閉,同時該線程的中斷狀態被設置,並收到一個java.nio.channels.ClosedByInterruptException
  • 若是線程正阻塞於 java.nio.channels.Selector 操做,則該線程的中斷狀態被設置,同時它將當即從選擇操做返回,並可能帶有一個非零值,其效果同 java.nio.channels.Selector.wakeup() 方法同樣;
  • 若是上述條件都不成立,則該線程的中斷狀態將被設置;

其中 Interruptible blocker 就是在 NIO 操做的時候經過 sun.misc.SharedSecrets 設置的(其效果同反射,可是不會生成其餘對象,也就是不會觸發 OOM);


interrupted 、isInterrupted 方法:

public static boolean interrupted() {
  return currentThread().isInterrupted(true);
}

public boolean isInterrupted() {
  return isInterrupted(false);
}

private native boolean isInterrupted(boolean ClearInterrupted);

能夠很清楚的看到他們都是經過 isInterrupted(boolean ClearInterrupted) 方法實現的,可是 interrupted 會清除中斷狀態,而 isInterrupted 則不會清除;

以上 interrupt 機制 就經過設置 interrupt flag,查詢中斷狀態,以及中斷異常構成了一套完整的通信機制;也能夠看做是經過 interrupt flag 共享變量實現的,下面咱們簡單舉例:

@Slf4j public class Interrupted {
  public static void main(String[] args) throws Exception {
    // sleepThread不停的嘗試睡眠
    Thread sleepThread = new Thread(new SleepRunner(), "SleepThread");
    sleepThread.setDaemon(true);
    
    // busyThread不停的運行
    Thread busyThread = new Thread(new BusyRunner(), "BusyThread");
    busyThread.setDaemon(true);
    sleepThread.start();
    busyThread.start();
    
    // 休眠5秒,讓sleepThread和busyThread充分運行
    TimeUnit.SECONDS.sleep(5);
    
    sleepThread.interrupt();
    busyThread.interrupt();
    log.info("SleepThread interrupted is {}", sleepThread.isInterrupted());
    log.info("BusyThread interrupted is {}", busyThread.isInterrupted());

    // 防止sleepThread和busyThread馬上退出
    TimeUnit.SECONDS.sleep(5);
    log.info("exit");
  }

  static class SleepRunner implements Runnable {
    @Override
    public void run() {
      try {
        while (true) {
          Thread.sleep(2000);
        }
      } catch (InterruptedException e) {
        log.error("SleepThread interrupted is {}", Thread.currentThread().isInterrupted());
        Thread.currentThread().interrupt();
        log.error("SleepThread interrupted is {}", Thread.currentThread().isInterrupted());
      }
      log.info("exit");
    }
  }

  static class BusyRunner implements Runnable {
    @Override
    public void run() {
      if (1 == 1) {
        while (true) {
        }
      }
      log.info("exit");
    }
  }
}

// 打印:

[14 10:20:55,269 INFO ] [main]        Interrupted - SleepThread interrupted is false
[14 10:20:55,269 ERROR] [SleepThread] Interrupted - SleepThread interrupted is false
[14 10:20:55,270 INFO ] [main]        Interrupted - BusyThread interrupted is true
[14 10:20:55,270 ERROR] [SleepThread] Interrupted - SleepThread interrupted is true
[14 10:20:55,271 INFO ] [SleepThread] Interrupted - exit
[14 10:21:00,271 INFO ] [main]        Interrupted - exit

從日誌中能夠看到:

  • 打斷的確是一個標記,對於未處於可打斷狀態的線程,或者沒有處理打斷狀態的線程是沒有影響的,就像 BusyThread;
  • 使用 interrupt 打斷睡眠線程,也的確符合上面的狀況,可是由於收到 InterruptedException 的時候會清楚中斷標記,因此這裏能夠再次設置中斷標記;

固然以上只是簡單的舉例,中斷機制如何使用仍是要根據具體的業務邏輯來肯定;另外以上的實例代碼是出自《Java 併發編程的藝術》,有興趣的也能夠找書來看一下;

總結

  • 以上的內容其主要目的是爲了幫助你構建一個相對完善的線程知識體系,其中還有不少的細節沒有講到,具體內容還須要結合實際場景分析;
相關文章
相關標籤/搜索