Python GIL 多線程機制 (C source code)

最近閱讀《Python源碼剖析》對進程線程的封裝解釋:html

GIL,Global Interpreter Lock,對於python的多線程機制很是重要,其如何實現的?代碼中實現以下:python

指向一個void*,C語言中的空指針類型能夠指向任意類型。Python創建多線程環境的動做只會執行一次。bootstrap

PyEval_InitThreads--》PyThread_allocate_lock建立GIL以後,當前線程開始遵照python的多線程機制,即任何調用Python C API以前須要先得到GIL.多線程

也就是代碼中PyThread_acquire_lock嘗試獲取GIL。async

static PyMethodDef thread_methods[] = {
    {"start_new_thread",        (PyCFunction)thread_PyThread_start_new_thread,
                            METH_VARARGS,
                            start_new_doc},
    {"start_new",               (PyCFunction)thread_PyThread_start_new_thread,
                            METH_VARARGS,
                            start_new_doc},
    {"allocate_lock",           (PyCFunction)thread_PyThread_allocate_lock,
     METH_NOARGS, allocate_doc},
    {"allocate",                (PyCFunction)thread_PyThread_allocate_lock,
     METH_NOARGS, allocate_doc},
    {"exit_thread",             (PyCFunction)thread_PyThread_exit_thread,
     METH_NOARGS, exit_doc},
    {"exit",                    (PyCFunction)thread_PyThread_exit_thread,
     METH_NOARGS, exit_doc},
    {"interrupt_main",          (PyCFunction)thread_PyThread_interrupt_main,
     METH_NOARGS, interrupt_doc},
    {"get_ident",               (PyCFunction)thread_get_ident,
     METH_NOARGS, get_ident_doc},
    {"_count",                  (PyCFunction)thread__count,
     METH_NOARGS, _count_doc},
    {"stack_size",              (PyCFunction)thread_stack_size,
                            METH_VARARGS,
                            stack_size_doc},
    {NULL,                      NULL}           /* sentinel */
};

/*建立bootstate,並初始化,其保存關於線程的一切信息,如線程過程,和參數等,*/
static PyObject *
thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs)
{
    PyObject *func, *args, *keyw = NULL;
    struct bootstate *boot;
    long ident;

    if (!PyArg_UnpackTuple(fargs, "start_new_thread", 2, 3,
                           &func, &args, &keyw))
        return NULL;
    if (!PyCallable_Check(func)) {
        PyErr_SetString(PyExc_TypeError,
                        "first arg must be callable");
        return NULL;
    }
    if (!PyTuple_Check(args)) {
        PyErr_SetString(PyExc_TypeError,
                        "2nd arg must be a tuple");
        return NULL;
    }
    if (keyw != NULL && !PyDict_Check(keyw)) {
        PyErr_SetString(PyExc_TypeError,
                        "optional 3rd arg must be a dictionary");
        return NULL;
    }
    boot = PyMem_NEW(struct bootstate, 1);
    if (boot == NULL)
        return PyErr_NoMemory();
    boot->interp = PyThreadState_GET()->interp;
    boot->func = func;
    boot->args = args;
    boot->keyw = keyw;
    boot->tstate = _PyThreadState_Prealloc(boot->interp);
    if (boot->tstate == NULL) {
        PyMem_DEL(boot);
        return PyErr_NoMemory();
    }
    Py_INCREF(func);
    Py_INCREF(args);
    Py_XINCREF(keyw);
    PyEval_InitThreads(); /* Start the interpreter's thread-awareness */
    ident = PyThread_start_new_thread(t_bootstrap, (void*) boot);
    if (ident == -1) {
        PyErr_SetString(ThreadError, "can't start new thread");
        Py_DECREF(func);
        Py_DECREF(args);
        Py_XDECREF(keyw);
        PyThreadState_Clear(boot->tstate);
        PyMem_DEL(boot);
        return NULL;
    }
    return PyInt_FromLong(ident);
}



/*以boot爲參數,建立一個原生線程*/
PyThreadState *
_PyThreadState_Prealloc(PyInterpreterState *interp)
{
    return new_threadstate(interp, 0);
}

