Java併發——基石篇(上)

概要

並行是這個時代的主旋律,也是不少現代操做系統須要提供的必備功能,在過去摩爾定律催生下,單個CPU核心計算的速度愈來愈快。可是隨着產業的發展,單個CPU核心的計算上限已經難以突破,傳統的增強單核的思惟模式已經不能知足需求。在古代,人們須要強大的戰馬來驅動戰車,爲了可以使得戰鬥力愈來愈強,人們馴化了愈來愈強勁的戰馬,可是單匹馬的力量始終是有限的,所以人們發明了多馬並駕的戰車結構。一樣地,在現代計算機領域,人們在單個CPU核心能力有限的狀況下,使用多個核心的CPU進行並行計算以驅動強大的算力。
可是,多CPU和多戰馬是遠遠不一樣的,在現實世界中的計算任務大多須要相互協調,其根本緣由是人類的思惟方式是線性串行的,設計一個徹底並行的計算邏輯體系仍是有至關大難度的。java

如何設計一個高併發的程序,不只僅是工程界的難題,在計算機學術界也是一個須要不斷突破的研究領域。從學術理論提出,到算法設計,再到工程實施,再到長夜驗證調優,整個流程都須要比較長的時間來進行迭代,究其根本,並行計算自己救贖很是複雜,不肯定的,不可預測的邏輯系統。linux

多核系統中的一致性

Java號稱一次編寫,處處運行,其自己也是構建在不一樣的系統之上的,以其運行時JVM來屏蔽系統底層的差別。所以,在介紹Java併發體系以前,有必要簡要介紹依稀計算機系統層面上的併發,以及面對的問題。程序員

咱們的目的其實很簡單,就是讓計算機在同一時刻,可以運行更多的任務。而並行計算,提供了很是不錯的解決方案。雖然這看起來很天然,但實際上面臨着衆多的問題,其中一個重大的問題就是絕大多數的計算不只僅是CPU一我的的事,而是須要不少計算機系統部件共同參與。可是咱們知道,計算機系統中運行速度最快的就是CPU,其餘部件例如:內存、磁盤、網絡等等都是及其緩慢的,同時這些操做在目前的計算機體系中是很難消除的,由於咱們不可能僅僅靠寄存器就完成全部的計算任務。面對高速CPU和低速存儲之間的鴻溝,若是想要實現高效數據通信,一個良好的解決方案就是在他們之間臺南佳一個cache層,這個cache層的速度和總體的速度關係以下:算法

CPU --> cache --> 存儲

經過cache這個緩衝地帶,實現CPU和存儲之間的高效溝通,這是計算機和軟件領域通用的一個問題解決問題:增長中間層,若是一箇中間層解決不了,那就兩層。在運算的時候,CPU將須要使用到的數據複製到cache中,之後每次獲取數據都較爲快速的從cache中獲取,加快訪問速度。緩存

所謂理想很豐滿,現實很骨感。這種計算體系有一個重要的問題須要解決,那就是:緩存一致性(cache coherence)問題。在現代的計算機系統中,主要都是多核系統爲主。在這些計算機系統中,每個CPU都擁有本身獨立的高速緩存,可是由於主存只有一個,所以他們之間只能共享,這種系統也被稱爲:共享內存多核系統(Shared-Menory multiprocessors System)。服務器

同時爲了保證CPU數據存儲的一致性,須要定義一個統一的緩存一致性協議,這類協議有不少,例如:MSI、MESI、MOSI、Synapse、Firefly以及Dragon Protocol等等。因此,一般狀況下,共享內存多核系統的架構以下:
image網絡

除了使用高速cache來緩存CPU和存儲設備之間的速度鴻溝,爲了可以充分利用多核CPU的處理性能,處理在實際執行機器指令時並不必定會按照程序設定的指令順序執行,可能存在代碼亂序執行(Out-Of_Order Execution)優化。可是,僅僅只是在代碼層面上亂序執行,系統會保證執行的結果邏輯正確,從宏觀上看就好像是順序執行同樣。架構

Java內存模型

