Swoole Server 消息回調函數4倍性能提高

做者:黃漢韜 原文:https://mp.weixin.qq.com/s/A4...php

前言

在Swoole4.5版本中(目前還未發佈),咱們的Server有一個性能須要優化的地方,就是worker進程在收到master進程發來的包的時候,須要進行兩次的拷貝,才能夠把數據從PHP擴展層傳遞到PHP上層(也就是咱們事件回調函數須要拿到的data)。react

優化前

咱們先來分析一下爲何會有性能的問題。git

首先,咱們須要一份會有性能問題的代碼,咱們git clone下swoole-src代碼。web

而後git checkout到8235c82fea2130534a16fd20771dcab3408a763e這個commit位置:服務器

git checkout 8235c82fea2130534a16fd20771dcab3408a763e

咱們來分析一下代碼,首先看master進程是如何封裝數據而後發送給worker進程的。websocket

在函數process_send_packet裏面,咱們看核心的地方:swoole

static int process_send_packet(swServer *serv, swPipeBuffer *buf, swSendData *resp, send_func_t _send, void* private_data)
{
    const char* data = resp->data;
    uint32_t send_n = resp->info.len;
    off_t offset = 0;

    uint32_t max_length = serv->ipc_max_size - sizeof(buf->info);

    if (send_n <= max_length)
    {
        buf->info.flags = 0;
        buf->info.len = send_n;
        memcpy(buf->data, data, send_n);

        int retval = _send(serv, buf, sizeof(buf->info) + send_n, private_data);
        return retval;
    }

    buf->info.flags = SW_EVENT_DATA_CHUNK;

    while (send_n > 0)
    {
        if (send_n > max_length)
        {
            buf->info.len = max_length;
        }
        else
        {
            buf->info.flags |= SW_EVENT_DATA_END;
            buf->info.len = send_n;
        }

        memcpy(buf->data, data + offset, buf->info.len);

        if (_send(serv, buf, sizeof(buf->info) + buf->info.len, private_data) < 0)
        {
            return SW_ERR;
        }

        send_n -= buf->info.len;
        offset += buf->info.len;
    }

    return SW_OK;
}

首先,咱們來講一下process_send_packet這個函數的參數: app

其中,socket

  • swServer *serv就是咱們建立的那個Server。
  • swPipeBuffer *buf指向的內存裏面的數據須要發送給worker進程。
  • swSendData *resp裏面存放了master進程收到的客戶端數據和一個swDataHead info頭部。
  • _send是一個回調函數,這裏面的邏輯就是master進程把swPipeBuffer *buf裏面的數據發送給worker進程。
  • void* private_data這裏是一個swWorker *worker類型的指針轉換過來的。指定了master進程須要發送的那個worker進程。

說明一點,這裏咱們是以Server設置了eof選項爲例子講解的(假設設置了"\r\n")。由於TCP是面向字節流的,即便客戶端發送了一個很大的包過來,服務器一次read出來的數據也不見得很是大。若是不設置eof的話,是不會致使咱們這篇文章所說的性能問題。函數

介紹完了process_send_packet函數的參數以後,咱們來看看代碼是如何實現的:

const char* data = resp->data;

首先,讓data指向resp->data,也就是客戶端發來的實際數據。例如,客戶端發來了字符串hello world\r\n,那麼data裏面存放的就是hello world\r\n。

uint32_t send_n = resp->info.len;

標誌着resp->data數據的長度。例如,客戶端往服務器發送了1M的數據,那麼resp->info.len就是1048576。

off_t offset = 0;

用來標誌哪些數據master進程已經發送給了worker進程。

uint32_t max_length = serv->ipc_max_size - sizeof(buf->info);

max_length表示master進程一次往worker進程發送的包最大長度。

注意:master進程和worker進程是經過udg方式進行通訊的。因此,master進程發送多少,worker進程就直接收多少

if (send_n <= max_length)
{
    buf->info.flags = 0;
    buf->info.len = send_n;
    memcpy(buf->data, data, send_n);

    int retval = _send(serv, buf, sizeof(buf->info) + send_n, private_data);
    return retval;
}

