在Swoole4.5
版本中(目前還未發佈),咱們的Server
有一個性能須要優化的地方,就是worker
進程在收到master
進程發來的包的時候,須要進行兩次的拷貝,才能夠把數據從PHP
擴展層傳遞到PHP
上層(也就是咱們事件回調函數的data
參數)。react
咱們先來分析一下爲何會有性能的問題。首先,咱們須要一份會有性能問題的代碼。咱們git clone
下swoole-src
代碼,而後git checkout
到8235c82fea2130534a16fd20771dcab3408a763e
這個commit
位置:git
git checkout 8235c82fea2130534a16fd20771dcab3408a763e
咱們來分析一下代碼,首先看master
進程是如何封裝數據而後發送給worker
進程的。在函數process_send_packet
裏面,咱們看核心的地方:shell
static int process_send_packet(swServer *serv, swPipeBuffer *buf, swSendData *resp, send_func_t _send, void* private_data) { const char* data = resp->data; uint32_t send_n = resp->info.len; off_t offset = 0; uint32_t max_length = serv->ipc_max_size - sizeof(buf->info); if (send_n <= max_length) { buf->info.flags = 0; buf->info.len = send_n; memcpy(buf->data, data, send_n); int retval = _send(serv, buf, sizeof(buf->info) + send_n, private_data); return retval; } buf->info.flags = SW_EVENT_DATA_CHUNK; while (send_n > 0) { if (send_n > max_length) { buf->info.len = max_length; } else { buf->info.flags |= SW_EVENT_DATA_END; buf->info.len = send_n; } memcpy(buf->data, data + offset, buf->info.len); if (_send(serv, buf, sizeof(buf->info) + buf->info.len, private_data) < 0) { return SW_ERR; } send_n -= buf->info.len; offset += buf->info.len; } return SW_OK; }
首先,咱們來講一下process_send_packet
這個函數的參數:服務器
其中,swoole
swServer *serv
就是咱們建立的那個Server
。app
swPipeBuffer *buf
指向的內存裏面的數據須要發送給worker
進程。函數
swSendData *resp
裏面存放了master
進程收到的客戶端數據以及一個swDataHead info
頭部。性能
_send
是一個回調函數,這裏面的邏輯就是master
進程把swPipeBuffer *buf
裏面的數據發送給worker
進程。fetch
void* private_data
這裏是一個swWorker *worker
類型的指針轉換過來的。指定了master
進程須要發送的那個worker
進程。大數據
說明一點,這裏咱們是以Server
設置了eof
選項爲例子講解的(假設設置了"\r\n"
)。由於TCP
是面向字節流的,即便客戶端發送了一個很大的包過來,服務器一次read
出來的數據也不見得很是大。若是不設置eof
的話,是不會致使咱們這篇文章所說的性能問題。
介紹完了process_send_packet
函數的參數以後,咱們來看看代碼是如何實現的:
const char* data = resp->data;
首先,讓data
指向resp->data
,也就是客戶端發來的實際數據。例如,客戶端發來了字符串hello world\r\n
,那麼data
裏面存放的就是hello world\r\n
。
uint32_t send_n = resp->info.len;
標誌着resp->data
數據的長度。例如,客戶端往服務器發送了1M
的數據,那麼resp->info.len
就是1048576
。
off_t offset = 0;
用來標誌哪些數據master
進程已經發送給了worker
進程。
uint32_t max_length = serv->ipc_max_size - sizeof(buf->info);
max_length
表示master
進程一次往worker
進程發送的包最大長度。
注意:master
進程和worker
進程是經過udg
方式進行通訊的。因此,master
進程發送多少,worker
進程就直接收多少
if (send_n <= max_length) { buf->info.flags = 0; buf->info.len = send_n; memcpy(buf->data, data, send_n); int retval = _send(serv, buf, sizeof(buf->info) + send_n, private_data); return retval; }
若是master
進程要發給worker
進程的數據小於max_length
,那麼就直接調用_send
函數,直接把數據發給worker
進程。
buf->info.flags = SW_EVENT_DATA_CHUNK;
當send_n
大於max_length
的時候,設置buf->info.flags
爲CHUNK
,也就意味着須要把客戶端發來的數據先拆分紅一小段一小段的數據,而後再發送給worker
進程。
while (send_n > 0) { if (send_n > max_length) { buf->info.len = max_length; } else { buf->info.flags |= SW_EVENT_DATA_END; buf->info.len = send_n; } memcpy(buf->data, data + offset, buf->info.len); if (_send(serv, buf, sizeof(buf->info) + buf->info.len, private_data) < 0) { return SW_ERR; } send_n -= buf->info.len; offset += buf->info.len; }
邏輯比較簡單,就是一個分段發送的過程。這裏須要注意的兩點:
一、buf->info.len的長度須要更新爲小段的chunk的長度,而不是大數據包的長度 二、最後一個chunk的info.flags須要變成SW_EVENT_DATA_END,意味着一個完整的包已經發完了
OK
,分析完了master
進程發包的過程,咱們來分析一下worker
進程收包的過程。
咱們先看一下函數swWorker_onPipeReceive
:
static int swWorker_onPipeReceive(swReactor *reactor, swEvent *event) { swServer *serv = (swServer *) reactor->ptr; swFactory *factory = &serv->factory; swPipeBuffer *buffer = serv->pipe_buffers[0]; int ret; _read_from_pipe: if (read(event->fd, buffer, serv->ipc_max_size) > 0) { ret = swWorker_onTask(factory, (swEventData *) buffer); if (buffer->info.flags & SW_EVENT_DATA_CHUNK) { //no data if (ret < 0 && errno == EAGAIN) { return SW_OK; } else if (ret > 0) { goto _read_from_pipe; } } return ret; } return SW_ERR; }
這個就是worker
進程接收master
進程發來的數據的代碼。
咱們看的,worker
進程會直接把數據先讀取到buffer
內存裏面,而後調用swWorker_onTask
。咱們再來看看swWorker_onTask
函數:
int swWorker_onTask(swFactory *factory, swEventData *task) { swServer *serv = (swServer *) factory->ptr; swWorker *worker = SwooleWG.worker; //worker busy worker->status = SW_WORKER_BUSY; //packet chunk if (task->info.flags & SW_EVENT_DATA_CHUNK) { if (serv->merge_chunk(serv, task->info.reactor_id, task->data, task->info.len) < 0) { swoole_error_log(SW_LOG_WARNING, SW_ERROR_SESSION_DISCARD_DATA, "cannot merge chunk to worker buffer, data[fd=%d, size=%d] lost", task->info.fd, task->info.len); return SW_OK; } //wait more data if (!(task->info.flags & SW_EVENT_DATA_END)) { return SW_OK; } } switch (task->info.type) { case SW_SERVER_EVENT_SEND_DATA: //discard data if (swWorker_discard_data(serv, task) == SW_TRUE) { break; } swWorker_do_task(serv, worker, task, serv->onReceive); break; // 省略其餘的case default: swWarn("[Worker] error event[type=%d]", (int )task->info.type); break; } //worker idle worker->status = SW_WORKER_IDLE; //maximum number of requests, process will exit. if (!SwooleWG.run_always && worker->request_count >= SwooleWG.max_request) { swWorker_stop(worker); } return SW_OK; }
咱們重點看看性能問題代碼:
if (task->info.flags & SW_EVENT_DATA_CHUNK) { if (serv->merge_chunk(serv, task->info.reactor_id, task->data, task->info.len) < 0) { swoole_error_log(SW_LOG_WARNING, SW_ERROR_SESSION_DISCARD_DATA, "cannot merge chunk to worker buffer, data[fd=%d, size=%d] lost", task->info.fd, task->info.len); return SW_OK; } //wait more data if (!(task->info.flags & SW_EVENT_DATA_END)) { return SW_OK; } }
這裏,worker
進程會先判斷master
發來的數據是不是CHUNK
數據,若是是,那麼會進行merge_chunk
的操做。咱們看看merge_chunk
對應的函數:
static int swServer_worker_merge_chunk(swServer *serv, int key, const char *data, size_t len) { swString *package = swServer_worker_get_input_buffer(serv, key); //merge data to package buffer return swString_append_ptr(package, data, len); }
咱們會先根據key
的值(其實是reactor
線程的id
),獲取一塊全局的內存,而後把接收到的chunk
數據,追加到這個全局的內存上面,而swString_append_ptr
執行的就是memcpy
的操做。
因此,這就是一個性能問題了。worker
進程接收到的全部數據都會被完整的拷貝一遍。若是客戶端發來的數據很大,這個拷貝的開銷也是很大聲的。
所以,咱們對這部分合並的代碼進行了一個優化。咱們讓worker
進程在接收master
進程的數據以前,就準備好一塊足夠大的內存,而後直接把master
進程發來的數據下來便可。
咱們先更新一下swoole-src
的源碼:
git checkout 529ad44d578930b3607abedcfc278364df34bc73
咱們依舊先看看process_send_packet
函數的代碼:
static int process_send_packet(swServer *serv, swPipeBuffer *buf, swSendData *resp, send_func_t _send, void* private_data) { const char* data = resp->data; uint32_t send_n = resp->info.len; off_t offset = 0; uint32_t copy_n; uint32_t max_length = serv->ipc_max_size - sizeof(buf->info); if (send_n <= max_length) { buf->info.flags = 0; buf->info.len = send_n; memcpy(buf->data, data, send_n); int retval = _send(serv, buf, sizeof(buf->info) + send_n, private_data); return retval; } buf->info.flags = SW_EVENT_DATA_CHUNK; buf->info.len = send_n; while (send_n > 0) { if (send_n > max_length) { copy_n = max_length; } else { buf->info.flags |= SW_EVENT_DATA_END; copy_n = send_n; } memcpy(buf->data, data + offset, copy_n); swTrace("finish, type=%d|len=%d", buf->info.type, copy_n); if (_send(serv, buf, sizeof(buf->info) + copy_n, private_data) < 0) { return SW_ERR; } send_n -= copy_n; offset += copy_n; } return SW_OK; }
咱們聚焦修改的地方,主要是對CHUNK
的處理:
buf->info.flags = SW_EVENT_DATA_CHUNK; buf->info.len = send_n;
咱們發現,buf->info.len
的長度不是每一個小段chunk
的長度了,而是整個大包的長度了。爲何能夠這樣作呢?由於master
進程與worker
進程是經過udg
進行通訊的,因此,worker
進程在調用recv
的時候,返回值實際上就是chunk
的長度了,因此buf->info.len
裏面存儲chunk
的長度沒有必要。
其餘地方的邏輯和以前的代碼沒有區別。
咱們再來看看worker
進程是如何接收master
進程發來的數據的。在函數swWorker_onPipeReceive
裏面:
static int swWorker_onPipeReceive(swReactor *reactor, swEvent *event) { int ret; ssize_t recv_n = 0; swServer *serv = (swServer *) reactor->ptr; swFactory *factory = &serv->factory; swPipeBuffer *pipe_buffer = serv->pipe_buffers[0]; void *buffer; struct iovec buffers[2]; // peek recv_n = recv(event->fd, &pipe_buffer->info, sizeof(pipe_buffer->info), MSG_PEEK); if (recv_n < 0 && errno == EAGAIN) { return SW_OK; } else if (recv_n < 0) { return SW_ERR; } if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK) { buffer = serv->get_buffer(serv, &pipe_buffer->info); _read_from_pipe: buffers[0].iov_base = &pipe_buffer->info; buffers[0].iov_len = sizeof(pipe_buffer->info); buffers[1].iov_base = buffer; buffers[1].iov_len = serv->ipc_max_size - sizeof(pipe_buffer->info); recv_n = readv(event->fd, buffers, 2); if (recv_n < 0 && errno == EAGAIN) { return SW_OK; } if (recv_n > 0) { serv->add_buffer_len(serv, &pipe_buffer->info, recv_n - sizeof(pipe_buffer->info)); } if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK) { //wait more chunk data if (!(pipe_buffer->info.flags & SW_EVENT_DATA_END)) { goto _read_from_pipe; } else { pipe_buffer->info.flags |= SW_EVENT_DATA_OBJ_PTR; /** * Because we don't want to split the swEventData parameters into swDataHead and data, * we store the value of the worker_buffer pointer in swEventData.data. * The value of this pointer will be fetched in the swServer_worker_get_packet function. */ serv->copy_buffer_addr(serv, pipe_buffer); } } } else { recv_n = read(event->fd, pipe_buffer, serv->ipc_max_size); } if (recv_n > 0) { ret = swWorker_onTask(factory, (swEventData *) pipe_buffer, recv_n - sizeof(pipe_buffer->info)); return ret; } return SW_ERR; }
其中,
recv_n = recv(event->fd, &pipe_buffer->info, sizeof(pipe_buffer->info), MSG_PEEK); if (recv_n < 0 && errno == EAGAIN) { return SW_OK; } else if (recv_n < 0) { return SW_ERR; }
咱們先對內核緩衝區裏面的數據進行一次peek
操做,來獲取到head
部分。這樣咱們就知道數據是不是以CHUNK
方式發來的了。
if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK) { buffer = serv->get_buffer(serv, &pipe_buffer->info); _read_from_pipe: buffers[0].iov_base = &pipe_buffer->info; buffers[0].iov_len = sizeof(pipe_buffer->info); buffers[1].iov_base = buffer; buffers[1].iov_len = serv->ipc_max_size - sizeof(pipe_buffer->info); recv_n = readv(event->fd, buffers, 2); if (recv_n < 0 && errno == EAGAIN) { return SW_OK; } if (recv_n > 0) { serv->add_buffer_len(serv, &pipe_buffer->info, recv_n - sizeof(pipe_buffer->info)); } if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK) { //wait more chunk data if (!(pipe_buffer->info.flags & SW_EVENT_DATA_END)) { goto _read_from_pipe; } else { pipe_buffer->info.flags |= SW_EVENT_DATA_OBJ_PTR; /** * Because we don't want to split the swEventData parameters into swDataHead and data, * we store the value of the worker_buffer pointer in swEventData.data. * The value of this pointer will be fetched in the swServer_worker_get_packet function. */ serv->copy_buffer_addr(serv, pipe_buffer); } } }
若是是CHUNK
方式發來的數據,那麼咱們執行以下的操做:
buffer = serv->get_buffer(serv, &pipe_buffer->info);
get_buffer
是一個回調函數,對應:
static void* swServer_worker_get_buffer(swServer *serv, swDataHead *info) { swString *worker_buffer = swServer_worker_get_input_buffer(serv, info->reactor_id); if (worker_buffer->size < info->len) { swString_extend(worker_buffer, info->len); } return worker_buffer->str + worker_buffer->length; }
這裏,咱們會先判斷這塊全局的buffer
是否足夠的大,能夠接收完整個大包,若是不夠大,咱們擴容到足夠的大。
_read_from_pipe: buffers[0].iov_base = &pipe_buffer->info; buffers[0].iov_len = sizeof(pipe_buffer->info); buffers[1].iov_base = buffer; buffers[1].iov_len = serv->ipc_max_size - sizeof(pipe_buffer->info); recv_n = readv(event->fd, buffers, 2);
而後,咱們調用readv
,把head
和實際的數據分別存在了兩個地方。這麼作是避免爲了把head
和實際的數據作拆分而致使的內存拷貝。
經過以上方式,Swoole Server
減小了一次內存拷貝。