Swoole 源碼分析——Server模塊之初始化

前言

本節主要介紹 server 模塊進行初始化的代碼,關於初始化過程當中,各個屬性的意義,能夠參考官方文檔:php

SERVER 配置選項html

關於初始化過程當中,用於監聽的 socket 綁定問題,能夠參考:react

UNP 學習筆記——基本 TCP 套接字編程laravel

UNP 學習筆記——套接字選項編程

構造 server 對象

構造 server 對象最重要的是兩件事:swServer_init 初始化 server、爲 server 添加端口:緩存

PHP_METHOD(swoole_server, __construct)
{
    zend_size_t host_len = 0;
    char *serv_host;
    long sock_type = SW_SOCK_TCP;
    long serv_port = 0;
    long serv_mode = SW_MODE_PROCESS;

    swServer *serv = sw_malloc(sizeof (swServer));
    swServer_init(serv);

    serv->factory_mode = serv_mode;

    if (serv_port == 0 && strcasecmp(serv_host, "SYSTEMD") == 0)
    {
        if (swserver_add_systemd_socket(serv) <= 0)
        {
            swoole_php_fatal_error(E_ERROR, "failed to add systemd socket.");
            return;
        }
    }
    else
    {
        swListenPort *port = swServer_add_port(serv, sock_type, serv_host, serv_port);
    }
}

swServer_init 函數

  • swServer_init 函數主要爲 serv 對象賦值初值,若是想要更改 serv 對象各個屬性,能夠調用 set 函數
  • serv->gs 是全局共享內存
void swServer_init(swServer *serv)
{
    swoole_init();
    bzero(serv, sizeof(swServer));

    serv->factory_mode = SW_MODE_BASE;

    serv->reactor_num = SW_REACTOR_NUM > SW_REACTOR_MAX_THREAD ? SW_REACTOR_MAX_THREAD : SW_REACTOR_NUM;

    serv->dispatch_mode = SW_DISPATCH_FDMOD;

    serv->worker_num = SW_CPU_NUM;
    serv->max_connection = SwooleG.max_sockets < SW_SESSION_LIST_SIZE ? SwooleG.max_sockets : SW_SESSION_LIST_SIZE;

    serv->max_request = 0;
    serv->max_wait_time = SW_WORKER_MAX_WAIT_TIME;

    //http server
    serv->http_parse_post = 1;
    serv->upload_tmp_dir = sw_strdup("/tmp");

    //heartbeat check
    serv->heartbeat_idle_time = SW_HEARTBEAT_IDLE;
    serv->heartbeat_check_interval = SW_HEARTBEAT_CHECK;

    serv->buffer_input_size = SW_BUFFER_INPUT_SIZE;
    serv->buffer_output_size = SW_BUFFER_OUTPUT_SIZE;

    serv->task_ipc_mode = SW_TASK_IPC_UNIXSOCK;

    /**
     * alloc shared memory
     */
    serv->stats = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(swServerStats));
    if (serv->stats == NULL)
    {
        swError("[Master] Fatal Error: failed to allocate memory for swServer->stats.");
    }
    serv->gs = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(swServerGS));
    if (serv->gs == NULL)
    {
        swError("[Master] Fatal Error: failed to allocate memory for swServer->gs.");
    }

    SwooleG.serv = serv;
}

swoole_init 函數

  • swoole_init 函數用於初始化全局變量 SwooleG 的各個屬性
  • SwooleGS 是全局的共享內存
  • SwooleTG 是線程特有數據,每一個線程都有本身獨特的數據
extern swServerG SwooleG;              //Local Global Variable
extern SwooleGS_t *SwooleGS;           //Share Memory Global Variable
extern __thread swThreadG SwooleTG;   //Thread Global Variable

typedef struct
{
    swLock lock;
    swLock lock_2;
} SwooleGS_t;

void swoole_init(void)
{
    struct rlimit rlmt;
    if (SwooleG.running)
    {
        return;
    }

    bzero(&SwooleG, sizeof(SwooleG));
    bzero(&SwooleWG, sizeof(SwooleWG));
    bzero(sw_error, SW_ERROR_MSG_SIZE);

    SwooleG.running = 1;
    SwooleG.enable_coroutine = 1;
    sw_errno = 0;

    SwooleG.log_fd = STDOUT_FILENO;
    SwooleG.cpu_num = sysconf(_SC_NPROCESSORS_ONLN);
    SwooleG.pagesize = getpagesize();
    SwooleG.pid = getpid();
    SwooleG.socket_buffer_size = SW_SOCKET_BUFFER_SIZE;

#ifdef SW_DEBUG
    SwooleG.log_level = 0;
#else
    SwooleG.log_level = SW_LOG_INFO;
#endif

    //get system uname
    uname(&SwooleG.uname);

    //random seed
    srandom(time(NULL));

    //init global shared memory
    SwooleG.memory_pool = swMemoryGlobal_new(SW_GLOBAL_MEMORY_PAGESIZE, 1);
    if (SwooleG.memory_pool == NULL)
    {
        printf("[Master] Fatal Error: global memory allocation failure.");
        exit(1);
    }
    SwooleGS = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(SwooleGS_t));
    if (SwooleGS == NULL)
    {
        printf("[Master] Fatal Error: failed to allocate memory for SwooleGS.");
        exit(2);
    }

    //init global lock
    swMutex_create(&SwooleGS->lock, 1);
    swMutex_create(&SwooleGS->lock_2, 1);
    swMutex_create(&SwooleG.lock, 0);

    if (getrlimit(RLIMIT_NOFILE, &rlmt) < 0)
    {
        swWarn("getrlimit() failed. Error: %s[%d]", strerror(errno), errno);
        SwooleG.max_sockets = 1024;
    }
    else
    {
        SwooleG.max_sockets = (uint32_t) rlmt.rlim_cur;
    }

    SwooleTG.buffer_stack = swString_new(8192);
    if (SwooleTG.buffer_stack == NULL)
    {
        exit(3);
    }

    if (!SwooleG.task_tmpdir)
    {
        SwooleG.task_tmpdir = sw_strndup(SW_TASK_TMP_FILE, sizeof(SW_TASK_TMP_FILE));
        SwooleG.task_tmpdir_len = sizeof(SW_TASK_TMP_FILE);
    }

    char *tmp_dir = swoole_dirname(SwooleG.task_tmpdir);
    //create tmp dir
    if (access(tmp_dir, R_OK) < 0 && swoole_mkdir_recursive(tmp_dir) < 0)
    {
        swWarn("create task tmp dir(%s) failed.", tmp_dir);
    }
    if (tmp_dir)
    {
        sw_free(tmp_dir);
    }

    //init signalfd
#ifdef HAVE_SIGNALFD
    swSignalfd_init();
    SwooleG.use_signalfd = 1;
    SwooleG.enable_signalfd = 1;
#endif
    //timerfd
#ifdef HAVE_TIMERFD
    SwooleG.use_timerfd = 1;
#endif

    SwooleG.use_timer_pipe = 1;
}

swServer_add_port 函數

  • swServer_add_port 函數爲服務端添加監聽的端口
  • 首先須要檢測 listen_port_num 已監聽的端口不能大於 SW_MAX_LISTEN_PORT(默認爲 60000)
  • 若是 socket 的類型不是 unix sock,那麼端口號必須大於等於 0,小於 65535
  • host 主域名長度也不能大於 SW_HOST_MAXSIZE(104)
  • 而後從共享內存池中申請一個 swListenPort 類型的對象,而後調用 swPort_init 對端口對象進行初始化
  • 利用函數 swSocket_create 建立一個 socket 對象,並返回其文件描述符
  • 調用 swSocket_bind 函數將 socket 綁定到對應的主域與端口上來
  • 若是協議是數據報(UDP),而不是數據流時,須要設置 socket 的發送緩存與接收緩存爲 socket_buffer_size
  • 設置 socket 爲非阻塞、O_CLOEXEC(exec 以後文件描述符自動關閉)
  • 根據協議類型設置 have_udp_sockhave_tcp_sockudp_socket_ipv4/udp_socket_ipv6 等等屬性
  • 遞增 listen_port_num ,向單鏈表 listen_list 中添加 swListenPort 對象
enum swSocket_type
{
    SW_SOCK_TCP          =  1,
    SW_SOCK_UDP          =  2,
    SW_SOCK_TCP6         =  3,
    SW_SOCK_UDP6         =  4,
    SW_SOCK_UNIX_DGRAM   =  5,  //unix sock dgram
    SW_SOCK_UNIX_STREAM  =  6,  //unix sock stream
};

