Swoole 源碼分析——Server模塊之ReactorThread事件循環(上)

前言

通過 php_swoole_server_before_start 調用 swReactorThread_create 建立了 serv->reactor_threads 對象後,swServer_start 調用 swReactorThread_start 建立了 reactor 多線程。線程在創建之時,就會調用 swReactorThread_loop 函數開啓 reactor 事件循環。php

swServer_master_onAccept 接受鏈接請求

  • swServer_start_proxy 設置了 main_reactor 監聽 socket 的事件回調函數,在 main_reactor 調用 wait 後,若是 listen_list 中有 TCPconnect 請求,reactor 就會調用 swServer_master_onAccept 函數
  • accept4accept 兩個函數惟一的區別在於最後的參數,accept4 能夠將返回的 socket 設置爲相應的文件屬性
  • 若是返回的文件描述符異常node

    • 若是錯誤是 EAGAIN,說明此時沒有鏈接等待接受,那麼能夠返回成功,繼續事件循環
    • 若是錯誤是 EINTR,說明 accept 被信號打斷,繼續調用 accept 便可
    • 若是錯誤是 EMFILE 或者 ENFILE,那麼當前文件描述符已經達到最大,此時應該中止接受鏈接請求
  • 設置 connect_notify 爲 1,告知 reactor 線程須要通知 worker 接受新的鏈接
  • 根據 new_fd 分配其該處理的 reactor 線程,並向該 reactor 線程添加該文件描述符的監控,可是值得注意的是,這時只會監聽寫事件,用於向客戶端說明已接收 accept 請求,並不會監聽讀事件
  • swServer_connection_new 函數用於更新 serv->connection_list[new_fd] 的屬性
int swServer_master_onAccept(swReactor *reactor, swEvent *event)
{
    swServer *serv = reactor->ptr;
    swReactor *sub_reactor;
    swSocketAddress client_addr;
    socklen_t client_addrlen = sizeof(client_addr);
    swListenPort *listen_host = serv->connection_list[event->fd].object;

    int new_fd = 0, reactor_id = 0, i;

    //SW_ACCEPT_AGAIN
    for (i = 0; i < SW_ACCEPT_MAX_COUNT; i++)
    {
#ifdef HAVE_ACCEPT4
        new_fd = accept4(event->fd, (struct sockaddr *) &client_addr, &client_addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
#else
        new_fd = accept(event->fd, (struct sockaddr *) &client_addr, &client_addrlen);
#endif
        if (new_fd < 0)
        {
            switch (errno)
            {
            case EAGAIN:
                return SW_OK;
            case EINTR:
                continue;
            default:
                if (errno == EMFILE || errno == ENFILE)
                {
                    swServer_disable_accept(reactor);
                    reactor->disable_accept = 1;
                }
                swoole_error_log(SW_LOG_ERROR, SW_ERROR_SYSTEM_CALL_FAIL, "accept() failed. Error: %s[%d]", strerror(errno), errno);
                return SW_OK;
            }
        }
#ifndef HAVE_ACCEPT4
        else
        {
            swoole_fcntl_set_option(new_fd, 1, 1);
        }
#endif

        swTrace("[Master] Accept new connection. maxfd=%d|reactor_id=%d|conn=%d", swServer_get_maxfd(serv), reactor->id, new_fd);

        //too many connection
        if (new_fd >= serv->max_connection)
        {
            swoole_error_log(SW_LOG_WARNING, SW_ERROR_SERVER_TOO_MANY_SOCKET, "Too many connections [now: %d].", new_fd);
            close(new_fd);
            return SW_OK;
        }

        if (serv->factory_mode == SW_MODE_SINGLE)
        {
            reactor_id = 0;
        }
        else
        {
            reactor_id = new_fd % serv->reactor_num;
        }

        //add to connection_list
        swConnection *conn = swServer_connection_new(serv, listen_host, new_fd, event->fd, reactor_id);
        memcpy(&conn->info.addr, &client_addr, sizeof(client_addr));
        sub_reactor = &serv->reactor_threads[reactor_id].reactor;
        conn->socket_type = listen_host->type;

#ifdef SW_USE_OPENSSL
        if (listen_host->ssl)
        {
            if (swSSL_create(conn, listen_host->ssl_context, 0) < 0)
            {
                bzero(conn, sizeof(swConnection));
                close(new_fd);
                return SW_OK;
            }
        }
        else
        {
            conn->ssl = NULL;
        }
#endif
        /*
         * [!!!] new_connection function must before reactor->add
         */
        conn->connect_notify = 1;
        if (sub_reactor->add(sub_reactor, new_fd, SW_FD_TCP | SW_EVENT_WRITE) < 0)
        {
            bzero(conn, sizeof(swConnection));
            close(new_fd);
            return SW_OK;
        }

#ifdef SW_ACCEPT_AGAIN
        continue;
#else
        break;
#endif
    }
    return SW_OK;
}

swServer_connection_new 建立新的鏈接對象

  • ls 是負責監聽鏈接的 swListenPort 對象,fd 是已創建鏈接的文件描述符,from_fd 是負責監聽鏈接的文件描述符,reactor_id 是分配給已鏈接的文件描述符的 reactor
  • 若是 ls 設置了 open_tcp_nodelay,那麼就要設置 fdTCP_NODELAY;若是設置了接受、發送緩衝區大小,就要設置 SO_RCVBUFSO_SNDBUF
  • 設置 swConnectionfdfrom_idfrom_fdconnect_timelast_time 等等參數
  • 設置鏈接的 session_id
static swConnection* swServer_connection_new(swServer *serv, swListenPort *ls, int fd, int from_fd, int reactor_id)
{
    swConnection* connection = NULL;

    serv->stats->accept_count++;
    sw_atomic_fetch_add(&serv->stats->connection_num, 1);
    sw_atomic_fetch_add(&ls->connection_num, 1);

    if (fd > swServer_get_maxfd(serv))
    {
        swServer_set_maxfd(serv, fd);
    }

    connection = &(serv->connection_list[fd]);
    bzero(connection, sizeof(swConnection));

    //TCP Nodelay
    if (ls->open_tcp_nodelay)
    {
        int sockopt = 1;
        if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &sockopt, sizeof(sockopt)) < 0)
        {
            swSysError("setsockopt(TCP_NODELAY) failed.");
        }
        connection->tcp_nodelay = 1;
    }

    //socket recv buffer size
    if (ls->kernel_socket_recv_buffer_size > 0)
    {
        if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &ls->kernel_socket_recv_buffer_size, sizeof(int)))
        {
            swSysError("setsockopt(SO_RCVBUF, %d) failed.", ls->kernel_socket_recv_buffer_size);
        }
    }

    //socket send buffer size
    if (ls->kernel_socket_send_buffer_size > 0)
    {
        if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &ls->kernel_socket_send_buffer_size, sizeof(int)) < 0)
        {
            swSysError("setsockopt(SO_SNDBUF, %d) failed.", ls->kernel_socket_send_buffer_size);
        }
    }

    connection->fd = fd;
    connection->from_id = serv->factory_mode == SW_MODE_SINGLE ? SwooleWG.id : reactor_id;
    connection->from_fd = (sw_atomic_t) from_fd;
    connection->connect_time = serv->gs->now;
    connection->last_time = serv->gs->now;
    connection->active = 1;
    connection->buffer_size = ls->socket_buffer_size;