若是master進程要發給worker進程的數據小於max_length,那麼就直接調用_send函數,直接把數據發給worker進程。

buf->info.flags = SW_EVENT_DATA_CHUNK;

當send_n大於max_length的時候,設置buf->info.flags爲CHUNK,也就意味着須要把客戶端發來的數據先拆分紅一小段一小段的數據,而後再發送給worker進程。

while (send_n > 0)
{
    if (send_n > max_length)
    {
        buf->info.len = max_length;
    }
    else
    {
        buf->info.flags |= SW_EVENT_DATA_END;
        buf->info.len = send_n;
    }

    memcpy(buf->data, data + offset, buf->info.len);

    if (_send(serv, buf, sizeof(buf->info) + buf->info.len, private_data) < 0)
    {
        return SW_ERR;
    }

    send_n -= buf->info.len;
    offset += buf->info.len;
}

邏輯比較簡單,就是一個分段發送的過程。這裏須要注意的兩點:

  • 一、buf->info.len的長度須要更新爲小段的chunk的長度,而不是大數據包的長度

    • 二、最後一個chunk的info.flags須要變成SW_EVENT_DATA_END,意味着一個完整的包已經發完了

OK,分析完了master進程發包的過程,咱們來分析一下worker進程收包的過程。

咱們先看一下函數swWorker_onPipeReceive:

static int swWorker_onPipeReceive(swReactor *reactor, swEvent *event)
{
    swServer *serv = (swServer *) reactor->ptr;
    swFactory *factory = &serv->factory;
    swPipeBuffer *buffer = serv->pipe_buffers[0];
    int ret;
    
    _read_from_pipe:
    
    if (read(event->fd, buffer, serv->ipc_max_size) > 0)
    {
        ret = swWorker_onTask(factory, (swEventData *) buffer);
        if (buffer->info.flags & SW_EVENT_DATA_CHUNK)
        {
            //no data
            if (ret < 0 && errno == EAGAIN)
            {
                return SW_OK;
            }
            else if (ret > 0)
            {
                goto _read_from_pipe;
            }
        }
        return ret;
    }
    
    return SW_ERR;
}

這個就是worker進程接收master進程發來的數據的代碼。咱們看到,worker進程會直接把數據先讀取到buffer內存裏面,而後調用swWorker_onTask。

咱們再來看看swWorker_onTask函數:

int swWorker_onTask(swFactory *factory, swEventData *task)
{
    swServer *serv = (swServer *) factory->ptr;
    swWorker *worker = SwooleWG.worker;
    
    //worker busy
    worker->status = SW_WORKER_BUSY;
    //packet chunk
    if (task->info.flags & SW_EVENT_DATA_CHUNK)
    {
        if (serv->merge_chunk(serv, task->info.reactor_id, task->data, task->info.len) < 0)
        {
            swoole_error_log(SW_LOG_WARNING, SW_ERROR_SESSION_DISCARD_DATA,
                    "cannot merge chunk to worker buffer, data[fd=%d, size=%d] lost", task->info.fd, task->info.len);
            return SW_OK;
        }
        //wait more data
        if (!(task->info.flags & SW_EVENT_DATA_END))
        {
            return SW_OK;
        }
    }
    
    switch (task->info.type)
    {
    case SW_SERVER_EVENT_SEND_DATA:
        //discard data
        if (swWorker_discard_data(serv, task) == SW_TRUE)
        {
            break;
        }
        swWorker_do_task(serv, worker, task, serv->onReceive);
        break;
    // 省略其餘的case
    default:
        swWarn("[Worker] error event[type=%d]", (int )task->info.type);
        break;
    }
    
    //worker idle
    worker->status = SW_WORKER_IDLE;
    
    //maximum number of requests, process will exit.
    if (!SwooleWG.run_always && worker->request_count >= SwooleWG.max_request)
    {
        swWorker_stop(worker);
    }
    return SW_OK;
}

