Swoole 源碼分析——基礎模塊之 Pipe 管道

前言

管道是進程間通訊 IPC 的最基礎的方式,管道有兩種類型:命名管道和匿名管道,匿名管道專門用於具備血緣關係的進程之間,完成數據傳遞,命名管道能夠用於任何兩個進程之間。swoole 中的管道都是匿名管道。react

swoole 中,有三種不一樣類型的管道,其中 swPipeBase 是最基礎的管道,swPipeUnsock 是利用 socketpair 實現的管道,swPipeEventfdeventfd 實現的管道。swoole 並無使用 FIFO 命名管道。segmentfault

Pipe 數據結構

無論哪一種類型的管道,其基礎都是 swPipe,該結構體包含一個具體的 pipeobject,表明着是否阻塞的 blocking,超時時間 timeout,還有對管道的操做函數readwritegetfdclose數組

typedef struct _swPipe
{
    void *object;
    int blocking;
    double timeout;

    int (*read)(struct _swPipe *, void *recv, int length);
    int (*write)(struct _swPipe *, void *send, int length);
    int (*getFd)(struct _swPipe *, int master);
    int (*close)(struct _swPipe *);
} swPipe;

swPipeBase 匿名管道

swPipeBase 數據結構

數據結構很是簡單,就是一個數組,存放着 pipe 的讀端和寫端。值得注意的是,swPipeBase 是半全工的管道,也就是說 pipes[0] 只能用於讀,pipes[1] 只能用於寫。swoole

當多個進程共享這個管道的時候,全部的進程讀取都須要 read 讀端 pipes[0],進程寫入消息都要 write 寫端 pipes[1]數據結構

所以使用這個匿名管道的時候,通常情形是一個進程只負責寫,另外一個進程只負責讀,只能單向傳遞消息,不能雙向傳遞,不然頗有可能讀到了本身剛剛發送的消息。socket

typedef struct _swPipeBase
{
    int pipes[2];
} swPipeBase;

swPipeBase 的建立

建立匿名管道就是調用 pipe 函數,程序自動設置管道爲非阻塞式。函數

int swPipeBase_create(swPipe *p, int blocking)
{
    int ret;
    swPipeBase *object = sw_malloc(sizeof(swPipeBase));
    if (object == NULL)
    {
        return -1;
    }
    p->blocking = blocking;
    ret = pipe(object->pipes);
    if (ret < 0)
    {
        swWarn("pipe() failed. Error: %s[%d]", strerror(errno), errno);
        sw_free(object);
        return -1;
    }
    else
    {
        //Nonblock
        swSetNonBlock(object->pipes[0]);
        swSetNonBlock(object->pipes[1]);
        p->timeout = -1;
        p->object = object;
        p->read = swPipeBase_read;
        p->write = swPipeBase_write;
        p->getFd = swPipeBase_getFd;
        p->close = swPipeBase_close;
    }
    return 0;
}

swPipeBase_read 管道的讀

因爲匿名管道被設置爲非阻塞式,沒法實現超時等待寫入。若是想要阻塞式的向管道寫入數據,設置必定超時時間,就須要利用 poll 函數。當 pipefd 可讀時,poll 馬上返回,或者達到超時時間。oop

static int swPipeBase_read(swPipe *p, void *data, int length)
{
    swPipeBase *object = p->object;
    if (p->blocking == 1 && p->timeout > 0)
    {
        if (swSocket_wait(object->pipes[0], p->timeout * 1000, SW_EVENT_READ) < 0)
        {
            return SW_ERR;
        }
    }
    return read(object->pipes[0], data, length);
}

int swSocket_wait(int fd, int timeout_ms, int events)
{
    struct pollfd event;
    event.fd = fd;
    event.events = 0;

    if (events & SW_EVENT_READ)
    {
        event.events |= POLLIN;
    }
    if (events & SW_EVENT_WRITE)
    {
        event.events |= POLLOUT;
    }
    while (1)
    {
        int ret = poll(&event, 1, timeout_ms);
        if (ret == 0)
        {
            return SW_ERR;
        }
        else if (ret < 0 && errno != EINTR)
        {
            swWarn("poll() failed. Error: %s[%d]", strerror(errno), errno);
            return SW_ERR;
        }
        else
        {
            return SW_OK;
        }
    }
    return SW_OK;
}

swPipeBase_write 管道的寫入

管道的寫入直接調用 write 便可,非阻塞式 IO 會馬上返回結果。ui