#ifdef SW_REACTOR_SYNC_SEND
    if (serv->factory_mode != SW_MODE_THREAD && !ls->ssl)
    {
        connection->direct_send = 1;
    }
#endif

#ifdef SW_REACTOR_USE_SESSION
    swSession *session;
    sw_spinlock(&serv->gs->spinlock);
    int i;
    uint32_t session_id = serv->gs->session_round;
    //get session id
    for (i = 0; i < serv->max_connection; i++)
    {
        session_id++;
        //SwooleGS->session_round just has 24 bits size;
        if (unlikely(session_id == 1 << 24))
        {
            session_id = 1;
        }
        session = swServer_get_session(serv, session_id);
        //vacancy
        if (session->fd == 0)
        {
            session->fd = fd;
            session->id = session_id;
            session->reactor_id = connection->from_id;
            break;
        }
    }
    serv->gs->session_round = session_id;
    sw_spinlock_release(&serv->gs->spinlock);
    connection->session_id = session_id;
#endif

    return connection;
}

swReactorThread_loop 事件循環

  • reactor 多線程在創建之時,就會調用 swReactorThread_loop 函數開啓 reactor 事件循環。
  • 從參數中獲取當前 reactor 線程的 id
  • 設置線程特有數據 SwooleTGfactory_lock_targetfactory_target_worker 用於後面向 worker 進程傳輸數據時,一次只能傳遞一部分,下次傳輸數據時須要鎖定對應的 worker 進程。
  • swServer_get_thread 用於利用 reactor_id 獲取對應的 swReactorThread 對象
  • 若是設置了 CPU_AFFINITY 選項(將 swoolereactor 線程與對應的 worker 進程綁定到固定的一個核上。能夠避免進程/線程的運行時在多個核之間互相切換,提升 CPU Cache 的命中率),這時要經過 reactor_id 將當前線程綁定到對應的 CPU 核中(worker 進程以相同方式綁定,這樣就實現了 reactor 線程與對應的 worker 進程綁定到固定的一個核上)。
  • 若是開啓了 cpu_affinity_ignore 設置(接受一個數組做爲參數,例如 array(0, 1) 表示不使用 CPU0, CPU1,專門空出來處理網絡中斷。若是當前系統內核與網卡有多隊列特性,網絡中斷會分佈到多核,能夠緩解網絡中斷的壓力,這個時候不須要設置該選項),那麼就要從 serv->cpu_affinity_available 數組中挑選 CPU 進行綁定
  • swReactor_create 創造本線程的 reactor 對象,而且設置 SW_FD_PIPE 的讀寫事件回調函數:swReactorThread_onPipeReceiveswReactorThread_onPipeWrite,用於與 worker 進程進行通訊
  • 若是 server 中存在 UDP 監聽端口,並且該監聽的 socketreactor_id 相對應,那麼向 reactor 對象添加文件描述符進行監聽
  • swReactorThread_set_protocol 用於設置 TCPUDP 的讀寫回調函數: swReactorThread_onPackageswReactorThread_onWriteswReactorThread_onRead 用來接收客戶端傳輸的信息,而且設置監聽 socketonRead 函數、onPackage 函數
  • 構造 pipe_read_list 存儲 pipe
  • 遍歷 serv->workers,找出與當前 reactor 相對應的的 worker,添加 pipe_master 文件描述符到 reactor 進行監控,設置其 serv->connection_list[pipe_master]in_bufferfrom_idobject,當前線程的 notify_pipepipe_read_list
  • 若是開啓了時間輪算法,就要建立 reactor->timewheel 對象,計算 reactor->heartbeat_interval,替代原有的 onFinishonTimeout 回調函數。