上面咱們探討了共享內存多核系統的內存模型,咱們提到了高速緩存以及緩存一致性問題,同時還介紹了指令亂序執行的問題。其實,這些概念在Java中也是存在的。由於Java的目標是:一次編寫,處處運行,所以必須在JVM層面上將系統之間的差別屏蔽掉。面對如此多的系統,最好的方式就是定義一套Java本身的內存訪問模型,而後在不一樣的硬件平臺和操做系統上分別利用本地接口來實現。這裏的思想其實和增長cache是同樣的,經過增長中間層來解決系統差別帶來的協做問題。併發

Java工做內存和主存之間的一致性保證主要經過如下4種操做完成:app

  1. read:Java執行引擎訪問引擎訪問本地工做內存中的變量副本,若是變量副本無效(變量副本不存在也是無效的一種),那就去主存中獲取,同時在本地工做內存中緩存一份
  2. write:Java執行引擎將最新的變量值賦值給工做內存中的變量副本,同時須要判斷是否須要將這個新的值當即同步到主內存,若是須要同步的話,還須要配合lock操做
  3. lock:Java執行引擎將主內存中的變量鎖定,鎖定的含義有:其餘的線程在此以後不能訪問這個變量直到本線程unlock;一旦鎖定,其餘線程對這個變量的操做必須等待
  4. unlock:Java執行引擎將主內存中的變量解鎖,解鎖以後各個線程才能從新併發訪問這個變量,直到變量被某個線程再次鎖定

Java Thread建立

在Java中,咱們都知道,一個線程直接對應了一個Thread對象。建立和啓動一個線程是比較容易的,咱們只須要建立一個Thread對象,而後調用對象的start方法便可。可是在建立一個Thread對象和啓動線程JVM中究竟發生了什麼?本節咱們就來看下。

在建立一個Thread對象的時候,除了一些初始化設置以外就沒有其餘實質性的操做,真正的工做實際上是在start方法調用中產生的。

Java經過registerNatives方法將Thread類中的java方法和一個本地的C/C++函數進行對應,同時registerNatives方法是類加載的時候調用的,所以在類首次加載的時候(Bootstarp類加載)就會註冊這些native方法。

