做者:李樂
本文基於Swoole-4.3.2和PHP-7.1.0版本php
Swoole4爲PHP語言提供了強大的CSP協程編程模式,用戶能夠經過go函數建立一個協程,以達到併發執行的效果,以下面代碼所示:node
<?php //Co::sleep()是Swoole提供的API,並不會阻塞當前進程,只會阻塞協程觸發協程切換。 go(function (){ Co::sleep(1); echo "a"; }); go(function (){ Co::sleep(2); echo "b"; }); echo "c"; //輸出結果:cab //程序總執行時間2秒
其實在Swoole4以前就實現了多協程編程模式,在協程建立、切換以及結束的時候,相應的操做php棧便可(建立、切換以及回收php棧)。react
此時的協程實現沒法完美的支持php語法,其根本緣由在於沒有保存c棧信息。(vm內部或者某些擴展提供的API是經過c函數實現的,調用這些函數時若是發生協程切換,c棧該如何處理?)編程
Swoole4新增了c棧的管理,在協程建立、切換以及結束的同時會伴隨着c棧的建立、切換以及回收。數組
Swoole4協程實現方案以下圖所示:swoole
其中:數據結構
對應c棧的建立以及換入換出。併發
函數是對代碼的封裝,對外暴露的只是一組指定的參數和一個可選的返回值;假設函數P調用函數Q,Q執行後返回函數P,實現該函數調用須要考慮如下三點:socket
大多數語言的函數調用都採用了棧結構實現,函數的調用與返回即對應的是一系列的入棧與出棧操做,咱們一般稱之爲函數棧幀(stack frame)。示意圖以下:函數
上面提到的程序計數器即寄存器%rip,另外還有兩個寄存器須要重點關注:%rbp指向棧幀底部,%rsp指向棧幀頂部。
下面將經過具體的代碼事例,爲讀者講解函數棧幀。c代碼與彙編代碼以下:
int add(int x, int y) { int a, b; a = 10; b = 5; return x+y; } int main() { int sum = add(1,2); }
main: pushq %rbp movq %rsp, %rbp subq $16, %rsp movl $2, %esi movl $1, %edi call add movl %eax, -4(%rbp) leave ret add: pushq %rbp movq %rsp, %rbp movl %edi, -20(%rbp) movl %esi, -24(%rbp) movl $10, -4(%rbp) movl $5, -8(%rbp) movl -24(%rbp), %eax movl -20(%rbp), %edx addl %edx, %eax popq %rbp ret
分析彙編代碼:
問題:觀察上面的彙編代碼,輸入參數分別使用的是寄存器%edi和%esi,返回值使用的是寄存器%eax,輸入輸出參數不該該保存在棧上嗎?寄存器比內存訪問要快的多,現代處理器寄存器數目也比較多,所以傾向於將參數優先保存在寄存器。好比%rdi, %rsi, %rdx, %rcx, %r8d, %r9d 六個寄存器用於存儲函數調用時的前6個參數,那麼當輸入參數數目超過6個時,如何處理?這些輸入參數只能存儲在棧上了。
(%rdi等表示64位寄存器,%edi等表示32位寄存器)
//add函數須要9個參數 add(1,2,3,4,5,6,7,8,9); //參數7,8,9存儲在棧上 movl $9, 16(%rsp) movl $8, 8(%rsp) movl $7, (%rsp) movl $6, %r9d movl $5, %r8d movl $4, %ecx movl $3, %edx movl $2, %esi movl $1, %edi
經過學習c棧基本知識,咱們知道最主要有三個寄存器:%rip程序計數器指向下一條須要執行的指令,%rbp指向函數棧幀底部,%rsp指向函數棧幀頂部。這三個寄存器能夠肯定一個c棧執行上下文,c棧的管理其實就是這些寄存器的管理。
第一節咱們提到Swoole在管理c棧時,用到了 boost.context庫,其中make_fcontext()和jump_fcontext()函數均使用匯編語言編寫,實現了c棧執行上下文的建立以及切換;函聲明命以下:
fcontext_t make_fcontext(void *sp, size_t size, void (*fn)(intptr_t)); intptr_t jump_fcontext(fcontext_t *ofc, fcontext_t nfc, intptr_t vp, bool preserve_fpu = false);
make_fcontext函數用於建立一個執行上下文,其中參數sp指向內存最高地址處(在堆中分配一塊內存做爲該執行上下文的c棧),參數size爲棧大小,參數fn是一個函數指針,指向該執行上下文的入口函數;代碼主要邏輯以下:
/*%rdi表示第一個參數sp,指向棧頂*/ movq %rdi, %rax //保證%rax指向的地址按照16字節對齊 andq $-16, %rax //將%rax向低地址處偏移0x48字節 leaq -0x48(%rax), %rax /* %rdx表示第三個參數fn,保存在%rax偏移0x38位置處 */ movq %rdx, 0x38(%rax) stmxcsr (%rax) fnstcw 0x4(%rax) leaq finish(%rip), %rcx movq %rcx, 0x40(%rax) //返回值保存在%rax寄存器 ret
make_fcontext函數建立的執行上下文示意圖以下(能夠看到預留了若干字節用於保存上下文信息):
Swoole協程實現的Context層封裝了上下文的建立,建立上下文函數實現以下:
Context::Context(size_t stack_size, coroutine_func_t fn, void* private_data) : fn_(fn), stack_size_(stack_size), private_data_(private_data) { stack_ = (char*) sw_malloc(stack_size_); void* sp = (void*) ((char*) stack_ + stack_size_); ctx_ = make_fcontext(sp, stack_size_, (void (*)(intptr_t))&context_func); }
能夠看到c棧執行上下文是經過sw_malloc函數在堆上分配的一塊內存,默認大小爲2M字節;參數sp指向的是內存最高地址處;執行上下文的入口函數爲Context::context_func()。
jump_fcontext函數用於切換c棧上下文:1)函數會將當前上下文(寄存器)保存在當前棧頂(push),同時將%rsp寄存器內容保存在ofc地址;2)函數從nfc地址處恢復%rsp寄存器內容,同時從棧頂恢復上下文信息(pop)。代碼主要邏輯以下:
//-------------------保存當前c棧上下文------------------- pushq %rbp /* save RBP */ pushq %rbx /* save RBX */ pushq %r15 /* save R15 */ pushq %r14 /* save R14 */ pushq %r13 /* save R13 */ pushq %r12 /* save R12 */ leaq -0x8(%rsp), %rsp stmxcsr (%rsp) fnstcw 0x4(%rsp) //%rdi表示第一個參數,即ofc,保存%rsp到ofc地址處 movq %rsp, (%rdi) //-------------------從nfc中恢復上下文------------------- //%rsi表示第二個參數,即nfc,從nfc地址處恢復%rsp movq %rsi, %rsp ldmxcsr (%rsp) fldcw 0x4(%rsp) leaq 0x8(%rsp), %rsp popq %r12 /* restrore R12 */ popq %r13 /* restrore R13 */ popq %r14 /* restrore R14 */ popq %r15 /* restrore R15 */ popq %rbx /* restrore RBX */ popq %rbp /* restrore RBP */ //這裏彈出的實際上是以前保存的%rip popq %r8 //%rdx表示第三個參數,%rax用於存儲函數返回值; movq %rdx, %rax //%rdi用於存儲第一個參數 movq %rdx, %rdi //跳轉到%r8指向的地址 jmp *%r8
觀察jump_fcontext函數的彙編代碼,能夠看到保存上下文與恢復上下文的代碼基本是對稱的。恢復上下文時"popq %r8"用於彈出上一次保存的程序計數器%rip的內容,然而並無看到保存寄存器%rip的代碼。這是由於調用jump_fcontext函數時,底層call指令已經將%rip入棧了。
Swoole協程實現的Context層封裝了上下文的換入換出,能夠在上下文swap_ctx_和ctx_之間隨時換入換出,代碼實現以下:
bool Context::SwapIn() { jump_fcontext(&swap_ctx_, ctx_, (intptr_t) this, true); return true; } bool Context::SwapOut() { jump_fcontext(&ctx_, swap_ctx_, (intptr_t) this, true); return true; }
上下文示意圖以下所示:
php代碼在執行時,一樣存在函數棧幀的分配與回收。php將此抽象爲兩個結構,php棧zend_vm_stack,與執行數據(函數棧幀)zend_execute_data。
php棧結構與c棧結構基本相似,定義以下:
struct _zend_vm_stack { zval *top; zval *end; zend_vm_stack prev; };
其中top字段指向棧頂位置,end字段指向棧底位置;prev指向上一個棧,造成鏈表,當棧空間不夠時,能夠進行擴容。php虛擬機申請棧空間時默認大小爲256K,Swoole建立棧空間時默認大小爲8K。
執行數據結構體,咱們須要重點關注這幾個字段:當前函數編譯後的指令集(opline指向指令集數組中的某一個元素,虛擬機只須要遍歷該數組並執行全部指令便可),函數返回值,以及調用該函數的執行數據;結構定義以下:
struct _zend_execute_data { //當前執行指令 const zend_op *opline; zend_execute_data *call; //函數返回值 zval *return_value; zend_function *func; zval This; /* this + call_info + num_args */ //調用當前函數的棧幀 zend_execute_data *prev_execute_data; //符號表 zend_array *symbol_table; #if ZEND_EX_USE_RUN_TIME_CACHE void **run_time_cache; #endif #if ZEND_EX_USE_LITERALS //常量數組 zval *literals; #endif };
php棧初始化函數爲zend_vm_stack_init;當執行用戶函數調用時,虛擬機經過函數zend_vm_stack_push_call_frame在php棧上分配新的執行數據,並執行該函數代碼;函數執行完成後,釋放該執行數據。代碼邏輯以下:
ZEND_API void zend_execute(zend_op_array *op_array, zval *return_value) { //分配新的執行數據 execute_data = zend_vm_stack_push_call_frame(ZEND_CALL_TOP_CODE | ZEND_CALL_HAS_SYMBOL_TABLE, (zend_function*)op_array, 0, zend_get_called_scope(EG(current_execute_data)), zend_get_this_object(EG(current_execute_data))); //設置prev execute_data->prev_execute_data = EG(current_execute_data); //初始化當前執行數據,op_array即爲當前函數編譯獲得的指令集 i_init_execute_data(execute_data, op_array, return_value); //執行函數代碼 zend_execute_ex(execute_data); //釋放執行數據 zend_vm_stack_free_call_frame(execute_data); }
php棧幀結構示意圖以下:
Swoole協程實現,須要本身管理php棧,在發生協程建立以及切換時,對應的建立新的php棧,切換php棧,同時保存和恢復php棧上下文信息。這裏涉及到一個很重要的結構體php_coro_task:
struct php_coro_task { zval *vm_stack_top; zval *vm_stack_end; zend_vm_stack vm_stack; zend_execute_data *execute_data; };
這裏列出了php_coro_task結構體的若干關鍵字段,這些字段用於保存和恢復php上下文信息。
協程建立時,底層經過函數PHPCoroutine::create_func實現了php棧的建立:
void PHPCoroutine::create_func(void *arg) { //建立並初始化php棧 vm_stack_init(); call = (zend_execute_data *) (EG(vm_stack_top)); //爲結構php_coro_task分配空間 task = (php_coro_task *) EG(vm_stack_top); EG(vm_stack_top) = (zval *) ((char *) call + PHP_CORO_TASK_SLOT * sizeof(zval)); //建立新的執行數據結構 call = zend_vm_stack_push_call_frame( ZEND_CALL_TOP_FUNCTION | ZEND_CALL_ALLOCATED, func, argc, fci_cache.called_scope, fci_cache.object ); }
從代碼中能夠看到結構php_coro_task是直接存儲在php棧的底部。
當經過yield函數讓出CPU時,底層會調用函數PHPCoroutine::on_yield切換php棧:
void PHPCoroutine::on_yield(void *arg) { php_coro_task *task = (php_coro_task *) arg; php_coro_task *origin_task = get_origin_task(task); //保存當前php棧上下文信息到php_coro_task結構 save_task(task); //從php_coro_task結構中恢復php棧上下文信息 restore_task(origin_task); }
前面咱們簡單介紹了Swoole協程的實現方案,以及Swoole對c棧與php棧的管理,接下來將結合前面的知識,系統性的介紹Swoole協程的實現原理。
話很少說,先看一張圖:
Swoole建立協程可使用go()函數,底層實現對應的是PHP_FUNCTION(swoole_coroutine_create),其函數實現以下:
PHP_FUNCTION(swoole_coroutine_create) { …… long cid = PHPCoroutine::create(&fci_cache, fci.param_count, fci.params); } long PHPCoroutine::create(zend_fcall_info_cache *fci_cache, uint32_t argc, zval *argv) { …… save_task(get_task()); return Coroutine::create(create_func, (void*) &php_coro_args); } class Coroutine { public: static inline long create(coroutine_func_t fn, void* args = nullptr) { return (new Coroutine(fn, args))->run(); } }
class PHPCoroutine { public: static inline php_coro_task* get_task() { php_coro_task *task = (php_coro_task *) Coroutine::get_current_task(); return task ? task : &main_task; } }
//全局協程map std::unordered_map<long, Coroutine*> Coroutine::coroutines; class Coroutine { protected: Coroutine(coroutine_func_t fn, void *private_data) : ctx(stack_size, fn, private_data) { cid = ++last_cid; coroutines[cid] = this; } inline long run() { long cid = this->cid; origin = current; current = this; ctx.SwapIn(); if (ctx.end) { close(); } return cid; } }
Context::Context(size_t stack_size, coroutine_func_t fn, void* private_data) : fn_(fn), stack_size_(stack_size), private_data_(private_data) { …… ctx_ = make_fcontext(sp, stack_size_, (void (*)(intptr_t))&context_func); }
void Context::context_func(void *arg) { Context *_this = (Context *) arg; _this->fn_(_this->private_data_); _this->end = true; _this->SwapOut(); }
jump_fcontext(&swap_ctx_, ctx_, (intptr_t) this, true); jump_fcontext: movq %rdx, %rdi
調用jump_fcontext函數時,第三個參數傳遞的是this,即當前Context對象;而函數jump_fcontext彙編實現時,將第三個參數的內容拷貝到%rdi寄存器中,當協程換入執行函數context_func時,寄存器%rdi存儲的就是第一個參數,即Context對象。
在Swoole模塊初始化時,會調用函數swoole_coroutine_util_init(該函數同時聲明瞭"Co"等短名稱),該函數進一步的調用PHPCoroutine::init()方法,該方法完成了靜態屬性的賦值操做。
void PHPCoroutine::init() { Coroutine::set_on_yield(on_yield); Coroutine::set_on_resume(on_resume); Coroutine::set_on_close(on_close); }
用戶能夠經過Co::yield()和Co::resume()實現協程的讓出和恢復,
Co::yield()的底層實現函數爲PHP_METHOD(swoole_coroutine_util, yield),Co::resume()的底層實現函數爲PHP_METHOD(swoole_coroutine_util, resume)。本節將爲讀者講述協程切換的實現原理。
static unordered_map<int, Coroutine *> user_yield_coros; static PHP_METHOD(swoole_coroutine_util, yield) { Coroutine* co = Coroutine::get_current_safe(); user_yield_coros[co->get_cid()] = co; co->yield(); RETURN_TRUE; } static PHP_METHOD(swoole_coroutine_util, resume) { …… auto coroutine_iterator = user_yield_coros.find(cid); if (coroutine_iterator == user_yield_coros.end()) { swoole_php_fatal_error(E_WARNING, "you can not resume the coroutine which is in IO operation"); RETURN_FALSE; } user_yield_coros.erase(cid); co->resume(); }
void Coroutine::yield() { state = SW_CORO_WAITING; if (on_yield) { on_yield(task); } current = origin; ctx.SwapOut(); }
void Coroutine::resume() { state = SW_CORO_RUNNING; if (on_resume) { on_resume(task); } origin = current; current = this; ctx.SwapIn(); if (ctx.end) { close(); } }
typedef enum { SW_CORO_INIT = 0, SW_CORO_WAITING, SW_CORO_RUNNING, SW_CORO_END, } sw_coro_state;
協程切換過程比較簡單,這裏不作過多詳述。
當咱們調用Co::sleep()讓協程休眠時,會換出當前協程;或者調用CoroutineSocket->recv()從socket接收數據,但socket數據尚未準備好時,會阻塞當前協程,從而使得協程換出。那麼問題來了,何時再換入執行這個協程呢?
Swoole的socket讀寫使用的成熟的IO多路複用模型:epoll/kqueue/select/poll等,而且將其封裝在結構體_swReactor中,其定義以下:
struct _swReactor { //超時時間 int32_t timeout_msec; //fd的讀寫事件處理函數 swReactor_handle handle[SW_MAX_FDTYPE]; swReactor_handle write_handle[SW_MAX_FDTYPE]; swReactor_handle error_handle[SW_MAX_FDTYPE]; //fd事件的註冊修改刪除以及wait //函數指針,(以epoll爲例)指向的是epoll_ctl、epoll_wait int (*add)(swReactor *, int fd, int fdtype); int (*set)(swReactor *, int fd, int fdtype); int (*del)(swReactor *, int fd); int (*wait)(swReactor *, struct timeval *); void (*free)(swReactor *); //超時回調函數,結束、開始回調函數 void (*onTimeout)(swReactor *); void (*onFinish)(swReactor *); void (*onBegin)(swReactor *); }
在調用函數PHPCoroutine::create建立協程時,會校驗是否已經初始化_swReactor對象,若是沒有則會調用php_swoole_reactor_init函數建立並初始化main_reactor對象;
void php_swoole_reactor_init() { if (SwooleG.main_reactor == NULL) { SwooleG.main_reactor = (swReactor *) sw_malloc(sizeof(swReactor)); if (swReactor_create(SwooleG.main_reactor, SW_REACTOR_MAXEVENTS) < 0) { } …… php_swoole_register_shutdown_function_prepend("swoole_event_wait"); } }
咱們以epoll爲例,main_reactor各回調函數以下:
reactor->onFinish = swReactor_onFinish; reactor->onTimeout = swReactor_onTimeout; reactor->add = swReactorEpoll_add; reactor->set = swReactorEpoll_set; reactor->del = swReactorEpoll_del; reactor->wait = swReactorEpoll_wait; reactor->free = swReactorEpoll_free;
類Socket封裝了socket讀寫相關的全部操做以及數據結構,其定義以下:
class Socket { public: swConnection *socket = nullptr; //讀寫函數 ssize_t recv(void *__buf, size_t __n); ssize_t send(const void *__buf, size_t __n); …… private: swReactor *reactor = nullptr; Coroutine *read_co = nullptr; Coroutine *write_co = nullptr; //鏈接超時時間,接收數據、發送數據超時時間 double connect_timeout = default_connect_timeout; double read_timeout = default_read_timeout; double write_timeout = default_write_timeout; }
void Socket::init_sock(int _fd) { reactor = SwooleG.main_reactor; //設置協程類型fd(SW_FD_CORO_SOCKET)的讀寫事件處理函數 if (!swReactor_handle_isset(reactor, SW_FD_CORO_SOCKET)) { reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_READ, readable_event_callback); reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_WRITE, writable_event_callback); reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_ERROR, error_event_callback); } }
當咱們調用CoroutineSocket->recv接收數據時,底層實現以下:
Socket::timeout_setter ts(sock->socket, timeout, SW_TIMEOUT_READ); ssize_t bytes = all ? sock->socket->recv_all(ZSTR_VAL(buf), length) : sock->socket->recv(ZSTR_VAL(buf), length);
類timeout_setter會設置socket的接收數據超時時間read_timeout爲timeout。
函數socket->recv_all會循環讀取數據,直到讀取到指定長度的數據,或者底層返回等待標識阻塞當前協程:
ssize_t Socket::recv_all(void *__buf, size_t __n) { timer_controller timer(&read_timer, read_timeout, this, timer_callback); while (true) { do { retval = swConnection_recv(socket, (char *) __buf + total_bytes, __n - total_bytes, 0); } while (retval < 0 && swConnection_error(errno) == SW_WAIT && timer.start() && wait_event(SW_EVENT_READ)); if (unlikely(retval <= 0)) { break; } total_bytes += retval; if ((size_t) total_bytes == __n) { break; } } }
class timer_controller { public: bool start() { if (timeout > 0) { *timer_pp = swTimer_add(&SwooleG.timer, (long) (timeout * 1000), 0, data, callback); } } }
struct _swTimer { swHeap *heap; //最小堆 swHashMap *map; //map,定時器ID做爲key //最先的定時任務觸發時間 long _next_msec; //函數指針,指向swReactorTimer_set int (*set)(swTimer *timer, long exec_msec); //函數指針,指向swReactorTimer_free void (*free)(swTimer *timer); };
if (timer->_next_msec < 0 || timer->_next_msec > _msec) { timer->set(timer, _msec); timer->_next_msec = _msec; } static int swReactorTimer_set(swTimer *timer, long exec_msec) { SwooleG.main_reactor->timeout_msec = exec_msec; return SW_OK; }
bool Socket::wait_event(const enum swEvent_type event, const void **__buf, size_t __n) { if (unlikely(!add_event(event))) { return false; } if (likely(event == SW_EVENT_READ)) { read_co = co; read_co->yield(); read_co = nullptr; } else // if (event == SW_EVENT_WRITE) { write_co = co; write_co->yield(); write_co = nullptr; } }
上面提到,建立協程時,註冊了一個函數swoole_event_wait,在生命週期register_shutdown階段會執行該函數,開始Swoole的事件循環,阻擋了php生命週期的結束。函數swoole_event_wait底層就是調用main_reactor->wait等待fd讀寫事件的產生;咱們以epoll爲例講述事件循環的邏輯:
static int swReactorEpoll_wait(swReactor *reactor, struct timeval *timeo) { while (reactor->running > 0) { n = epoll_wait(epoll_fd, events, max_event_num, swReactor_get_timeout_msec(reactor)); if (n == 0) { if (reactor->onTimeout != NULL) { reactor->onTimeout(reactor); } SW_REACTOR_CONTINUE; } for (i = 0; i < n; i++) { if ((events[i].events & EPOLLIN) && !event.socket->removed) { handle = swReactor_getHandle(reactor, SW_EVENT_READ, event.type); ret = handle(reactor, &event); } if ((events[i].events & EPOLLOUT) && !event.socket->removed) { handle = swReactor_getHandle(reactor, SW_EVENT_WRITE, event.type); ret = handle(reactor, &event); } } } }
int Socket::readable_event_callback(swReactor *reactor, swEvent *event) { Socket *socket = (Socket *) event->socket->object; socket->read_co->resume(); }
while ((tmp = swHeap_top(timer->heap))) { tnode = tmp->data; if (tnode->exec_msec > now_msec || tnode->round == timer->round) { break; } timer->_current_id = tnode->id; if (!tnode->remove) { tnode->callback(timer, tnode); } …… } //該定時任務沒有超時,須要更新須要更新_swTimer中最先的定時任務觸發時間_next_msec long next_msec = tnode->exec_msec - now_msec; if (next_msec <= 0) { next_msec = 1; } //同時更新main_reactor對象的超時時間,實現函數爲swReactorTimer_set timer->set(timer, next_msec);
void Socket::timer_callback(swTimer *timer, swTimer_node *tnode) { Socket *socket = (Socket *) tnode->data; socket->set_err(ETIMEDOUT); if (likely(tnode == socket->read_timer)) { socket->read_timer = nullptr; socket->read_co->resume(); } else if (tnode == socket->write_timer) { socket->write_timer = nullptr; socket->write_co->resume(); } }
Co::sleep()的實現函數爲PHP_METHOD(swoole_coroutine_util, sleep),該函數經過調用Coroutine::sleep實現了協程休眠的功能:
int Coroutine::sleep(double sec) { Coroutine* co = Coroutine::get_current_safe(); if (swTimer_add(&SwooleG.timer, (long) (sec * 1000), 0, co, sleep_timeout) == NULL) { return -1; } co->yield(); return 0; }
能夠看到,與socket讀寫事件超時處理相同,sleep內部實現時經過swTimer_add添加定時任務,同時換出當前協程實現的。該定時任務會致使main_reactor對象的超時時間的改變,即修改了epoll_wait的超時時間。
sleep的超時處理函數爲sleep_timeout,只須要換入該阻塞協程對象便可,實現以下:
static void sleep_timeout(swTimer *timer, swTimer_node *tnode) { ((Coroutine *) tnode->data)->resume(); }