static int swPipeBase_write(swPipe *p, void *data, int length)
{
    swPipeBase *this = p->object;
    return write(this->pipes[1], data, length);
}

swPipeBase_getFd

本函數用於獲取管道的讀端或者寫端。this

static int swPipeBase_getFd(swPipe *p, int isWriteFd)
{
    swPipeBase *this = p->object;
    return (isWriteFd == 0) ? this->pipes[0] : this->pipes[1];
}

swPipeBase_close 關閉管道

static int swPipeBase_close(swPipe *p)
{
    int ret1, ret2;
    swPipeBase *this = p->object;
    ret1 = close(this->pipes[0]);
    ret2 = close(this->pipes[1]);
    sw_free(this);
    return 0 - ret1 - ret2;
}

swPipeEventfd 管道

swPipeEventfd 數據結構

數據結構中僅僅存放 eventfd 函數返回的文件描述符。

pipe 管道不一樣的是,eventfd 只有一個文件描述符,讀和寫都是對這個文件描述符進行操做。

該管道一樣也是隻適用於進程間單向通訊。

typedef struct _swPipeEventfd
{
    int event_fd;
} swPipeEventfd;

swPipeEventfd_read 管道的讀取

相似於匿名管道,eventfd 也不支持超時等待,所以仍是利用 poll 函數進行超時等待。

因爲 eventfd 多是阻塞式,所以 read 時可能會被信號打斷。

static int swPipeEventfd_read(swPipe *p, void *data, int length)
{
    int ret = -1;
    swPipeEventfd *object = p->object;

    //eventfd not support socket timeout
    if (p->blocking == 1 && p->timeout > 0)
    {
        if (swSocket_wait(object->event_fd, p->timeout * 1000, SW_EVENT_READ) < 0)
        {
            return SW_ERR;
        }
    }

    while (1)
    {
        ret = read(object->event_fd, data, sizeof(uint64_t));
        if (ret < 0 && errno == EINTR)
        {
            continue;
        }
        break;
    }
    return ret;
}

swPipeEventfd_write 管道的寫入

寫入和讀取的過程相似,注意被信號打斷後繼續循環便可。

static int swPipeEventfd_write(swPipe *p, void *data, int length)
{
    int ret;
    swPipeEventfd *this = p->object;
    while (1)
    {
        ret = write(this->event_fd, data, sizeof(uint64_t));
        if (ret < 0)
        {
            if (errno == EINTR)
            {
                continue;
            }
        }
        break;
    }
    return ret;
}

swPipeEventfd_getFd

static int swPipeEventfd_getFd(swPipe *p, int isWriteFd)
{
    return ((swPipeEventfd *) (p->object))->event_fd;
}

swPipeEventfd_close 關閉管道

static int swPipeEventfd_close(swPipe *p)
{
    int ret;
    ret = close(((swPipeEventfd *) (p->object))->event_fd);
    sw_free(p->object);
    return ret;
}

swPipeUnsock 管道

swPipeUnsock 數據結構

不一樣於 pipe 的匿名管道,swPipeUnsock 管道是雙向通訊的管道。

所以兩個進程利用 swPipeUnsock 管道進行通訊的時候,獨佔一個 sock,也就是說 A 進程讀寫都是用 socks[0]B 進程讀寫都是用 socks[1]socks[0] 寫入的消息會在 socks[1] 讀出來,反之,socks[0] 讀出的消息是 sock[1] 寫入的,這樣就實現了兩個進程的雙向通訊。

typedef struct _swPipeUnsock
{
    /**
     * master : socks[1]
     * worker : socks[0]
     */
    int socks[2];
    /**
     * master pipe is closed
     */
    uint8_t pipe_master_closed;
    /**
     * worker pipe is closed
     */
    uint8_t pipe_worker_closed;
} swPipeUnsock;

swPipeUnsock 的建立

swPipeUnsock 的建立主要是調用 socketpair 函數,protocol 決定了建立的 socketSOCK_DGRAM 類型仍是 SOCK_STREAM 類型。