static int swReactorThread_loop(swThreadParam *param)
{
    swServer *serv = SwooleG.serv;
    int ret;
    int reactor_id = param->pti;

    pthread_t thread_id = pthread_self();

    SwooleTG.factory_lock_target = 0;
    SwooleTG.factory_target_worker = -1;
    SwooleTG.id = reactor_id;
    SwooleTG.type = SW_THREAD_REACTOR;

    SwooleTG.buffer_stack = swString_new(8192);
    if (SwooleTG.buffer_stack == NULL)
    {
        return SW_ERR;
    }

    swReactorThread *thread = swServer_get_thread(serv, reactor_id);
    swReactor *reactor = &thread->reactor;

    SwooleTG.reactor = reactor;

#ifdef HAVE_CPU_AFFINITY
    //cpu affinity setting
    if (serv->open_cpu_affinity)
    {
        cpu_set_t cpu_set;
        CPU_ZERO(&cpu_set);

        if (serv->cpu_affinity_available_num)
        {
            CPU_SET(serv->cpu_affinity_available[reactor_id % serv->cpu_affinity_available_num], &cpu_set);
        }
        else
        {
            CPU_SET(reactor_id % SW_CPU_NUM, &cpu_set);
        }

        if (0 != pthread_setaffinity_np(thread_id, sizeof(cpu_set), &cpu_set))
        {
            swSysError("pthread_setaffinity_np() failed.");
        }
    }
#endif

    ret = swReactor_create(reactor, SW_REACTOR_MAXEVENTS);
    if (ret < 0)
    {
        return SW_ERR;
    }

    swSignal_none();

    reactor->ptr = serv;
    reactor->id = reactor_id;
    reactor->thread = 1;
    reactor->socket_list = serv->connection_list;
    reactor->max_socket = serv->max_connection;

    reactor->onFinish = NULL;
    reactor->onTimeout = NULL;
    reactor->close = swReactorThread_close;

    reactor->setHandle(reactor, SW_FD_CLOSE, swReactorThread_onClose);
    reactor->setHandle(reactor, SW_FD_PIPE | SW_EVENT_READ, swReactorThread_onPipeReceive);
    reactor->setHandle(reactor, SW_FD_PIPE | SW_EVENT_WRITE, swReactorThread_onPipeWrite);

    //listen UDP
    if (serv->have_udp_sock == 1)
    {
        swListenPort *ls;
        LL_FOREACH(serv->listen_list, ls)
        {
            if (ls->type == SW_SOCK_UDP || ls->type == SW_SOCK_UDP6 || ls->type == SW_SOCK_UNIX_DGRAM)
            {
                if (ls->sock % serv->reactor_num != reactor_id)
                {
                    continue;
                }
                if (ls->type == SW_SOCK_UDP)
                {
                    serv->connection_list[ls->sock].info.addr.inet_v4.sin_port = htons(ls->port);
                }
                else
                {
                    serv->connection_list[ls->sock].info.addr.inet_v6.sin6_port = htons(ls->port);
                }
                serv->connection_list[ls->sock].fd = ls->sock;
                serv->connection_list[ls->sock].socket_type = ls->type;
                serv->connection_list[ls->sock].object = ls;
                ls->thread_id = thread_id;
                reactor->add(reactor, ls->sock, SW_FD_UDP);
            }
        }
    }

    //set protocol function point
    swReactorThread_set_protocol(serv, reactor);

    int i = 0, pipe_fd;
#ifdef SW_USE_RINGBUFFER
    int j = 0;
#endif

    if (serv->factory_mode == SW_MODE_PROCESS)
    {
#ifdef SW_USE_RINGBUFFER
        thread->pipe_read_list = sw_calloc(serv->reactor_pipe_num, sizeof(int));
        if (thread->pipe_read_list == NULL)
        {
            swSysError("thread->buffer_pipe create failed");
            return SW_ERR;
        }
#endif

        for (i = 0; i < serv->worker_num; i++)
        {
            if (i % serv->reactor_num == reactor_id)
            {
                pipe_fd = serv->workers[i].pipe_master;

                //for request
                swBuffer *buffer = swBuffer_new(sizeof(swEventData));
                if (!buffer)
                {
                    swWarn("create buffer failed.");
                    break;
                }
                serv->connection_list[pipe_fd].in_buffer = buffer;

                //for response
                swSetNonBlock(pipe_fd);
                reactor->add(reactor, pipe_fd, SW_FD_PIPE);

                if (thread->notify_pipe == 0)
                {
                    thread->notify_pipe = serv->workers[i].pipe_worker;
                }

                /**
                 * mapping reactor_id and worker pipe
                 */
                serv->connection_list[pipe_fd].from_id = reactor_id;
                serv->connection_list[pipe_fd].fd = pipe_fd;
                serv->connection_list[pipe_fd].object = sw_malloc(sizeof(swLock));

                /**
                 * create pipe lock
                 */
                if (serv->connection_list[pipe_fd].object == NULL)
                {
                    swWarn("create pipe mutex lock failed.");
                    break;
                }
                swMutex_create(serv->connection_list[pipe_fd].object, 0);

#ifdef SW_USE_RINGBUFFER
                thread->pipe_read_list[j] = pipe_fd;
                j++;
#endif
            }
        }
    }

#ifdef SW_USE_TIMEWHEEL
    if (serv->heartbeat_idle_time > 0)
    {
        if (serv->heartbeat_idle_time < SW_TIMEWHEEL_SIZE)
        {
            reactor->timewheel = swTimeWheel_new(serv->heartbeat_idle_time);
            reactor->heartbeat_interval = 1;
        }
        else
        {
            reactor->timewheel = swTimeWheel_new(SW_TIMEWHEEL_SIZE);
            reactor->heartbeat_interval = serv->heartbeat_idle_time / SW_TIMEWHEEL_SIZE;
        }
        reactor->last_heartbeat_time = 0;
        if (reactor->timewheel == NULL)
        {
            swSysError("thread->timewheel create failed.");
            return SW_ERR;
        }
        reactor->timeout_msec = reactor->heartbeat_interval * 1000;
        reactor->onFinish = swReactorThread_onReactorCompleted;
        reactor->onTimeout = swReactorThread_onReactorCompleted;
    }
#endif

    //wait other thread
#ifdef HAVE_PTHREAD_BARRIER
    pthread_barrier_wait(&serv->barrier);
#else
    SW_START_SLEEP;
#endif
    //main loop
    reactor->wait(reactor, NULL);
    //shutdown
    reactor->free(reactor);

#ifdef SW_USE_TIMEWHEEL
    if (reactor->timewheel)
    {
        swTimeWheel_free(reactor->timewheel);
    }
#endif

    swString_free(SwooleTG.buffer_stack);
    pthread_exit(0);
    return SW_OK;
}


void swReactorThread_set_protocol(swServer *serv, swReactor *reactor)
{
    //UDP Packet
    reactor->setHandle(reactor, SW_FD_UDP, swReactorThread_onPackage);
    //Write
    reactor->setHandle(reactor, SW_FD_TCP | SW_EVENT_WRITE, swReactorThread_onWrite);
    //Read
    reactor->setHandle(reactor, SW_FD_TCP | SW_EVENT_READ, swReactorThread_onRead);

    swListenPort *ls;
    //listen the all tcp port
    LL_FOREACH(serv->listen_list, ls)
    {
        if (swSocket_is_dgram(ls->type))
        {
            continue;
        }
        swPort_set_protocol(ls);
    }
}