咱們重點看看性能問題代碼:

if (task->info.flags & SW_EVENT_DATA_CHUNK)
{
    if (serv->merge_chunk(serv, task->info.reactor_id, task->data, task->info.len) < 0)
    {
        swoole_error_log(SW_LOG_WARNING, SW_ERROR_SESSION_DISCARD_DATA,
                "cannot merge chunk to worker buffer, data[fd=%d, size=%d] lost", task->info.fd, task->info.len);
        return SW_OK;
    }
    //wait more data
    if (!(task->info.flags & SW_EVENT_DATA_END))
    {
        return SW_OK;
    }
}

這裏,worker進程會先判斷master發來的數據是不是CHUNK數據,若是是,那麼會進行merge_chunk的操做。咱們看看merge_chunk對應的函數:

static int swServer_worker_merge_chunk(swServer *serv, int key, const char *data, size_t len)
{
    swString *package = swServer_worker_get_input_buffer(serv, key);
    //merge data to package buffer
    return swString_append_ptr(package, data, len);
}

咱們會先根據key的值(其實是reactor線程的id),獲取一塊全局的內存,而後把接收到的chunk數據,追加到這個全局內存上面,而swString_append_ptr執行的就是memcpy操做。

因此,這就是一個性能問題了。worker進程接收到的全部數據都會被完整的拷貝一遍。若是客戶端發來的數據很大,這個拷貝的開銷也是很大的。

咱們再看看Swoole內核是如何把data提供給PHP應用層的,主要函數是:

void php_swoole_get_recv_data(swServer *serv, zval *zdata, swEventData *req, char *header, uint32_t header_length)
{
    char *data = NULL;
    
    size_t length = serv->get_packet(serv, req, &data);
    if (header_length >= length)
    {
        ZVAL_EMPTY_STRING(zdata);
    }
    else
    {
        ZVAL_STRINGL(zdata, data + header_length, length - header_length);
    }
    if (header_length > 0)
    {
        memcpy(header, data, header_length);
    }
}

程序會進入如下代碼:

ZVAL_STRINGL(zdata, data + header_length, length - header_length);

這個地方是經過ZVAL_STRINGL來建立zend_string的,也就意味着PHP底層會把Swoole內核中存儲的data完整的拷貝一份到zend_string裏面。而後再讓zdata(也就是PHP應用層會用到的data)的zend_value指針指向這個zend_string。這裏,又多了一次完整的內存拷貝。

上述過程咱們能夠經過下面這幅圖總結:

所以,咱們對這部分合並的代碼進行了一個優化。咱們讓worker進程在接收master進程數據以前,就準備好一塊足夠大的內存,而後直接用這塊內存把master進程發來的數據接收便可。

優化一

咱們先更新一下swoole-src的源碼:

git checkout 529ad44d578930b3607abedcfc278364df34bc73

咱們依舊先看看process_send_packet函數的代碼:

static int process_send_packet(swServer *serv, swPipeBuffer *buf, swSendData *resp, send_func_t _send, void* private_data)
{
    const char* data = resp->data;
    uint32_t send_n = resp->info.len;
    off_t offset = 0;
    uint32_t copy_n;
    
    uint32_t max_length = serv->ipc_max_size - sizeof(buf->info);
    
    if (send_n <= max_length)
    {
        buf->info.flags = 0;
        buf->info.len = send_n;
        memcpy(buf->data, data, send_n);
        
        int retval = _send(serv, buf, sizeof(buf->info) + send_n, private_data);
        return retval;
    }
    
    buf->info.flags = SW_EVENT_DATA_CHUNK;
    buf->info.len = send_n;
    
    while (send_n > 0)
    {
        if (send_n > max_length)
        {
            copy_n = max_length;
        }
        else
        {
            buf->info.flags |= SW_EVENT_DATA_END;
            copy_n = send_n;
        }
        
        memcpy(buf->data, data + offset, copy_n);
        
        swTrace("finish, type=%d|len=%d", buf->info.type, copy_n);
        
        if (_send(serv, buf, sizeof(buf->info) + copy_n, private_data) < 0)
        {
            return SW_ERR;
        }
        
        send_n -= copy_n;
        offset += copy_n;
    }
    
    return SW_OK;
}