int swPipeUnsock_create(swPipe *p, int blocking, int protocol)
{
    int ret;
    swPipeUnsock *object = sw_malloc(sizeof(swPipeUnsock));
    if (object == NULL)
    {
        swWarn("malloc() failed.");
        return SW_ERR;
    }
    bzero(object, sizeof(swPipeUnsock));
    p->blocking = blocking;
    ret = socketpair(AF_UNIX, protocol, 0, object->socks);
    if (ret < 0)
    {
        swWarn("socketpair() failed. Error: %s [%d]", strerror(errno), errno);
        sw_free(object);
        return SW_ERR;
    }
    else
    {
        //Nonblock
        if (blocking == 0)
        {
            swSetNonBlock(object->socks[0]);
            swSetNonBlock(object->socks[1]);
        }

        int sbsize = SwooleG.socket_buffer_size;
        swSocket_set_buffer_size(object->socks[0], sbsize);
        swSocket_set_buffer_size(object->socks[1], sbsize);

        p->object = object;
        p->read = swPipeUnsock_read;
        p->write = swPipeUnsock_write;
        p->getFd = swPipeUnsock_getFd;
        p->close = swPipeUnsock_close;
    }
    return 0;
}

int swSocket_set_buffer_size(int fd, int buffer_size)
{
    if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buffer_size, sizeof(buffer_size)) < 0)
    {
        swSysError("setsockopt(%d, SOL_SOCKET, SO_SNDBUF, %d) failed.", fd, buffer_size);
        return SW_ERR;
    }
    if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buffer_size, sizeof(buffer_size)) < 0)
    {
        swSysError("setsockopt(%d, SOL_SOCKET, SO_RCVBUF, %d) failed.", fd, buffer_size);
        return SW_ERR;
    }
    return SW_OK;
}

swPipeUnsock_getFd 函數

一樣的獲取管道文件描述符根據 master 來決定。

static int swPipeUnsock_getFd(swPipe *p, int master)
{
    swPipeUnsock *this = p->object;
    return master == 1 ? this->socks[1] : this->socks[0];
}

swPipeUnsock_close 關閉管道

關閉管道就是調用 close 來依次關閉兩個 socket.

static int swPipeUnsock_close(swPipe *p)
{
    swPipeUnsock *object = p->object;
    int ret = swPipeUnsock_close_ext(p, 0);
    sw_free(object);
    return ret;
}

int swPipeUnsock_close_ext(swPipe *p, int which)
{
    int ret1 = 0, ret2 = 0;
    swPipeUnsock *object = p->object;

    if (which == SW_PIPE_CLOSE_MASTER)
    {
        if (object->pipe_master_closed)
        {
            return SW_ERR;
        }
        ret1 = close(object->socks[1]);
        object->pipe_master_closed = 1;
    }
    else if (which == SW_PIPE_CLOSE_WORKER)
    {
        if (object->pipe_worker_closed)
        {
            return SW_ERR;
        }
        ret1 = close(object->socks[0]);
        object->pipe_worker_closed = 1;
    }
    else
    {
        ret1 = swPipeUnsock_close_ext(p, SW_PIPE_CLOSE_MASTER);
        ret2 = swPipeUnsock_close_ext(p, SW_PIPE_CLOSE_WORKER);
    }

    return 0 - ret1 - ret2;
}

管道的應用

tasker 模塊

當調用 taskwait 函數後,投遞的 worker 進程會阻塞在 serv->task_notify[SwooleWG.id] 管道的讀取中,tasker 模塊處理完畢後,會向 serv->task_notify[source_worker_id] 管道寫入數據。

這個就是 pipe 函數或者 eventfd 建立的匿名管道的用途,用於單向的進程通訊(tasker 進程向 worker 進程傳遞數據)。

static inline int swPipeNotify_auto(swPipe *p, int blocking, int semaphore)
{
#ifdef HAVE_EVENTFD
    return swPipeEventfd_create(p, blocking, semaphore, 0);
#else
    return swPipeBase_create(p, blocking);
#endif
}

worker 模塊

manager 負責爲 worker 進程建立 pipe_masterpipe_worker。用於 reactor 線程與 worker 進程直接進行通訊。

int swManager_start(swFactory *factory)
{
   ...
   
   for (i = 0; i < serv->worker_num; i++)
    {
        if (swPipeUnsock_create(&object->pipes[i], 1, SOCK_DGRAM) < 0)
        {
            return SW_ERR;
        }
        serv->workers[i].pipe_master = object->pipes[i].getFd(&object->pipes[i], SW_PIPE_MASTER);
        serv->workers[i].pipe_worker = object->pipes[i].getFd(&object->pipes[i], SW_PIPE_WORKER);
        serv->workers[i].pipe_object = &object->pipes[i];
        swServer_store_pipe_fd(serv, serv->workers[i].pipe_object);
    }
   
   ...

}