swReactorThread_onWrite 寫事件回調

  • master 線程的 main_reactor 接受到新的請求後,就會設置相應的 swConnection.connect_notify 爲 1,這個時候 reactor 線程的任務並非向客戶端發送數據,而是向 worker 進程發送 SW_EVENT_CONNECT 事件react

    • 若是使用時間輪算法,那麼就須要調用 swTimeWheel_add 將該 swConnection 對象添加到時間輪的監控中
    • 若是存在 onConnect 回調函數,就要調用 swServer_tcp_notify 函數向 worker 進程發送事件
    • 若是 out_buffer 緩衝區有數據,就將其數據發送給客戶端
    • 若是啓用了 enable_delay_receive 選項,那麼就要把當前鏈接 socketreactor 中刪除,等待服務端調用 $serv->confirm($fd) 對鏈接進行確認;不然就要一併開啓 socket 的可讀事件,讀取客戶端發來的數據。
  • 若是心跳檢測或者時間輪算法檢測到死鏈接,那麼就會重置 close_notify 爲 1,這個時候就要通知 worker 進行關閉事件
  • out_buffer 不爲空,說明此時服務端有數據須要發給客戶端,數據會被存儲在 swBuffer 這個鏈表數據結構中,每一個鏈表元素是一個數據包,此時須要檢驗數據類型是 SW_CHUNK_CLOSESW_CHUNK_SENDFILE 仍是其餘普通數據。
  • swConnection_buffer_send 用於發送普通數據,這個函數會嘗試向 socket 發送一次數據,可能出現的狀況有:web

    • 所有發送成功:繼續循環,發送下一個 buffer
    • 發送部分數據:繼續循環,發送這一個 buffer 的剩餘元素
    • send_wait 爲 1:跳出循環,等待下一次可寫就緒
    • 發生異常:繼續循環,從新發送
    • close_wait 爲 1:鏈接已關閉,關閉這個 socket 文件描述符的監控
  • 若是發送了部分數據,重置 overflow 爲 0
  • 若是 high_watermark 爲 1,說明此前 out_buffer 數據已達到高水位線,此時從新比較 out_buffer 數據大小,若是低於 buffer_low_watermark,就要通知 worker 進程調用 onBufferEmpty 回調函數。
  • 若是 out_buffer 爲空,那麼從新設置 socket 文件描述符的 reactor 監聽事件,刪除寫就緒,只設置讀就緒。這個是水平觸發模式的必要步驟,避免無數據寫入時,頻繁地調用寫就緒回調函數。
static int swReactorThread_onWrite(swReactor *reactor, swEvent *ev)
{
    int ret;
    swServer *serv = SwooleG.serv;
    swBuffer_trunk *chunk;
    int fd = ev->fd;

    if (serv->factory_mode == SW_MODE_PROCESS)
    {
        assert(fd % serv->reactor_num == reactor->id);
        assert(fd % serv->reactor_num == SwooleTG.id);
    }

    swConnection *conn = swServer_connection_get(serv, fd);
    if (conn == NULL || conn->active == 0)
    {
        return SW_ERR;
    }

    swTraceLog(SW_TRACE_REACTOR, "fd=%d, conn->connect_notify=%d, conn->close_notify=%d, serv->disable_notify=%d, conn->close_force=%d",
            fd, conn->connect_notify, conn->close_notify, serv->disable_notify, conn->close_force);

    if (conn->connect_notify)
    {
        conn->connect_notify = 0;
#ifdef SW_USE_TIMEWHEEL
        if (reactor->timewheel)
        {
            swTimeWheel_add(reactor->timewheel, conn);
        }
#endif
#ifdef SW_USE_OPENSSL
        if (conn->ssl)
        {
            goto listen_read_event;
        }
#endif
        //notify worker process
        if (serv->onConnect)
        {
            swServer_tcp_notify(serv, conn, SW_EVENT_CONNECT);
            if (!swBuffer_empty(conn->out_buffer))
            {
                goto _pop_chunk;
            }
        }
        //delay receive, wait resume command.
        if (serv->enable_delay_receive)
        {
            conn->listen_wait = 1;
            return reactor->del(reactor, fd);
        }
        else
        {
#ifdef SW_USE_OPENSSL
            listen_read_event:
#endif
            return reactor->set(reactor, fd, SW_EVENT_TCP | SW_EVENT_READ);
        }
    }
    else if (conn->close_notify)
    {
#ifdef SW_USE_OPENSSL
        if (conn->ssl && conn->ssl_state != SW_SSL_STATE_READY)
        {
            return swReactorThread_close(reactor, fd);
        }
#endif
        swServer_tcp_notify(serv, conn, SW_EVENT_CLOSE);
        conn->close_notify = 0;
        return SW_OK;
    }
    else if (serv->disable_notify && conn->close_force)
    {
        return swReactorThread_close(reactor, fd);
    }

    _pop_chunk: while (!swBuffer_empty(conn->out_buffer))
    {
        chunk = swBuffer_get_trunk(conn->out_buffer);
        if (chunk->type == SW_CHUNK_CLOSE)
        {
            close_fd: reactor->close(reactor, fd);
            return SW_OK;
        }
        else if (chunk->type == SW_CHUNK_SENDFILE)
        {
            ret = swConnection_onSendfile(conn, chunk);
        }
        else
        {
            ret = swConnection_buffer_send(conn);
        }

        if (ret < 0)
        {
            if (conn->close_wait)
            {
                goto close_fd;
            }
            else if (conn->send_wait)
            {
                break;
            }
        }
    }

    if (conn->overflow && conn->out_buffer->length < conn->buffer_size)
    {
        conn->overflow = 0;
    }

    if (serv->onBufferEmpty && conn->high_watermark)
    {
        swListenPort *port = swServer_get_port(serv, fd);
        if (conn->out_buffer->length <= port->buffer_low_watermark)
        {
            conn->high_watermark = 0;
            swServer_tcp_notify(serv, conn, SW_EVENT_BUFFER_EMPTY);
        }
    }

    //remove EPOLLOUT event
    if (!conn->removed && swBuffer_empty(conn->out_buffer))
    {
        reactor->set(reactor, fd, SW_FD_TCP | SW_EVENT_READ);
    }
    return SW_OK;
}