咱們聚焦修改的地方,主要是對CHUNK的處理:

buf->info.flags = SW_EVENT_DATA_CHUNK;
buf->info.len = send_n;

咱們發現,buf->info.len的長度不是每一個小段chunk的長度了,而是整個大包的長度了。爲何能夠這樣作呢?由於master進程與worker進程是經過udg進行通訊的,因此,worker進程在調用recv的時候,返回值實際上就是chunk的長度了,因此buf->info.len裏面存儲chunk的長度沒有必要。

其餘地方的邏輯和以前的代碼沒有區別。

咱們再來看看worker進程是如何接收master進程發來的數據的。在函數swWorker_onPipeReceive裏面:

static int swWorker_onPipeReceive(swReactor *reactor, swEvent *event)
{
    int ret;
    ssize_t recv_n = 0;
    swServer *serv = (swServer *) reactor->ptr;
    swFactory *factory = &serv->factory;
    swPipeBuffer *pipe_buffer = serv->pipe_buffers[0];
    void *buffer;
    struct iovec buffers[2];
    
    // peek
    recv_n = recv(event->fd, &pipe_buffer->info, sizeof(pipe_buffer->info), MSG_PEEK);
    if (recv_n < 0 && errno == EAGAIN)
    {
        return SW_OK;
    }
    else if (recv_n < 0)
    {
        return SW_ERR;
    }
    
    if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK)
    {
        buffer = serv->get_buffer(serv, &pipe_buffer->info);
        _read_from_pipe:
        
        buffers[0].iov_base = &pipe_buffer->info;
        buffers[0].iov_len = sizeof(pipe_buffer->info);
        buffers[1].iov_base = buffer;
        buffers[1].iov_len = serv->ipc_max_size - sizeof(pipe_buffer->info);
       
        recv_n = readv(event->fd, buffers, 2);
        if (recv_n < 0 && errno == EAGAIN)
        {
            return SW_OK;
        }
        if (recv_n > 0)
        {
            serv->add_buffer_len(serv, &pipe_buffer->info, recv_n - sizeof(pipe_buffer->info));
        }
        
        if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK)
        {
            //wait more chunk data
            if (!(pipe_buffer->info.flags & SW_EVENT_DATA_END))
            {
                goto _read_from_pipe;
            }
            else
            {
                pipe_buffer->info.flags |= SW_EVENT_DATA_OBJ_PTR;
                /**
                 * Because we don't want to split the swEventData parameters into swDataHead and data,
                 * we store the value of the worker_buffer pointer in swEventData.data.
                 * The value of this pointer will be fetched in the swServer_worker_get_packet function.
                 */
                serv->copy_buffer_addr(serv, pipe_buffer);
            }
        }
    }
    else
    {
        recv_n = read(event->fd, pipe_buffer, serv->ipc_max_size);
    }
    
    if (recv_n > 0)
    {
        ret = swWorker_onTask(factory, (swEventData *) pipe_buffer, recv_n - sizeof(pipe_buffer->info));
        return ret;
    }
    
    return SW_ERR;
}

其中,

recv_n = recv(event->fd, &pipe_buffer->info, sizeof(pipe_buffer->info), MSG_PEEK);
if (recv_n < 0 && errno == EAGAIN)
{
    return SW_OK;
}
else if (recv_n < 0)
{
    return SW_ERR;
}

咱們先對內核緩衝區裏面的數據進行一次peek操做,來獲取到head部分。這樣咱們就知道數據是不是以CHUNK方式發來的了。