swListenPort* swServer_add_port(swServer *serv, int type, char *host, int port)
{
    if (serv->listen_port_num >= SW_MAX_LISTEN_PORT)
    {
        swoole_error_log(SW_LOG_ERROR, SW_ERROR_SERVER_TOO_MANY_LISTEN_PORT, "allows up to %d ports to listen", SW_MAX_LISTEN_PORT);
        return NULL;
    }
    if (!(type == SW_SOCK_UNIX_DGRAM || type == SW_SOCK_UNIX_STREAM) && (port < 0 || port > 65535))
    {
        swoole_error_log(SW_LOG_ERROR, SW_ERROR_SERVER_INVALID_LISTEN_PORT, "invalid port [%d]", port);
        return NULL;
    }
    if (strlen(host) + 1  > SW_HOST_MAXSIZE)
    {
        swoole_error_log(SW_LOG_ERROR, SW_ERROR_NAME_TOO_LONG, "address '%s' exceeds %d characters limit", host, SW_HOST_MAXSIZE - 1);
        return NULL;
    }

    swListenPort *ls = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(swListenPort));
    if (ls == NULL)
    {
        swError("alloc failed");
        return NULL;
    }

    swPort_init(ls);
    ls->type = type;
    ls->port = port;
    strncpy(ls->host, host, strlen(host) + 1);

    if (type & SW_SOCK_SSL)
    {
        type = type & (~SW_SOCK_SSL);
        if (swSocket_is_stream(type))
        {
            ls->type = type;
            ls->ssl = 1;
#ifdef SW_USE_OPENSSL
            ls->ssl_config.prefer_server_ciphers = 1;
            ls->ssl_config.session_tickets = 0;
            ls->ssl_config.stapling = 1;
            ls->ssl_config.stapling_verify = 1;
            ls->ssl_config.ciphers = sw_strdup(SW_SSL_CIPHER_LIST);
            ls->ssl_config.ecdh_curve = sw_strdup(SW_SSL_ECDH_CURVE);
#endif
        }
    }

    //create server socket
    int sock = swSocket_create(ls->type);
    if (sock < 0)
    {
        swSysError("create socket failed.");
        return NULL;
    }
    //bind address and port
    if (swSocket_bind(sock, ls->type, ls->host, &ls->port) < 0)
    {
        close(sock);
        return NULL;
    }
    //dgram socket, setting socket buffer size
    if (swSocket_is_dgram(ls->type))
    {
        setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &ls->socket_buffer_size, sizeof(int));
        setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &ls->socket_buffer_size, sizeof(int));
    }
    //O_NONBLOCK & O_CLOEXEC
    swoole_fcntl_set_option(sock, 1, 1);
    ls->sock = sock;

    if (swSocket_is_dgram(ls->type))
    {
        serv->have_udp_sock = 1;
        serv->dgram_port_num++;
        if (ls->type == SW_SOCK_UDP)
        {
            serv->udp_socket_ipv4 = sock;
        }
        else if (ls->type == SW_SOCK_UDP6)
        {
            serv->udp_socket_ipv6 = sock;
        }
    }
    else
    {
        serv->have_tcp_sock = 1;
    }

    LL_APPEND(serv->listen_list, ls);
    serv->listen_port_num++;
    return ls;
}

swPort_init 函數

  • swPort_init 函數用於初始化 swListenPort 對象
  • backlogtcp_keepcounttcp_keepidle 等等都是相應 socket 的屬性
  • 在外網通訊時,有些客戶端發送數據的速度較慢,每次只能發送一小段數據。這樣 onReceive 到的數據就不是一個完整的包。 還有些客戶端是逐字節發送數據的,若是每次回調 onReceive 會拖慢整個系統。Length_Check 和 EOF_Check 的使用package_length_typepackage_eof 等等就是相關參數的具體參數。
#define SW_DATA_EOF                "\r\n\r\n"