/* Make sure registerNatives is the first thing <clinit> does. */
private static native void registerNatives();
static {
    registerNatives();
}
static JNINativeMethod methods[] = {
    {"start0",           "()V",        (void *)&JVM_StartThread},
    {"stop0",            "(" OBJ ")V", (void *)&JVM_StopThread},
    {"isAlive",          "()Z",        (void *)&JVM_IsThreadAlive},
    {"suspend0",         "()V",        (void *)&JVM_SuspendThread},
    {"resume0",          "()V",        (void *)&JVM_ResumeThread},
    {"setPriority0",     "(I)V",       (void *)&JVM_SetThreadPriority},
    {"yield",            "()V",        (void *)&JVM_Yield},
    {"sleep",            "(J)V",       (void *)&JVM_Sleep},
    {"currentThread",    "()" THD,     (void *)&JVM_CurrentThread},
    {"countStackFrames", "()I",        (void *)&JVM_CountStackFrames},
    {"interrupt0",       "()V",        (void *)&JVM_Interrupt},
    {"isInterrupted",    "(Z)Z",       (void *)&JVM_IsInterrupted},
    {"holdsLock",        "(" OBJ ")Z", (void *)&JVM_HoldsLock},
    {"getThreads",        "()[" THD,   (void *)&JVM_GetAllThreads},
    {"dumpThreads",      "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
    {"setNativeName",    "(" STR ")V", (void *)&JVM_SetNativeThreadName},
};
JNIEXPORT void JNICALL
Java_java_lang_Thread_registerNatives(JNIEnv *env, jclass cls)
{
    (*env)->RegisterNatives(env, cls, methods, ARRAY_LENGTH(methods));
}

再看看對應JNI的結構體

/*
 * used in RegisterNatives to describe native method name, signature, * and function pointer. */
typedef struct {
    char *name;
    char *signature;
    void *fnPtr;
} JNINativeMethod;

即第一列是Java中定義的native方法名稱,第二列是Java方法簽名,第三列是本地方法對應函數。所以,Java中的start方法就是對應native的JVM——StartThread函數:

JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
  JVMWrapper("JVM_StartThread");
  JavaThread *native_thread = NULL;

  // We cannot hold the Threads_lock when we throw an exception,   // due to rank ordering issues. Example:  we might need to grab the   // Heap_lock while we construct the exception.   bool throw_illegal_thread_state = false;

  // We must release the Threads_lock before we can post a jvmti event   // in Thread::start.   {
    // Ensure that the C++ Thread and OSThread structures aren't freed before     // we operate.     MutexLocker mu(Threads_lock);

    // Since JDK 5 the java.lang.Thread threadStatus is used to prevent     // re-starting an already started thread, so we should usually find     // that the JavaThread is null. However for a JNI attached thread     // there is a small window between the Thread object being created     // (with its JavaThread set) and the update to its threadStatus, so we     // have to check for this     if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {
      throw_illegal_thread_state = true;
    } else {
      // We could also check the stillborn flag to see if this thread was already stopped, but       // for historical reasons we let the thread detect that itself when it starts running 
      jlong size =
             java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
      // Allocate the C++ Thread structure and create the native thread.  The       // stack size retrieved from java is 64-bit signed, but the constructor takes       // size_t (an unsigned type), which may be 32 or 64-bit depending on the platform.       //  - Avoid truncating on 32-bit platforms if size is greater than UINT_MAX.       //  - Avoid passing negative values which would result in really large stacks.       NOT_LP64(if (size > SIZE_MAX) size = SIZE_MAX;)
      size_t sz = size > 0 ? (size_t) size : 0;
      // 重點看這裏!!!       native_thread = new JavaThread(&thread_entry, sz);

      // At this point it may be possible that no osthread was created for the       // JavaThread due to lack of memory. Check for this situation and throw       // an exception if necessary. Eventually we may want to change this so       // that we only grab the lock if the thread was created successfully -       // then we can also do this check and throw the exception in the       // JavaThread constructor.       if (native_thread->osthread() != NULL) {
        // Note: the current thread is not being used within "prepare".         native_thread->prepare(jthread);
      }
    }
  }

  if (throw_illegal_thread_state) {
    THROW(vmSymbols::java_lang_IllegalThreadStateException());
  }

  assert(native_thread != NULL, "Starting null thread?");

  if (native_thread->osthread() == NULL) {
    // No one should hold a reference to the 'native_thread'.     native_thread->smr_delete();
    if (JvmtiExport::should_post_resource_exhausted()) {
      JvmtiExport::post_resource_exhausted(
        JVMTI_RESOURCE_EXHAUSTED_OOM_ERROR | JVMTI_RESOURCE_EXHAUSTED_THREADS,
        os::native_thread_creation_failed_msg());
    }
    THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(),
              os::native_thread_creation_failed_msg());
  }

  Thread::start(native_thread);

JVM_END

這段代碼的主要做用是建立一個JavaThread對象並啓動。咱們進入建立JavaThread構造函數

