Swoole 源碼分析——進程管理 Swoole_Process

前言

swoole-1.7.2 增長了一個進程管理模塊,用來替代 PHPpcntl 擴展。php

PHP自帶的pcntl,存在不少不足,如react

  • pcntl 沒有提供進程間通訊的功能
  • pcntl 不支持重定向標準輸入和輸出
  • pcntl 只提供了 fork 這樣原始的接口,容易使用錯誤
  • swoole_process 提供了比 pcntl 更強大的功能,更易用的 API,使 PHP 在多進程編程方面更加輕鬆。

swoole_process::__construct 建立子進程

在進程初始化的時候,首先要判斷當前的環境:編程

  • CLI 模式下不能使用
  • server master 進程下而且已經啓動了 server 後是不能建立進程的,由於此時 master 進程已經建立了 多個 reator 線程,fork 後會將多線程也複製下來。
  • 一樣的道理,使用了異步的 AIO 的進程使用了線程池,fork 會出現很是複雜的帶線程 fork 問題。

若是當前環境能夠建立進程,那麼須要初始化如下屬性:segmentfault

  • process->id:若是是普通的客戶端進程,或者是 master 進程未啓動 server 的狀態, php_swoole_worker_round_id 就是建立的 process 進程數量,此時只須要遞增便可;若是 server 已啓動,那麼 php_swoole_worker_round_id 還要加上全部 worker 進程的數量。 php_swoole_worker_round_id 遞增就是 process->id
  • 設置重定向,讓進程的輸入輸出與主進程管道相關聯
  • swPipeUnsock_create 函數新建管道
static PHP_METHOD(swoole_process, __construct)
{
    zend_bool redirect_stdin_and_stdout = 0;
    long pipe_type = 2;
    zval *callback;

    //only cli env
    if (!SWOOLE_G(cli))
    {
        swoole_php_fatal_error(E_ERROR, "swoole_process only can be used in PHP CLI mode.");
        RETURN_FALSE;
    }

    if (SwooleG.serv && SwooleG.serv->gs->start == 1 && swIsMaster())
    {
        swoole_php_fatal_error(E_ERROR, "swoole_process can't be used in master process.");
        RETURN_FALSE;
    }

    if (SwooleAIO.init)
    {
        swoole_php_fatal_error(E_ERROR, "unable to create process with async-io threads.");
        RETURN_FALSE;
    }

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z|bl", &callback, &redirect_stdin_and_stdout, &pipe_type) == FAILURE)
    {
        RETURN_FALSE;
    }

    char *func_name = NULL;
    if (!sw_zend_is_callable(callback, 0, &func_name TSRMLS_CC))
    {
        swoole_php_fatal_error(E_ERROR, "function '%s' is not callable", func_name);
        efree(func_name);
        RETURN_FALSE;
    }
    efree(func_name);

    swWorker *process = emalloc(sizeof(swWorker));
    bzero(process, sizeof(swWorker));

    int base = 1;
    if (SwooleG.serv && SwooleG.serv->gs->start)
    {
        base = SwooleG.serv->worker_num + SwooleG.serv->task_worker_num + SwooleG.serv->user_worker_num;
    }
    if (php_swoole_worker_round_id == 0)
    {
        php_swoole_worker_round_id = base;
    }
    process->id = php_swoole_worker_round_id++;

    if (redirect_stdin_and_stdout)
    {
        process->redirect_stdin = 1;
        process->redirect_stdout = 1;
        process->redirect_stderr = 1;
        /**
         * Forced to use stream pipe
         */
        pipe_type = 1;
    }

    if (pipe_type > 0)
    {
        swPipe *_pipe = emalloc(sizeof(swPipe));
        int socket_type = pipe_type == 1 ? SOCK_STREAM : SOCK_DGRAM;
        if (swPipeUnsock_create(_pipe, 1, socket_type) < 0)
        {
            RETURN_FALSE;
        }

        process->pipe_object = _pipe;
        process->pipe_master = _pipe->getFd(_pipe, SW_PIPE_MASTER);
        process->pipe_worker = _pipe->getFd(_pipe, SW_PIPE_WORKER);
        process->pipe = process->pipe_master;

        zend_update_property_long(swoole_process_class_entry_ptr, getThis(), ZEND_STRL("pipe"), process->pipe_master TSRMLS_CC);
    }

    swoole_set_object(getThis(), process);
    zend_update_property(swoole_process_class_entry_ptr, getThis(), ZEND_STRL("callback"), callback TSRMLS_CC);
}