swConnection_buffer_send 普通數據的發送

值得注意的是此函數 conn 中的 socket 文件描述符是非阻塞的,這個函數會嘗試調用一次 swConnection_send 發送數據,可能發生的事件有:redis

  • 所有發送成功:swBuffer_pop_trunk 刪除當前鏈表元素
  • 發送部分數據:增長 offset
  • send_wait 爲 1:告知此時 socket 已不可寫
  • 發生異常:返回錯誤
  • close_wait 爲 1:鏈接已關閉

不管是哪一種狀況,發送數據後都會馬上返回結果,不會阻塞致使 reactor 線程事件循環停滯。算法

int swConnection_buffer_send(swConnection *conn)
{
    int ret, sendn;

    swBuffer *buffer = conn->out_buffer;
    swBuffer_trunk *trunk = swBuffer_get_trunk(buffer);
    sendn = trunk->length - trunk->offset;

    if (sendn == 0)
    {
        swBuffer_pop_trunk(buffer, trunk);
        return SW_OK;
    }

    ret = swConnection_send(conn, trunk->store.ptr + trunk->offset, sendn, 0);
    if (ret < 0)
    {
        switch (swConnection_error(errno))
        {
        case SW_ERROR:
            swWarn("send to fd[%d] failed. Error: %s[%d]", conn->fd, strerror(errno), errno);
            break;
        case SW_CLOSE:
            conn->close_errno = errno;
            conn->close_wait = 1;
            return SW_ERR;
        case SW_WAIT:
            conn->send_wait = 1;
            return SW_ERR;
        default:
            break;
        }
        return SW_OK;
    }
    //trunk full send
    else if (ret == sendn || sendn == 0)
    {
        swBuffer_pop_trunk(buffer, trunk);
    }
    else
    {
        trunk->offset += ret;
    }
    return SW_OK;
}

swReactorThread_onRead 讀就緒事件回調

  • 讀就緒事件發生後,若是使用了時間輪算法,那麼須要更新時間輪的數據
  • 更新 last_timelast_time_usec
  • 調用 port->onRead 函數。值得注意的是,這個 onRead 函數,是在 reactor 線程啓動時,調用 swPort_set_protocol 這個函數設置的。open_length_checkopen_length_check 等等不一樣的設置,onRead 也會不一樣。
static int swReactorThread_onRead(swReactor *reactor, swEvent *event)
{
    swServer *serv = reactor->ptr;
    /**
     * invalid event
     * The server has been actively closed the connection, the client also initiated off, fd has been reused.
     */
    if (event->socket->from_fd == 0)
    {
        return SW_OK;
    }
    swListenPort *port = swServer_get_port(serv, event->fd);
#ifdef SW_USE_OPENSSL
    if (swReactorThread_verify_ssl_state(reactor, port, event->socket) < 0)
    {
        return swReactorThread_close(reactor, event->fd);
    }
#endif

#ifdef SW_USE_TIMEWHEEL
    /**
     * TimeWheel update
     */
    if (reactor->timewheel && swTimeWheel_new_index(reactor->timewheel) != event->socket->timewheel_index)
    {
        swTimeWheel_update(reactor->timewheel, event->socket);
    }
#endif

    event->socket->last_time = serv->gs->now;
#ifdef SW_BUFFER_RECV_TIME
    event->socket->last_time_usec = swoole_microtime();
#endif

    return port->onRead(reactor, port, event);
}

swPort_set_protocol 函數

  • 若是開啓了 open_eof_check 選項,將檢測客戶端鏈接發來的數據,當數據包結尾是指定的字符串時纔會投遞給Worker進程。不然會一直拼接數據包,直到超過緩存區或者超時纔會停止。這個時候,onRead 函數就是 swPort_onRead_check_eof
  • 若是開啓了 open_length_check 選項,包長檢測提供了固定包頭+包體這種格式協議的解析。啓用後,能夠保證Worker進程onReceive每次都會收到一個完整的數據包。這個時候 onRead 函數就是 swPort_onRead_check_length
  • 若是沒有設置任何選項,那麼發送給 worker 的數據包並不保證是完整的,須要用戶本身去拼裝。此時 onRead 函數就是 swPort_onRead_raw
void swPort_set_protocol(swListenPort *ls)
{
    //Thread mode must copy the data.
    //will free after onFinish
    if (ls->open_eof_check)
    {
        if (ls->protocol.package_eof_len > sizeof(ls->protocol.package_eof))
        {
            ls->protocol.package_eof_len = sizeof(ls->protocol.package_eof);
        }
        ls->protocol.onPackage = swReactorThread_dispatch;
        ls->onRead = swPort_onRead_check_eof;
    }
    else if (ls->open_length_check)
    {
        if (ls->protocol.package_length_type != '\0')
        {
            ls->protocol.get_package_length = swProtocol_get_package_length;
        }
        ls->protocol.onPackage = swReactorThread_dispatch;
        ls->onRead = swPort_onRead_check_length;
    }
    else if (ls->open_http_protocol)
    {
        if (ls->open_websocket_protocol)
        {
            ls->protocol.get_package_length = swWebSocket_get_package_length;
            ls->protocol.onPackage = swWebSocket_dispatch_frame;
            ls->protocol.package_length_size = SW_WEBSOCKET_HEADER_LEN + SW_WEBSOCKET_MASK_LEN + sizeof(uint64_t);
        }
#ifdef SW_USE_HTTP2
        else if (ls->open_http2_protocol)
        {
            ls->protocol.get_package_length = swHttp2_get_frame_length;
            ls->protocol.package_length_size = SW_HTTP2_FRAME_HEADER_SIZE;
            ls->protocol.onPackage = swReactorThread_dispatch;
        }
#endif
        ls->onRead = swPort_onRead_http;
    }
    else if (ls->open_mqtt_protocol)
    {
        ls->protocol.get_package_length = swMqtt_get_package_length;
        ls->protocol.onPackage = swReactorThread_dispatch;
        ls->onRead = swPort_onRead_check_length;
    }
    else if (ls->open_redis_protocol)
    {
        ls->protocol.onPackage = swReactorThread_dispatch;
        ls->onRead = swPort_onRead_redis;
    }
    else
    {
        ls->onRead = swPort_onRead_raw;
    }
}

