IPC研究(2) --- pipes

==================================================================== IPC ---- Pipes Related system calls: popen  (high-level, implemented by actually invoking shell and then command) pclose (redirect shell commands input and output, you can use these two calls to write tests) pipe exec dup mkfifo unlink Basics: redirect input and output (parent process) -->file_pipes[1] --> [PIPE] --> file_pipes[0] --> (child process) The child process may use the exec call to become a different program. You've got to pass the file descriptor as a parameter to the new program. pipe is unidirectional By default, file descriptors remain open across an execve().  File descriptors that  are  marked  close-on-exec are  closed;  see  the description of FD_CLOEXEC in fcntl(2).  也就是說,用execve或者exec-like functions來替換掉原進程(A)執行新進程(B)時,A的fd會被保留下來。此時,新進程(B)可使用這些fd。好比Example 1中,子進程被替換掉後,新進程其實是有子進程的fd的,之因此要傳參數給新進程,是由於它沒法知道哪一個fd是pipe的read end。若是事先知道的話,是不須要傳參數的;好比,咱們知道此程序中pipe的read end是3,那麼在procB.c中,read(fd, buf, BUFSIZ);改爲read(3, buf, BUFSIZ);也是徹底正確的。 在全部的fd中,有3個比較特別,0,1,2;它們的語義是預約義的,分別爲標準輸入,標準輸出和標準錯誤輸出。 由以上兩點,咱們能夠想到,若是利用dup函數,複製一個fd,而dup本身產生的fd是0(咱們只要先close(0)就能夠了),那麼,此時標準輸入實際上是從這個fd指向的文件(inode)中來讀取東西。若是以後,咱們用execve來替換掉這個進程執行新進程,那麼新進程的標準輸入,實際上是fd指向的文件。若是這個文件是一個pipe,那麼新進程的標準輸入其實就是pipe的read end。 由上,咱們能夠在不改變原來某些程序的狀況下,編寫新程序(pipe,fork, close, dup, execve),來創建一個pipe。 新程序 --》 (pipe) --》 原程序 (note:每一個進程有獨立的地址空間和fd table,因此此時若是系統運行其餘程序,打開一個文件,其fd依然是3,只是這個fd指向的是系統中另一個inode)。 mkfifo filename (create a named pipe in command line) 命名管道其實是在系統的file system中建立了一個named file,它的類型是pipe。因爲此FIFO存在於file system中,不相關的程序就能夠經過它來進行數據交互(不像unnamed pipe對數據交互雙方有很強的耦合要求。) 注意,管道是單向的。[procA] --- named pipe ---> [procB] 若是想讓數據雙向傳輸,有如下兩種方法: 1. [procA] --- named pipe1 ---> [procB]    [procA] <---named pipe2 ---- [procB] 2. [procA] --- named pipe1 ---> [procB]     雙方關閉管道,從新打開以下    [procB] --- named pipe1 ---> [procA] 咱們一般使用方法1. 注意,打開fifo是可能會block的,若是沒有O_NONBLOCK標誌的話。 若是有多個writer和一個reader,會出現什麼狀況呢? 【procA】| 【procB】||=== named pipe ===> 【procD】 【procC】| ABC的write request就可又能會交錯。好比A的一個write request寫了一半,B的write request就寫進來了。 如何避免這種狀況呢?讓write request < PIPE_BUF字節,系統就能保證,每一個write行爲都是atomic的。 Analysis: ==> Unnamed Pipes pipe IPC機制總算是比signal要強大一點。由於它能夠傳輸的信息量比較大,而不只僅只是一個integer。 可是,unnamed pipe的IPC機制的使用頗有限制,由於它本質上是經過一對fd來進行通信的。因此,它的應用基本只能侷限在如下兩種狀況: 1. 父子進程,由於子進程有parent進程的fd拷貝。 2. 子進程exec成另一個進程,同時經過arg將fd傳遞給新進程。 通訊雙方的耦合程度不言而喻,基本上比signal還要差。由於signal還有辦法經過程序名字得到進程名,從而能夠經過kill調用來進行通信。pipe IPC機制而言,你如何去獲得fd?/proc底下某進程有一堆fd,你咋知道哪一個是哪一個?若是進程A只有一對pipe fd,那麼在/proc目錄下還能夠經過readlink來進行判斷,那兩個fd屬於pipe;但要是A有好幾個pipe呢?因此,unnamed pipe的應用場景,基本上就是以上兩種。 3. pipe做爲標準輸入輸出使用,達到如下效果【新程序 --》 (pipe) --》 原程序】。分析見上。 (注意:這個應用場景頗有用!好比,咱們本身寫了一個程序,從標準輸入讀東西來進行處理的,那麼咱們能夠用以上方法,創建pipe在另一個程序中,使其輸入自動化而且能夠對輸入進行控制。同理,咱們能夠用dup來替換掉標準輸出,從而對某些程序的輸出進程自動化控制和分析。) 不過pipe這種單刀直入的思想,在不少應用場景下,頗有效果。 ==> Named Pipes Named Pipe機制比unnamed pipe機制要強大一點。緣由是它的reader和writer的耦合程度相對較低(只須要知道fifo的存在位置,如/tmp/my_fifo)。另外,從上面的分析能夠知道,若是多個writer的write request的字節數少於PIPE_BUF的話,每一個write行爲都是atomic的。這一點,使named pipe能夠做爲client/server的通信。 由此,named pipe的應用場景要比unnamed pipe要廣。 能夠以下 [client1] | [client2] | ---- serv_pipe ----> [server] [client3] |               | --- cli_pipe_1 --> [client1] [server] ---- | --- cli_pipe_2 --> [client2]               | --- cli_pipe_3 --> [client3] 只要定義好協議格式就能夠進行處理了。 另外,open pipe若是沒有O_NONBLOCK,是會block的。 好比open(pipe_fd, O_RDONLY);會一種block到另一個進程open(pipe_fd, O_WRONLY). 同理open(pipe_fd, O_WRONLY);會一種block到另一個進程open(pipe_fd, O_RDONLY). 因此,在寫client/server時要注意對pipe操做的順序,以避免發生兩邊都blcok了。             Examples: Example 1 ---- unamed pipes (procA sends some data to procB) /**  * procA.c  *  * write to pipe  **/ #include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <string.h> void main() {     int file_pipes[2];     const char some_data[] = "Hello! I'm process A!";     char buf[BUFSIZ+1];     pid_t pid;     int ret;     memset(buf, 0, sizeof(buf));     ret = pipe(file_pipes);     if (ret < 0)     {         perror("create pipe failed");         exit(-1);     }     else            /* ret == 0 */     {         pid = fork();         if (pid <0)         {             perror("fork() failed");             exit(-1);         }         else if (pid == 0) /* child process */         {             sprintf(buf, "%d", file_pipes[0]);             execl("procB", "procB", buf, NULL);             exit(-1); /* we should not get here */         }         else        /* parent process */         {             write(file_pipes[1], some_data, strlen(some_data));             printf("[%d] ---- write finished! \n", getpid());         }     } } /**  * procB.c  *  * read from a pipe, display its contents  **/ #include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <string.h> void main(int argc, char *argv[]) {   int fd;   char buf[BUFSIZ+1];   memset(buf, 0, sizeof(buf));   sscanf(argv[1], "%d", &fd);   read(fd, buf, BUFSIZ);   printf("[%d] ---- read data finish: %s \n", getpid(), buf); } Example 2 ---- pipes as standard input and standard output /**  * pipe_as_stdin.c  * procA --[pipe] --> procB  * procA passes a string to procB through a pipe  * procB is 'wc' program, it receives the string as if it was passed in from stdin  **/ #include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <string.h> void main() {     char data[] = "Hello! This is a string to be passed to wc as an input!";     int file_pipes[2];     pid_t pid;     int ret;     ret = pipe(file_pipes);     if (ret < 0)     {         perror("create pipes failed\n");         exit(-1);     }     else     {         pid = fork();         if (pid < 0)         {             perror("fork failed");             exit(-1);         }         else if (pid == 0) /* child process */         {             close(0);             dup(file_pipes[0]);             close(file_pipes[1]);             close(file_pipes[0]);             execlp("wc", "wc", NULL);             exit(-1); /* we should never get here */         }         else         /* parent process */         {             close(file_pipes[0]);             write(file_pipes[1], data, sizeof(data));             close(file_pipes[1]);             printf("[%d] ---- write finished!\n", getpid());         }     } } /**  * pipe_as_stdout.c  * analyse how many hidden files there are in this directory  * 'ls -a' --[pipe]--> my program  **/ #include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <string.h> void main() {     char buf[BUFSIZ+1];     int file_pipes[2];     pid_t pid;     int ret;     ret = pipe(file_pipes);     if (ret < 0)     {         perror("create pipes failed\n");         exit(-1);     }     else     {         pid = fork();         if (pid < 0)         {             perror("fork failed");             exit(-1);         }         else if (pid == 0) /* child process */         {             close(1);             dup(file_pipes[1]);             close(file_pipes[0]);             close(file_pipes[1]);             execlp("ls", "ls", "-a", NULL);             exit(-1); /* we should never get here */         }         else         /* parent process */         {             close(file_pipes[1]);             memset(buf, 0, sizeof(buf));             read(file_pipes[0], buf, BUFSIZ);             close(file_pipes[0]);             printf("[%d] ---- read finished -- \n %s \n", getpid(), buf);         }     } } Example 3 --- name pipes (client/server using fifo) #ifndef _CS_PIPE_COMMON_H #define _CS_PIPE_COMMON_H #include <unistd.h> #include <stdlib.h> #include <string.h> #include <stdio.h> #include <fcntl.h> #include <limits.h> #include <sys/types.h> #include <sys/stat.h> #include <ctype.h> #define SERVER_FIFO_NAME "/tmp/serv_fifo" #define CLIENT_FIFO_NAME "/tmp/cli_%d_fifo" #define ACTION_LEN 32 #define DATA_LEN 1024 #define BUFFER_SIZE PIPE_BUF struct cs_packet_t {     pid_t cli_pid;     char action[ACTION_LEN];     char data[DATA_LEN]; }; #endif /**  * client.c  *  * client/server using named pipes (FIFO)  **/ #include "common.h" static void print_pkt(struct cs_packet_t *pkt) {     printf("==========packet=================\n");     printf("client pid = %d \n", pkt->cli_pid);     printf("client action = %s \n", pkt->action);     printf("client data = %s \n", pkt->data);     printf("\n"); } void main() {     struct    cs_packet_t pkt;     pkt.cli_pid = getpid();     /* fifo initialization */     int serv_fd;     int cli_fd;     serv_fd = open(SERVER_FIFO_NAME, O_WRONLY);     if (serv_fd < 0)     {         perror("open server fifo failed");         exit(-1);     }     char client_fifo[32];     sprintf(client_fifo, CLIENT_FIFO_NAME, pkt.cli_pid);     int ret = mkfifo(client_fifo, 0777);     if (ret < 0)     {         perror ("create client fifo failed");         exit(-1);     }     /* loop to send the server uppercase requests and down case requests */     for (;;)     {         printf("<<debugging client>> --- in for loop\n");         sprintf(pkt.action, "upcase");         sprintf(pkt.data, "Data From Client %d", pkt.cli_pid);         write(serv_fd, &pkt, sizeof(pkt));         printf("sent packet finished: \n");         print_pkt(&pkt);         cli_fd = open(client_fifo, O_RDONLY); /* open client fifo for result */         struct cs_packet_t ret_pkt;         int read_res = read(cli_fd, &ret_pkt, sizeof(ret_pkt));         if (read_res > 0)         {             printf("receive packet form server: \n");             print_pkt(&ret_pkt);         }         close(cli_fd);         sleep(2);         sprintf(pkt.action, "downcase");         sprintf(pkt.data, "Data From Client %d", pkt.cli_pid);         write(serv_fd, &pkt, sizeof(pkt));         printf("sent packet finished: \n");         print_pkt(&pkt);         cli_fd = open(client_fifo, O_RDONLY); /* open client fifo for result */         read_res = read(cli_fd, &ret_pkt, sizeof(ret_pkt));         if (read_res > 0)         {             printf("receive packet form server: \n");             print_pkt(&ret_pkt);         }         close(cli_fd);         sleep(2);     } } /**  * server.c  *  * client/server using named pipes (FIFO)  **/ #include "common.h" #include <pthread.h> static void print_pkt(struct cs_packet_t *pkt) {     printf("==========packet=================\n");     printf("client pid = %d \n", pkt->cli_pid);     printf("client action = %s \n", pkt->action);     printf("client data = %s \n", pkt->data);     printf("\n"); } /**  * function: thread_handle_requests  *  * thread which handles client requests, sends back the result to client through cli_pipe_%d pipe and then exits  * client1 --(pkt1)--> |                  | --> pkt1 (thread1)  * client2 --(pkt2)--> | --> serv_buf --> | --> pkt2 (thread2)  * client3 --(pkt3)--> |                  | --> pkt3 (thread3)  * This is not a very good design. For practical server, we should queue the requests.  * N threads may loop to take the requests out of the queue and then handles it.  * This avoids the overhead of thead creation and destruction.  **/ void *thread_handle_requests(void *arg) /* arg is of type struct cs_packet_t */ {     struct cs_packet_t *pkt = (struct cs_packet_t*)malloc(sizeof(struct cs_packet_t));     if (pkt == NULL)     {         perror("Not Enough Memory In Heap!");         exit(-1);     }     memcpy(pkt, arg, sizeof(struct cs_packet_t));     printf("<<debugging server>> --- \n");     print_pkt(pkt);     pid_t cli_pid = pkt->cli_pid;     char *action = pkt->action;     char *data = pkt->data;     char client_fifo[32];    /* name of this client's fifo, cli_fifo_%d */     int client_fifo_fd;     memset(client_fifo, 0, sizeof(client_fifo));     sprintf(client_fifo, CLIENT_FIFO_NAME, cli_pid);     client_fifo_fd = open(client_fifo, O_WRONLY);     if (client_fifo_fd == -1)     {         perror("open client pipe failed");         exit(-1);     }     if (!strcmp(pkt->action, "upcase"))     {         char *tmp_char_ptr = data;         while (*tmp_char_ptr)         {             *tmp_char_ptr = toupper(*tmp_char_ptr);             tmp_char_ptr++;         }         write(client_fifo_fd, pkt, sizeof(struct cs_packet_t));     }     else if (!strcmp(pkt->action, "downcase"))     {         char *tmp_char_ptr = data;         while (*tmp_char_ptr)         {             *tmp_char_ptr = tolower(*tmp_char_ptr);             tmp_char_ptr++;         }         write(client_fifo_fd, pkt, sizeof(struct cs_packet_t));     }     else     {         sprintf(data, "Action %s not supported", pkt->action);         write(client_fifo_fd, pkt, sizeof(struct cs_packet_t));     }     close(client_fifo_fd);     free(pkt);     return NULL; } void main() {     char serv_buf[BUFFER_SIZE];     struct cs_packet_t pkt;     int server_fifo_fd;     pthread_t th;     /* make server fifo */     mkfifo(SERVER_FIFO_NAME, 0777);     server_fifo_fd = open(SERVER_FIFO_NAME, O_RDONLY);     if (server_fifo_fd == -1)     {         perror("create server fifo failed");         exit(-1);     }     /* read from server fifo */     for (;;)     {         int read_res = read(server_fifo_fd, &pkt, sizeof(pkt));         if (read_res > 0)         {             printf("<<debugging server>>----received a packet from %d\n", pkt.cli_pid);             /* create a thread to handle this request */             int ret = pthread_create(&th, NULL, thread_handle_requests, (void*)&pkt);             if (ret < 0)             {                 perror("create thread failed");                 exit(-1);             }         }     } }
相關文章
相關標籤/搜索