swoole_process->start 啓動進程

swoole_process->start 函數用於 fork 一個新進程,而且調用 php_swoole_process_startswoole

static PHP_METHOD(swoole_process, start)
{
    swWorker *process = swoole_get_object(getThis());

    if (process->pid > 0 && kill(process->pid, 0) == 0)
    {
        swoole_php_fatal_error(E_WARNING, "process has already been started.");
        RETURN_FALSE;
    }

    pid_t pid = fork();
    if (pid < 0)
    {
        swoole_php_fatal_error(E_WARNING, "fork() failed. Error: %s[%d]", strerror(errno), errno);
        RETURN_FALSE;
    }
    else if (pid > 0)
    {
        process->pid = pid;
        process->child_process = 0;
        zend_update_property_long(swoole_server_class_entry_ptr, getThis(), ZEND_STRL("pid"), process->pid TSRMLS_CC);
        RETURN_LONG(pid);
    }
    else
    {
        process->child_process = 1;
        SW_CHECK_RETURN(php_swoole_process_start(process, getThis() TSRMLS_CC));
    }
    RETURN_TRUE;
}

php_swoole_process_start 函數用於設定重定向和清理主進程殘留的一些功能:多線程

  • STDIN_FILENO 輸入、STDOUT_FILENO 輸出、STDERR_FILENO 錯誤輸出與 pipe_worker 相綁定,實現重定向功能。
  • 若是存在 SwooleG.main_reactor,刪除並釋放相關內存。
  • 清空主進程殘留的定時器與信號。
  • 設定 process_type 爲 0
  • 執行 _construct 回調函數
  • 若是在回調函數中調用了異步系統,啓動 php_swoole_event_wait 函數進行事件循環。