swPort_onRead_raw 函數

  • swPort_onRead_raw 函數是最簡單的發送 worker 進程的函數
  • 調用 swConnection_recv 函數以後,會有三種狀況數組

    • 發生錯誤
    • 未接受到數據,說明鏈接已關閉
    • 接受到數據
  • 接受到數據以後,就要調用 swReactorThread_dispatch 函數將數據發送給相應的 workertask.target_worker_id 被初始化爲 -1。
static int swPort_onRead_raw(swReactor *reactor, swListenPort *port, swEvent *event)
{
    int n;
    swDispatchData task;
    swConnection *conn =  event->socket;

    n = swConnection_recv(conn, task.data.data, SW_BUFFER_SIZE, 0);
    if (n < 0)
    {
        switch (swConnection_error(errno))
        {
        case SW_ERROR:
            swSysError("recv from connection#%d failed.", event->fd);
            return SW_OK;
        case SW_CLOSE:
            conn->close_errno = errno;
            goto close_fd;
        default:
            return SW_OK;
        }
    }
    else if (n == 0)
    {
        close_fd: swReactorThread_onClose(reactor, event);
        return SW_OK;
    }
    else
    {
        task.data.info.fd = event->fd;
        task.data.info.from_id = event->from_id;
        task.data.info.len = n;
        task.data.info.type = SW_EVENT_TCP;
        task.target_worker_id = -1;
        return swReactorThread_dispatch(conn, task.data.data, task.data.info.len);
    }
    return SW_OK;
}

swReactorThread_dispatch 發送數據

  • swReactorThread_dispatch 函數負責向 worker 進程投遞消息,server 的配置不一樣,投遞的方式也不一樣,在本函數中能夠看出,能夠看出有三種區別大的配置:普通模式調度、Stream 模式調度、RINGBUFFER 共享內存池發送數據包
  • 在普通模式中,會將數據包拆分爲多個 SW_BUFFER_SIZE 大小的小包,而後經過 pipe 投遞給 worker 進程,這種模式適用於 SW_DISPATCH_ROUND(輪循模式)、SW_DISPATCH_FDMOD(固定模式)、SW_DISPATCH_QUEUE(搶佔模式)、SW_DISPATCH_IPMOD(IP分配)、SW_DISPATCH_UIDMOD(UID分配)、SW_DISPATCH_USERFUNC(用戶自定義)緩存

    • 這時,全部小的數據包都被打包成 swDispatchData 對象,其 data.info.type 都是 SW_EVENT_PACKAGE_START,只有最後一個數據包類型是 SW_EVENT_PACKAGE_END
    • 值得注意的是 factory_lock_target 這個屬性,這個屬性使得全部的小數據包都發送給同一個 worker 進程
  • Stream 模式調度與以上的模式都不一樣,worker 也不會是由 reactor 線程來指定,而是由 worker 進程本身來 accept,接受 reactor 線程的請求。websocket

    • 當採用 Stream 模式調用的時候,首先須要 swStream_new 新建 swStream 對象,而後利用 swStream_send 函數發送數據
    • 值得注意的是,這個時候 task.data.info.typeSW_EVENT_PACKAGE_ENDtask.data.info.fdconn->session_id 而不是 conn->fdtask.data.info.len 爲 0
    • 具體關於 Stream 模式的流程,咱們在 worker 事件循環來說。
  • RINGBUFFER 共享內存池解決了大包發送的問題,數據包大小將不受限制,一次 IPC 就能夠投遞整個數據包,不再須要拆包,而後屢次調用 send 系統調用。swoole

    • RINGBUFFER 共享內存池須要調用 swReactorThread_alloc 函數從 reactor->buffer_input 中申請內存,將數據複製到共享內存中後,將共享內存的首地址存儲到 swPackage 對象中,再將 swPackage 對象打包到 swDispatchData 對象中。這樣,worker 進程和 reactor 線程之間傳遞的僅僅是共享內存的首地址,無需真正傳遞大數據包,worker 進程獲得首地址後只須要從共享內存中拷貝數據便可。
enum swFactory_dispatch_mode
{
    SW_DISPATCH_ROUND    = 1,
    SW_DISPATCH_FDMOD    = 2,
    SW_DISPATCH_QUEUE    = 3,
    SW_DISPATCH_IPMOD    = 4,
    SW_DISPATCH_UIDMOD   = 5,
    SW_DISPATCH_USERFUNC = 6,
    SW_DISPATCH_STREAM   = 7,
};

typedef struct _swDataHead
{
    int fd;
    uint16_t len;
    int16_t from_id;
    uint8_t type;
    uint8_t flags;
    uint16_t from_fd;
#ifdef SW_BUFFER_RECV_TIME
    double time;
#endif
} swDataHead;

typedef struct _swEventData
{
    swDataHead info;
    char data[SW_BUFFER_SIZE];
} swEventData;

typedef struct
{
    long target_worker_id;
    swEventData data;
} swDispatchData;

typedef struct _swPackage
{
    void *data;
    uint32_t length;
    uint32_t id;
} swPackage;