if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK)
{
    buffer = serv->get_buffer(serv, &pipe_buffer->info);
    _read_from_pipe:
    
    buffers[0].iov_base = &pipe_buffer->info;
    buffers[0].iov_len = sizeof(pipe_buffer->info);
    buffers[1].iov_base = buffer;
    buffers[1].iov_len = serv->ipc_max_size - sizeof(pipe_buffer->info);
    
    recv_n = readv(event->fd, buffers, 2);
    if (recv_n < 0 && errno == EAGAIN)
    {
        return SW_OK;
    }
    if (recv_n > 0)
    {
        serv->add_buffer_len(serv, &pipe_buffer->info, recv_n - sizeof(pipe_buffer->info));
    }
    
    if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK)
    {
        //wait more chunk data
        if (!(pipe_buffer->info.flags & SW_EVENT_DATA_END))
        {
            goto _read_from_pipe;
        }
        else
        {
            pipe_buffer->info.flags |= SW_EVENT_DATA_OBJ_PTR;
            /**
                * Because we don't want to split the swEventData parameters into swDataHead and data,
                * we store the value of the worker_buffer pointer in swEventData.data.
                * The value of this pointer will be fetched in the swServer_worker_get_packet function.
                */
            serv->copy_buffer_addr(serv, pipe_buffer);
        }
    }
}

若是是CHUNK方式發來的數據,那麼咱們執行以下的操做:

buffer = serv->get_buffer(serv, &pipe_buffer->info);

get_buffer是一個回調函數,對應:

static void* swServer_worker_get_buffer(swServer *serv, swDataHead *info)
{
    swString *worker_buffer = swServer_worker_get_input_buffer(serv, info->reactor_id);
    
    if (worker_buffer->size < info->len)
    {
        swString_extend(worker_buffer, info->len);
    }
    
    return worker_buffer->str + worker_buffer->length;
}

這裏咱們先判斷這塊全局的buffer是否足夠的大,能夠接收完整個大包。若不夠大,咱們擴容到足夠的大。

_read_from_pipe:

buffers[0].iov_base = &pipe_buffer->info;
buffers[0].iov_len = sizeof(pipe_buffer->info);
buffers[1].iov_base = buffer;
buffers[1].iov_len = serv->ipc_max_size - sizeof(pipe_buffer->info);

recv_n = readv(event->fd, buffers, 2);

而後,咱們調用readv,把head和實際的數據分別存在了兩個地方。這麼作是避免爲了把head和實際的數據作拆分而致使的內存拷貝。

經過以上方式,Swoole Server減小了一次內存拷貝。

上述過程咱們能夠經過下面這幅圖總結:

從圖中咱們能夠看出,步驟2到步驟3這裏仍是會有一次完整的拷貝,咱們也把它給優化掉了。咱們來看優化後的代碼。

優化二

咱們先更新一下swoole-src的源碼:

git checkout 5278bb30c9b6b84753fa1950cef3226f1cfb515c

master進程發送數據到worker進程的代碼沒有變化,主要是worker進程這邊對接收buffer處理的變化。咱們會發現函數swWorker_onPipeReceive沒有任何改動,改動的是處理buffer的幾個回調函數。咱們一一來看下。

首先是函數指針swServer::get_buffer對應了函數php_swoole_server_worker_get_buffer:

static void* php_swoole_server_worker_get_buffer(swServer *serv, swDataHead *info)
{
    zend_string *worker_buffer = php_swoole_server_worker_get_input_buffer(serv, info->reactor_id);
   
    if (worker_buffer == NULL)
    {
        worker_buffer = zend_string_alloc(info->len, 0);
        worker_buffer->len = 0;
        php_swoole_server_worker_set_buffer(serv, info, worker_buffer);
    }
  
    return worker_buffer->val + worker_buffer->len;
}

這裏先調用函數php_swoole_server_worker_get_input_buffer來獲取接收master進程發來數據的buffer。若是說沒有獲取到,那麼說明咱們以前的worker_buffer沒有建立或者接收完全部的數據以後被銷燬了,此時咱們須要經過函數zend_string_alloc分配一塊內存。這裏須要注意的一個地方就是,info->len是master進程發送給worker進程的總長度,也就意味着咱們須要把zend_string的len手動初始化爲0。畢竟zend_string沒有offest這個成員,因此這裏咱們只可以把len看成offset來用了。

