Ruby 2.x 源代碼學習:線程

前言

Ruby 使用 pthread 線程庫來實現線程概念linux

本文涉及到的源代碼:bootstrap

  • vm.csegmentfault

  • thread.capi

  • thread_pthread.c(linux/unix 平臺)ruby

線程類 Thread

參考 Ruby 2.x 源代碼學習:bootstrap 這篇文章,Ruby 解釋器在 bootstrap 階段會調用一系列 Init_XXX 函數,和 線程相關的兩個函數是位於 vm.c 文件中的 Init_VM 和位於 thread.c 文件中的 Init_Thread數據結構

Init_VM 與 rb_cThread

參考 Ruby 2.x 源代碼學習:對象模型,每一個對象在 Ruby 虛擬機內部對應一個 RObject 結構體,每一個對象都有一個類結構 RClass,每一個類也是一個對象,下面這句代碼在虛擬機中建立了 Thread 類在虛擬機內部對應的 RClass rb_cThread,它以 rb_cObject 類爲父類,名字爲 Thread多線程

// vm.c

void Init_VM(void) {
    ...
    rb_cThread = rb_define_class("Thread", rb_cObject);
    ...
}

提示:Ruby 的不少內置類都以 rb_c 爲前綴函數

Init_Thread

介紹完 rb_cThread,如今能夠來看看 Init_Thread 函數了學習

// thread.c

void Init_Thread(void) {
    ...
    VALUE cThGroup;
    rb_thread_t *th = GET_THREAD();

    ...
    rb_define_singleton_method(rb_cThread, "new", thread_s_new, -1);
    rb_define_singleton_method(rb_cThread, "start", thread_start, -2);
    ...
    rb_define_method(rb_cThread, "initialize", thread_initialize, -2);
    rb_define_method(rb_cThread, "join", thread_join_m, -1);
    ...

    /* init thread core */
    /* main thread setting */
    /* acquire global vm lock */

    rb_thread_create_timer_thread();

    /* suppress warnings on cygwin, mingw and mswin.*/
    (void)native_mutex_trylock;

    Init_thread_sync();
}

經過調用 rb_define_xxx api 向 rb_cThread 添加 Thread 類支持的方法,這些方法咱們在後面選一些分析
底部 / init thread core / 註釋的代碼塊涉及到 線程調度,下文會詳細分析ui

啓動線程

Ruby 啓動線程有不少方式,咱們先來看看 使用 Thread::new 方法的啓動流程,根據上文的分析,Thread::new 方法對應的 C 函數爲 thread_s_new

// thread.c

static VALUE thread_s_new(int argc, VALUE *argv, VALUE klass)
{
    rb_thread_t *th;
    // 建立 thread 對象
    VALUE thread = rb_thread_alloc(klass);

    // 若是主線程被殺掉,直接拋出異常
    if (GET_VM()->main_thread->status == THREAD_KILLED)
        rb_raise(rb_eThreadError, "can't alloc thread");

    // 調用 thread 類的 initialize 方法(也是一個 C 語言實現的 native 函數)
    rb_obj_call_init(thread, argc, argv);
    GetThreadPtr(thread, th);
    // 若是子類沒有調用父類的 initialize 方法,直接拋出異常
    if (!threadptr_initialized(th)) {
           rb_raise(rb_eThreadError, "uninitialized thread - check `%"PRIsVALUE"#initialize'", klass);
    }
    return thread;
}

Thread 類的 initialize 方法定義在 thread.c 中

// thread.c

/* :nodoc: */
static VALUE thread_initialize(VALUE thread, VALUE args)
{
    rb_thread_t *th;
    // 必須傳遞 block 參數,不然拋出異常!!!
    if (!rb_block_given_p()) {
        rb_raise(rb_eThreadError, "must be called with a block");
    }
    GetThreadPtr(thread, th);
    if (th->first_args) {
        VALUE proc = th->first_proc, loc;
        if (!proc || !RTEST(loc = rb_proc_location(proc))) {
            rb_raise(rb_eThreadError, "already initialized thread");
        }
        rb_raise(rb_eThreadError,
         "already initialized thread - %"PRIsVALUE":%"PRIsVALUE,
                 RARRAY_AREF(loc, 0), RARRAY_AREF(loc, 1));
    }
    return thread_create_core(thread, args, 0);
}

咱們再來看 thread_create_core 函數,這個函數中會建立 pthread 線程並啓動執行

// thread.c