int swReactorThread_dispatch(swConnection *conn, char *data, uint32_t length)
{
    swFactory *factory = SwooleG.factory;
    swServer *serv = factory->ptr;
    swDispatchData task;

    task.data.info.from_fd = conn->from_fd;
    task.data.info.from_id = conn->from_id;
#ifdef SW_BUFFER_RECV_TIME
    task.data.info.time = conn->last_time_usec;
#endif

    if (serv->dispatch_mode == SW_DISPATCH_STREAM)
    {
        swStream *stream = swStream_new(serv->stream_socket, 0, SW_SOCK_UNIX_STREAM);
        if (stream == NULL)
        {
            return SW_ERR;
        }
        stream->response = swReactorThread_onStreamResponse;
        stream->session_id = conn->session_id;
        swListenPort *port = swServer_get_port(serv, conn->fd);
        swStream_set_max_length(stream, port->protocol.package_max_length);

        task.data.info.fd = conn->session_id;
        task.data.info.type = SW_EVENT_PACKAGE_END;
        task.data.info.len = 0;

        if (swStream_send(stream, (char*) &task.data.info, sizeof(task.data.info)) < 0)
        {
            return SW_ERR;
        }
        if (swStream_send(stream, data, length) < 0)
        {
            stream->cancel = 1;
            return SW_ERR;
        }
        return SW_OK;
    }

    task.data.info.fd = conn->fd;

    swTrace("send string package, size=%ld bytes.", (long)length);

#ifdef SW_USE_RINGBUFFER
    swServer *serv = SwooleG.serv;
    swReactorThread *thread = swServer_get_thread(serv, SwooleTG.id);

    swPackage package;
    package.length = length;
    package.data = swReactorThread_alloc(thread, package.length);

    task.data.info.type = SW_EVENT_PACKAGE;
    task.data.info.len = sizeof(package);

    memcpy(package.data, data, package.length);
    memcpy(task.data.data, &package, sizeof(package));

    task.target_worker_id = swServer_worker_schedule(serv, conn->fd, &task.data);

    //dispatch failed, free the memory.
    if (factory->dispatch(factory, &task) < 0)
    {
        thread->buffer_input->free(thread->buffer_input, package.data);
    }
    else
    {
        return SW_OK;
    }
#else

    task.data.info.type = SW_EVENT_PACKAGE_START;
    task.target_worker_id = -1;

    /**
     * lock target
     */
    SwooleTG.factory_lock_target = 1;

    size_t send_n = length;
    size_t offset = 0;

    while (send_n > 0)
    {
        if (send_n > SW_BUFFER_SIZE)
        {
            task.data.info.len = SW_BUFFER_SIZE;
        }
        else
        {
            task.data.info.type = SW_EVENT_PACKAGE_END;
            task.data.info.len = send_n;
        }

        task.data.info.fd = conn->fd;
        memcpy(task.data.data, data + offset, task.data.info.len);

        send_n -= task.data.info.len;
        offset += task.data.info.len;

        swTrace("dispatch, type=%d|len=%d\n", task.data.info.type, task.data.info.len);

        if (factory->dispatch(factory, &task) < 0)
        {
            break;
        }
    }

    /**
     * unlock
     */
    SwooleTG.factory_target_worker = -1;
    SwooleTG.factory_lock_target = 0;

#endif
    return SW_OK;
}

swReactorThread_alloc 申請共享內存

  • 共享內存是從 buffer_input 中獲取而來,可是若是客戶端發送的數據太快太多,worker 進程來不及消費,那麼共享內存就會不足
  • 當共享內存不足的時候,就須要調用 swReactorThread_yield 方法,暫停向 worker 發送數據,轉而讓 reactor 線程去處理 worker 進程發送過來的消息。
  • 若是 reactor 線程處理完消息,worker 進程尚未釋放共享內存,而且次數達到 SW_RINGBUFFER_WARNING ,那麼就須要 sleep
  • pipe_read_list 是綁定到本 reactor 線程的 pipe_master 列表,與 reactor 線程綁定的 worker 處理消息以後,會向這個 pipe_master 發送消息
static sw_inline void* swReactorThread_alloc(swReactorThread *thread, uint32_t size)
{
    void *ptr = NULL;
    int try_count = 0;

    while (1)
    {
        ptr = thread->buffer_input->alloc(thread->buffer_input, size);
        if (ptr == NULL)
        {
            if (try_count > SW_RINGBUFFER_WARNING)
            {
                swWarn("memory pool is full. Wait memory collect. alloc(%d)", size);
                usleep(1000);
                try_count = 0;
            }
            try_count++;
            swReactorThread_yield(thread);
            continue;
        }
        break;
    }
    //debug("%p\n", ptr);
    return ptr;
}

static sw_inline void swReactorThread_yield(swReactorThread *thread)
{
    swEvent event;
    swServer *serv = SwooleG.serv;
    int i;
    for (i = 0; i < serv->reactor_pipe_num; i++)
    {
        event.fd = thread->pipe_read_list[i];
        swReactorThread_onPipeReceive(&thread->reactor, &event);
    }
    swYield();
}

swFactoryProcess_dispatch 函數

  • swFactoryProcess_dispatch 函數就是上面說的 factory->dispatch 函數,用於調度 worker 進程
  • 本函數主要調用 swServer_worker_schedule 函數來進行調度,決定應該向哪一個 worker 進程發送數據。
  • swReactorThread_send2worker 函數用於發送數據
static sw_inline int swEventData_is_stream(uint8_t type)
{
    switch (type)
    {
    case SW_EVENT_TCP:
    case SW_EVENT_TCP6:
    case SW_EVENT_UNIX_STREAM:
    case SW_EVENT_PACKAGE_START:
    case SW_EVENT_PACKAGE:
    case SW_EVENT_PACKAGE_END:
    case SW_EVENT_CONNECT:
    case SW_EVENT_CLOSE:
    case SW_EVENT_PAUSE_RECV:
    case SW_EVENT_RESUME_RECV:
    case SW_EVENT_BUFFER_FULL:
    case SW_EVENT_BUFFER_EMPTY:
        return SW_TRUE;
    default:
        return SW_FALSE;
    }
}