static PyThreadState *
new_threadstate(PyInterpreterState *interp, int init)
{
    PyThreadState *tstate = (PyThreadState *)malloc(sizeof(PyThreadState));

    if (_PyThreadState_GetFrame == NULL)
        _PyThreadState_GetFrame = threadstate_getframe;

    if (tstate != NULL) {
        tstate->interp = interp;

        tstate->frame = NULL;
        tstate->recursion_depth = 0;
        tstate->tracing = 0;
        tstate->use_tracing = 0;
        tstate->tick_counter = 0;
        tstate->gilstate_counter = 0;
        tstate->async_exc = NULL;
#ifdef WITH_THREAD
        tstate->thread_id = PyThread_get_thread_ident();
#else
        tstate->thread_id = 0;
#endif

        tstate->dict = NULL;

        tstate->curexc_type = NULL;
        tstate->curexc_value = NULL;
        tstate->curexc_traceback = NULL;

        tstate->exc_type = NULL;
        tstate->exc_value = NULL;
        tstate->exc_traceback = NULL;

        tstate->c_profilefunc = NULL;
        tstate->c_tracefunc = NULL;
        tstate->c_profileobj = NULL;
        tstate->c_traceobj = NULL;

        tstate->trash_delete_nesting = 0;
        tstate->trash_delete_later = NULL;

        if (init)
            _PyThreadState_Init(tstate);

        HEAD_LOCK();
        tstate->next = interp->tstate_head;
        interp->tstate_head = tstate;
        HEAD_UNLOCK();
    }

    return tstate;
}
View Code

 GIL(NRMUTEX)對象,結構中有4個成員,其中hevent就是Win32平臺下的Event內核對象,而thread_id則記錄任意時刻獲取的GIL的線程ID。ide

 1 /*
 2  * Lock support. It has too be implemented as semaphores.
 3  * I [Dag] tried to implement it with mutex but I could find a way to
 4  * tell whether a thread already own the lock or not.
 5  */
 6 PyThread_type_lock
 7 PyThread_allocate_lock(void)
 8 {
 9     PNRMUTEX aLock;
10 
11     dprintf(("PyThread_allocate_lock called\n"));
12     if (!initialized)
13         PyThread_init_thread();
14 
15     aLock = AllocNonRecursiveMutex() ;
16 
17     dprintf(("%ld: PyThread_allocate_lock() -> %p\n", PyThread_get_thread_ident(), aLock));
18 
19     return (PyThread_type_lock) aLock;
20 }
21 
22 typedef struct NRMUTEX {
23     LONG   owned ;
24     DWORD  thread_id ;
25     HANDLE hevent ;
26 } NRMUTEX, *PNRMUTEX ;
27 
28 PNRMUTEX
29 AllocNonRecursiveMutex(void)
30 {
31     PNRMUTEX mutex = (PNRMUTEX)malloc(sizeof(NRMUTEX)) ;
32     if (mutex && !InitializeNonRecursiveMutex(mutex))
33     {
34         free(mutex) ;
35         mutex = NULL ;
36     }
37     return mutex ;
38 }
39 
40 BOOL
41 InitializeNonRecursiveMutex(PNRMUTEX mutex)
42 {
43     mutex->owned = -1 ;  /* No threads have entered NonRecursiveMutex */
44     mutex->thread_id = 0 ;
45     mutex->hevent = CreateEvent(NULL, FALSE, FALSE, NULL) ;
46     return mutex->hevent != NULL ;      /* TRUE if the mutex is created */
47 }
View Code

 PyThread_acquire_lock嘗試獲取GIL代碼以下:函數

void
PyEval_InitThreads(void)
{
    if (interpreter_lock)
        return;
    interpreter_lock = PyThread_allocate_lock();
    PyThread_acquire_lock(interpreter_lock, 1);
    main_thread = PyThread_get_thread_ident();
}
View Code
/*
 * Return 1 on success if the lock was acquired
 *
 * and 0 if the lock was not acquired. This means a 0 is returned
 * if the lock has already been acquired by this thread!
 */
int
PyThread_acquire_lock(PyThread_type_lock aLock, int waitflag)
{
    int success ;

    dprintf(("%ld: PyThread_acquire_lock(%p, %d) called\n", PyThread_get_thread_ident(),aLock, waitflag));

    success = aLock && EnterNonRecursiveMutex((PNRMUTEX) aLock, (waitflag ? INFINITE : 0)) == WAIT_OBJECT_0 ;

    dprintf(("%ld: PyThread_acquire_lock(%p, %d) -> %d\n", PyThread_get_thread_ident(),aLock, waitflag, success));

    return success;
}
View Code

 Windown下調用系統的WaitForSingleObjectui

DWORD
EnterNonRecursiveMutex(PNRMUTEX mutex, BOOL wait)
{
    /* Assume that the thread waits successfully */
    DWORD ret ;

    /* InterlockedIncrement(&mutex->owned) == 0 means that no thread currently owns the mutex */
    if (!wait)
    {
        if (InterlockedCompareExchange(&mutex->owned, 0, -1) != -1)
            return WAIT_TIMEOUT ;
        ret = WAIT_OBJECT_0 ;
    }
    else
        ret = InterlockedIncrement(&mutex->owned) ?
            /* Some thread owns the mutex, let's wait... */
            WaitForSingleObject(mutex->hevent, INFINITE) : WAIT_OBJECT_0 ;

    mutex->thread_id = GetCurrentThreadId() ; /* We own it */
    return ret ;
}
View Code

Linux下則使用互斥鎖metux和lock機制,條件等待機制一塊兒使用。this

先由本線程調用status = pthread_mutex_lock( &thelock->mut )鎖住,mutex保持鎖定狀態,並在線程掛起進入等待前解鎖。spa

而後status = pthread_cond_wait(&thelock->lock_released,&thelock->mut);