獲取到zend_string這塊worker buffer以後,咱們就能夠經過readv來讀取master進程發送給worker進程的數據了。獲取完數據以後,咱們調用swServer::add_buffer_len函數指針對應的php_swoole_server_worker_add_buffer_len這個函數來增長偏移量:

static void php_swoole_server_worker_add_buffer_len(swServer *serv, swDataHead *info, size_t len)
{
    zend_string *worker_buffer = php_swoole_server_worker_get_input_buffer(serv, info->reactor_id);
    worker_buffer->len += len;
}

當咱們接收完master進程發送過來的全部數據以後,咱們調用swServer::copy_buffer_addr函數指針對應的php_swoole_server_worker_copy_buffer_addr來把zend_string的地址拷貝到swPipeBuffer::data裏面。

這樣,咱們經過函數指針的簡單替換,實現了C層面的buffer到PHP層面的buffer切換。

最後,咱們須要把接收到的數據,也就是zend_string裏面的數據提供給PHP應用層。咱們來看看swoole_websocket_onMessage這個函數:

int swoole_websocket_onMessage(swServer *serv, swEventData *req)
{
    int fd = req->info.fd;
    uchar flags = 0;
    zend_long opcode = 0;
    
    zval zdata;
    char frame_header[2];
    memcpy(frame_header, &req->info.ext_flags, sizeof(frame_header));
   
    php_swoole_get_recv_data(serv, &zdata, req);
   
    // frame info has already decoded in swWebSocket_dispatch_frame
    flags  = frame_header[0];
    opcode = frame_header[1];
  
    if (opcode == WEBSOCKET_OPCODE_CLOSE && !serv->listen_list->open_websocket_close_frame)
    {
        zval_ptr_dtor(&zdata);
        return SW_OK;
    }

#ifdef SW_HAVE_ZLIB
    /**
     * RFC 7692
     */
    if (serv->websocket_compression && (flags & SW_WEBSOCKET_FLAG_RSV1))
    {
        swString_clear(swoole_zlib_buffer);
        if (!websocket_message_uncompress(swoole_zlib_buffer, Z_STRVAL(zdata), Z_STRLEN(zdata)))
        {
            zval_ptr_dtor(&zdata);
            return SW_OK;
        }
        zval_ptr_dtor(&zdata);
        ZVAL_STRINGL(&zdata, swoole_zlib_buffer->str, swoole_zlib_buffer->length);
        flags ^= (SW_WEBSOCKET_FLAG_RSV1 | SW_WEBSOCKET_FLAG_COMPRESS);
    }
#endif
   
    zend_fcall_info_cache *fci_cache = php_swoole_server_get_fci_cache(serv, req->info.server_fd, SW_SERVER_CB_onMessage);
    zval args[2];
    
    args[0] = *(zval *) serv->ptr2;
    php_swoole_websocket_construct_frame(&args[1], opcode, Z_STRVAL(zdata), Z_STRLEN(zdata), flags);
    zend_update_property_long(swoole_websocket_frame_ce, &args[1], ZEND_STRL("fd"), fd);
   
    if (UNEXPECTED(!zend::function::call(fci_cache, 2, args, NULL, SwooleG.enable_coroutine)))
    {
        php_swoole_error(E_WARNING, "%s->onMessage handler error", ZSTR_VAL(swoole_websocket_server_ce->name));
        serv->close(serv, fd, 0);
    }
    
    zval_ptr_dtor(&zdata);
    zval_ptr_dtor(&args[1]);
    
    return SW_OK;
}

其中,php_swoole_get_recv_data函數是用來獲取zend_string數據的,咱們分析代碼:

