管道是進程間通訊 IPC
的最基礎的方式,管道有兩種類型:命名管道和匿名管道,匿名管道專門用於具備血緣關係的進程之間,完成數據傳遞,命名管道能夠用於任何兩個進程之間。swoole
中的管道都是匿名管道。react
在 swoole
中,有三種不一樣類型的管道,其中 swPipeBase
是最基礎的管道,swPipeUnsock
是利用 socketpair
實現的管道,swPipeEventfd
是 eventfd
實現的管道。swoole
並無使用 FIFO
命名管道。segmentfault
Pipe
數據結構無論哪一種類型的管道,其基礎都是 swPipe
,該結構體包含一個具體的 pipe
類 object
,表明着是否阻塞的 blocking
,超時時間 timeout
,還有對管道的操做函數read
、write
、getfd
、close
數組
typedef struct _swPipe { void *object; int blocking; double timeout; int (*read)(struct _swPipe *, void *recv, int length); int (*write)(struct _swPipe *, void *send, int length); int (*getFd)(struct _swPipe *, int master); int (*close)(struct _swPipe *); } swPipe;
swPipeBase
匿名管道swPipeBase
數據結構數據結構很是簡單,就是一個數組,存放着 pipe
的讀端和寫端。值得注意的是,swPipeBase
是半全工的管道,也就是說 pipes[0]
只能用於讀,pipes[1]
只能用於寫。swoole
當多個進程共享這個管道的時候,全部的進程讀取都須要 read
讀端 pipes[0]
,進程寫入消息都要 write
寫端 pipes[1]
。數據結構
所以使用這個匿名管道的時候,通常情形是一個進程只負責寫,另外一個進程只負責讀,只能單向傳遞消息,不能雙向傳遞,不然頗有可能讀到了本身剛剛發送的消息。socket
typedef struct _swPipeBase { int pipes[2]; } swPipeBase;
swPipeBase
的建立建立匿名管道就是調用 pipe
函數,程序自動設置管道爲非阻塞式。函數
int swPipeBase_create(swPipe *p, int blocking) { int ret; swPipeBase *object = sw_malloc(sizeof(swPipeBase)); if (object == NULL) { return -1; } p->blocking = blocking; ret = pipe(object->pipes); if (ret < 0) { swWarn("pipe() failed. Error: %s[%d]", strerror(errno), errno); sw_free(object); return -1; } else { //Nonblock swSetNonBlock(object->pipes[0]); swSetNonBlock(object->pipes[1]); p->timeout = -1; p->object = object; p->read = swPipeBase_read; p->write = swPipeBase_write; p->getFd = swPipeBase_getFd; p->close = swPipeBase_close; } return 0; }
swPipeBase_read
管道的讀因爲匿名管道被設置爲非阻塞式,沒法實現超時等待寫入。若是想要阻塞式的向管道寫入數據,設置必定超時時間,就須要利用 poll
函數。當 pipefd
可讀時,poll
馬上返回,或者達到超時時間。oop
static int swPipeBase_read(swPipe *p, void *data, int length) { swPipeBase *object = p->object; if (p->blocking == 1 && p->timeout > 0) { if (swSocket_wait(object->pipes[0], p->timeout * 1000, SW_EVENT_READ) < 0) { return SW_ERR; } } return read(object->pipes[0], data, length); } int swSocket_wait(int fd, int timeout_ms, int events) { struct pollfd event; event.fd = fd; event.events = 0; if (events & SW_EVENT_READ) { event.events |= POLLIN; } if (events & SW_EVENT_WRITE) { event.events |= POLLOUT; } while (1) { int ret = poll(&event, 1, timeout_ms); if (ret == 0) { return SW_ERR; } else if (ret < 0 && errno != EINTR) { swWarn("poll() failed. Error: %s[%d]", strerror(errno), errno); return SW_ERR; } else { return SW_OK; } } return SW_OK; }
swPipeBase_write
管道的寫入管道的寫入直接調用 write
便可,非阻塞式 IO
會馬上返回結果。ui
static int swPipeBase_write(swPipe *p, void *data, int length) { swPipeBase *this = p->object; return write(this->pipes[1], data, length); }
swPipeBase_getFd
本函數用於獲取管道的讀端或者寫端。this
static int swPipeBase_getFd(swPipe *p, int isWriteFd) { swPipeBase *this = p->object; return (isWriteFd == 0) ? this->pipes[0] : this->pipes[1]; }
swPipeBase_close
關閉管道static int swPipeBase_close(swPipe *p) { int ret1, ret2; swPipeBase *this = p->object; ret1 = close(this->pipes[0]); ret2 = close(this->pipes[1]); sw_free(this); return 0 - ret1 - ret2; }
swPipeEventfd
管道swPipeEventfd
數據結構數據結構中僅僅存放 eventfd
函數返回的文件描述符。
和 pipe
管道不一樣的是,eventfd
只有一個文件描述符,讀和寫都是對這個文件描述符進行操做。
該管道一樣也是隻適用於進程間單向通訊。
typedef struct _swPipeEventfd { int event_fd; } swPipeEventfd;
swPipeEventfd_read
管道的讀取相似於匿名管道,eventfd
也不支持超時等待,所以仍是利用 poll
函數進行超時等待。
因爲 eventfd
多是阻塞式,所以 read
時可能會被信號打斷。
static int swPipeEventfd_read(swPipe *p, void *data, int length) { int ret = -1; swPipeEventfd *object = p->object; //eventfd not support socket timeout if (p->blocking == 1 && p->timeout > 0) { if (swSocket_wait(object->event_fd, p->timeout * 1000, SW_EVENT_READ) < 0) { return SW_ERR; } } while (1) { ret = read(object->event_fd, data, sizeof(uint64_t)); if (ret < 0 && errno == EINTR) { continue; } break; } return ret; }
swPipeEventfd_write
管道的寫入寫入和讀取的過程相似,注意被信號打斷後繼續循環便可。
static int swPipeEventfd_write(swPipe *p, void *data, int length) { int ret; swPipeEventfd *this = p->object; while (1) { ret = write(this->event_fd, data, sizeof(uint64_t)); if (ret < 0) { if (errno == EINTR) { continue; } } break; } return ret; }
swPipeEventfd_getFd
static int swPipeEventfd_getFd(swPipe *p, int isWriteFd) { return ((swPipeEventfd *) (p->object))->event_fd; }
swPipeEventfd_close
關閉管道static int swPipeEventfd_close(swPipe *p) { int ret; ret = close(((swPipeEventfd *) (p->object))->event_fd); sw_free(p->object); return ret; }
swPipeUnsock
管道swPipeUnsock
數據結構不一樣於 pipe
的匿名管道,swPipeUnsock
管道是雙向通訊的管道。
所以兩個進程利用 swPipeUnsock
管道進行通訊的時候,獨佔一個 sock
,也就是說 A
進程讀寫都是用 socks[0]
,B
進程讀寫都是用 socks[1]
,socks[0]
寫入的消息會在 socks[1]
讀出來,反之,socks[0]
讀出的消息是 sock[1]
寫入的,這樣就實現了兩個進程的雙向通訊。
typedef struct _swPipeUnsock { /** * master : socks[1] * worker : socks[0] */ int socks[2]; /** * master pipe is closed */ uint8_t pipe_master_closed; /** * worker pipe is closed */ uint8_t pipe_worker_closed; } swPipeUnsock;
swPipeUnsock
的建立swPipeUnsock
的建立主要是調用 socketpair
函數,protocol
決定了建立的 socket
是 SOCK_DGRAM
類型仍是 SOCK_STREAM
類型。
int swPipeUnsock_create(swPipe *p, int blocking, int protocol) { int ret; swPipeUnsock *object = sw_malloc(sizeof(swPipeUnsock)); if (object == NULL) { swWarn("malloc() failed."); return SW_ERR; } bzero(object, sizeof(swPipeUnsock)); p->blocking = blocking; ret = socketpair(AF_UNIX, protocol, 0, object->socks); if (ret < 0) { swWarn("socketpair() failed. Error: %s [%d]", strerror(errno), errno); sw_free(object); return SW_ERR; } else { //Nonblock if (blocking == 0) { swSetNonBlock(object->socks[0]); swSetNonBlock(object->socks[1]); } int sbsize = SwooleG.socket_buffer_size; swSocket_set_buffer_size(object->socks[0], sbsize); swSocket_set_buffer_size(object->socks[1], sbsize); p->object = object; p->read = swPipeUnsock_read; p->write = swPipeUnsock_write; p->getFd = swPipeUnsock_getFd; p->close = swPipeUnsock_close; } return 0; } int swSocket_set_buffer_size(int fd, int buffer_size) { if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buffer_size, sizeof(buffer_size)) < 0) { swSysError("setsockopt(%d, SOL_SOCKET, SO_SNDBUF, %d) failed.", fd, buffer_size); return SW_ERR; } if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buffer_size, sizeof(buffer_size)) < 0) { swSysError("setsockopt(%d, SOL_SOCKET, SO_RCVBUF, %d) failed.", fd, buffer_size); return SW_ERR; } return SW_OK; }
swPipeUnsock_getFd
函數一樣的獲取管道文件描述符根據 master
來決定。
static int swPipeUnsock_getFd(swPipe *p, int master) { swPipeUnsock *this = p->object; return master == 1 ? this->socks[1] : this->socks[0]; }
swPipeUnsock_close
關閉管道關閉管道就是調用 close
來依次關閉兩個 socket
.
static int swPipeUnsock_close(swPipe *p) { swPipeUnsock *object = p->object; int ret = swPipeUnsock_close_ext(p, 0); sw_free(object); return ret; } int swPipeUnsock_close_ext(swPipe *p, int which) { int ret1 = 0, ret2 = 0; swPipeUnsock *object = p->object; if (which == SW_PIPE_CLOSE_MASTER) { if (object->pipe_master_closed) { return SW_ERR; } ret1 = close(object->socks[1]); object->pipe_master_closed = 1; } else if (which == SW_PIPE_CLOSE_WORKER) { if (object->pipe_worker_closed) { return SW_ERR; } ret1 = close(object->socks[0]); object->pipe_worker_closed = 1; } else { ret1 = swPipeUnsock_close_ext(p, SW_PIPE_CLOSE_MASTER); ret2 = swPipeUnsock_close_ext(p, SW_PIPE_CLOSE_WORKER); } return 0 - ret1 - ret2; }
tasker
模塊當調用 taskwait
函數後,投遞的 worker
進程會阻塞在 serv->task_notify[SwooleWG.id]
管道的讀取中,tasker
模塊處理完畢後,會向 serv->task_notify[source_worker_id]
管道寫入數據。
這個就是 pipe
函數或者 eventfd
建立的匿名管道的用途,用於單向的進程通訊(tasker
進程向 worker
進程傳遞數據)。
static inline int swPipeNotify_auto(swPipe *p, int blocking, int semaphore) { #ifdef HAVE_EVENTFD return swPipeEventfd_create(p, blocking, semaphore, 0); #else return swPipeBase_create(p, blocking); #endif }
worker
模塊manager
負責爲 worker
進程建立 pipe_master
與 pipe_worker
。用於 reactor
線程與 worker
進程直接進行通訊。
int swManager_start(swFactory *factory) { ... for (i = 0; i < serv->worker_num; i++) { if (swPipeUnsock_create(&object->pipes[i], 1, SOCK_DGRAM) < 0) { return SW_ERR; } serv->workers[i].pipe_master = object->pipes[i].getFd(&object->pipes[i], SW_PIPE_MASTER); serv->workers[i].pipe_worker = object->pipes[i].getFd(&object->pipes[i], SW_PIPE_WORKER); serv->workers[i].pipe_object = &object->pipes[i]; swServer_store_pipe_fd(serv, serv->workers[i].pipe_object); } ... }
當 reactor
線程啓動的時候,會將 pipe_master
加入 reactor
的監控當中。
static int swReactorThread_loop(swThreadParam *param) { ... for (i = 0; i < serv->worker_num; i++) { if (i % serv->reactor_num == reactor_id) { pipe_fd = serv->workers[i].pipe_master; swSetNonBlock(pipe_fd); reactor->add(reactor, pipe_fd, SW_FD_PIPE); if (thread->notify_pipe == 0) { thread->notify_pipe = serv->workers[i].pipe_worker; } } } ... }
在 worker
進程中,會將 pipe_worker
做爲另外一端 socket
放入 worker
的 reactor
事件循環中進行監控。
int swWorker_loop(swFactory *factory, int worker_id) { ... int pipe_worker = worker->pipe_worker; swSetNonBlock(pipe_worker); SwooleG.main_reactor->ptr = serv; SwooleG.main_reactor->add(SwooleG.main_reactor, pipe_worker, SW_FD_PIPE | SW_EVENT_READ); SwooleG.main_reactor->setHandle(SwooleG.main_reactor, SW_FD_PIPE, swWorker_onPipeReceive); SwooleG.main_reactor->setHandle(SwooleG.main_reactor, SW_FD_WRITE, swReactor_onWrite); ... }
tasker
進程tasker
進程中管道的建立是 swProcessPool_create
函數完成的。
int swProcessPool_create(swProcessPool *pool, int worker_num, int max_request, key_t msgqueue_key, int ipc_mode) { ... else if (ipc_mode == SW_IPC_UNIXSOCK) { pool->pipes = sw_calloc(worker_num, sizeof(swPipe)); if (pool->pipes == NULL) { swWarn("malloc[2] failed."); return SW_ERR; } swPipe *pipe; int i; for (i = 0; i < worker_num; i++) { pipe = &pool->pipes[i]; if (swPipeUnsock_create(pipe, 1, SOCK_DGRAM) < 0) { return SW_ERR; } pool->workers[i].pipe_master = pipe->getFd(pipe, SW_PIPE_MASTER); pool->workers[i].pipe_worker = pipe->getFd(pipe, SW_PIPE_WORKER); pool->workers[i].pipe_object = pipe; } } ... }
向 tasker
進程發佈任務的時候,會調用 swProcessPool_dispatch
函數,進而會向 pipe_master
管道寫入任務數據。
int swProcessPool_dispatch(swProcessPool *pool, swEventData *data, int *dst_worker_id) { ... ret = swWorker_send2worker(worker, data, sendn, SW_PIPE_MASTER | SW_PIPE_NONBLOCK); ... } int swWorker_send2worker(swWorker *dst_worker, void *buf, int n, int flag) { int pipefd, ret; if (flag & SW_PIPE_MASTER) { pipefd = dst_worker->pipe_master; } else { pipefd = dst_worker->pipe_worker; } ... if ((flag & SW_PIPE_NONBLOCK) && SwooleG.main_reactor) { return SwooleG.main_reactor->write(SwooleG.main_reactor, pipefd, buf, n); } else { ret = swSocket_write_blocking(pipefd, buf, n); } return ret; }
tasker
進程並無 reactor
事件循環,只會阻塞在某個系統調用中,若是 tasker
進程採用的是 unix socket
進行投遞任務的時候,就會阻塞在對管道的 read
當中。
static int swProcessPool_worker_loop(swProcessPool *pool, swWorker *worker) { ... while (SwooleG.running > 0 && task_n > 0) { ... else { n = read(worker->pipe_worker, &out.buf, sizeof(out.buf)); if (n < 0 && errno != EINTR) { swSysError("[Worker#%d] read(%d) failed.", worker->id, worker->pipe_worker); } } ... } ... }