static int swFactoryProcess_dispatch(swFactory *factory, swDispatchData *task)
{
    uint32_t send_len = sizeof(task->data.info) + task->data.info.len;
    int target_worker_id;
    swServer *serv = SwooleG.serv;
    int fd = task->data.info.fd;

    if (task->target_worker_id < 0)
    {
#ifndef SW_USE_RINGBUFFER
        if (SwooleTG.factory_lock_target)
        {
            if (SwooleTG.factory_target_worker < 0)
            {
                target_worker_id = swServer_worker_schedule(serv, fd, &task->data);
                SwooleTG.factory_target_worker = target_worker_id;
            }
            else
            {
                target_worker_id = SwooleTG.factory_target_worker;
            }
        }
        else
#endif
        {
            target_worker_id = swServer_worker_schedule(serv, fd, &task->data);
        }
    }
    else
    {
        target_worker_id = task->target_worker_id;
    }
    //discard the data packet.
    if (target_worker_id < 0)
    {
        return SW_OK;
    }

    if (swEventData_is_stream(task->data.info.type))
    {
        swConnection *conn = swServer_connection_get(serv, fd);
        if (conn == NULL || conn->active == 0)
        {
            swWarn("dispatch[type=%d] failed, connection#%d is not active.", task->data.info.type, fd);
            return SW_ERR;
        }
        //server active close, discard data.
        if (conn->closed)
        {
            //Connection has been clsoed by server
            if (!(task->data.info.type == SW_EVENT_CLOSE && conn->close_force))
            {
                return SW_OK;
            }
        }
        //converted fd to session_id
        task->data.info.fd = conn->session_id;
        task->data.info.from_fd = conn->from_fd;
    }

    return swReactorThread_send2worker((void *) &(task->data), send_len, target_worker_id);
}

swServer_worker_schedule 調度函數

  • 本函數根據 dispatch_mode 的不一樣,計算 key
  • 值得注意的時候 搶佔模式,其方法就是遍歷 worker,獲取 worker 進程的當前狀態,找到 SW_WORKER_IDLE 空閒的 worker 進程。若是全部 worker 進程都是繁忙的,那麼就退化爲了 SW_DISPATCH_ROUND,無論下一個輪循的 worker 進程會不會第一個處理完畢,這也是 Stream 模式相對於其餘模式的優勢。
static sw_inline int swServer_worker_schedule(swServer *serv, int fd, swEventData *data)
{
    uint32_t key;

    //polling mode
    if (serv->dispatch_mode == SW_DISPATCH_ROUND)
    {
        key = sw_atomic_fetch_add(&serv->worker_round_id, 1);
    }
    //Using the FD touch access to hash
    else if (serv->dispatch_mode == SW_DISPATCH_FDMOD)
    {
        key = fd;
    }
    //Using the IP touch access to hash
    else if (serv->dispatch_mode == SW_DISPATCH_IPMOD)
    {
        swConnection *conn = swServer_connection_get(serv, fd);
        //UDP
        if (conn == NULL)
        {
            key = fd;
        }
        //IPv4
        else if (conn->socket_type == SW_SOCK_TCP)
        {
            key = conn->info.addr.inet_v4.sin_addr.s_addr;
        }
        //IPv6
        else
        {
#ifdef HAVE_KQUEUE
            key = *(((uint32_t *) &conn->info.addr.inet_v6.sin6_addr) + 3);
#else
            key = conn->info.addr.inet_v6.sin6_addr.s6_addr32[3];
#endif
        }
    }
    else if (serv->dispatch_mode == SW_DISPATCH_UIDMOD)
    {
        swConnection *conn = swServer_connection_get(serv, fd);
        if (conn == NULL || conn->uid == 0)
        {
            key = fd;
        }
        else
        {
            key = conn->uid;
        }
    }
    //schedule by dispatch function
    else if (serv->dispatch_mode == SW_DISPATCH_USERFUNC)
    {
        return serv->dispatch_func(serv, swServer_connection_get(serv, fd), data);
    }
    //Preemptive distribution
    else
    {
        int i;
        int found = 0;
        for (i = 0; i < serv->worker_num + 1; i++)
        {
            key = sw_atomic_fetch_add(&serv->worker_round_id, 1) % serv->worker_num;
            if (serv->workers[key].status == SW_WORKER_IDLE)
            {
                found = 1;
                break;
            }
        }
        if (unlikely(found == 0))
        {
            serv->scheduler_warning = 1;
        }
        swTraceLog(SW_TRACE_SERVER, "schedule=%d, round=%d", key, serv->worker_round_id);
        return key;
    }
    return key % serv->worker_num;
}

swReactorThread_send2worker 函數

  • swReactorThread_send2worker 函數嘗試利用非阻塞方式使用系統調用 write
  • 若是失敗,就根據 target_worker_id 獲取相對應的 reactor_id, 將數據放入 in_buffer 當中,設置 pipe_fd 的讀寫就緒監控(swReactorThread_loop 函數中僅僅 add,並無對讀寫就緒事件進行監控),等待着 pipe_master 寫就緒。
int swReactorThread_send2worker(void *data, int len, uint16_t target_worker_id)
{
    swServer *serv = SwooleG.serv;

    assert(target_worker_id < serv->worker_num);

    int ret = -1;
    swWorker *worker = &(serv->workers[target_worker_id]);

    //reactor thread
    if (SwooleTG.type == SW_THREAD_REACTOR)
    {
        int pipe_fd = worker->pipe_master;
        int thread_id = serv->connection_list[pipe_fd].from_id;
        swReactorThread *thread = swServer_get_thread(serv, thread_id);
        swLock *lock = serv->connection_list[pipe_fd].object;

        //lock thread
        lock->lock(lock);

        swBuffer *buffer = serv->connection_list[pipe_fd].in_buffer;
        if (swBuffer_empty(buffer))
        {
            ret = write(pipe_fd, (void *) data, len);
#ifdef HAVE_KQUEUE
            if (ret < 0 && (errno == EAGAIN || errno == ENOBUFS))
#else
            if (ret < 0 && errno == EAGAIN)
#endif
            {
                if (thread->reactor.set(&thread->reactor, pipe_fd, SW_FD_PIPE | SW_EVENT_READ | SW_EVENT_WRITE) < 0)
                {
                    swSysError("reactor->set(%d, PIPE | READ | WRITE) failed.", pipe_fd);
                }
                goto append_pipe_buffer;
            }
        }
        else
        {
            append_pipe_buffer:
            if (swBuffer_append(buffer, data, len) < 0)
            {
                swWarn("append to pipe_buffer failed.");
                ret = SW_ERR;
            }
            else
            {
                ret = SW_OK;
            }
        }
        //release thread lock
        lock->unlock(lock);
    }
    //master/udp thread
    else
    {
        int pipe_fd = worker->pipe_master;
        ret = swSocket_write_blocking(pipe_fd, data, len);
    }
    return ret;
}
相關文章
相關標籤/搜索