以後status = pthread_mutex_unlock( &thelock->mut );

條件知足從而離開pthread_cond_wait()以前,mutex加鎖,以加鎖動做對應。
激發條件有兩種形式,pthread_cond_signal()激活一個等待該條件的線程,存在多個等待線程時按入隊順序激活其中一個;而pthread_cond_broadcast()則激活全部等待線程。
pthread_cond_wait解釋:
int
PyThread_acquire_lock(PyThread_type_lock lock, int waitflag)
{
    int success;
    pthread_lock *thelock = (pthread_lock *)lock;
    int status, error = 0;

    dprintf(("PyThread_acquire_lock(%p, %d) called\n", lock, waitflag));

    status = pthread_mutex_lock( &thelock->mut );
    CHECK_STATUS("pthread_mutex_lock[1]");
    success = thelock->locked == 0;

    if ( !success && waitflag ) {
        /* continue trying until we get the lock */

        /* mut must be locked by me -- part of the condition
         * protocol */
        while ( thelock->locked ) {
            status = pthread_cond_wait(&thelock->lock_released,
                                       &thelock->mut);
            CHECK_STATUS("pthread_cond_wait");
        }
        success = 1;
    }
    if (success) thelock->locked = 1;
    status = pthread_mutex_unlock( &thelock->mut );
    CHECK_STATUS("pthread_mutex_unlock[1]");

    if (error) success = 0;
    dprintf(("PyThread_acquire_lock(%p, %d) -> %d\n", lock, waitflag, success));
    return success;
}
View Code

 

       python建立子線程過程:

        多線程環境初始化以後,python開始建立底層平臺的原生線程。主線程經過調用 thread_PyThread_start_new_thread-》PyThread_start_new_thread完成子線程的工做,返回子線程的ID。子線程的ID只有被激活才能從子線程中獲取,所以主線程等待這個子線程的ID,一旦子線程設置好ID,就會設法喚醒主線程。至此,主線程和子線程開始分道揚鑣。主線程在返回子線程ID以後,繼續執行後續的字節碼。

        PyThread_start_new_thread傳入的func是函數t_bootstrap,而arg則是bootstate結構體boot。而boot中保存着程序中所定義的線程信息。PyThread_start_new_thread首先將func和arg都打包到一個類型爲callobj結構體中。

        建立好子線程以後,其開始與主線程對GIL競爭。在t_bootstrap中調用PyEval_AcquireThread申請GIL,成功以後就申請到GIL,接下來子線程調用PyEval_CallObjectWithKeywords並最終調用咱們熟悉的函數PyEval_EvalFrameEx,也就是python的字節碼執行引擎。以後執行完畢,進行清理掃尾工做PyThreadState_DeleteCurrent釋放GIL。 

    t_bootstrap 看上去彷佛子線程一直執行到釋放GIL,他們是如何激活多線程機制的呢?答案在於函數PyEval_EvalFrameEx中,python內部維護的模擬中斷時鐘不斷激活線程的調度機制,從而實現子線程和主線程的切換。

執行秩序: thread_PyThread_start_new_thread-》PyThread_start_new_thread-》bootstrap--》t_bootstrap  

t_bootstrap 代碼:

static void
t_bootstrap(void *boot_raw)
{
    struct bootstate *boot = (struct bootstate *) boot_raw;
    PyThreadState *tstate;
    PyObject *res;

    tstate = boot->tstate;
    tstate->thread_id = PyThread_get_thread_ident();
    _PyThreadState_Init(tstate);
    PyEval_AcquireThread(tstate);
    nb_threads++;
    res = PyEval_CallObjectWithKeywords(
        boot->func, boot->args, boot->keyw);
    if (res == NULL) {
        if (PyErr_ExceptionMatches(PyExc_SystemExit))
            PyErr_Clear();
        else {
            PyObject *file;
            PyObject *exc, *value, *tb;
            PyErr_Fetch(&exc, &value, &tb);
            PySys_WriteStderr(
                "Unhandled exception in thread started by ");
            file = PySys_GetObject("stderr");
            if (file)
                PyFile_WriteObject(boot->func, file, 0);
            else
                PyObject_Print(boot->func, stderr, 0);
            PySys_WriteStderr("\n");
            PyErr_Restore(exc, value, tb);
            PyErr_PrintEx(0);
        }
    }
    else
        Py_DECREF(res);
    Py_DECREF(boot->func);
    Py_DECREF(boot->args);
    Py_XDECREF(boot->keyw);
    PyMem_DEL(boot_raw);
    nb_threads--;
    PyThreadState_Clear(tstate);
    PyThreadState_DeleteCurrent();
    PyThread_exit_thread();
}
View Code

完成打包以後,調用Win32下的建立thread API 函數CreateThread或者_beginthreadex ,而後經過bootstrap調用咱們定義的函數(例如本身的test.py中的def testThread 函數)

函數打包,調用代碼:

相關文章
相關標籤/搜索