static VALUE thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS)) {
    ...
    // 這三句話和線程要執行的 block 相關,先有個大體印象
    th->first_func = fn;
    th->first_proc = fn ? Qfalse : rb_block_proc();
    th->first_args = args; /* GC: shouldn't put before above line */
    ...

    ...
    // 建立 native pthread
    err = native_thread_create(th);
    if (err) {
        // 拋出異常
        ...
    }
    // 將線程 th 添加到 vm 的線程列表裏頭,供線程調度使用
    rb_vm_living_threads_insert(th->vm, th);
    return thval;
}

根據不一樣的平臺,native_thread_create 有不一樣的實現,對於 linux 操做系統,具體實如今 thread_pthread.c 文件中,通過一些展轉以後又會回到 thread.c 中的 thread_start_func_2 函數:

static int
thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_start)
{
    ...
    // global virtual lock 全局虛擬機鎖
    gvl_acquire(th->vm, th);
    {
        rb_thread_set_current(th);

        TH_PUSH_TAG(th);
        if ((state = EXEC_TAG()) == 0) {
            ...
            th->value = rb_vm_invoke_proc(th, proc, (int)RARRAY_LEN(args), RARRAY_CONST_PTR(args), VM_BLOCK_HANDLER_NONE);
            ...
        } else {
            th->value = (*th->first_func)((void *)args);
        }
    }
    else {
        // 異常處理
    }
    ...
    return 0;
}

終於看到傳說中的 GVL 了!從網上看 Ruby 多線程一直有爭議,將 Ruby 多線程戲稱爲 "僞多線程",因爲 GVL 的存在,Ruby 多線程並不能正真在多核上並行執行,對 CPU 密集型的應用來講這多是硬傷~,但多線程對 IO 密集型仍是有改進的,一個線程在作耗時的 IO 操做時能夠經過 gvl_release 讓出 GVL 供其它線程使用!
ruby 自帶的 C 語言擴展已經考慮到這種狀況,可是對於歷史遺留的數量龐大的第三方擴展可能就悲劇了

中止線程

線程調度

GVL

Ruby 線程調度和 GVL 密切相關,因此咱們先來看看 GVL 相關的數據結構和方法

rb_global_vm_lock_t

thread_pthread.h 文件定義了 rb_global_vm_lock_t 結構體

// thread.c

typedef struct rb_global_vm_lock_struct {
    /* fast path */
    unsigned long acquired;
    rb_nativethread_lock_t lock;

    /* slow path */
    volatile unsigned long waiting;
    rb_nativethread_cond_t cond;

    /* yield */
    rb_nativethread_cond_t switch_cond;
    rb_nativethread_cond_t switch_wait_cond;
    int need_yield;
    int wait_yield;
} rb_global_vm_lock_t;

acquire

當前是否有線程獲取到全局鎖,初始值爲 0,每次獲取 GVL 時設置爲 1,釋放時設置爲 0

lock

全局鎖,用於保護 acquire 等字段的讀寫以及和 各類 condition(條件)配合使用

waiting

當前處於 lock 狀態的線程個數,waiting > 0 代表當前有線程正在等待 GVL

cond

switch_cons

條件變量,用於告知釋放全局鎖的線程已經有線程獲取到全局鎖

switch_wait_cond

條件變量,因爲存在競態條件,兩個線程有可能同時 yield,這時只有一個線程能夠執行 yeild,其它線程必須等待,當獲取 yield 執行權限的線程執行完操做以後使用該變量通知其它線程

gvl_init

解釋器在初始化 Thread 類時會同時初始化 GVL,並獲取 GVL。這也容易理解,由於解釋器啓動時只有一個(主)線程,該線程確定要獲取 GVL

// thread.c

void Init_Thread(void) {
    ...
    /* main thread setting */
    {
        gvl_init(th->vm);
        gvl_acquire(th->vm, th);
        ...
    }
}

gvl_init 具體實現和平臺相關,linux/unix 平臺下在 thread_pthread.c 文件裏能夠找到相關代碼(下同)

// thread_pthread.c

static void
gvl_init(rb_vm_t *vm)
{
    native_mutex_initialize(&vm->gvl.lock);
    native_cond_initialize(&vm->gvl.cond, RB_CONDATTR_CLOCK_MONOTONIC);
    native_cond_initialize(&vm->gvl.switch_cond, RB_CONDATTR_CLOCK_MONOTONIC);
    native_cond_initialize(&vm->gvl.switch_wait_cond, RB_CONDATTR_CLOCK_MONOTONIC);
    vm->gvl.acquired = 0;
    vm->gvl.waiting = 0;
    vm->gvl.need_yield = 0;
    vm->gvl.wait_yield = 0;
}

gvl_acquire

