如下代碼基於swoole4.4.5-alpha, php7.1.26
咱們按照執行流程去逐步分析swoole協程的實現, php程序是這樣的:php
<?php go(function (){ Co::sleep(1); echo "a"; }); echo "c";
go其實是swoole_coroutine_create的別名:html
PHP_FALIAS(go, swoole_coroutine_create, arginfo_swoole_coroutine_create);
首先會執行zif_swoole_coroutine_create去建立協程:node
// 真正執行的函數 PHP_FUNCTION(swoole_coroutine_create) { ... // 解析參數 ZEND_PARSE_PARAMETERS_START(1, -1) Z_PARAM_FUNC(fci, fci_cache) Z_PARAM_VARIADIC('*', fci.params, fci.param_count) ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE); ... long cid = PHPCoroutine::create(&fci_cache, fci.param_count, fci.params); if (sw_likely(cid > 0)) { RETURN_LONG(cid); } else { RETURN_FALSE; } } long PHPCoroutine::create(zend_fcall_info_cache *fci_cache, uint32_t argc, zval *argv) { ... // 保存匿名函數參數和執行結構 php_coro_args php_coro_args; php_coro_args.fci_cache = fci_cache; php_coro_args.argv = argv; php_coro_args.argc = argc; save_task(get_task()); // 保存php棧到當前task // 建立coroutine return Coroutine::create(main_func, (void*) &php_coro_args); }
php_coro_args是用來保存回調函數信息的結構:react
// 保存go()回調的結構體 struct php_coro_args { zend_fcall_info_cache *fci_cache; // 匿名函數信息 zval *argv; // 參數 uint32_t argc; // 參數數量 };
php_corutine::get_task()用來獲取當前正在執行的任務, 第一次執行時, 獲取的是初始化好的main_task:api
php_coro_task PHPCoroutine::main_task = {0}; // 獲取當前的task, 沒有則是主task static inline php_coro_task* get_task() { php_coro_task *task = (php_coro_task *) Coroutine::get_current_task(); return task ? task : &main_task; } static inline void* get_current_task() { return sw_likely(current) ? current->get_task() : nullptr; } inline void* get_task() { return task; }
save_task會將當前php棧信息保存到當前使用的task上, 當前使用的是main_task, 因此這些信息會被保存在main_task上:swoole
void PHPCoroutine::save_task(php_coro_task *task) { save_vm_stack(task); // 保存php棧 ... } inline void PHPCoroutine::save_vm_stack(php_coro_task *task) { task->bailout = EG(bailout); task->vm_stack_top = EG(vm_stack_top); // 當前棧頂 task->vm_stack_end = EG(vm_stack_end); // 棧底 task->vm_stack = EG(vm_stack); // 整個棧結構 task->vm_stack_page_size = EG(vm_stack_page_size); task->error_handling = EG(error_handling); task->exception_class = EG(exception_class); task->exception = EG(exception); }
php_coro_task這個結構用來保存當前任務的php棧:php7
struct php_coro_task { JMP_BUF *bailout; // 內部異常使用 zval *vm_stack_top; // 棧頂 zval *vm_stack_end; // 棧底 zend_vm_stack vm_stack; // 執行棧 size_t vm_stack_page_size; zend_execute_data *execute_data; zend_error_handling_t error_handling; zend_class_entry *exception_class; zend_object *exception; zend_output_globals *output_ptr; /* for array_walk non-reentrancy */ php_swoole_fci *array_walk_fci; swoole::Coroutine *co; // 屬於哪一個coroutine std::stack<php_swoole_fci *> *defer_tasks; long pcid; zend_object *context; int64_t last_msec; zend_bool enable_scheduler; };
保存完當前php棧就能夠開始建立coroutine了:併發
static inline long create(coroutine_func_t fn, void* args = nullptr) { return (new Coroutine(fn, args))->run(); } Coroutine(coroutine_func_t fn, void *private_data) : ctx(stack_size, fn, private_data) // 默認stack size 2M { cid = ++last_cid; // 分配協程id coroutines[cid] = this; // 當前對象指針存儲在全局的corutines靜態屬性上 if (sw_unlikely(count() > peak_num)) // 更新峯值 { peak_num = count(); } }
首先, 會建立一個ctx對象, context對象主要用來管理c棧app
#define SW_DEFAULT_C_STACK_SIZE (2 *1024 * 1024) size_t Coroutine::stack_size = SW_DEFAULT_C_STACK_SIZE; ctx(stack_size, fn, private_data) Context::Context(size_t stack_size, coroutine_func_t fn, void* private_data) : fn_(fn), stack_size_(stack_size), private_data_(private_data) { end_ = false; // 標記協程是否已經執行完成 swap_ctx_ = nullptr; stack_ = (char*) sw_malloc(stack_size_); // 分配一塊內存儲存c棧, 默認2M ... void* sp = (void*) ((char*) stack_ + stack_size_); // 計算出棧頂地址即最高地址 ctx_ = make_fcontext(sp, stack_size_, (void (*)(intptr_t))&context_func); // 構建上下文 }
make_fcontext函數是boost.context庫中提供的,由彙編編寫,不一樣平臺有不一樣實現,咱們這裏使用的是make_x86_64_sysv_elf_gas.S這個文件:函數
傳參使用的寄存器依次是rdi、rsi、rdx、rcx、r八、r9
make_fcontext: /* first arg of make_fcontext() == top of context-stack */ /* rax = sp */ movq %rdi, %rax /* shift address in RAX to lower 16 byte boundary */ /* rax = rax & -16 => rax = rax & (~0x10000 + 1) => rax = rax - rax%16, 其實就是按16對齊*/ andq $-16, %rax /* reserve space for context-data on context-stack */ /* size for fc_mxcsr .. RIP + return-address for context-function */ /* on context-function entry: (RSP -0x8) % 16 == 0 */ /*lea是「load effective address」的縮寫, 簡單的說,lea指令能夠用來將一個內存地址直接賦給目的操做數, 例如:lea eax,[ebx+8]就是將ebx+8這個值直接賦給eax,而不是把ebx+8處的內存地址裏的數據賦給eax。 而mov指令則偏偏相反,例如:mov eax,[ebx+8]則是把內存地址爲ebx+8處的數據賦給eax。*/ /* rax = rax - 0x48, 預留0x48個字節 */ leaq -0x48(%rax), %rax /* third arg of make_fcontext() == address of context-function */ /* context_func函數地址放在rax+0x38處*/ movq %rdx, 0x38(%rax) /* save MMX control- and status-word */ stmxcsr (%rax) /* save x87 control-word */ fnstcw 0x4(%rax) /* compute abs address of label finish */ /* https://sourceware.org/binutils/docs/as/i386_002dMemory.html The x86-64 architecture adds an RIP (instruction pointer relative) addressing. This addressing mode is specified by using ‘rip’ as a base register. Only constant offsets are valid. For example: AT&T: ‘1234(%rip)’, Intel: ‘[rip + 1234]’ Points to the address 1234 bytes past the end of the current instruction. AT&T: ‘symbol(%rip)’, Intel: ‘[rip + symbol]’ Points to the symbol in RIP relative way, this is shorter than the default absolute addressing. */ /* rcx = finish */ leaq finish(%rip), %rcx /* save address of finish as return-address for context-function */ /* will be entered after context-function returns */ /* finish函數地址放在rax+0x40處 */ movq %rcx, 0x40(%rax) /*return rax*/ ret /* return pointer to context-data */ finish: /* exit code is zero */ xorq %rdi, %rdi /* exit application */ call _exit@PLT hlt
make_fcontext函數執行完以後, 用來保存上下文的內存佈局是這樣:
/**************************************************************************************** * |<- ctx_ ---------------------------------------------------------------------------------- * * | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | * * ---------------------------------------------------------------------------------- * * | 0x0 | 0x4 | 0x8 | 0xc | 0x10 | 0x14 | 0x18 | 0x1c | * * ---------------------------------------------------------------------------------- * * | fc_mxcsr|fc_x87_cw| | | | * * ---------------------------------------------------------------------------------- * * ---------------------------------------------------------------------------------- * * | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | * * ---------------------------------------------------------------------------------- * * | 0x20 | 0x24 | 0x28 | 0x2c | 0x30 | 0x34 | 0x38 | 0x3c | * * ---------------------------------------------------------------------------------- * * | | | | context_func | * * ---------------------------------------------------------------------------------- * * ---------------------------------------------------------------------------------- * * | 16 | 17 | | * * ---------------------------------------------------------------------------------- * * | 0x40 | 0x44 | | * * ---------------------------------------------------------------------------------- * * | finish | | * * ---------------------------------------------------------------------------------- * * * ****************************************************************************************/
Coroutine對象被實例化完以後開始執行run方法, run方法會將上一個執行了相關方法的Coroutine對象存入origin中, 並把current置爲當前對象:
static sw_co_thread_local Coroutine* current; Coroutine *origin; inline long run() { long cid = this->cid; origin = current; // orign保存原來的對象 current = this; // current置爲當前對象 ctx.swap_in(); // 換入 ... }
接下來是切換c棧的核心方法, swap_in和swap_out, 底層也是由boost.context庫提供的, 先來看換入:
bool Context::swap_in() { jump_fcontext(&swap_ctx_, ctx_, (intptr_t) this, true); return true; } // jump_x86_64_sysv_elf_gas.S jump_fcontext: /* 當前寄存器壓入棧, 注意, rbp上面實際上還有一個rip, 由於call jump_fcontext 等價於 push rip, jmp jump_fcontext. */ /* rip保存着下一條要執行的指令, 在這裏就是jump_fcontext以後的return true */ pushq %rbp /* save RBP */ pushq %rbx /* save RBX */ pushq %r15 /* save R15 */ pushq %r14 /* save R14 */ pushq %r13 /* save R13 */ pushq %r12 /* save R12 */ /* prepare stack for FPU */ leaq -0x8(%rsp), %rsp /* test for flag preserve_fpu */ cmp $0, %rcx je 1f /* save MMX control- and status-word */ stmxcsr (%rsp) /* save x87 control-word */ fnstcw 0x4(%rsp) 1: /* store RSP (pointing to context-data) in RDI */ /* *swap_ctx_ = rsp, 保存棧頂位置 */ movq %rsp, (%rdi) /* restore RSP (pointing to context-data) from RSI */ /* rsp = ctx_, 這裏將當前執行棧指向了剛剛經過make_fcontext構建出來的棧 */ movq %rsi, %rsp /* test for flag preserve_fpu */ cmp $0, %rcx je 2f /* restore MMX control- and status-word */ ldmxcsr (%rsp) /* restore x87 control-word */ fldcw 0x4(%rsp) 2: /* prepare stack for FPU */ leaq 0x8(%rsp), %rsp /* 將寄存器恢復重新棧上壓入的值, 此次執行時這裏還都是空的 */ popq %r12 /* restrore R12 */ popq %r13 /* restrore R13 */ popq %r14 /* restrore R14 */ popq %r15 /* restrore R15 */ popq %rbx /* restrore RBX */ popq %rbp /* restrore RBP */ /* restore return-address */ /* r8 = make_fcontext(往上看看make_fcontext結束後的內存佈局圖) */ popq %r8 /* use third arg as return-value after jump */ /* rax = this */ movq %rdx, %rax /* use third arg as first arg in context function */ /* rdi = this */ movq %rdx, %rdi /* indirect jump to context */ /* 執行context_func */ jmp *%r8
jump_fcontext執行完以後原來的棧內存佈局是這樣:
/**************************************************************************************** * |<-swap_ctx_ * * ---------------------------------------------------------------------------------- * * | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | * * ---------------------------------------------------------------------------------- * * | 0x0 | 0x4 | 0x8 | 0xc | 0x10 | 0x14 | 0x18 | 0x1c | * * ---------------------------------------------------------------------------------- * * | fc_mxcsr|fc_x87_cw| R12 | R13 | R14 | * * ---------------------------------------------------------------------------------- * * ---------------------------------------------------------------------------------- * * | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | * * ---------------------------------------------------------------------------------- * * | 0x20 | 0x24 | 0x28 | 0x2c | 0x30 | 0x34 | 0x38 | 0x3c | * * ---------------------------------------------------------------------------------- * * | R15 | RBX | RBP | RIP/return true | * * ---------------------------------------------------------------------------------- * * * ****************************************************************************************/
context_func有一個參數, jump_fcontext執行完後往rdi寫入的this將做爲參數給context_func使用, fn_, private_data_是構造ctx時傳入的參數:
void Context::context_func(void *arg) { Context *_this = (Context *) arg; _this->fn_(_this->private_data_); // main_func(php_coro_args) _this->end_ = true; _this->swap_out(); }
main_func會爲當前協程分配一個新的執行棧, 並將其與剛剛實例化好的Coroutine綁定, 而後執行協程的回調函數:
void PHPCoroutine::main_func(void *arg) { ... // 在EG上建立一個新的vmstack, 用於執行go()裏的回調函數, 以前的執行棧已經被保存在main_task上了 vm_stack_init(); call = (zend_execute_data *) (EG(vm_stack_top)); task = (php_coro_task *) EG(vm_stack_top); EG(vm_stack_top) = (zval *) ((char *) call + PHP_CORO_TASK_SLOT * sizeof(zval)); // 爲task預留位置 call = zend_vm_stack_push_call_frame(call_info, func, argc, object_or_called_scope); // 爲參數分配棧空間 EG(bailout) = NULL; EG(current_execute_data) = call; EG(error_handling) = EH_NORMAL; EG(exception_class) = NULL; EG(exception) = NULL; save_vm_stack(task); // 保存vmstack到當前task上 record_last_msec(task); // 記錄時間 task->output_ptr = NULL; task->array_walk_fci = NULL; task->co = Coroutine::get_current(); // 記錄當前coroutine task->co->set_task((void *) task); // coroutine與當前task綁定 task->defer_tasks = nullptr; task->pcid = task->co->get_origin_cid(); // 記錄上一個協程id task->context = nullptr; task->enable_scheduler = 1; if (EXPECTED(func->type == ZEND_USER_FUNCTION)) { ... // 初始化execute_data zend_init_func_execute_data(call, &func->op_array, retval); // 執行協程裏的用戶函數 zend_execute_ex(EG(current_execute_data)); } ... }
接下來就是執行用戶回調函數生成的opcode了, 執行到Co::sleep(1)時會調用System::sleep(seconds), 這裏面會爲當前coroutine註冊一個定時事件, 回調函數是sleep_timeout:
int System::sleep(double sec) { Coroutine* co = Coroutine::get_current_safe(); // 獲取當前coroutine if (swoole_timer_add((long) (sec * 1000), SW_FALSE, sleep_timeout, co) == NULL) // 爲當前couroutine添加一個定時事件 { return -1; } co->yield(); // 切換 return 0; } // 定時事件註冊的回調 static void sleep_timeout(swTimer *timer, swTimer_node *tnode) { ((Coroutine *) tnode->data)->resume(); }
yield函數負責php棧和c棧的切換
void Coroutine::yield() { SW_ASSERT(current == this || on_bailout != nullptr); state = SW_CORO_WAITING; // 協程狀態變爲waiting if (sw_likely(on_yield)) { on_yield(task); // php棧切換 } current = origin; // 切換當前協程到上一個 ctx.swap_out(); // c棧切換 }
先來看php棧的切換, on_yield是初始化時已經註冊好的函數
void PHPCoroutine::init() { Coroutine::set_on_yield(on_yield); Coroutine::set_on_resume(on_resume); Coroutine::set_on_close(on_close); } void PHPCoroutine::on_yield(void *arg) { php_coro_task *task = (php_coro_task *) arg; // 當前task php_coro_task *origin_task = get_origin_task(task); // 獲取上一個task save_task(task); // 保存當前任務 restore_task(origin_task); // 恢復上一個任務 }
拿到上一個task就能夠經過上面保存的執行信息恢復EG了, 程序很簡單, 只要把vmstack和current_execute_data換回來就能夠了:
void PHPCoroutine::restore_task(php_coro_task *task) { restore_vm_stack(task); ... } inline void PHPCoroutine::restore_vm_stack(php_coro_task *task) { EG(bailout) = task->bailout; EG(vm_stack_top) = task->vm_stack_top; EG(vm_stack_end) = task->vm_stack_end; EG(vm_stack) = task->vm_stack; EG(vm_stack_page_size) = task->vm_stack_page_size; EG(current_execute_data) = task->execute_data; EG(error_handling) = task->error_handling; EG(exception_class) = task->exception_class; EG(exception) = task->exception; ... }
這個時候php棧執行狀態已經恢復到剛剛調用go()函數時的狀態了(main_task), 再看看c棧切換是怎麼處理的:
bool Context::swap_out() { jump_fcontext(&ctx_, swap_ctx_, (intptr_t) this, true); return true; }
回憶一下swap_in函數, swap_ctx_保存着執行swap_in時的rsp, ctx_保存着經過make_fcontext初始化好的棧頂位置, 再來看一遍jump_fcontext執行:
// jump_x86_64_sysv_elf_gas.S jump_fcontext: /* 當前寄存器壓入棧, 注意, rbp上面實際上還有一個rip, 由於call jump_fcontext 等價於 push rip, jmp jump_fcontext. */ /* rip保存着下一條要執行的指令, 在這裏就是swap_out裏jump_fcontext以後的return true */ pushq %rbp /* save RBP */ pushq %rbx /* save RBX */ pushq %r15 /* save R15 */ pushq %r14 /* save R14 */ pushq %r13 /* save R13 */ pushq %r12 /* save R12 */ /* prepare stack for FPU */ leaq -0x8(%rsp), %rsp /* test for flag preserve_fpu */ cmp $0, %rcx je 1f /* save MMX control- and status-word */ stmxcsr (%rsp) /* save x87 control-word */ fnstcw 0x4(%rsp) 1: /* store RSP (pointing to context-data) in RDI */ /* *ctx_ = rsp, 保存棧頂位置 */ movq %rsp, (%rdi) /* restore RSP (pointing to context-data) from RSI */ /* rsp = swap_ctx_, 這裏將當前執行棧指向了以前執行swap_in時的rsp */ movq %rsi, %rsp /* test for flag preserve_fpu */ cmp $0, %rcx je 2f /* restore MMX control- and status-word */ ldmxcsr (%rsp) /* restore x87 control-word */ fldcw 0x4(%rsp) 2: /* prepare stack for FPU */ leaq 0x8(%rsp), %rsp /* 將寄存器恢復到執行swap_in時的狀態 */ popq %r12 /* restrore R12 */ popq %r13 /* restrore R13 */ popq %r14 /* restrore R14 */ popq %r15 /* restrore R15 */ popq %rbx /* restrore RBX */ popq %rbp /* restrore RBP */ /* restore return-address */ /* r8 = Context::swap_in::return true */ popq %r8 /* use third arg as return-value after jump */ /* rax = this */ movq %rdx, %rax /* use third arg as first arg in context function */ /* rdi = this */ movq %rdx, %rdi /* indirect jump to context */ /* 接着上一次swap_in的位置繼續執行 */ jmp *%r8
這個時候php和c棧都已經恢復到執行swap_in的狀態, 代碼一路返回到zif_swoole_coroutine_create執行完畢:
bool Context::swap_in() { jump_fcontext(&swap_ctx_, ctx_, (intptr_t) this, true); return true; // 從這裏開始繼續執行, 回到以前調用它的函數 } inline long run() { ... ctx.swap_in(); // 返回 check_end(); // 檢查協程是否已經執行完畢, 執行完畢須要作清理 return cid; } static inline long create(coroutine_func_t fn, void* args = nullptr) { return (new Coroutine(fn, args))->run(); } long PHPCoroutine::create(zend_fcall_info_cache *fci_cache, uint32_t argc, zval *argv) { ... return Coroutine::create(main_func, (void*) &php_coro_args); } PHP_FUNCTION(swoole_coroutine_create) { ... long cid = PHPCoroutine::create(&fci_cache, fci.param_count, fci.params); ... RETURN_LONG(cid); // 返回協程id }
由於execute_data已經切換回main_task上的主協程opcode了, 因此下一條opcode是 'echo "a"', 至關於把sleep後面的代碼跳過了
<?php go(function (){ Co::sleep(1); echo "a"; }); echo "c"; // 從這裏開始繼續執行
等到必定時機, 定時器會調用sleep函數註冊的回調函數sleep_timeout(調用時機後面會介紹), 喚醒協程繼續運轉:
// 定時事件註冊的回調 static void sleep_timeout(swTimer *timer, swTimer_node *tnode) { ((Coroutine *) tnode->data)->resume(); } // 恢復整個執行環境 void Coroutine::resume() { ... state = SW_CORO_RUNNING; // 協程狀態改成進行中 if (sw_likely(on_resume)) { on_resume(task); // 恢復php執行狀態 } origin = current; current = this; ctx.swap_in(); // 恢復c棧 ... } // 恢復task void PHPCoroutine::on_resume(void *arg) { php_coro_task *task = (php_coro_task *) arg; php_coro_task *current_task = get_task(); save_task(current_task); // 保存當前任務 restore_task(task); // 恢復任務 record_last_msec(task); // 記錄時間 }
zend_vm會讀取到以後的opcode 'echo "a"', 繼續執行
<?php go(function (){ Co::sleep(1); echo "a"; // 從這裏開始繼續執行 }); echo "c";
當前回調中的opcode被所有執行完畢以後, PHPCoroutine::main_func還會把以前註冊的defer執行一遍, 順序是FILO, 而後清理資源
void PHPCoroutine::main_func(void *arg) { ... if (EXPECTED(func->type == ZEND_USER_FUNCTION)) { ... // 協程回調函數執行完畢, 返回 zend_execute_ex(EG(current_execute_data)); } if (task->defer_tasks) { std::stack<php_swoole_fci *> *tasks = task->defer_tasks; while (!tasks->empty()) { php_swoole_fci *defer_fci = tasks->top(); tasks->pop(); // FILO // 調用defer註冊的函數 if (UNEXPECTED(sw_zend_call_function_anyway(&defer_fci->fci, &defer_fci->fci_cache) != SUCCESS)) { ... } } } // resources release ... }
main_func執行完回到Context::context_func方法, 把當前協程標記爲已結束, 再作一次swap_out回到剛剛swap_in的地方, 也就是resume方法, 以後去檢查喚醒的協程有沒有執行完畢, 檢查只須要判斷end_屬性
void Context::context_func(void *arg) { Context *_this = (Context *) arg; _this->fn_(_this->private_data_); // main_func(closure)返回 _this->end_ = true; // 當前協程標記爲已結束 _this->swap_out(); // 切換回main c棧 } void Coroutine::resume() { ... ctx.swap_in(); // 切換回這裏 check_end(); // 檢查協程是否已經結束 } inline void check_end() { if (ctx.is_end()) { close(); } } inline bool is_end() { return end_; }
close方法會清理爲這個協程建立的vm_stack, 同時切回到main_task, 這時c棧和php棧都已經切換回主協程
void Coroutine::close() { ... state = SW_CORO_END; // 狀態改成已結束 if (on_close) { on_close(task); } current = origin; coroutines.erase(cid); // 移除當前協程 delete this; } void PHPCoroutine::on_close(void *arg) { php_coro_task *task = (php_coro_task *) arg; php_coro_task *origin_task = get_origin_task(task); vm_stack_destroy(); // 銷燬vm_stack restore_task(origin_task); // 還原main_task }
那麼定時事件何時會被執行呢? 這是經過內部的Reactor事件循環去實現的, 下面來看具體實現:
建立協程時會判斷reactor是否已經初始化, 沒有初始化則會調用activate函數初始化reactor, activate函數大概有這幾個步驟:
1.初始化reactor結構, 註冊各類回調函數(讀寫事件採用對應平臺效率最高的多路複用api, 封裝成統一的回調函數有助於屏蔽不一樣api實現細節)
2.經過php_swoole_register_shutdown_function("Swoole\Event::rshutdown")註冊一個在request_shutdown階段調用的函數(回憶一下php的生命週期, 腳本結束的時候會調用此函數), 實際上事件循環就在這個階段執行
3.開啓搶佔式調度線程(這個後面會說)
long PHPCoroutine::create(zend_fcall_info_cache *fci_cache, uint32_t argc, zval *argv) { ... if (sw_unlikely(!active)) { activate(); } ... } inline void PHPCoroutine::activate() { ... /* init reactor and register event wait */ php_swoole_check_reactor(); /* replace interrupt function */ orig_interrupt_function = zend_interrupt_function; // 保存原來的中斷回調函數 zend_interrupt_function = coro_interrupt_function; // 替換中斷函數 // 開啓搶佔式調度 if (SWOOLE_G(enable_preemptive_scheduler) || config.enable_preemptive_scheduler) { /* create a thread to interrupt the coroutine that takes up too much time */ interrupt_thread_start(); } ... active = true; } static sw_inline int php_swoole_check_reactor() { ... if (sw_unlikely(!SwooleG.main_reactor)) { return php_swoole_reactor_init() == SW_OK ? 1 : -1; } ... } int php_swoole_reactor_init() { ... if (!SwooleG.main_reactor) { swoole_event_init(); SwooleG.main_reactor->wait_exit = 1; // 註冊rshutdown函數 php_swoole_register_shutdown_function("Swoole\\Event::rshutdown"); } ... } #define sw_reactor() (SwooleG.main_reactor) #define SW_REACTOR_MAXEVENTS 4096 int swoole_event_init() { SwooleG.main_reactor = (swReactor *) sw_malloc(sizeof(swReactor)); if (swReactor_create(sw_reactor(), SW_REACTOR_MAXEVENTS) < 0) { ... } ... } int swReactor_create(swReactor *reactor, int max_event) { int ret; bzero(reactor, sizeof(swReactor)); #ifdef HAVE_EPOLL ret = swReactorEpoll_create(reactor, max_event); #elif defined(HAVE_KQUEUE) ret = swReactorKqueue_create(reactor, max_event); #elif defined(HAVE_POLL) ret = swReactorPoll_create(reactor, max_event); #else ret = swReactorSelect_create(reactor); #endif ... reactor->onTimeout = reactor_timeout; // 有定時器超時時觸發的回調 ... Socket::init_reactor(reactor); ... } int swReactorEpoll_create(swReactor *reactor, int max_event_num) { ... //binding method reactor->add = swReactorEpoll_add; reactor->set = swReactorEpoll_set; reactor->del = swReactorEpoll_del; reactor->wait = swReactorEpoll_wait; reactor->free = swReactorEpoll_free; }
request_shutdown階段會執行註冊的Swoole\Event::rshutdown函數, swoole_event_rshutdown會執行以前註冊的wait函數:
static PHP_FUNCTION(swoole_event_rshutdown) { /* prevent the program from jumping out of the rshutdown */ zend_try { PHP_FN(swoole_event_wait)(INTERNAL_FUNCTION_PARAM_PASSTHRU); } zend_end_try(); } int swoole_event_wait() { int retval = sw_reactor()->wait(sw_reactor(), NULL); swoole_event_free(); return retval; }
咱們再來看看定時事件的註冊, 首先會初始化timer:
int System::sleep(double sec) { Coroutine* co = Coroutine::get_current_safe(); // 獲取當前coroutine if (swoole_timer_add((long) (sec * 1000), SW_FALSE, sleep_timeout, co) == NULL) { ... } } swTimer_node* swoole_timer_add(long ms, uchar persistent, swTimerCallback callback, void *private_data) { return swTimer_add(sw_timer(), ms, persistent, private_data, callback); } swTimer_node* swTimer_add(swTimer *timer, long _msec, int interval, void *data, swTimerCallback callback) { if (sw_unlikely(!timer->initialized)) { if (sw_unlikely(swTimer_init(timer, _msec) != SW_OK)) // 初始化timer { return NULL; } } ... } static int swTimer_init(swTimer *timer, long msec) { ... timer->heap = swHeap_new(1024, SW_MIN_HEAP); // 初始化最小堆 timer->map = swHashMap_new(SW_HASHMAP_INIT_BUCKET_N, NULL); timer->_current_id = -1; // 當前定時器id timer->_next_msec = msec; // 定時器裏最短的超時時間 timer->_next_id = 1; timer->round = 0; ret = swReactorTimer_init(SwooleG.main_reactor, timer, msec); ... } static int swReactorTimer_init(swReactor *reactor, swTimer *timer, long exec_msec) { reactor->check_timer = SW_TRUE; reactor->timeout_msec = exec_msec; // 定時器裏最短的超時時間 reactor->timer = timer; timer->reactor = reactor; timer->set = swReactorTimer_set; timer->close = swReactorTimer_close; ... }
接着是添加事件, 須要注意的是:
1.time._next_msec和reactor.timeout_msec一直保持全部計時器裏最短的超時時間(相對值)
2.tnode.exec_msec和tnode用最小堆來保存, 這樣一來堆頂的元素就是最先超時的元素
swTimer_node* swTimer_add(swTimer *timer, long _msec, int interval, void *data, swTimerCallback callback) { swTimer_node *tnode = sw_malloc(sizeof(swTimer_node)); int64_t now_msec = swTimer_get_relative_msec(); tnode->data = data; tnode->type = SW_TIMER_TYPE_KERNEL; tnode->exec_msec = now_msec + _msec; // 絕對時間 tnode->interval = interval ? _msec : 0; // 是否須要一直調用 tnode->removed = 0; tnode->callback = callback; tnode->round = timer->round; tnode->dtor = NULL; if (timer->_next_msec < 0 || timer->_next_msec > _msec) // 必要時更新, 始終保持最小超時時間 { timer->set(timer, _msec); timer->_next_msec = _msec; } tnode->id = timer->_next_id++; tnode->heap_node = swHeap_push(timer->heap, tnode->exec_msec, tnode); // 放入堆, priority = tnode->exec_msec if (sw_unlikely(swHashMap_add_int(timer->map, tnode->id, tnode) != SW_OK)) // hashmap保存tnodeid和tnode映射關係 { ... } ... }
定時時間註冊完就能夠等待被事件循環執行了, 咱們以epoll爲例:
使用epoll_wait等待fd讀寫事件, 傳入reactor->timeout_msec, 等待fd事件到來
1.若是epoll_wait超時時還未獲取到任何fd讀寫事件, 執行onTimeout函數, 處理定時事件
2.有fd事件則處理fd讀寫事件, 處理完此次因此觸發的事件後, 進入下一次循環
static int swReactorEpoll_wait(swReactor *reactor, struct timeval *timeo) { ... reactor->running = 1; reactor->start = 1; while (reactor->running > 0) { ... n = epoll_wait(epoll_fd, events, max_event_num, reactor->timeout_msec); if (n < 0) { ... // 錯誤處理 } else if (n == 0) { reactor->onTimeout(reactor); } for (i = 0; i < n; i++) { ... // fd讀寫事件處理 } ... } return 0; }
若是這期間沒有任何fd事件, 定時事件會被執行, onTimeout是以前已經註冊過的函數reactor_timeout, swTimer_select函數會把當前因此已經到期的事件執行完再退出循環, 執行到上文咱們註冊的sleep_timeout函數時, 就會喚醒由於sleep休眠的協程繼續執行:
static void reactor_timeout(swReactor *reactor) { reactor_finish(reactor); ... } static void reactor_finish(swReactor *reactor) { //check timer if (reactor->check_timer) { swTimer_select(reactor->timer); } ... //the event loop is empty if (reactor->wait_exit && reactor->is_empty(reactor)) // 沒有任務了, 退出循環 { reactor->running = 0; } } int swTimer_select(swTimer *timer) { int64_t now_msec = swTimer_get_relative_msec(); // 當前時間 while ((tmp = swHeap_top(timer->heap))) // 獲取最先到期的事件 { tnode = tmp->data; if (tnode->exec_msec > now_msec) // 未到時間 { break; } if (!tnode->removed) { tnode->callback(timer, tnode); // 執行定時事件註冊的回調函數 } timer->num--; swHeap_pop(timer->heap); swHashMap_del_int(timer->map, tnode->id); } ... }
到這裏, 整個流程都已經介紹完了, 總結一下:
經過上面咱們能夠知道, 若是協程裏沒有任何IO/定時事件, 實際上協程是沒有切換時機的, 對於CPU密集型的場景,一些協程會由於得不到CPU時間片被餓死, Swoole 4.4引入了搶佔式調度就是爲了解決這個問題.
vm interrupt是php7.1.0後引入的執行機制, swoole就是使用這個特性實現的搶佔式調度:
1.ZEND_VM_INTERRUPT_CHECK會在指令是jump和call的時候執行
2.ZEND_VM_INTERRUPT_CHECK會檢查EG(vm_interrupt)這個標誌位, 若是爲1, 則觸發zend_interrupt_function的執行
// php 7.1.26 src #define ZEND_VM_INTERRUPT_CHECK() do { \ if (UNEXPECTED(EG(vm_interrupt))) { \ ZEND_VM_INTERRUPT(); \ } \ } while (0) #define ZEND_VM_INTERRUPT() ZEND_VM_TAIL_CALL(zend_interrupt_helper_SPEC(ZEND_OPCODE_HANDLER_ARGS_PASSTHRU)); static ZEND_OPCODE_HANDLER_RET ZEND_FASTCALL zend_interrupt_helper_SPEC(ZEND_OPCODE_HANDLER_ARGS) { ... EG(vm_interrupt) = 0; if (zend_interrupt_function) { zend_interrupt_function(execute_data); } }
下面來看具體實現:
初始化:
1.保存原來的中斷函數, zend_interrupt_function替換成新的中斷函數
2.開啓線程執行interrupt_thread_loop
3.interrupt_thread_loop裏每隔5ms將EG(vm_interrupt)設置爲1
inline void PHPCoroutine::activate() { ... /* replace interrupt function */ orig_interrupt_function = zend_interrupt_function; // 保存原來的中斷回調函數 zend_interrupt_function = coro_interrupt_function; // 替換中斷函數 // 開啓搶佔式調度 if (SWOOLE_G(enable_preemptive_scheduler) || config.enable_preemptive_scheduler) // 配置要開啓enable_preemptive_scheduler選項 { /* create a thread to interrupt the coroutine that takes up too much time */ interrupt_thread_start(); } } void PHPCoroutine::interrupt_thread_start() { zend_vm_interrupt = &EG(vm_interrupt); interrupt_thread_running = true; if (pthread_create(&interrupt_thread_id, NULL, (void * (*)(void *)) interrupt_thread_loop, NULL) < 0) { ... } } static const uint8_t MAX_EXEC_MSEC = 10; void PHPCoroutine::interrupt_thread_loop() { static const useconds_t interval = (MAX_EXEC_MSEC / 2) * 1000; while (interrupt_thread_running) { *zend_vm_interrupt = 1; // EG(vm_interrupt) = 1 usleep(interval); // 休眠5ms } pthread_exit(0); }
中斷函數coro_interrupt_function會檢查當前的協程是否可調度(距離上一次切換時間超過10ms), 若是能夠, 直接讓出當前協程, 完成搶佔調度
static void coro_interrupt_function(zend_execute_data *execute_data) { php_coro_task *task = PHPCoroutine::get_task(); if (task && task->co && PHPCoroutine::is_schedulable(task)) { task->co->yield(); // 讓出當前協程 } if (orig_interrupt_function) { orig_interrupt_function(execute_data); // 執行原有的中斷函數 } } static const uint8_t MAX_EXEC_MSEC = 10; static inline bool is_schedulable(php_coro_task *task) { // enable_scheduler屬性爲1而且已經連續執行超過10ms了 return task->enable_scheduler && (swTimer_get_absolute_msec() - task->last_msec > MAX_EXEC_MSEC); }