最近閱讀《Python源碼剖析》對進程線程的封裝解釋:html
GIL,Global Interpreter Lock,對於python的多線程機制很是重要,其如何實現的?代碼中實現以下:python
指向一個void*,C語言中的空指針類型能夠指向任意類型。Python創建多線程環境的動做只會執行一次。bootstrap
PyEval_InitThreads--》PyThread_allocate_lock建立GIL以後,當前線程開始遵照python的多線程機制,即任何調用Python C API以前須要先得到GIL.多線程
也就是代碼中PyThread_acquire_lock嘗試獲取GIL。async
static PyMethodDef thread_methods[] = { {"start_new_thread", (PyCFunction)thread_PyThread_start_new_thread, METH_VARARGS, start_new_doc}, {"start_new", (PyCFunction)thread_PyThread_start_new_thread, METH_VARARGS, start_new_doc}, {"allocate_lock", (PyCFunction)thread_PyThread_allocate_lock, METH_NOARGS, allocate_doc}, {"allocate", (PyCFunction)thread_PyThread_allocate_lock, METH_NOARGS, allocate_doc}, {"exit_thread", (PyCFunction)thread_PyThread_exit_thread, METH_NOARGS, exit_doc}, {"exit", (PyCFunction)thread_PyThread_exit_thread, METH_NOARGS, exit_doc}, {"interrupt_main", (PyCFunction)thread_PyThread_interrupt_main, METH_NOARGS, interrupt_doc}, {"get_ident", (PyCFunction)thread_get_ident, METH_NOARGS, get_ident_doc}, {"_count", (PyCFunction)thread__count, METH_NOARGS, _count_doc}, {"stack_size", (PyCFunction)thread_stack_size, METH_VARARGS, stack_size_doc}, {NULL, NULL} /* sentinel */ }; /*建立bootstate,並初始化,其保存關於線程的一切信息,如線程過程,和參數等,*/ static PyObject * thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs) { PyObject *func, *args, *keyw = NULL; struct bootstate *boot; long ident; if (!PyArg_UnpackTuple(fargs, "start_new_thread", 2, 3, &func, &args, &keyw)) return NULL; if (!PyCallable_Check(func)) { PyErr_SetString(PyExc_TypeError, "first arg must be callable"); return NULL; } if (!PyTuple_Check(args)) { PyErr_SetString(PyExc_TypeError, "2nd arg must be a tuple"); return NULL; } if (keyw != NULL && !PyDict_Check(keyw)) { PyErr_SetString(PyExc_TypeError, "optional 3rd arg must be a dictionary"); return NULL; } boot = PyMem_NEW(struct bootstate, 1); if (boot == NULL) return PyErr_NoMemory(); boot->interp = PyThreadState_GET()->interp; boot->func = func; boot->args = args; boot->keyw = keyw; boot->tstate = _PyThreadState_Prealloc(boot->interp); if (boot->tstate == NULL) { PyMem_DEL(boot); return PyErr_NoMemory(); } Py_INCREF(func); Py_INCREF(args); Py_XINCREF(keyw); PyEval_InitThreads(); /* Start the interpreter's thread-awareness */ ident = PyThread_start_new_thread(t_bootstrap, (void*) boot); if (ident == -1) { PyErr_SetString(ThreadError, "can't start new thread"); Py_DECREF(func); Py_DECREF(args); Py_XDECREF(keyw); PyThreadState_Clear(boot->tstate); PyMem_DEL(boot); return NULL; } return PyInt_FromLong(ident); } /*以boot爲參數,建立一個原生線程*/ PyThreadState * _PyThreadState_Prealloc(PyInterpreterState *interp) { return new_threadstate(interp, 0); } static PyThreadState * new_threadstate(PyInterpreterState *interp, int init) { PyThreadState *tstate = (PyThreadState *)malloc(sizeof(PyThreadState)); if (_PyThreadState_GetFrame == NULL) _PyThreadState_GetFrame = threadstate_getframe; if (tstate != NULL) { tstate->interp = interp; tstate->frame = NULL; tstate->recursion_depth = 0; tstate->tracing = 0; tstate->use_tracing = 0; tstate->tick_counter = 0; tstate->gilstate_counter = 0; tstate->async_exc = NULL; #ifdef WITH_THREAD tstate->thread_id = PyThread_get_thread_ident(); #else tstate->thread_id = 0; #endif tstate->dict = NULL; tstate->curexc_type = NULL; tstate->curexc_value = NULL; tstate->curexc_traceback = NULL; tstate->exc_type = NULL; tstate->exc_value = NULL; tstate->exc_traceback = NULL; tstate->c_profilefunc = NULL; tstate->c_tracefunc = NULL; tstate->c_profileobj = NULL; tstate->c_traceobj = NULL; tstate->trash_delete_nesting = 0; tstate->trash_delete_later = NULL; if (init) _PyThreadState_Init(tstate); HEAD_LOCK(); tstate->next = interp->tstate_head; interp->tstate_head = tstate; HEAD_UNLOCK(); } return tstate; }
GIL(NRMUTEX)對象,結構中有4個成員,其中hevent就是Win32平臺下的Event內核對象,而thread_id則記錄任意時刻獲取的GIL的線程ID。ide
1 /* 2 * Lock support. It has too be implemented as semaphores. 3 * I [Dag] tried to implement it with mutex but I could find a way to 4 * tell whether a thread already own the lock or not. 5 */ 6 PyThread_type_lock 7 PyThread_allocate_lock(void) 8 { 9 PNRMUTEX aLock; 10 11 dprintf(("PyThread_allocate_lock called\n")); 12 if (!initialized) 13 PyThread_init_thread(); 14 15 aLock = AllocNonRecursiveMutex() ; 16 17 dprintf(("%ld: PyThread_allocate_lock() -> %p\n", PyThread_get_thread_ident(), aLock)); 18 19 return (PyThread_type_lock) aLock; 20 } 21 22 typedef struct NRMUTEX { 23 LONG owned ; 24 DWORD thread_id ; 25 HANDLE hevent ; 26 } NRMUTEX, *PNRMUTEX ; 27 28 PNRMUTEX 29 AllocNonRecursiveMutex(void) 30 { 31 PNRMUTEX mutex = (PNRMUTEX)malloc(sizeof(NRMUTEX)) ; 32 if (mutex && !InitializeNonRecursiveMutex(mutex)) 33 { 34 free(mutex) ; 35 mutex = NULL ; 36 } 37 return mutex ; 38 } 39 40 BOOL 41 InitializeNonRecursiveMutex(PNRMUTEX mutex) 42 { 43 mutex->owned = -1 ; /* No threads have entered NonRecursiveMutex */ 44 mutex->thread_id = 0 ; 45 mutex->hevent = CreateEvent(NULL, FALSE, FALSE, NULL) ; 46 return mutex->hevent != NULL ; /* TRUE if the mutex is created */ 47 }
PyThread_acquire_lock嘗試獲取GIL代碼以下:函數
void PyEval_InitThreads(void) { if (interpreter_lock) return; interpreter_lock = PyThread_allocate_lock(); PyThread_acquire_lock(interpreter_lock, 1); main_thread = PyThread_get_thread_ident(); }
/* * Return 1 on success if the lock was acquired * * and 0 if the lock was not acquired. This means a 0 is returned * if the lock has already been acquired by this thread! */ int PyThread_acquire_lock(PyThread_type_lock aLock, int waitflag) { int success ; dprintf(("%ld: PyThread_acquire_lock(%p, %d) called\n", PyThread_get_thread_ident(),aLock, waitflag)); success = aLock && EnterNonRecursiveMutex((PNRMUTEX) aLock, (waitflag ? INFINITE : 0)) == WAIT_OBJECT_0 ; dprintf(("%ld: PyThread_acquire_lock(%p, %d) -> %d\n", PyThread_get_thread_ident(),aLock, waitflag, success)); return success; }
Windown下調用系統的WaitForSingleObjectui
DWORD EnterNonRecursiveMutex(PNRMUTEX mutex, BOOL wait) { /* Assume that the thread waits successfully */ DWORD ret ; /* InterlockedIncrement(&mutex->owned) == 0 means that no thread currently owns the mutex */ if (!wait) { if (InterlockedCompareExchange(&mutex->owned, 0, -1) != -1) return WAIT_TIMEOUT ; ret = WAIT_OBJECT_0 ; } else ret = InterlockedIncrement(&mutex->owned) ? /* Some thread owns the mutex, let's wait... */ WaitForSingleObject(mutex->hevent, INFINITE) : WAIT_OBJECT_0 ; mutex->thread_id = GetCurrentThreadId() ; /* We own it */ return ret ; }
Linux下則使用互斥鎖metux和lock機制,條件等待機制一塊兒使用。this
先由本線程調用status = pthread_mutex_lock( &thelock->mut )鎖住,mutex保持鎖定狀態,並在線程掛起進入等待前解鎖。spa
而後status = pthread_cond_wait(&thelock->lock_released,&thelock->mut);
以後status = pthread_mutex_unlock( &thelock->mut );
int PyThread_acquire_lock(PyThread_type_lock lock, int waitflag) { int success; pthread_lock *thelock = (pthread_lock *)lock; int status, error = 0; dprintf(("PyThread_acquire_lock(%p, %d) called\n", lock, waitflag)); status = pthread_mutex_lock( &thelock->mut ); CHECK_STATUS("pthread_mutex_lock[1]"); success = thelock->locked == 0; if ( !success && waitflag ) { /* continue trying until we get the lock */ /* mut must be locked by me -- part of the condition * protocol */ while ( thelock->locked ) { status = pthread_cond_wait(&thelock->lock_released, &thelock->mut); CHECK_STATUS("pthread_cond_wait"); } success = 1; } if (success) thelock->locked = 1; status = pthread_mutex_unlock( &thelock->mut ); CHECK_STATUS("pthread_mutex_unlock[1]"); if (error) success = 0; dprintf(("PyThread_acquire_lock(%p, %d) -> %d\n", lock, waitflag, success)); return success; }
python建立子線程過程:
多線程環境初始化以後,python開始建立底層平臺的原生線程。主線程經過調用 thread_PyThread_start_new_thread-》PyThread_start_new_thread完成子線程的工做,返回子線程的ID。子線程的ID只有被激活才能從子線程中獲取,所以主線程等待這個子線程的ID,一旦子線程設置好ID,就會設法喚醒主線程。至此,主線程和子線程開始分道揚鑣。主線程在返回子線程ID以後,繼續執行後續的字節碼。
PyThread_start_new_thread傳入的func是函數t_bootstrap,而arg則是bootstate結構體boot。而boot中保存着程序中所定義的線程信息。PyThread_start_new_thread首先將func和arg都打包到一個類型爲callobj結構體中。
建立好子線程以後,其開始與主線程對GIL競爭。在t_bootstrap中調用PyEval_AcquireThread申請GIL,成功以後就申請到GIL,接下來子線程調用PyEval_CallObjectWithKeywords並最終調用咱們熟悉的函數PyEval_EvalFrameEx,也就是python的字節碼執行引擎。以後執行完畢,進行清理掃尾工做PyThreadState_DeleteCurrent釋放GIL。
t_bootstrap 看上去彷佛子線程一直執行到釋放GIL,他們是如何激活多線程機制的呢?答案在於函數PyEval_EvalFrameEx中,python內部維護的模擬中斷時鐘不斷激活線程的調度機制,從而實現子線程和主線程的切換。
執行秩序: thread_PyThread_start_new_thread-》PyThread_start_new_thread-》bootstrap--》t_bootstrap
t_bootstrap 代碼:
static void t_bootstrap(void *boot_raw) { struct bootstate *boot = (struct bootstate *) boot_raw; PyThreadState *tstate; PyObject *res; tstate = boot->tstate; tstate->thread_id = PyThread_get_thread_ident(); _PyThreadState_Init(tstate); PyEval_AcquireThread(tstate); nb_threads++; res = PyEval_CallObjectWithKeywords( boot->func, boot->args, boot->keyw); if (res == NULL) { if (PyErr_ExceptionMatches(PyExc_SystemExit)) PyErr_Clear(); else { PyObject *file; PyObject *exc, *value, *tb; PyErr_Fetch(&exc, &value, &tb); PySys_WriteStderr( "Unhandled exception in thread started by "); file = PySys_GetObject("stderr"); if (file) PyFile_WriteObject(boot->func, file, 0); else PyObject_Print(boot->func, stderr, 0); PySys_WriteStderr("\n"); PyErr_Restore(exc, value, tb); PyErr_PrintEx(0); } } else Py_DECREF(res); Py_DECREF(boot->func); Py_DECREF(boot->args); Py_XDECREF(boot->keyw); PyMem_DEL(boot_raw); nb_threads--; PyThreadState_Clear(tstate); PyThreadState_DeleteCurrent(); PyThread_exit_thread(); }
完成打包以後,調用Win32下的建立thread API 函數CreateThread或者_beginthreadex ,而後經過bootstrap調用咱們定義的函數(例如本身的test.py中的def testThread 函數)
函數打包,調用代碼: