Swoole源代碼學習記錄(十二)——ReactorThread模塊

Swoole版本號:1.7.5-stable

Github地址:https://github.com/LinkedDestiny/swoole-src-analysis

這一章將分析Swoole的ReactorThread模塊。儘管叫Thread。但是實際上使用的是swFactoryProcess也就是多進程模式。但是。在ReactorThread中。所有的事件監聽是在線程中執行的(Rango僅僅是簡單提到了PHP不支持多線程安全,詳細緣由還有待請教……),比方在UDP模式下,是針對每一個監聽的host開闢一個線程執行reactor。在TCP模式下,則是開啓指定的reactor_num個線程用於執行reactor。node

那麼OK,先上swReactorThread結構體。該結構體封裝的事實上是一個執行着Reactor的線程Thread的相關信息,其聲明在Server.h文件的104 – 112行。例如如下:react

typedef struct _swReactorThread
{
    pthread_t thread_id;
    swReactor reactor;
    swUdpFd *udp_addrs;
    swMemoryPool *buffer_input;
    swArray *buffer_pipe;
    int c_udp_fd;
} swReactorThread;

當中thread_id爲ReactorThread的id,udp_addrs和c_udp_fd專門用於處理udp請求,buffer_input爲RingBuffer。用於開啓了RingBuffer選項的處理,buffer_pipe用於存放來自管道的數據git

還有一個結構體用來封裝需要傳遞給Thread的參數,其聲明在swoole.h的575 - 579行,例如如下:github

typedef struct _swThreadParam
{
    void *object;
    int pti;
} swThreadParam;

第一個void*指針指向了參數內容的地址。第二個參數標記線程的id(不是pid)緩存

ReactorThread在Server.h中一共同擁有……嗯……12個函數……聲明在Server.h的555 - 568行。 這裏分下類,當中3個函數用於建立、啓動、釋放。歸爲操做類函數。7個函數用於回調操做。歸爲回調函數,剩下2個用於發送數據,歸爲發送函數安全


首先看3個操做類函數,這三個函數的聲明例如如下:swoole

int swReactorThread_create(swServer *serv);
int swReactorThread_start(swServer *serv, swReactor *main_reactor_ptr);
void swReactorThread_free(swServer *serv);

首先是swReactorThread_create函數。該函數實質上並不是建立一個ReactorThread,而是初始化swServer中相關變量,並建立對應的Factory。如下上核心源代碼:多線程

    serv->reactor_threads = SwooleG.memory_pool->alloc(SwooleG.memory_pool, (serv->reactor_num * sizeof(swReactorThread)));
    if (serv->reactor_threads == NULL)
    {
        swError("calloc[reactor_threads] fail.alloc_size=%d", (int )(serv->reactor_num * sizeof(swReactorThread)));
        return SW_ERR;
    }

#ifdef SW_USE_RINGBUFFER
    int i;
    for (i = 0; i < serv->reactor_num; i++)
    {
        serv->reactor_threads[i].buffer_input = swRingBuffer_new(SwooleG.serv->buffer_input_size, 1);
        if (!serv->reactor_threads[i].buffer_input)
        {
            return SW_ERR;
        }
    }
#endif

    serv->connection_list = sw_shm_calloc(serv->max_connection, sizeof(swConnection));
    if (serv->connection_list == NULL)
    {
        swError("calloc[1] failed");
        return SW_ERR;
    }

源代碼解釋:初始化執行reactor的線程池,假設指定使用了RingBuffer,則將reactor_threads裏的輸入緩存區的類型設置爲RingBuffer。隨後。在共享內存中初始化connectoin_list鏈接列表的內存空間。app

    //create factry object
    if (serv->factory_mode == SW_MODE_THREAD)
    {
        if (serv->writer_num < 1)
        {
            swError("Fatal Error: serv->writer_num < 1");
            return SW_ERR;
        }
        ret = swFactoryThread_create(&(serv->factory), serv->writer_num);
    }
    else if (serv->factory_mode == SW_MODE_PROCESS)
    {
        if (serv->writer_num < 1 || serv->worker_num < 1)
        {
            swError("Fatal Error: serv->writer_num < 1 or serv->worker_num < 1");
            return SW_ERR;
        }
        ret = swFactoryProcess_create(&(serv->factory), serv->writer_num, serv->worker_num);
    }
    else
    {
        ret = swFactory_create(&(serv->factory));
    }

源代碼解釋:推斷swServer的factory_mode。socket

假設爲SW_MODE_THREAD(線程模式),則建立FactoryThread。假設爲SW_MODE_PROCESS(進程模式),則建立FactoryProcess。不然,爲SW_MODE_BASE(基礎模式)。建立Factory。

建立完後,就需要啓動了。swReactorThread_start函數卻是真的用於啓動ReactorThread了……核心源代碼例如如下:

    if (serv->have_udp_sock == 1)
    {
        if (swUDPThread_start(serv) < 0)
        {
            swError("udp thread start failed.");
            return SW_ERR;
        }
    }

    //listen TCP
    if (serv->have_tcp_sock == 1)
    {
        //listen server socket
        ret = swServer_listen(serv, main_reactor_ptr);
        if (ret < 0)
        {
            return SW_ERR;
        }
        //create reactor thread
        for (i = 0; i < serv->reactor_num; i++)
        {
            thread = &(serv->reactor_threads[i]);
            param = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(swThreadParam));
            if (param == NULL)
            {
                swError("malloc failed");
                return SW_ERR;
            }

            param->object = serv;
            param->pti = i;

            if (pthread_create(&pidt, NULL, (void * (*)(void *)) swReactorThread_loop_tcp, (void *) param) < 0)
            {
                swError("pthread_create[tcp_reactor] failed. Error: %s[%d]", strerror(errno), errno);
            }
            thread->thread_id = pidt;
        }
    }

    //timer
    if (SwooleG.timer.fd > 0)
    {
        main_reactor_ptr->add(main_reactor_ptr, SwooleG.timer.fd, SW_FD_TIMER);
    }

源代碼解釋:假設swServer需要監聽UDP,則調用swUDPThread_start函數啓動UDP監聽線程;假設swServer需要監聽TCP,首先調用swServer_listen函數在main_reactor中註冊accept監聽,而後建立reactor_num個reactor執行線程用於監聽TCP鏈接的其它事件(讀、寫)。

最後。假設使用了Timer,則將Timer的fd監聽增長到main_reactor中。

swReactorThread_free函數用於釋放全部的正在執行的線程以及相關資源。核心源代碼例如如下:

    if (serv->have_tcp_sock == 1)
    {
        //create reactor thread
        for (i = 0; i < serv->reactor_num; i++)
        {
            thread = &(serv->reactor_threads[i]);
            if (pthread_join(thread->thread_id, NULL))
            {
                swWarn("pthread_join() failed. Error: %s[%d]", strerror(errno), errno);
            }

            for (j = 0; j < serv->worker_num; j++)
            {
                swWorker *worker = swServer_get_worker(serv, i);
                swBuffer *buffer = *(swBuffer **) swArray_fetch(thread->buffer_pipe, worker->pipe_master);
                swBuffer_free(buffer);
            }
            swArray_free(thread->buffer_pipe);

#ifdef SW_USE_RINGBUFFER
            thread->buffer_input->destroy(thread->buffer_input);
#endif
        }
    }

    if (serv->have_udp_sock == 1)
    {
        swListenList_node *listen_host;
        LL_FOREACH(serv->listen_list, listen_host)
        {
            shutdown(listen_host->sock, SHUT_RDWR);
            if (listen_host->type == SW_SOCK_UDP || listen_host->type == SW_SOCK_UDP6 || listen_host->type == SW_SOCK_UNIX_DGRAM)
            {
                if (pthread_join(listen_host->thread_id, NULL))
                {
                    swWarn("pthread_join() failed. Error: %s[%d]", strerror(errno), errno);
                }
            }
        }
    }

源代碼解釋:假設使用了TCP,則遍歷全部的reactor_thread,並調用pthread_join函數結束線程,並釋放線程中用於管道通訊的緩存區。假設使用了RingBuffer,還需要釋放buffer_input輸入緩存。假設使用了UDP。則首先遍歷監聽列表,使用shutdown終止鏈接,而後調用pthread_join函數結束線程。


接着先看發送函數。一共兩個發送函數,一個用於發送數據到client客戶端或者輸出buffer,一個用於發送數據到worker進程。兩個函數的聲明例如如下:

int swReactorThread_send(swSendData *_send);
int swReactorThread_send2worker(void *data, int len, uint16_t target_worker_id);

swReactorThread_send函數用於發送數據到client,也就是經過swConnection發送數據,其核心源代碼例如如下:

    volatile swBuffer_trunk *trunk;
    swConnection *conn = swServer_connection_get(serv, fd);

    if (conn == NULL || conn->active == 0)
    {
        swWarn("Connection[fd=%d] is not exists.", fd);
        return SW_ERR;
    }

#if SW_REACTOR_SCHEDULE == 2
    reactor_id = fd % serv->reactor_num;
#else
    reactor_id = conn->from_id;
#endif

    swTraceLog(SW_TRACE_EVENT, "send-data. fd=%d|reactor_id=%d", fd, reactor_id);
    swReactor *reactor = &(serv->reactor_threads[reactor_id].reactor);

源代碼解釋:假設不是直傳。則需要現將數據放入緩存。首先建立connection的out_buffer輸出緩存,假設發送數據長度爲0,則指定緩存的trunk類型爲SW_TRUNK_CLOSE(關閉鏈接),假設發送數據的類型爲sendfile,則調用swConnection_sendfile函數,不然調用swBuffer_append函數將發送數據增長緩存中。最後,在reactor中設置fd爲可寫狀態。

swReactorThread_send2worker函數在此再也不貼源代碼分析。基本思路就是消息隊列模式就扔隊列不是消息隊列模式就扔緩存或者直接扔管道……應該都看得懂了。

這裏直接上數量最多的回調函數的分析。這幾個回調式用於處理接收數據的,從名字上你們基本能看出,Swoole提供的一些特性比方包長檢測、eof檢測還有UDP的報文接收都是經過這些不一樣的回調來實現的。

首先來看swReactorThread_onReceive_no_buffer函數,這個是最主要的接收函數,沒有緩存,沒有檢測,收到多少數據就發給worker多少數據。如下上核心源代碼:

#ifdef SW_USE_EPOLLET
    n = swRead(event->fd, task.data.data, SW_BUFFER_SIZE);
#else
    //非ET模式會持續通知
    n = swConnection_recv(conn, task.data.data, SW_BUFFER_SIZE, 0);
#endif

    if (n < 0)
    {
        switch (swConnection_error(errno))
        {
        case SW_ERROR:
            swWarn("recv from connection[fd=%d] failed. Error: %s[%d]", event->fd, strerror(errno), errno);
            return SW_OK;
        case SW_CLOSE:
            goto close_fd;
        default:
            return SW_OK;
        }
    }
    //需要檢測errno來區分是EAGAIN仍是ECONNRESET
    else if (n == 0)
    {
        close_fd:
        swTrace("Close Event.FD=%d|From=%d|errno=%d", event->fd, event->from_id, errno);
        swServer_connection_close(serv, event->fd, 1);
        /**
         * skip EPOLLERR
         */
        event->fd = 0;
        return SW_OK;
    }

源代碼解釋:假設使用epoll的ET模式。則調用swRead函數直接從fd中讀取數據。不然,在非ET模式下,調用swConnection_recv函數接收數據。假設接收數據失敗,則依據errno運行相應的操做,假設接收數據爲0,需要關閉鏈接,調用swServer_connection_close函數關閉fd。

        conn->last_time =  SwooleGS->now;

        //heartbeat ping package
        if (serv->heartbeat_ping_length == n)
        {
            if (serv->heartbeat_pong_length > 0)
            {
                send(event->fd, serv->heartbeat_pong, serv->heartbeat_pong_length, 0);
            }
            return SW_OK;
        }

        task.data.info.fd = event->fd;
        task.data.info.from_id = event->from_id;
        task.data.info.len = n;

#ifdef SW_USE_RINGBUFFER

        uint16_t target_worker_id = swServer_worker_schedule(serv, conn->fd);
        swPackage package;

        package.length = task.data.info.len;
        package.data = swReactorThread_alloc(&serv->reactor_threads[SwooleTG.id], package.length);
        task.data.info.type = SW_EVENT_PACKAGE;

        memcpy(package.data, task.data.data, task.data.info.len);
        task.data.info.len = sizeof(package);
        task.target_worker_id = target_worker_id;
        memcpy(task.data.data, &package, sizeof(package));

#else
        task.data.info.type = SW_EVENT_TCP;
        task.target_worker_id = -1;
#endif

        //dispatch to worker process
        ret = factory->dispatch(factory, &task);

#ifdef SW_USE_EPOLLET
        //緩存區還有數據沒讀完,繼續讀。EPOLL的ET模式
        if (sw_errno == EAGAIN)
        {
            swWarn("sw_errno == EAGAIN");
            ret = swReactorThread_onReceive_no_buffer(reactor, event);
        }
#endif

源代碼解釋:首先更新近期收包的時間。隨後,檢測是不是心跳包。假設接收長度等於心跳包的長度並且指定了發送心跳回應,則發送心跳包並返回。假設不是心跳包,則設置接收數據的fd、reactor_id以及長度。假設指定使用了RingBuffer,則需要將數據封裝到swPackage中而後放進ReactorThread的input_buffer中。隨後調用factory的dispatch方法將數據投遞到相應的worker中。最後。假設是LT模式,並且緩存區的數據還沒讀完。則繼續調用swReactorThread_onReceive_no_buffer函數讀取數據。

接下來是swReactorThread_onReceive_buffer_check_length函數。該函數用於接收開啓了包長檢測的數據包。包長檢測是Swoole用於支持固定包頭+包體的本身定義協議的特性,固然有很多小夥伴不理解怎麼使用這個特性……如下上核心源代碼:

        //new package
        if (conn->object == NULL)
        {
            do_parse_package:
            do
            {
                package_total_length = swReactorThread_get_package_length(serv, (void *)tmp_ptr, (uint32_t) tmp_n);

                //Invalid package, close connection
                if (package_total_length < 0)
                {
                    goto close_fd;
                }
                //no package_length
                else if(package_total_length == 0)
                {
                    char recv_buf_again[SW_BUFFER_SIZE];
                    memcpy(recv_buf_again, (void *) tmp_ptr, (uint32_t) tmp_n);
                    do
                    {
                        //前tmp_n個字節存放不完整包頭
                        n = recv(event->fd, (void *)recv_buf_again + tmp_n, SW_BUFFER_SIZE, 0);
                        try_count ++;

                        //連續5次嘗試補齊包頭,認定爲惡意請求
                        if (try_count > 5)
                        {
                            swWarn("No package head. Close connection.");
                            goto close_fd;
                        }
                    }
                    while(n < 0 && errno == EINTR);

                    if (n == 0)
                    {
                        goto close_fd;
                    }
                    tmp_ptr = recv_buf_again;
                    tmp_n = tmp_n + n;

                    goto do_parse_package;
                }
                //complete package
                if (package_total_length <= tmp_n)
                {
                    tmp_package.size = package_total_length;
                    tmp_package.length = package_total_length;
                    tmp_package.str = (void *) tmp_ptr;

                    //swoole_dump_bin(buffer.str, 's', buffer.length);
                    swReactorThread_send_string_buffer(swServer_get_thread(serv, SwooleTG.id), conn, &tmp_package);

                    tmp_ptr += package_total_length;
                    tmp_n -= package_total_length;
                    continue;
                }
                //wait more data
                else
                {
                    if (package_total_length >= serv->package_max_length)
                    {
                        swWarn("Package length more than the maximum size[%d], Close connection.", serv->package_max_length);
                        goto close_fd;
                    }
                    package = swString_new(package_total_length);
                    if (package == NULL)
                    {
                        return SW_ERR;
                    }
                    memcpy(package->str, (void *)tmp_ptr, (uint32_t) tmp_n);
                    package->length += tmp_n;
                    conn->object = (void *) package;
                    break;
                }
            }
            while(tmp_n > 0);
            return SW_OK;
        }

源代碼解釋:假設connection鏈接中沒有緩存數據,則斷定爲新的數據包,進入接收循環。在接收循環中。首先從數據緩存中嘗試獲取包體的長度(這個長度存在包頭中),假設長度小於0,說明這個數據包不合法,丟棄並關閉鏈接。假設長度等於0,說明包頭還沒接收完整,繼續接收數據。假設連續5次補全包頭失敗,則認定爲惡意請求,關閉鏈接;假設長度大於0並且已經接收的數據長度超過或等於包體長度,則說明已經收到一個完整的數據包。經過swReactorThread_send_string_buffer函數將數據包放入緩存;假設已接收數據長度小於包體長度。則將不完整的數據包放入connection的object域。等待下一批數據。

        else
        {
            package = conn->object;
            //swTraceLog(40, "wait_data, size=%d, length=%d", buffer->size, buffer->length);

            /**
             * Also on the require_n byte data is complete.
             */
            int require_n = package->size - package->length;

            /**
             * Data is not complete, continue to wait
             */
            if (require_n > n)
            {
                memcpy(package->str + package->length, recv_buf, n);
                package->length += n;
                return SW_OK;
            }
            else
            {
                memcpy(package->str + package->length, recv_buf, require_n);
                package->length += require_n;
                swReactorThread_send_string_buffer(swServer_get_thread(serv, SwooleTG.id), conn, package);
                swString_free((swString *) package);
                conn->object = NULL;

                /**
                 * Still have the data, to parse.
                 */
                if (n - require_n > 0)
                {
                    tmp_n = n - require_n;
                    tmp_ptr = recv_buf + require_n;
                    goto do_parse_package;
                }
            }
        }