gvl_acquire 用於獲取 GVL,在調用 gvl_acquire_common 進行實際獲取動做以前須要先鎖定 gvl.lock

static void
gvl_acquire(rb_vm_t *vm, rb_thread_t *th)
{
    native_mutex_lock(&vm->gvl.lock);
    gvl_acquire_common(vm);
    native_mutex_unlock(&vm->gvl.lock);
}

gvl_acquire_common

獲取 GVL 的核心代碼

static void gvl_acquire_common(rb_vm_t *vm)
{
    // 若是 GVL 的狀態是 acquired,才須要等待其它線程釋放 GVL,不然直接設置 GVL 爲 acquired
    if (vm->gvl.acquired) {
        // 上文提到過,waiting 是當前等待 GVL 的線程個數
        vm->gvl.waiting++;
        if (vm->gvl.waiting == 1) {
            // 定時線程相關,先略過
            rb_thread_wakeup_timer_thread_low();
        }
        // 經典的 while wait 循環,等待 GVL 被釋放,gvl_release 函數會 signal gvl.cond
        while (vm->gvl.acquired) {
            native_cond_wait(&vm->gvl.cond, &vm->gvl.lock);
        }
        // 當前等待 GVL 的線程個數 --
        vm->gvl.waiting--;
        // 上文提到過,釋放 GVL 的線程會等待 新線程獲取 GVL,這裏發送一個通知信號告訴原來持有 GVL 的線程
        if (vm->gvl.need_yield) {
            vm->gvl.need_yield = 0;
            native_cond_signal(&vm->gvl.switch_cond);
        }
    }

    vm->gvl.acquired = 1;
}

gvl_release

gvl_release 相對簡單一些:

// thread_pthread.c

static void gvl_release_common(rb_vm_t *vm)
{
    vm->gvl.acquired = 0;
    if (vm->gvl.waiting > 0)
        // 喚醒 gvl_acquire 線程
        native_cond_signal(&vm->gvl.cond);
}

static void gvl_release(rb_vm_t *vm)
{
    native_mutex_lock(&vm->gvl.lock);
    gvl_release_common(vm);
    native_mutex_unlock(&vm->gvl.lock);
}

gvl_destroy

gvl_atfork

gvl_yield

gvl_yield 函數用於釋放 GVL並從新 acquire,有點相似操做系統裏面 進程 在內核態從新請求調度器進行進程調度
若是去掉 yield 核心代碼,gvl_yield 和 gvl_release 幾乎沒啥區別,只是在最後從新嘗試獲取 GVL

static void gvl_yield(rb_vm_t *vm, rb_thread_t *th)
{   
    // 和其它 gvl_ 系函數同樣,代碼必須包裹在 native_mutex_lock 和 native_mutex_unlock 內
    native_mutex_lock(&vm->gvl.lock);
    gvl_release_common(vm);

    // yield 核心代碼

    acquire:
    // 從新嘗試獲取 GVL
    gvl_acquire_common(vm);
    native_mutex_unlock(&vm->gvl.lock);
}

咱們來看看 yield 核心代碼:

// thread_pthread.c gvl_yield

    /* An another thread is processing GVL yield. */
    // 若是其它線程也正在 yield,則進入等待
    if (UNLIKELY(vm->gvl.wait_yield)) {
        while (vm->gvl.wait_yield)
            native_cond_wait(&vm->gvl.switch_wait_cond, &vm->gvl.lock);
        goto acquire;
    }
    // 若是當前有線程阻塞在 gvl_acquire 須要等待線程獲取 GVL
    if (vm->gvl.waiting > 0) {
        /* Wait until another thread task take GVL. */
        vm->gvl.need_yield = 1;
        vm->gvl.wait_yield = 1;
        while (vm->gvl.need_yield)
            native_cond_wait(&vm->gvl.switch_cond, &vm->gvl.lock);
        vm->gvl.wait_yield = 0;
    } else {
        native_mutex_unlock(&vm->gvl.lock);
        sched_yield();
        native_mutex_lock(&vm->gvl.lock);
    }

    native_cond_broadcast(&vm->gvl.switch_wait_cond);

Thread 線程調度相關方法

有了上文關於 GVL 的基礎知識,咱們來看一些 Thread 線程調度的方法,它們基本上就是對 gvl_xxx 函數簡單封裝

Thread::pass

doc

Give the thread scheduler a hint to pass execution to another thread. A running thread may or may not switch, it depends on OS and processor.

Init_Thread 函數中定義了 Thread::pass 的入口,順着入口最終找到 rb_thread_schedule_limits 函數

// thread.c