reactor 線程啓動的時候,會將 pipe_master 加入 reactor 的監控當中。

static int swReactorThread_loop(swThreadParam *param)
{

   ...
    
   for (i = 0; i < serv->worker_num; i++)
   {
       if (i % serv->reactor_num == reactor_id)
       {
           pipe_fd = serv->workers[i].pipe_master;
           
           swSetNonBlock(pipe_fd);
           reactor->add(reactor, pipe_fd, SW_FD_PIPE);

           if (thread->notify_pipe == 0)
           {
               thread->notify_pipe = serv->workers[i].pipe_worker;
           }
       
       }
   
   }
   ...
}

worker 進程中,會將 pipe_worker 做爲另外一端 socket 放入 workerreactor 事件循環中進行監控。

int swWorker_loop(swFactory *factory, int worker_id)
{
    ...
    
    int pipe_worker = worker->pipe_worker;

    swSetNonBlock(pipe_worker);
    SwooleG.main_reactor->ptr = serv;
    SwooleG.main_reactor->add(SwooleG.main_reactor, pipe_worker, SW_FD_PIPE | SW_EVENT_READ);
    SwooleG.main_reactor->setHandle(SwooleG.main_reactor, SW_FD_PIPE, swWorker_onPipeReceive);
    SwooleG.main_reactor->setHandle(SwooleG.main_reactor, SW_FD_WRITE, swReactor_onWrite);
    
    ...


}

tasker 進程

tasker 進程中管道的建立是 swProcessPool_create 函數完成的。

int swProcessPool_create(swProcessPool *pool, int worker_num, int max_request, key_t msgqueue_key, int ipc_mode)
{
    ...
    
    else if (ipc_mode == SW_IPC_UNIXSOCK)
    {
        pool->pipes = sw_calloc(worker_num, sizeof(swPipe));
        if (pool->pipes == NULL)
        {
            swWarn("malloc[2] failed.");
            return SW_ERR;
        }

        swPipe *pipe;
        int i;
        for (i = 0; i < worker_num; i++)
        {
            pipe = &pool->pipes[i];
            if (swPipeUnsock_create(pipe, 1, SOCK_DGRAM) < 0)
            {
                return SW_ERR;
            }
            pool->workers[i].pipe_master = pipe->getFd(pipe, SW_PIPE_MASTER);
            pool->workers[i].pipe_worker = pipe->getFd(pipe, SW_PIPE_WORKER);
            pool->workers[i].pipe_object = pipe;
        }
    }
    
    ...
}

tasker 進程發佈任務的時候,會調用 swProcessPool_dispatch 函數,進而會向 pipe_master 管道寫入任務數據。

int swProcessPool_dispatch(swProcessPool *pool, swEventData *data, int *dst_worker_id)
{
    ...
    
    ret = swWorker_send2worker(worker, data, sendn, SW_PIPE_MASTER | SW_PIPE_NONBLOCK);
    
    ...
}

int swWorker_send2worker(swWorker *dst_worker, void *buf, int n, int flag)
{
    int pipefd, ret;

    if (flag & SW_PIPE_MASTER)
    {
        pipefd = dst_worker->pipe_master;
    }
    else
    {
        pipefd = dst_worker->pipe_worker;
    }
    
    ...
    
    if ((flag & SW_PIPE_NONBLOCK) && SwooleG.main_reactor)
    {
        return SwooleG.main_reactor->write(SwooleG.main_reactor, pipefd, buf, n);
    }
    else
    {
        ret = swSocket_write_blocking(pipefd, buf, n);
    }

    return ret;


}

tasker 進程並無 reactor 事件循環,只會阻塞在某個系統調用中,若是 tasker 進程採用的是 unix socket 進行投遞任務的時候,就會阻塞在對管道的 read 當中。

static int swProcessPool_worker_loop(swProcessPool *pool, swWorker *worker)
{
    ...
    
    while (SwooleG.running > 0 && task_n > 0)
    {
        ...
        
        else
        {
            n = read(worker->pipe_worker, &out.buf, sizeof(out.buf));
            if (n < 0 && errno != EINTR)
            {
                swSysError("[Worker#%d] read(%d) failed.", worker->id, worker->pipe_worker);
            }
        }
        
        ...
    }

    ...
}

原文地址:http://www.javashuo.com/article/p-aperambe-bh.html

相關文章
相關標籤/搜索