源代碼解釋:假設connecton的object已經存有數據,則先推斷這個數據包還剩下多少個字節未接受,並用當前接收的數據補全數據包。假設數據不夠。則繼續等待下一批數據;假設數據夠。則補全數據包並將數據包發送到緩存中,並清空connection的object域。

假設在補全數據包後仍有剩餘數據,則開始下一次數據包的解析。

接下來分析swReactorThread_onReceive_buffer_check_eof函數。這個函數用於檢測用戶定義的數據包的切割符,用於解決TCP長鏈接發送數據的粘包問題,保證onReceive回調每次拿到的是一個完整的數據包。

如下是核心源代碼:

//讀滿buffer了,可能還有數據
 if ((buffer->trunk_size - trunk->length) == n)
 {
     recv_again = SW_TRUE;
 }

 trunk->length += n;
 buffer->length += n;

 //over max length, will discard
 //TODO write to tmp file.
 if (buffer->length > serv->package_max_length)
 {
     swWarn("Package is too big. package_length=%d", buffer->length);
     goto close_fd;
 }

//        printf("buffer[len=%d][n=%d]-----------------\n", trunk->length, n);
 //((char *)trunk->data)[trunk->length] = 0; //for printf
//        printf("buffer-----------------: %s|fd=%d|len=%d\n", (char *) trunk->data, event->fd, trunk->length);

 //EOF_Check
 isEOF = memcmp(trunk->store.ptr + trunk->length - serv->package_eof_len, serv->package_eof, serv->package_eof_len);
//        printf("buffer ok. EOF=%s|Len=%d|RecvEOF=%s|isEOF=%d\n", serv->package_eof, serv->package_eof_len, (char *)trunk->data + trunk->length - serv->package_eof_len, isEOF);

 //received EOF, will send package to worker
 if (isEOF == 0)
 {
     swReactorThread_send_in_buffer(swServer_get_thread(serv, SwooleTG.id), conn);
     return SW_OK;
 }
 else if (recv_again)
 {
     trunk = swBuffer_new_trunk(buffer, SW_TRUNK_DATA, buffer->trunk_size);
     if (trunk)
     {
         goto recv_data;
     }
 }

源代碼解釋:這裏使用了connection的in_buffer輸入緩存。首先斷定trunk的剩餘空間,假設trunk已經滿了,此時可能還有數據,則需要新開一個trunk接收數據。所以設定recv_again標籤爲true。隨後斷定已經接收的數據長度是否超過了最大包長。假設超過,則丟棄數據包(這裏可以看到TODO標籤,Rango將在後期把超過包長的數據寫入tmp文件裏)。

隨後斷定EOF。將數據末尾的長度爲eof_len的字符串和指定的eof字符串對照,假設相等,則將數據包發送到緩存區,假設不相等,而recv_again標籤爲true。則新開trunk用於接收數據。

這裏需要說明,Rango僅僅是簡單斷定了數據包末尾是否爲eof。而不是在已經接收到的字符串中去匹配eof,所以並不能很是準確的依據eof來拆分數據包。因此各位假設但願能準確解決粘包問題,仍是使用固定包頭+包體這樣的協議格式較好。

剩下的幾個回調函數都較爲簡單。有興趣的讀者可以依據以前的分析本身嘗試解讀一下這幾個函數。

至此,ReactorThread模塊已全部解析完畢。可以看出。ReactorThread模塊主要在處理鏈接接收到的數據,並把這些數據投遞到相應的緩存中交由Worker去處理。所以可以理出一個主要的結構: Reactor響應fd請求->ReactorThread接收數據並放入緩存->ReactorFactory將數據從緩存取出發送給Worker->Worker處理數據。

相關文章
相關標籤/搜索