void Init_Thread(void) {
    ...
    rb_define_singleton_method(rb_cThread, "pass", thread_s_pass, 0);
    ...
}

static VALUE thread_s_pass(VALUE klass) {
    rb_thread_shedule();
    return Qnil;
}

static void rb_thread_schedule(void) {
    rb_thread_t *cur_th = GET_THREAD();
    rb_thread_schedule_limits(0);
    RUBY_VM_CHECK_INTS(cur_th);
}

rb_thread_schedule_limits 調用 gvl_yield 釋放 GVL 請求調度

static void rb_thread_schedule_limits(unsigned long limits_us)
{
    thread_debug("rb_thread_schedule\n");
    // 若是隻有一個線程,顯然啥也不須要幹
    if (!rb_thread_alone()) {
        rb_thread_t *th = GET_THREAD();

        if (th->running_time_us >= limits_us) {
            thread_debug("rb_thread_schedule/switch start\n");
            RB_GC_SAVE_MACHINE_CONTEXT(th);
            // 調用 gvl_yield
            gvl_yield(th->vm, th);
            rb_thread_set_current(th);
            thread_debug("rb_thread_schedule/switch done\n");
        }
    }
}

Thread::stop

doc

Stops execution of the current thread, putting it into a 'sleep' state, and schedules execution of another thread.

doc 提供了一個有意思的例子:

1 a = Thread.new { print "a"; Thread.stop; print "c" }
 2 sleep 0.1 while a.status!='sleep'
 3 print "b"
 4 a.run
 5 a.join
 
#=> "abc"

最終輸出結果爲 abc:

  • 語句 1 new 了一個 Thread 對象,傳入一個 block

  • 語句 2 使主線程 sleep 一段時間,這時線程 a 得到執行機會,輸出 'a', 此後 線程 a 被 stop,釋放 GVL

  • 語句 3 主線程從 sleep 中返回輸出 'b'

  • 語句 4 從新激活 線程 a

  • 語句 5 主線程等待線程 a 執行完畢

咱們可能會有一些疑問:

  • Thread.stop 是如何使線程進入 'sleep' 狀態的?

  • 全局函數 sleep 是如何實現的?

  • Thread::run 是如何從新激活線程 a?

  • Thread::join 是如何實現的?

rb_thread_stop

Thread::stop 對應的 C 函數爲 rb_thread_stop,它首先檢查當前線程是不是虛擬機內惟一的線程,若是是則禁止 stop,接着調用 rb_thread_sleep_deadly

VALUE rb_thread_stop(void) {
    // 若是 線程 是 Ruby 虛擬機內惟一線程,不容許 stop !!!
    if (rb_thread_alone()) {
        rb_raise(rb_eThreadError,
             "stopping only thread\n\tnote: use sleep to stop forever");
    }
    rb_thread_sleep_deadly();
    return Qnil;
}


void rb_thread_sleep_deadly(void)
{
    thread_debug("rb_thread_sleep_deadly\n");
    sleep_forever(GET_THREAD(), 1, 1);
}

sleep_forever 函數的定義以下,從函數命名來看該函數實現"永久休眠"

static void sleep_forever(rb_thread_t *th, int deadlockable, int spurious_check)
{
    enum rb_thread_status prev_status = th->status;
    enum rb_thread_status status = deadlockable ? THREAD_STOPPED_FOREVER : THREAD_STOPPED;

    th->status = status;
    RUBY_VM_CHECK_INTS_BLOCKING(th);
    // 只有當線程狀態發生變化才退出循環 !!!
    while (th->status == status) {
        if (deadlockable) {
            th->vm->sleeper++;
            rb_check_deadlock(th->vm);
        }
        // 平臺相關 sleep 實現,傳入的超時時間爲 0,sleep forever
        native_sleep(th, 0);

        if (deadlockable) {
            th->vm->sleeper--;
        }
        RUBY_VM_CHECK_INTS_BLOCKING(th);
        if (!spurious_check)
            break;
    }
    th->status = prev_status;
}

咱們來看看 thread_pthread.c 中 native_sleep 是如何實現的:

// thread_pthread.c

static void
native_sleep(rb_thread_t *th, struct timeval *timeout_tv)
{
    struct timespec timeout;
    rb_nativethread_lock_t *lock = &th->interrupt_lock;
    rb_nativethread_cond_t *cond = &th->native_thread_data.sleep_cond;

    // 計算 sleep 超時時間
    ...

    GVL_UNLOCK_BEGIN();
    {
        // 核心代碼 
    }
    GVL_UNLOCK_END();

    thread_debug("native_sleep done\n");
}
相關文章
相關標籤/搜索