int php_swoole_process_start(swWorker *process, zval *object TSRMLS_DC)
{
    process->pipe = process->pipe_worker;
    process->pid = getpid();

    if (process->redirect_stdin)
    {
        if (dup2(process->pipe, STDIN_FILENO) < 0)
        {
            swoole_php_fatal_error(E_WARNING, "dup2() failed. Error: %s[%d]", strerror(errno), errno);
        }
    }

    if (process->redirect_stdout)
    {
        if (dup2(process->pipe, STDOUT_FILENO) < 0)
        {
            swoole_php_fatal_error(E_WARNING, "dup2() failed. Error: %s[%d]", strerror(errno), errno);
        }
    }

    if (process->redirect_stderr)
    {
        if (dup2(process->pipe, STDERR_FILENO) < 0)
        {
            swoole_php_fatal_error(E_WARNING, "dup2() failed. Error: %s[%d]", strerror(errno), errno);
        }
    }

    /**
     * Close EventLoop
     */
    if (SwooleG.main_reactor)
    {
        SwooleG.main_reactor->free(SwooleG.main_reactor);
        SwooleG.main_reactor = NULL;
        swTraceLog(SW_TRACE_PHP, "destroy reactor");
    }

    bzero(&SwooleWG, sizeof(SwooleWG));
    SwooleG.pid = process->pid;
    if (SwooleG.process_type != SW_PROCESS_USERWORKER)
    {
        SwooleG.process_type = 0;
    }
    SwooleWG.id = process->id;

    if (SwooleG.timer.fd)
    {
        swTimer_free(&SwooleG.timer);
        bzero(&SwooleG.timer, sizeof(SwooleG.timer));
    }

    swSignal_clear();

    zend_update_property_long(swoole_process_class_entry_ptr, object, ZEND_STRL("pid"), process->pid TSRMLS_CC);
    zend_update_property_long(swoole_process_class_entry_ptr, object, ZEND_STRL("pipe"), process->pipe_worker TSRMLS_CC);

    zval *zcallback = sw_zend_read_property(swoole_process_class_entry_ptr, object, ZEND_STRL("callback"), 0 TSRMLS_CC);
    zval **args[1];

    if (zcallback == NULL || ZVAL_IS_NULL(zcallback))
    {
        swoole_php_fatal_error(E_ERROR, "no callback.");
        return SW_ERR;
    }

    zval *retval = NULL;
    args[0] = &object;
    sw_zval_add_ref(&object);

    if (sw_call_user_function_ex(EG(function_table), NULL, zcallback, &retval, 1, args, 0, NULL TSRMLS_CC) == FAILURE)
    {
        swoole_php_fatal_error(E_ERROR, "callback function error");
        return SW_ERR;
    }
    if (EG(exception))
    {
        zend_exception_error(EG(exception), E_ERROR TSRMLS_CC);
    }
    if (retval)
    {
        sw_zval_ptr_dtor(&retval);
    }

    if (SwooleG.main_reactor)
    {
        php_swoole_event_wait();
    }
    SwooleG.running = 0;

    zend_bailout();
    return SW_OK;
}

swoole_process->write/ swoole_process->read

主進程與子進程之間進行通訊可使用 writeread,若是使用了 swoole_event,會自動將管道轉爲非阻塞模式,由 reactor 進行事件循環讀寫,不然就會採用阻塞式讀寫。異步

static PHP_METHOD(swoole_process, write)
{
    char *data = NULL;
    zend_size_t data_len = 0;

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &data, &data_len) == FAILURE)
    {
        RETURN_FALSE;
    }

    if (data_len < 1)
    {
        swoole_php_fatal_error(E_WARNING, "the data to send is empty.");
        RETURN_FALSE;
    }

    swWorker *process = swoole_get_object(getThis());
    if (process->pipe == 0)
    {
        swoole_php_fatal_error(E_WARNING, "no pipe, can not write into pipe.");
        RETURN_FALSE;
    }

    int ret;

    //async write
    if (SwooleG.main_reactor)
    {
        swConnection *_socket = swReactor_get(SwooleG.main_reactor, process->pipe);
        if (_socket && _socket->nonblock)
        {
            ret = SwooleG.main_reactor->write(SwooleG.main_reactor, process->pipe, data, (size_t) data_len);
        }
        else
        {
            goto _blocking_read;
        }
    }
    else
    {
        _blocking_read: ret = swSocket_write_blocking(process->pipe, data, data_len);
    }

    if (ret < 0)
    {
        swoole_php_error(E_WARNING, "write() failed. Error: %s[%d]", strerror(errno), errno);
        RETURN_FALSE;
    }
    ZVAL_LONG(return_value, ret);
}

static PHP_METHOD(swoole_process, read)
{
    long buf_size = 8192;

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|l", &buf_size) == FAILURE)
    {
        RETURN_FALSE;
    }

    if (buf_size > 65536)
    {
        buf_size = 65536;
    }

    swWorker *process = swoole_get_object(getThis());

    if (process->pipe == 0)
    {
        swoole_php_fatal_error(E_WARNING, "no pipe, can not read from pipe.");
        RETURN_FALSE;
    }

    char *buf = emalloc(buf_size + 1);
    int ret = read(process->pipe, buf, buf_size);;
    if (ret < 0)
    {
        efree(buf);
        if (errno != EINTR)
        {
            swoole_php_error(E_WARNING, "read() failed. Error: %s[%d]", strerror(errno), errno);
        }
        RETURN_FALSE;
    }
    buf[ret] = 0;
    SW_ZVAL_STRINGL(return_value, buf, ret, 0);
    efree(buf);
}

swoole_process::signal 設置信號處理函數

爲異步的程序添加信號處理函數。首先程序會檢查當前的進程環境與註冊的信號,不符合條件的直接返回,例如:swoole_server 中不能設置 SIGTERMSIGALAM 信號,這兩個信號是 swoole 須要保留的,用戶不能進行修改。socket

若是此前該信號已存在信號處理函數,該函數會覆蓋之前的回調函數,以前的邏輯會再次執行一次,以後就會被銷燬。async

static PHP_METHOD(swoole_process, signal)
{
    zval *callback = NULL;
    long signo = 0;

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "lz", &signo, &callback) == FAILURE)
    {
        return;
    }

    if (!SWOOLE_G(cli))
    {
        swoole_php_fatal_error(E_ERROR, "cannot use swoole_process::signal here.");
        RETURN_FALSE;
    }

    if (SwooleG.serv && SwooleG.serv->gs->start)
    {
        if ((swIsWorker() || swIsTaskWorker()) && signo == SIGTERM)
        {
            swoole_php_fatal_error(E_WARNING, "unable to register SIGTERM in worker/task process.");
            RETURN_FALSE;
        }
        else if (swIsManager() && (signo == SIGTERM || signo == SIGUSR1 || signo == SIGUSR2 || signo == SIGALRM))
        {
            swoole_php_fatal_error(E_WARNING, "unable to register SIGTERM/SIGUSR1/SIGUSR2/SIGALRM in manager process.");
            RETURN_FALSE;
        }
        else if (swIsMaster() && (signo == SIGTERM || signo == SIGUSR1 || signo == SIGUSR2 || signo == SIGALRM || signo == SIGCHLD))
        {
            swoole_php_fatal_error(E_WARNING, "unable to register SIGTERM/SIGUSR1/SIGUSR2/SIGALRM/SIGCHLD in manager process.");
            RETURN_FALSE;
        }
    }

    php_swoole_check_reactor();
    swSignalHander handler;

    if (callback == NULL || ZVAL_IS_NULL(callback))
    {
        callback = signal_callback[signo];
        if (callback)
        {
            swSignal_add(signo, NULL);
            SwooleG.main_reactor->defer(SwooleG.main_reactor, free_signal_callback, callback);
            signal_callback[signo] = NULL;
            RETURN_TRUE;
        }
        else
        {
            swoole_php_error(E_WARNING, "no callback.");
            RETURN_FALSE;
        }
    }
    else if (Z_TYPE_P(callback) == IS_LONG && Z_LVAL_P(callback) == (long) SIG_IGN)
    {
        handler = NULL;
    }
    else
    {
        char *func_name;
        if (!sw_zend_is_callable(callback, 0, &func_name TSRMLS_CC))
        {
            swoole_php_error(E_WARNING, "function '%s' is not callable", func_name);
            efree(func_name);
            RETURN_FALSE;
        }
        efree(func_name);

        callback = sw_zval_dup(callback);
        sw_zval_add_ref(&callback);

        handler = php_swoole_onSignal;
    }

    /**
     * for swSignalfd_setup
     */
    SwooleG.main_reactor->check_signalfd = 1;

    //free the old callback
    if (signal_callback[signo])
    {
        SwooleG.main_reactor->defer(SwooleG.main_reactor, free_signal_callback, signal_callback[signo]);
    }
    signal_callback[signo] = callback;

    /**
     * use user settings
     */
    SwooleG.use_signalfd = SwooleG.enable_signalfd;

    swSignal_add(signo, handler);

    RETURN_TRUE;
}

swoole_process::alarm 進程定時器

對比 Swoole\Timer 來講,swoole_process::alarm 並非一個很是好的選擇,swoole_process::alarm 更加相似於真是的進程 alarm 定時器,alarm 只容許設定一個 alarm 信號,而 Swoole\Timer 因爲實現了一個定時任務最小堆,能夠在不一樣的時間間隔執行不一樣的任務。所以爲了區分二者,swoole 規定並不容許二者同時存在。函數

swoole_process::alarm 函數須要與 swoole_process::signal 相結合,由於其內部調用 setitimer,會週期發送 alarm 信號,須要在 swoole_process::signal 函數中設置 alarm 信號的回調函數。

static PHP_METHOD(swoole_process, alarm)
{
    long usec = 0;
    long type = ITIMER_REAL;

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l|l", &usec, &type) == FAILURE)
    {
        return;
    }

    if (!SWOOLE_G(cli))
    {
        swoole_php_fatal_error(E_ERROR, "cannot use swoole_process::alarm here.");
        RETURN_FALSE;
    }

    if (SwooleG.timer.fd != 0)
    {
        swoole_php_fatal_error(E_WARNING, "cannot use both 'timer' and 'alarm' at the same time.");
        RETURN_FALSE;
    }

    struct timeval now;
    if (gettimeofday(&now, NULL) < 0)
    {
        swoole_php_error(E_WARNING, "gettimeofday() failed. Error: %s[%d]", strerror(errno), errno);
        RETURN_FALSE;
    }

    struct itimerval timer_set;
    bzero(&timer_set, sizeof(timer_set));

    if (usec > 0)
    {
        long _sec = usec / 1000000;
        long _usec = usec - (_sec * 1000000);

        timer_set.it_interval.tv_sec = _sec;
        timer_set.it_interval.tv_usec = _usec;

        timer_set.it_value.tv_sec = _sec;
        timer_set.it_value.tv_usec = _usec;

        if (timer_set.it_value.tv_usec > 1e6)
        {
            timer_set.it_value.tv_usec = timer_set.it_value.tv_usec - 1e6;
            timer_set.it_value.tv_sec += 1;
        }
    }

    if (setitimer(type, &timer_set, NULL) < 0)
    {
        swoole_php_error(E_WARNING, "setitimer() failed. Error: %s[%d]", strerror(errno), errno);
        RETURN_FALSE;
    }

    RETURN_TRUE;
}

swoole_process->useQueue 消息隊列

useQueue 會利用 swMsgQueue_create 建立 process->queue

static PHP_METHOD(swoole_process, useQueue)
{
    long msgkey = 0;
    long mode = 2;

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|ll", &msgkey, &mode) == FAILURE)
    {
        RETURN_FALSE;
    }

    swWorker *process = swoole_get_object(getThis());

    if (msgkey <= 0)
    {
        msgkey = ftok(sw_zend_get_executed_filename(), 1);
    }

    swMsgQueue *queue = emalloc(sizeof(swMsgQueue));
    if (swMsgQueue_create(queue, 1, msgkey, 0) < 0)
    {
        RETURN_FALSE;
    }
    if (mode & MSGQUEUE_NOWAIT)
    {
        swMsgQueue_set_blocking(queue, 0);
        mode = mode & (~MSGQUEUE_NOWAIT);
    }
    process->queue = queue;
    process->ipc_mode = mode;
    zend_update_property_long(swoole_process_class_entry_ptr, getThis(), ZEND_STRL("msgQueueId"), queue->msg_id TSRMLS_CC);
    zend_update_property_long(swoole_process_class_entry_ptr, getThis(), ZEND_STRL("msgQueueKey"), msgkey TSRMLS_CC);
    RETURN_TRUE;
}

swoole_process->push/swoole_process->pop 消息通訊

推送和消費消息就是利用 swMsgQueue_push/swMsgQueue_pop 函數。

static PHP_METHOD(swoole_process, push)
{
    char *data;
    zend_size_t length;

    struct
    {
        long type;
        char data[SW_MSGMAX];
    } message;

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &data, &length) == FAILURE)
    {
        RETURN_FALSE;
    }

    if (length <= 0)
    {
        swoole_php_fatal_error(E_WARNING, "the data to push is empty.");
        RETURN_FALSE;
    }
    else if (length >= sizeof(message.data))
    {
        swoole_php_fatal_error(E_WARNING, "the data to push is too big.");
        RETURN_FALSE;
    }

    swWorker *process = swoole_get_object(getThis());

    if (!process->queue)
    {
        swoole_php_fatal_error(E_WARNING, "no msgqueue, can not use push()");
        RETURN_FALSE;
    }

    message.type = process->id;
    memcpy(message.data, data, length);

    if (swMsgQueue_push(process->queue, (swQueue_data *)&message, length) < 0)
    {
        RETURN_FALSE;
    }
    RETURN_TRUE;
}

static PHP_METHOD(swoole_process, pop)
{
    long maxsize = SW_MSGMAX;

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|l", &maxsize) == FAILURE)
    {
        RETURN_FALSE;
    }

    if (maxsize > SW_MSGMAX || maxsize <= 0)
    {
        maxsize = SW_MSGMAX;
    }

    swWorker *process = swoole_get_object(getThis());
    if (!process->queue)
    {
        swoole_php_fatal_error(E_WARNING, "no msgqueue, can not use pop()");
        RETURN_FALSE;
    }

    struct
    {
        long type;
        char data[SW_MSGMAX];
    } message;

    if (process->ipc_mode == 2)
    {
        message.type = 0;
    }
    else
    {
        message.type = process->id;
    }

    int n = swMsgQueue_pop(process->queue, (swQueue_data *) &message, maxsize);
    if (n < 0)
    {
        RETURN_FALSE;
    }
    SW_RETURN_STRINGL(message.data, n, 1);
}

swoole_process::kill/swoole_process::wait

向進程發送信號 kill 與回收子進程 wait 邏輯比較簡單,就是調用對應的函數。值得注意的是 kill 以後的錯誤若是是 ESRCH,表明着相應的進程不存在。

static PHP_METHOD(swoole_process, kill)
{
    long pid;
    long sig = SIGTERM;

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l|l", &pid, &sig) == FAILURE)
    {
        RETURN_FALSE;
    }

    int ret = kill((int) pid, (int) sig);
    if (ret < 0)
    {
        if (!(sig == 0 && errno == ESRCH))
        {
            swoole_php_error(E_WARNING, "kill(%d, %d) failed. Error: %s[%d]", (int) pid, (int) sig, strerror(errno), errno);
        }
        RETURN_FALSE;
    }
    RETURN_TRUE;
}

static PHP_METHOD(swoole_process, wait)
{
    int status;
    zend_bool blocking = 1;

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|b", &blocking) == FAILURE)
    {
        RETURN_FALSE;
    }

    int options = 0;
    if (!blocking)
    {
        options |= WNOHANG;
    }

    pid_t pid = swWaitpid(-1, &status, options);
    if (pid > 0)
    {
        array_init(return_value);
        add_assoc_long(return_value, "pid", pid);
        add_assoc_long(return_value, "code", WEXITSTATUS(status));
        add_assoc_long(return_value, "signal", WTERMSIG(status));
    }
    else
    {
        RETURN_FALSE;
    }
}

static sw_inline int swWaitpid(pid_t __pid, int *__stat_loc, int __options)
{
    int ret;
    do
    {
        ret = waitpid(__pid, __stat_loc, __options);
        if (ret < 0 && errno == EINTR)
        {
            continue;
        }
        break;
    } while(1);
    return ret;
}

原文地址:http://www.javashuo.com/article/p-ygukejgz-e.html

相關文章
相關標籤/搜索