void php_swoole_get_recv_data(swServer *serv, zval *zdata, swEventData *req)
{
    char *data = NULL;
    zend_string *worker_buffer;
    
    size_t length = serv->get_packet(serv, req, &data);
    if (length == 0)
    {
        ZVAL_EMPTY_STRING(zdata);
    }
    else
    {
        if (req->info.flags & SW_EVENT_DATA_OBJ_PTR)
        {
            worker_buffer = (zend_string *) (data - XtOffsetOf(zend_string, val));
            ZVAL_STR(zdata, worker_buffer);
        }
        else
        {
            ZVAL_STRINGL(zdata, data, length);
        }
    }
}

由於在swWorker_onPipeReceive函數裏面把req->info.flags設置爲了SW_EVENT_DATA_OBJ_PTR,因此函數會執行如下代碼:

if (req->info.flags & SW_EVENT_DATA_OBJ_PTR)
{
    worker_buffer = (zend_string *) (data - XtOffsetOf(zend_string, val));
    ZVAL_STR(zdata, worker_buffer);
}

其中,zdata就是PHP應用層會使用到的data,而ZVAL_STR是讓zdata裏面的zend_value直接指向了worker_buffer,沒有任何的內存拷貝。

經過以上方式,Swoole Server再次減小了一次內存拷貝。

上述過程咱們能夠經過下面這幅圖總結:

最終,咱們把4次內存拷貝下降到了1次,所以onMessage回調函數性能提高了4倍。

性能對比

咱們的壓測程序以下,Server代碼:

<?php

use Swoole\WebSocket\Server;

$start = microtime(true);

$server = new Server("0.0.0.0", 9501);

$server->set([
    "worker_num" => 1,
    'package_max_length' => 1024 * 1024 * 4,
]);

$server->on('open', function (Server $server, $request) {
});

$server->on('message', function (Server $server, $frame) use ($start) {
    if ($frame->data === "end") {
        $end = microtime(true);
        var_dump(($end - $start) * 1000);
    }
});

$server->on('close', function ($ser, $fd) {
    echo "client {$fd} closed\n";
});

$server->start();

Client代碼:

<?php

use Swoole\Coroutine;
use Swoole\Coroutine\Http\Client;

use function Co\run;

run(function () {
    $cli = new Client("127.0.0.1", 9501);
    $ret = $cli->upgrade("/websocket");
    
    if (!$ret) {
        echo "ERROR\n";
        return;
    }
    
    for ($i = 0; $i < 2000; $i++) {
        $cli->push(str_repeat('a', 0.5 * 1024 * 1024));
    }
   
    $cli->push("end");
    sleep(100000);
});

內存拷貝:

CPU使用率:

總結

    1. 本次優化主要是經過減小內存拷貝來提高Server的性能,這也是服務器優化的重點。
    1. 本次優化咱們經過設計Buffer的接口,來達到切換Swoole內核層Buffer和PHP層Buffer的目的。首先,經過調用get_buffer來獲取到接收數據的zend_string(接收第一個Chunk前,都會分配一個新的zend_string,由於PHP底層會經過內存池來管理,因此分配zend_string的性能開銷不用太擔憂);當接收完數據的時候,調用add_buffer_len來更新zend_string的len,也就是咱們的offset;當咱們接收完全部的Chunk以後,調用copy_buffer_addr來保存zend_string的地址。最後,咱們經過ZVAL_STR來設置PHP應用層要用的data。
    1. 本次優化咱們使用了readv代替read。這麼作的目的是,Swoole進程間通訊的時候,使用了一個header來保存data的信息,例如數據是哪一個reactor線程發來的,發送的數據大小是多少。可是,這個header對於PHP應用層來講是無需關心的,因此,咱們就須要把原本連續的header:data內存單獨分開了,把全部的data拼接起來。可是,這樣就避免不了對data的拷貝了。因此咱們須要經過readv來在接收數據的時候就分離header和data。可是,由於readv是須要指定每一段buffer的長度的,因此咱們在接收第一個Chunk以前,須要調用read的peek方法來獲取到header的長度,可是這個系統調用的開銷是很是小的。

歡迎關注學而思網校技術團隊;)
image.png

相關文章
相關標籤/搜索