void swPort_init(swListenPort *port)
{
    port->sock = 0;
    port->ssl = 0;

    //listen backlog
    port->backlog = SW_BACKLOG;
    //tcp keepalive
    port->tcp_keepcount = SW_TCP_KEEPCOUNT;
    port->tcp_keepinterval = SW_TCP_KEEPINTERVAL;
    port->tcp_keepidle = SW_TCP_KEEPIDLE;
    port->open_tcp_nopush = 1;

    port->protocol.package_length_type = 'N';
    port->protocol.package_length_size = 4;
    port->protocol.package_body_offset = 4;
    port->protocol.package_max_length = SW_BUFFER_INPUT_SIZE;

    port->socket_buffer_size = SwooleG.socket_buffer_size;

    char eof[] = SW_DATA_EOF;
    port->protocol.package_eof_len = sizeof(SW_DATA_EOF) - 1;
    memcpy(port->protocol.package_eof, eof, port->protocol.package_eof_len);
}
  • c:有符號、1字節
  • C:無符號、1字節
  • s :有符號、主機字節序、2字節
  • S:無符號、主機字節序、2字節
  • n:無符號、網絡字節序、2字節
  • N:無符號、網絡字節序、4字節
  • l:有符號、主機字節序、4字節(小寫L)
  • L:無符號、主機字節序、4字節(大寫L)
  • v:無符號、小端字節序、2字節
  • V:無符號、小端字節序、4字節

swSocket_create 建立 socket

swSocket_create 函數會根據 type 的類型來調用 socket 系統調用服務器

int swSocket_create(int type)
{
    int _domain;
    int _type;

    switch (type)
    {
    case SW_SOCK_TCP:
        _domain = PF_INET;
        _type = SOCK_STREAM;
        break;
    case SW_SOCK_TCP6:
        _domain = PF_INET6;
        _type = SOCK_STREAM;
        break;
    case SW_SOCK_UDP:
        _domain = PF_INET;
        _type = SOCK_DGRAM;
        break;
    case SW_SOCK_UDP6:
        _domain = PF_INET6;
        _type = SOCK_DGRAM;
        break;
    case SW_SOCK_UNIX_DGRAM:
        _domain = PF_UNIX;
        _type = SOCK_DGRAM;
        break;
    case SW_SOCK_UNIX_STREAM:
        _domain = PF_UNIX;
        _type = SOCK_STREAM;
        break;
    default:
        swWarn("unknown socket type [%d]", type);
        return SW_ERR;
    }
    return socket(_domain, _type, 0);
}

swSocket_bind 綁定端口

  • SO_REUSEADDR 容許啓動一個監聽服務器並捆綁衆所周知端口,即便之前創建的該端口用做它們的本地端口的鏈接仍存在。swoole

    • 若是不對TCP的套接字選項進行任何限制時,若是啓動兩個進程,第二個進程就會在調用bind函數的時候出錯(Address already in use)。
    • 若是在調用bind以前咱們設置了SO_REUSEADDR,可是不在第二個進程啓動前close這個套接字,那麼第二個進程仍然會在調用bind函數的時候出錯(Address already in use)。
    • 若是在調用bind以前咱們設置了SO_REUSEADDR,並接收了一個客戶端鏈接,而且在第二個進程啓動前關閉了bind的套接字,這個時候第一個進程只擁有一個套接字(與客戶端的鏈接),那麼第二個進程則能夠bind成功,符合預期。
  • 相對 SO_REUSEADDR 來講,SO_REUSEPORT 沒有那麼多的限制條件,容許兩個毫無血緣關係的進程使用相同的 IP 地址同時監聽同一個端口,而且不會出現驚羣效應
  • 對於 UNIX SOCKET,須要設置 sun_familysun_path
  • 對於 IPV4,須要設置 sin_familysin_portsin_addr;對於 IPV6,須要設置 sin6_familysin6_portsin6_addr,而後調用 bind 函數;
  • 若是 port 爲0,說明服務器綁定的是任意端口,bind 函數會將系統所選擇的端口返回給 sockaddr 對象
int swSocket_bind(int sock, int type, char *host, int *port)
{
    int ret;

    struct sockaddr_in addr_in4;
    struct sockaddr_in6 addr_in6;
    struct sockaddr_un addr_un;
    socklen_t len;

    //SO_REUSEADDR option
    int option = 1;
    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(int)) < 0)
    {
        swoole_error_log(SW_LOG_WARNING, SW_ERROR_SYSTEM_CALL_FAIL, "setsockopt(%d, SO_REUSEADDR) failed.", sock);
    }
    //reuse port