JavaThread::JavaThread(ThreadFunction entry_point, size_t stack_sz) :
                       Thread() {
  initialize();
  _jni_attach_state = _not_attaching_via_jni;
  set_entry_point(entry_point);
  // Create the native thread itself.   // %note runtime_23   os::ThreadType thr_type = os::java_thread;
  thr_type = entry_point == &compiler_thread_entry ? os::compiler_thread :
                                                     os::java_thread;
  // 經過 os 類的 create_thread 函數來建立一個線程   os::create_thread(this, thr_type, stack_sz);
  // The _osthread may be NULL here because we ran out of memory (too many threads active).   // We need to throw and OutOfMemoryError - however we cannot do this here because the caller   // may hold a lock and all locks must be unlocked before throwing the exception (throwing   // the exception consists of creating the exception object & initializing it, initialization   // will leave the VM via a JavaCall and then all locks must be unlocked).   //   // The thread is still suspended when we reach here. Thread must be explicit started   // by creator! Furthermore, the thread must also explicitly be added to the Threads list   // by calling Threads:add. The reason why this is not done here, is because the thread   // object must be fully initialized (take a look at JVM_Start) }

能夠看到,重點是經過os類的create_thread函數來建立一個線程,由於JVM是跨平臺的,而且不一樣操做系統上的線程實現機制可能不太同樣,所以這裏的create_thread確定會有多個針對不一樣平臺的實現,咱們查看這個函數的實現就知道了:
image
能夠看到,HotSpot提供了主要的操做系統上的實現,由於在服務器上,linux的佔比是很高的,所以咱們這裏就看下linux上的實現:

bool os::create_thread(Thread* thread, ThreadType thr_type,
                       size_t req_stack_size) {
  ...
  // init thread attributes   pthread_attr_t attr;
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
  // Calculate stack size if it's not specified by caller.   size_t stack_size = os::Posix::get_initial_stack_size(thr_type, req_stack_size);
  // In the Linux NPTL pthread implementation the guard size mechanism   // is not implemented properly. The posix standard requires adding   // the size of the guard pages to the stack size, instead Linux   // takes the space out of 'stacksize'. Thus we adapt the requested   // stack_size by the size of the guard pages to mimick proper   // behaviour. However, be careful not to end up with a size   // of zero due to overflow. Don't add the guard page in that case.   size_t guard_size = os::Linux::default_guard_size(thr_type);
  if (stack_size <= SIZE_MAX - guard_size) {
    stack_size += guard_size;
  }
  assert(is_aligned(stack_size, os::vm_page_size()), "stack_size not aligned");

  int status = pthread_attr_setstacksize(&attr, stack_size);
  assert_status(status == 0, status, "pthread_attr_setstacksize");

  // Configure glibc guard page.   pthread_attr_setguardsize(&attr, os::Linux::default_guard_size(thr_type));
  ...
  pthread_t tid;
  // 建立並啓動線程   int ret = pthread_create(&tid, &attr, (void* (*)(void*)) thread_native_entry, thread);
  ...
}

這個函數比較長,這裏就省略部分,只保留和線程建立啓動相關的部分,能夠看到,在linux平臺上,JVM的線程是經過大名鼎鼎的pthread庫來建立啓動線程的,這裏須要注意的是,在指定線程棧大小的時候,並非程序員指定多少就是多少,而是要根據系統平臺的限制來綜合決定的。咱們也能夠得出結論,Java Thread在底層對應一個pthread線程。咱們看下pthread建立並啓動線程的接口:

int thread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg);

第一個是pthread_t結構體數據指針,存放線程信息,第二個是線程的屬性,第三個是線程體,也就是線程實際執行的函數,第四個是線程體的參數列表。
上面調用這個接口的地方,咱們指定了線程體函數是thread_native_entry,參數是thread指針。咱們先看下thread_native_entry這個函數的定義:

// Thread start routine for all newly created threads static void *thread_native_entry(Thread *thread) {
  ...
  // call one more level start routine   thread->run();
  ...
}

一樣,這裏只保留了重點代碼,經過註釋咱們能夠知道,thread->run()這一行是最可能執行咱們run方法的地方。咱們看一下代碼:

// The first routine called by a new Java thread void JavaThread::run() {
  ...
  // We call another function to do the rest so we are sure that the stack addresses used   // from there will be lower than the stack base just computed   thread_main_inner();
}

這裏重點是調用了thread_main_inner函數:

void JavaThread::thread_main_inner() {
  assert(JavaThread::current() == this, "sanity check");
  assert(this->threadObj() != NULL, "just checking");

  // Execute thread entry point unless this thread has a pending exception   // or has been stopped before starting.   // Note: Due to JVM_StopThread we can have pending exceptions already!   if (!this->has_pending_exception() &&
      !java_lang_Thread::is_stillborn(this->threadObj())) {
    {
      ResourceMark rm(this);
      this->set_native_thread_name(this->get_thread_name());
    }
    HandleMark hm(this);
    // 這裏開始調用 java thread 的 run 方法啦~~~     this->entry_point()(this, this);
  }

  DTRACE_THREAD_PROBE(stop, this);

  // java 中的 run 方法執行完畢了,這裏須要退出線程並清理資源   this->exit(false);
  // delete cpp 的對象   this->smr_delete();
}

能夠看到,Java Thread中的run方法就是在this->entry_point()(this,this);這裏調用的。看這裏的調用方式就知道,entry_point()返回的是一個函數指針,而後直接調用,entry_point函數實現以下:

ThreadFunction entry_point() const             { return _entry_point; }

那麼_entry_point是哪裏來的?咱們再看上面JavaThread的構造函數,咱們發現了一個方法set_entry_point(entry_point),_entry_point就是咱們建立JavaThread對象時傳入的函數指針。

相關文章
相關標籤/搜索