做者:黃漢韜 原文:https://mp.weixin.qq.com/s/A4...php
在Swoole4.5版本中(目前還未發佈),咱們的Server有一個性能須要優化的地方,就是worker進程在收到master進程發來的包的時候,須要進行兩次的拷貝,才能夠把數據從PHP擴展層傳遞到PHP上層(也就是咱們事件回調函數須要拿到的data)。react
咱們先來分析一下爲何會有性能的問題。git
首先,咱們須要一份會有性能問題的代碼,咱們git clone下swoole-src代碼。web
而後git checkout到8235c82fea2130534a16fd20771dcab3408a763e這個commit位置:服務器
git checkout 8235c82fea2130534a16fd20771dcab3408a763e
咱們來分析一下代碼,首先看master進程是如何封裝數據而後發送給worker進程的。websocket
在函數process_send_packet裏面,咱們看核心的地方:swoole
static int process_send_packet(swServer *serv, swPipeBuffer *buf, swSendData *resp, send_func_t _send, void* private_data) { const char* data = resp->data; uint32_t send_n = resp->info.len; off_t offset = 0; uint32_t max_length = serv->ipc_max_size - sizeof(buf->info); if (send_n <= max_length) { buf->info.flags = 0; buf->info.len = send_n; memcpy(buf->data, data, send_n); int retval = _send(serv, buf, sizeof(buf->info) + send_n, private_data); return retval; } buf->info.flags = SW_EVENT_DATA_CHUNK; while (send_n > 0) { if (send_n > max_length) { buf->info.len = max_length; } else { buf->info.flags |= SW_EVENT_DATA_END; buf->info.len = send_n; } memcpy(buf->data, data + offset, buf->info.len); if (_send(serv, buf, sizeof(buf->info) + buf->info.len, private_data) < 0) { return SW_ERR; } send_n -= buf->info.len; offset += buf->info.len; } return SW_OK; }
首先,咱們來講一下process_send_packet這個函數的參數: app
其中,socket
說明一點,這裏咱們是以Server設置了eof選項爲例子講解的(假設設置了"\r\n")。由於TCP是面向字節流的,即便客戶端發送了一個很大的包過來,服務器一次read出來的數據也不見得很是大。若是不設置eof的話,是不會致使咱們這篇文章所說的性能問題。函數
介紹完了process_send_packet函數的參數以後,咱們來看看代碼是如何實現的:
const char* data = resp->data;
首先,讓data指向resp->data,也就是客戶端發來的實際數據。例如,客戶端發來了字符串hello world\r\n,那麼data裏面存放的就是hello world\r\n。
uint32_t send_n = resp->info.len;
標誌着resp->data數據的長度。例如,客戶端往服務器發送了1M的數據,那麼resp->info.len就是1048576。
off_t offset = 0;
用來標誌哪些數據master進程已經發送給了worker進程。
uint32_t max_length = serv->ipc_max_size - sizeof(buf->info);
max_length表示master進程一次往worker進程發送的包最大長度。
注意:master進程和worker進程是經過udg方式進行通訊的。因此,master進程發送多少,worker進程就直接收多少
if (send_n <= max_length) { buf->info.flags = 0; buf->info.len = send_n; memcpy(buf->data, data, send_n); int retval = _send(serv, buf, sizeof(buf->info) + send_n, private_data); return retval; }
若是master進程要發給worker進程的數據小於max_length,那麼就直接調用_send函數,直接把數據發給worker進程。
buf->info.flags = SW_EVENT_DATA_CHUNK;
當send_n大於max_length的時候,設置buf->info.flags爲CHUNK,也就意味着須要把客戶端發來的數據先拆分紅一小段一小段的數據,而後再發送給worker進程。
while (send_n > 0) { if (send_n > max_length) { buf->info.len = max_length; } else { buf->info.flags |= SW_EVENT_DATA_END; buf->info.len = send_n; } memcpy(buf->data, data + offset, buf->info.len); if (_send(serv, buf, sizeof(buf->info) + buf->info.len, private_data) < 0) { return SW_ERR; } send_n -= buf->info.len; offset += buf->info.len; }
邏輯比較簡單,就是一個分段發送的過程。這裏須要注意的兩點:
一、buf->info.len的長度須要更新爲小段的chunk的長度,而不是大數據包的長度
OK,分析完了master進程發包的過程,咱們來分析一下worker進程收包的過程。
咱們先看一下函數swWorker_onPipeReceive:
static int swWorker_onPipeReceive(swReactor *reactor, swEvent *event) { swServer *serv = (swServer *) reactor->ptr; swFactory *factory = &serv->factory; swPipeBuffer *buffer = serv->pipe_buffers[0]; int ret; _read_from_pipe: if (read(event->fd, buffer, serv->ipc_max_size) > 0) { ret = swWorker_onTask(factory, (swEventData *) buffer); if (buffer->info.flags & SW_EVENT_DATA_CHUNK) { //no data if (ret < 0 && errno == EAGAIN) { return SW_OK; } else if (ret > 0) { goto _read_from_pipe; } } return ret; } return SW_ERR; }
這個就是worker進程接收master進程發來的數據的代碼。咱們看到,worker進程會直接把數據先讀取到buffer內存裏面,而後調用swWorker_onTask。
咱們再來看看swWorker_onTask函數:
int swWorker_onTask(swFactory *factory, swEventData *task) { swServer *serv = (swServer *) factory->ptr; swWorker *worker = SwooleWG.worker; //worker busy worker->status = SW_WORKER_BUSY; //packet chunk if (task->info.flags & SW_EVENT_DATA_CHUNK) { if (serv->merge_chunk(serv, task->info.reactor_id, task->data, task->info.len) < 0) { swoole_error_log(SW_LOG_WARNING, SW_ERROR_SESSION_DISCARD_DATA, "cannot merge chunk to worker buffer, data[fd=%d, size=%d] lost", task->info.fd, task->info.len); return SW_OK; } //wait more data if (!(task->info.flags & SW_EVENT_DATA_END)) { return SW_OK; } } switch (task->info.type) { case SW_SERVER_EVENT_SEND_DATA: //discard data if (swWorker_discard_data(serv, task) == SW_TRUE) { break; } swWorker_do_task(serv, worker, task, serv->onReceive); break; // 省略其餘的case default: swWarn("[Worker] error event[type=%d]", (int )task->info.type); break; } //worker idle worker->status = SW_WORKER_IDLE; //maximum number of requests, process will exit. if (!SwooleWG.run_always && worker->request_count >= SwooleWG.max_request) { swWorker_stop(worker); } return SW_OK; }
咱們重點看看性能問題代碼:
if (task->info.flags & SW_EVENT_DATA_CHUNK) { if (serv->merge_chunk(serv, task->info.reactor_id, task->data, task->info.len) < 0) { swoole_error_log(SW_LOG_WARNING, SW_ERROR_SESSION_DISCARD_DATA, "cannot merge chunk to worker buffer, data[fd=%d, size=%d] lost", task->info.fd, task->info.len); return SW_OK; } //wait more data if (!(task->info.flags & SW_EVENT_DATA_END)) { return SW_OK; } }
這裏,worker進程會先判斷master發來的數據是不是CHUNK數據,若是是,那麼會進行merge_chunk的操做。咱們看看merge_chunk對應的函數:
static int swServer_worker_merge_chunk(swServer *serv, int key, const char *data, size_t len) { swString *package = swServer_worker_get_input_buffer(serv, key); //merge data to package buffer return swString_append_ptr(package, data, len); }
咱們會先根據key的值(其實是reactor線程的id),獲取一塊全局的內存,而後把接收到的chunk數據,追加到這個全局內存上面,而swString_append_ptr執行的就是memcpy操做。
因此,這就是一個性能問題了。worker進程接收到的全部數據都會被完整的拷貝一遍。若是客戶端發來的數據很大,這個拷貝的開銷也是很大的。
咱們再看看Swoole內核是如何把data提供給PHP應用層的,主要函數是:
void php_swoole_get_recv_data(swServer *serv, zval *zdata, swEventData *req, char *header, uint32_t header_length) { char *data = NULL; size_t length = serv->get_packet(serv, req, &data); if (header_length >= length) { ZVAL_EMPTY_STRING(zdata); } else { ZVAL_STRINGL(zdata, data + header_length, length - header_length); } if (header_length > 0) { memcpy(header, data, header_length); } }
程序會進入如下代碼:
ZVAL_STRINGL(zdata, data + header_length, length - header_length);
這個地方是經過ZVAL_STRINGL來建立zend_string的,也就意味着PHP底層會把Swoole內核中存儲的data完整的拷貝一份到zend_string裏面。而後再讓zdata(也就是PHP應用層會用到的data)的zend_value指針指向這個zend_string。這裏,又多了一次完整的內存拷貝。
上述過程咱們能夠經過下面這幅圖總結:
所以,咱們對這部分合並的代碼進行了一個優化。咱們讓worker進程在接收master進程數據以前,就準備好一塊足夠大的內存,而後直接用這塊內存把master進程發來的數據接收便可。
咱們先更新一下swoole-src的源碼:
git checkout 529ad44d578930b3607abedcfc278364df34bc73
咱們依舊先看看process_send_packet函數的代碼:
static int process_send_packet(swServer *serv, swPipeBuffer *buf, swSendData *resp, send_func_t _send, void* private_data) { const char* data = resp->data; uint32_t send_n = resp->info.len; off_t offset = 0; uint32_t copy_n; uint32_t max_length = serv->ipc_max_size - sizeof(buf->info); if (send_n <= max_length) { buf->info.flags = 0; buf->info.len = send_n; memcpy(buf->data, data, send_n); int retval = _send(serv, buf, sizeof(buf->info) + send_n, private_data); return retval; } buf->info.flags = SW_EVENT_DATA_CHUNK; buf->info.len = send_n; while (send_n > 0) { if (send_n > max_length) { copy_n = max_length; } else { buf->info.flags |= SW_EVENT_DATA_END; copy_n = send_n; } memcpy(buf->data, data + offset, copy_n); swTrace("finish, type=%d|len=%d", buf->info.type, copy_n); if (_send(serv, buf, sizeof(buf->info) + copy_n, private_data) < 0) { return SW_ERR; } send_n -= copy_n; offset += copy_n; } return SW_OK; }
咱們聚焦修改的地方,主要是對CHUNK的處理:
buf->info.flags = SW_EVENT_DATA_CHUNK;
buf->info.len = send_n;
咱們發現,buf->info.len的長度不是每一個小段chunk的長度了,而是整個大包的長度了。爲何能夠這樣作呢?由於master進程與worker進程是經過udg進行通訊的,因此,worker進程在調用recv的時候,返回值實際上就是chunk的長度了,因此buf->info.len裏面存儲chunk的長度沒有必要。
其餘地方的邏輯和以前的代碼沒有區別。
咱們再來看看worker進程是如何接收master進程發來的數據的。在函數swWorker_onPipeReceive裏面:
static int swWorker_onPipeReceive(swReactor *reactor, swEvent *event) { int ret; ssize_t recv_n = 0; swServer *serv = (swServer *) reactor->ptr; swFactory *factory = &serv->factory; swPipeBuffer *pipe_buffer = serv->pipe_buffers[0]; void *buffer; struct iovec buffers[2]; // peek recv_n = recv(event->fd, &pipe_buffer->info, sizeof(pipe_buffer->info), MSG_PEEK); if (recv_n < 0 && errno == EAGAIN) { return SW_OK; } else if (recv_n < 0) { return SW_ERR; } if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK) { buffer = serv->get_buffer(serv, &pipe_buffer->info); _read_from_pipe: buffers[0].iov_base = &pipe_buffer->info; buffers[0].iov_len = sizeof(pipe_buffer->info); buffers[1].iov_base = buffer; buffers[1].iov_len = serv->ipc_max_size - sizeof(pipe_buffer->info); recv_n = readv(event->fd, buffers, 2); if (recv_n < 0 && errno == EAGAIN) { return SW_OK; } if (recv_n > 0) { serv->add_buffer_len(serv, &pipe_buffer->info, recv_n - sizeof(pipe_buffer->info)); } if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK) { //wait more chunk data if (!(pipe_buffer->info.flags & SW_EVENT_DATA_END)) { goto _read_from_pipe; } else { pipe_buffer->info.flags |= SW_EVENT_DATA_OBJ_PTR; /** * Because we don't want to split the swEventData parameters into swDataHead and data, * we store the value of the worker_buffer pointer in swEventData.data. * The value of this pointer will be fetched in the swServer_worker_get_packet function. */ serv->copy_buffer_addr(serv, pipe_buffer); } } } else { recv_n = read(event->fd, pipe_buffer, serv->ipc_max_size); } if (recv_n > 0) { ret = swWorker_onTask(factory, (swEventData *) pipe_buffer, recv_n - sizeof(pipe_buffer->info)); return ret; } return SW_ERR; }
其中,
recv_n = recv(event->fd, &pipe_buffer->info, sizeof(pipe_buffer->info), MSG_PEEK); if (recv_n < 0 && errno == EAGAIN) { return SW_OK; } else if (recv_n < 0) { return SW_ERR; }
咱們先對內核緩衝區裏面的數據進行一次peek操做,來獲取到head部分。這樣咱們就知道數據是不是以CHUNK方式發來的了。
if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK) { buffer = serv->get_buffer(serv, &pipe_buffer->info); _read_from_pipe: buffers[0].iov_base = &pipe_buffer->info; buffers[0].iov_len = sizeof(pipe_buffer->info); buffers[1].iov_base = buffer; buffers[1].iov_len = serv->ipc_max_size - sizeof(pipe_buffer->info); recv_n = readv(event->fd, buffers, 2); if (recv_n < 0 && errno == EAGAIN) { return SW_OK; } if (recv_n > 0) { serv->add_buffer_len(serv, &pipe_buffer->info, recv_n - sizeof(pipe_buffer->info)); } if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK) { //wait more chunk data if (!(pipe_buffer->info.flags & SW_EVENT_DATA_END)) { goto _read_from_pipe; } else { pipe_buffer->info.flags |= SW_EVENT_DATA_OBJ_PTR; /** * Because we don't want to split the swEventData parameters into swDataHead and data, * we store the value of the worker_buffer pointer in swEventData.data. * The value of this pointer will be fetched in the swServer_worker_get_packet function. */ serv->copy_buffer_addr(serv, pipe_buffer); } } }
若是是CHUNK方式發來的數據,那麼咱們執行以下的操做:
buffer = serv->get_buffer(serv, &pipe_buffer->info);
get_buffer是一個回調函數,對應:
static void* swServer_worker_get_buffer(swServer *serv, swDataHead *info) { swString *worker_buffer = swServer_worker_get_input_buffer(serv, info->reactor_id); if (worker_buffer->size < info->len) { swString_extend(worker_buffer, info->len); } return worker_buffer->str + worker_buffer->length; }
這裏咱們先判斷這塊全局的buffer是否足夠的大,能夠接收完整個大包。若不夠大,咱們擴容到足夠的大。
_read_from_pipe: buffers[0].iov_base = &pipe_buffer->info; buffers[0].iov_len = sizeof(pipe_buffer->info); buffers[1].iov_base = buffer; buffers[1].iov_len = serv->ipc_max_size - sizeof(pipe_buffer->info); recv_n = readv(event->fd, buffers, 2);
而後,咱們調用readv,把head和實際的數據分別存在了兩個地方。這麼作是避免爲了把head和實際的數據作拆分而致使的內存拷貝。
經過以上方式,Swoole Server減小了一次內存拷貝。
上述過程咱們能夠經過下面這幅圖總結:
從圖中咱們能夠看出,步驟2到步驟3這裏仍是會有一次完整的拷貝,咱們也把它給優化掉了。咱們來看優化後的代碼。
咱們先更新一下swoole-src的源碼:
git checkout 5278bb30c9b6b84753fa1950cef3226f1cfb515c
master進程發送數據到worker進程的代碼沒有變化,主要是worker進程這邊對接收buffer處理的變化。咱們會發現函數swWorker_onPipeReceive沒有任何改動,改動的是處理buffer的幾個回調函數。咱們一一來看下。
首先是函數指針swServer::get_buffer對應了函數php_swoole_server_worker_get_buffer:
static void* php_swoole_server_worker_get_buffer(swServer *serv, swDataHead *info) { zend_string *worker_buffer = php_swoole_server_worker_get_input_buffer(serv, info->reactor_id); if (worker_buffer == NULL) { worker_buffer = zend_string_alloc(info->len, 0); worker_buffer->len = 0; php_swoole_server_worker_set_buffer(serv, info, worker_buffer); } return worker_buffer->val + worker_buffer->len; }
這裏先調用函數php_swoole_server_worker_get_input_buffer來獲取接收master進程發來數據的buffer。若是說沒有獲取到,那麼說明咱們以前的worker_buffer沒有建立或者接收完全部的數據以後被銷燬了,此時咱們須要經過函數zend_string_alloc分配一塊內存。這裏須要注意的一個地方就是,info->len是master進程發送給worker進程的總長度,也就意味着咱們須要把zend_string的len手動初始化爲0。畢竟zend_string沒有offest這個成員,因此這裏咱們只可以把len看成offset來用了。
獲取到zend_string這塊worker buffer以後,咱們就能夠經過readv來讀取master進程發送給worker進程的數據了。獲取完數據以後,咱們調用swServer::add_buffer_len函數指針對應的php_swoole_server_worker_add_buffer_len這個函數來增長偏移量:
static void php_swoole_server_worker_add_buffer_len(swServer *serv, swDataHead *info, size_t len) { zend_string *worker_buffer = php_swoole_server_worker_get_input_buffer(serv, info->reactor_id); worker_buffer->len += len; }
當咱們接收完master進程發送過來的全部數據以後,咱們調用swServer::copy_buffer_addr函數指針對應的php_swoole_server_worker_copy_buffer_addr來把zend_string的地址拷貝到swPipeBuffer::data裏面。
這樣,咱們經過函數指針的簡單替換,實現了C層面的buffer到PHP層面的buffer切換。
最後,咱們須要把接收到的數據,也就是zend_string裏面的數據提供給PHP應用層。咱們來看看swoole_websocket_onMessage這個函數:
int swoole_websocket_onMessage(swServer *serv, swEventData *req) { int fd = req->info.fd; uchar flags = 0; zend_long opcode = 0; zval zdata; char frame_header[2]; memcpy(frame_header, &req->info.ext_flags, sizeof(frame_header)); php_swoole_get_recv_data(serv, &zdata, req); // frame info has already decoded in swWebSocket_dispatch_frame flags = frame_header[0]; opcode = frame_header[1]; if (opcode == WEBSOCKET_OPCODE_CLOSE && !serv->listen_list->open_websocket_close_frame) { zval_ptr_dtor(&zdata); return SW_OK; } #ifdef SW_HAVE_ZLIB /** * RFC 7692 */ if (serv->websocket_compression && (flags & SW_WEBSOCKET_FLAG_RSV1)) { swString_clear(swoole_zlib_buffer); if (!websocket_message_uncompress(swoole_zlib_buffer, Z_STRVAL(zdata), Z_STRLEN(zdata))) { zval_ptr_dtor(&zdata); return SW_OK; } zval_ptr_dtor(&zdata); ZVAL_STRINGL(&zdata, swoole_zlib_buffer->str, swoole_zlib_buffer->length); flags ^= (SW_WEBSOCKET_FLAG_RSV1 | SW_WEBSOCKET_FLAG_COMPRESS); } #endif zend_fcall_info_cache *fci_cache = php_swoole_server_get_fci_cache(serv, req->info.server_fd, SW_SERVER_CB_onMessage); zval args[2]; args[0] = *(zval *) serv->ptr2; php_swoole_websocket_construct_frame(&args[1], opcode, Z_STRVAL(zdata), Z_STRLEN(zdata), flags); zend_update_property_long(swoole_websocket_frame_ce, &args[1], ZEND_STRL("fd"), fd); if (UNEXPECTED(!zend::function::call(fci_cache, 2, args, NULL, SwooleG.enable_coroutine))) { php_swoole_error(E_WARNING, "%s->onMessage handler error", ZSTR_VAL(swoole_websocket_server_ce->name)); serv->close(serv, fd, 0); } zval_ptr_dtor(&zdata); zval_ptr_dtor(&args[1]); return SW_OK; }
其中,php_swoole_get_recv_data函數是用來獲取zend_string數據的,咱們分析代碼:
void php_swoole_get_recv_data(swServer *serv, zval *zdata, swEventData *req) { char *data = NULL; zend_string *worker_buffer; size_t length = serv->get_packet(serv, req, &data); if (length == 0) { ZVAL_EMPTY_STRING(zdata); } else { if (req->info.flags & SW_EVENT_DATA_OBJ_PTR) { worker_buffer = (zend_string *) (data - XtOffsetOf(zend_string, val)); ZVAL_STR(zdata, worker_buffer); } else { ZVAL_STRINGL(zdata, data, length); } } }
由於在swWorker_onPipeReceive函數裏面把req->info.flags設置爲了SW_EVENT_DATA_OBJ_PTR,因此函數會執行如下代碼:
if (req->info.flags & SW_EVENT_DATA_OBJ_PTR) { worker_buffer = (zend_string *) (data - XtOffsetOf(zend_string, val)); ZVAL_STR(zdata, worker_buffer); }
其中,zdata就是PHP應用層會使用到的data,而ZVAL_STR是讓zdata裏面的zend_value直接指向了worker_buffer,沒有任何的內存拷貝。
經過以上方式,Swoole Server再次減小了一次內存拷貝。
上述過程咱們能夠經過下面這幅圖總結:
最終,咱們把4次內存拷貝下降到了1次,所以onMessage回調函數性能提高了4倍。
咱們的壓測程序以下,Server代碼:
<?php use Swoole\WebSocket\Server; $start = microtime(true); $server = new Server("0.0.0.0", 9501); $server->set([ "worker_num" => 1, 'package_max_length' => 1024 * 1024 * 4, ]); $server->on('open', function (Server $server, $request) { }); $server->on('message', function (Server $server, $frame) use ($start) { if ($frame->data === "end") { $end = microtime(true); var_dump(($end - $start) * 1000); } }); $server->on('close', function ($ser, $fd) { echo "client {$fd} closed\n"; }); $server->start();
Client代碼:
<?php use Swoole\Coroutine; use Swoole\Coroutine\Http\Client; use function Co\run; run(function () { $cli = new Client("127.0.0.1", 9501); $ret = $cli->upgrade("/websocket"); if (!$ret) { echo "ERROR\n"; return; } for ($i = 0; $i < 2000; $i++) { $cli->push(str_repeat('a', 0.5 * 1024 * 1024)); } $cli->push("end"); sleep(100000); });
內存拷貝:
CPU使用率:
歡迎關注學而思網校技術團隊;)