#ifdef HAVE_REUSEPORT
    if (SwooleG.reuse_port)
    {
        if (setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &option, sizeof(int)) < 0)
        {
            swSysError("setsockopt(SO_REUSEPORT) failed.");
            SwooleG.reuse_port = 0;
        }
    }
#endif
    //unix socket
    if (type == SW_SOCK_UNIX_DGRAM || type == SW_SOCK_UNIX_STREAM)
    {
        bzero(&addr_un, sizeof(addr_un));
        unlink(host);
        addr_un.sun_family = AF_UNIX;
        strncpy(addr_un.sun_path, host, sizeof(addr_un.sun_path) - 1);
        ret = bind(sock, (struct sockaddr*) &addr_un, sizeof(addr_un));
    }
    //IPv6
    else if (type > SW_SOCK_UDP)
    {
        bzero(&addr_in6, sizeof(addr_in6));
        inet_pton(AF_INET6, host, &(addr_in6.sin6_addr));
        addr_in6.sin6_port = htons(*port);
        addr_in6.sin6_family = AF_INET6;
        ret = bind(sock, (struct sockaddr *) &addr_in6, sizeof(addr_in6));
        if (ret == 0 && *port == 0)
        {
            len = sizeof(addr_in6);
            if (getsockname(sock, (struct sockaddr *) &addr_in6, &len) != -1)
            {
                *port = ntohs(addr_in6.sin6_port);
            }
        }
    }
    //IPv4
    else
    {
        bzero(&addr_in4, sizeof(addr_in4));
        inet_pton(AF_INET, host, &(addr_in4.sin_addr));
        addr_in4.sin_port = htons(*port);
        addr_in4.sin_family = AF_INET;
        ret = bind(sock, (struct sockaddr *) &addr_in4, sizeof(addr_in4));
        if (ret == 0 && *port == 0)
        {
            len = sizeof(addr_in4);
            if (getsockname(sock, (struct sockaddr *) &addr_in4, &len) != -1)
            {
                *port = ntohs(addr_in4.sin_port);
            }
        }
    }
    //bind failed
    if (ret < 0)
    {
        swoole_error_log(SW_LOG_WARNING, SW_ERROR_SYSTEM_CALL_FAIL, "bind(%s:%d) failed. Error: %s [%d]", host, *port, strerror(errno), errno);
        return SW_ERR;
    }

    return ret;
}

swoole_fcntl_set_option 函數爲文件描述符設置選項

  • 此函數主要是利用 fcntl 函數爲文件描述符設置阻塞/非阻塞、CLOEXEC 等屬性。
void swoole_fcntl_set_option(int sock, int nonblock, int cloexec)
{
    int opts, ret;

    if (nonblock >= 0)
    {
        do
        {
            opts = fcntl(sock, F_GETFL);
        }
        while (opts < 0 && errno == EINTR);

        if (opts < 0)
        {
            swSysError("fcntl(%d, GETFL) failed.", sock);
        }

        if (nonblock)
        {
            opts = opts | O_NONBLOCK;
        }
        else
        {
            opts = opts & ~O_NONBLOCK;
        }

        do
        {
            ret = fcntl(sock, F_SETFL, opts);
        }
        while (ret < 0 && errno == EINTR);

        if (ret < 0)
        {
            swSysError("fcntl(%d, SETFL, opts) failed.", sock);
        }
    }

#ifdef FD_CLOEXEC
    if (cloexec >= 0)
    {
        do
        {
            opts = fcntl(sock, F_GETFD);
        }
        while (opts < 0 && errno == EINTR);

        if (opts < 0)
        {
            swSysError("fcntl(%d, GETFL) failed.", sock);
        }

        if (cloexec)
        {
            opts = opts | FD_CLOEXEC;
        }
        else
        {
            opts = opts & ~FD_CLOEXEC;
        }

        do
        {
            ret = fcntl(sock, F_SETFD, opts);
        }
        while (ret < 0 && errno == EINTR);

        if (ret < 0)
        {
            swSysError("fcntl(%d, SETFD, opts) failed.", sock);
        }
    }
#endif
}
相關文章
相關標籤/搜索