前10種限於同一臺主機的兩個進程之間的IPC
shell
實現機制:
管道是由內核管理的一個緩衝區
緩存
管道的建立
服務器
int pipe(int fd[2]);
管道的關閉由系統負責
數據結構
當管道的一端被關閉後,下列規則起做用:
1.當讀一個寫端已被關閉的管道時,在全部數據都被讀取後,read返回0,以指示達到了文件結束處. 當讀一個沒有數據的管道時,read阻塞
2.若是寫一個讀端已被關閉的管道,則產生信號SIGPIPE。
若是忽略該信號或者捕捉該信號並從其處理程序返回,則write出錯返回,errno設置爲EPIPE。
函數
注意
1. 在寫管道時,常數PIPE_BUF規定了內核中管道緩存器的大小。
2. 若是對管道進行write調用,並且要求寫的字節數小於等於PIPE_BUF,則此操做不會與其餘進程
對同一管道(或FIFO)的write操做穿插進行
3. 但,如有多個進程同時寫一個管道(或FIFO),並且某個或某些進程要求寫的字節數超過PIPE_BUF字節數,
則數據可能與其餘寫操做的數據相穿插。
佈局
過濾器程序
性能
#include <stdio.h> FILE* popen(const char* cmdstring, const char* type) int pclose(FILE* fp);
函數popen:
先執行fork,而後調用 exec 以執行 cmdstring,而且返回一個標準I/O文件指針。
ui
注意
1. popen之中調用了fork函數,會出現與system同樣的問題:
調用popen的進程當心使用waitpid,以及設置SIGCHLD的信號處理函數
spa
過濾程序從標準輸入讀取數據,對其進行適當處理後寫到標準輸出。幾個過濾進程一般在shell管道命令中線性地鏈接。
當同一個程序產生某個過濾程序的輸入,同時又讀取該過濾程序的輸出時,則該過濾程序就成爲協同進程(coprocess)。
指針
// kill child first, when write to child, parent receives the signal static void sigpipe_handler(int sig) { fprintf(stderr, "SIGPIPE received from child.\n"); exit(1); }
int main(int argc, char* argv[]) { struct sigaction siga; siga.sa_handler = sigpipe_handler; sigemptyset(&siga.sa_mask); siga.sa_flags = SA_RESTART; if (sigaction(SIGPIPE, &siga, NULL) < 0){ fprintf(stderr, "sigaction err : %s\n", strerror(errno)); exit(-1); }
int p2c[2],c2p[2]; if (pipe(p2c) < 0 || pipe(c2p) < 0){ fprintf(stderr, "pipe err : %s\n", strerror(errno)); exit(-1); }
pid_t pid; if ((pid = fork()) < 0){ fprintf(stderr, "fork err : %s\n", strerror(errno)); exit(-1); } else if (pid == 0) { close(p2c[1]); close(c2p[0]); if (p2c[0] != STDIN_FILENO) { if (dup2(p2c[0],STDIN_FILENO) != STDIN_FILENO) { fprintf(stderr, "dup2 err : %s\n", strerror(errno)); exit(-1); } } if (c2p[1] != STDOUT_FILENO) { if (dup2(c2p[1],STDOUT_FILENO) != STDOUT_FILENO){ fprintf(stderr, "dup2 err : %s\n", strerror(errno)); exit(-1); } } if (execl("./child", "child", (char*)0) < 0) { fprintf(stderr, "execl err : %s\n", strerror(errno)); exit(-1); }
} else { close(c2p[1]); close(p2c[0]); char buf[BUFSIZ]; while (fgets(buf, BUFSIZ, stdin) != NULL) { if ( write(p2c[1], buf, strlen(buf)) < 0){ fprintf(stderr, "write err : %s\n", strerror(errno)); exit(-1); } int n; if (( n = read(c2p[0], buf, BUFSIZ)) < 0){ fprintf(stderr, "read err : %s\n", strerror(errno)); exit(-1); } buf[n] = '\0'; printf ("the answer is : %s\n", buf); } } return 0; }
-------------------------------------code A-------------------------------------------
如下代碼通過編譯生成的可執行文件的路徑名稱, 就是上面代碼的child
#include "apue.h" int main(){ int n, int1,int2; char line[MAXLINE]; /* if (setvbuf(stdin, NULL, _IOLBF, 0) != 0) err_sys("setvbuf error") ; if (setvbuf(stdout, NULL, _IOLBF, 0)!= 0) err_sys("setvbuf error") ; */ while ((n = read(STDIN_FILENO, line, MAXLINE)) > 0){ /*deal with the line*/ if (write(STDOUT_FILENO, line, n) != n) err_sys("write err."); } exit(0); }
-------------------------------------code B-------------------------------------------
注意:
若code A 調用 Code B,則它再也不工做。問題出在系統默認的標準I/O緩存機制上。
當B被調用時,對標準輸入的第一個fgets引發標準I/O庫分配一個緩存,並選擇緩存的類型。
由於標準輸入是個管道,因此isatty爲假,因而標準I/O庫由系統默認是全緩存的。
對標準輸出也有一樣的處理。當B從其標準輸入讀取而發生堵塞時,A從管道讀時也發生堵塞,因而產生了死鎖。
解決方法:
將管道設置爲行緩存,如Code B中被註釋掉的代碼所示
管道只能由相關進程使用. 但經過FIFO,不相關的進程也能交換數據。
建立FIFO
int mkfifo(const char* path, mode_t mode); int mkfifoat(int fd, const char* path, mode_t mode);
int main() { if (access(PIPE_PATH, F_OK) < 0) { if (errno == ENOENT) { if (mkfifo(PIPE_PATH, 0777) < 0) { fprintf(stderr, "mkfifo err : %s\n", strerror(errno)); exit(-1); } } else { fprintf(stderr, "access err : %s\n", strerror(errno)); exit(-1); } } int rfd = open(PIPE_PATH, O_RDONLY | O_NONBLOCK); if (rfd < 0) { fprintf(stderr, "open err : %s\n", strerror(errno)); exit(-1); } int n; char buf[1024]; while (( n = read(rfd, buf, sizeof (buf))) < 0) { if (errno == EAGAIN){ continue; } fprintf(stderr, "read err : %s\n", strerror(errno)); exit(-1); } buf[n] = '\0'; printf("%s\n", buf); return 0; }
FIFO的關閉由系統負責
FIFO有兩種用途:
1.FIFO由shell命令使用以便將數據從一條管道線傳送到另外一條,爲此無需建立中間臨時文件。
mkfifo fifo1 prog3 < fifo1 & prog1 < infile | tee fifo1 | prog2
2.FIFO用於客戶機-服務器應用程序中,以在客戶機和服務器之間傳遞數據。
注意
1. client一次發送的請求長度必須小於PIPE_BUF,避免屢次寫的交叉
2. client必須在請求中包含自身的ID,從而server知道將replies發送給哪一個client
3. 當客戶進程個數變爲0時,server將在well-known的FIFO讀到一個文件結束標誌。
解決方法:server以讀寫方式打開well-known的FIFO
XSI IPC: 消息隊列, 信號量, 共享存儲
標識符(int)是IPC對象的內部名, 鍵(key_t)是該對象的外部名稱
有多種方法使客戶機和服務器在同一IPC結構上會合:
--1. 服務器能夠指定關鍵字IPC_PRIVATE建立一個新IPC結構,將返回的標識符存放在某處以便客戶機取用。
關鍵字IPC_PRIVATE保證服務器建立一個新IPC結構。
缺點:
服務器要將整型標識符寫到文件中,而後客戶機在此後又要讀文件取得此標識符。
IPC_PRIVATE關鍵字也可用於父、子關係進程。父進程指定IPC_PRIVATE建立一個新IPC結構,
所返回的標識符在fork後可由子進程使用。子進程可將此標識符做爲exec函數的一個參數傳給一個新程序。
--2. 在一個公用頭文件中定義一個客戶機和服務器都承認的關鍵字。而後服務器指定此關鍵字建立一個新的IPC結構。
這種方法的問題是該關鍵字可能已與一個IPC結構相結合,在此狀況下,get函數(msgget、semget或shmget)出錯返回。
服務器必須處理這一錯誤,刪除已存在的IPC結構,而後試着再建立它。
--3. 客戶機和服務器認同一個路徑名和課題ID(課題ID是0~255之間的字符值),而後調用函數ftok將這兩個值
變換爲一個關鍵字。而後在方法2中使用此關鍵字。
key_t ftok(const char* path, int id); // path:現有文件路徑
注意 :
1. 對於不一樣文件的兩個路徑名若使用同一個項目ID,ftok可能會產生相同的鍵
建立IPC結構的注意事項
三個get函數(msgget、semget和shmget)都有兩個相似的參數key和一個整型的flag。
如若知足下列條件,則建立一個新的IPC結構(一般由服務器建立):
--1. key是IPC_PRIVATE,或
--2. key未與特定類型的IPC結構相結合,flag中指定了IPC_CREAT位。
注意:
爲訪問現存的隊列(客戶機),key必須等於建立該隊列時所指定的關鍵字,而且不該指定IPC_CREAT。
爲了訪問一個現存隊列,毫不能指定IPC_PRIVATE做爲關鍵字。由於這是一個特殊的鍵值,它老是用於建立一個新隊列。
爲了訪問一個用IPC_PRIVATE關鍵字建立的現存隊列,必定要知道與該隊列相結合的標識符,而後在其餘IPC調用中·使用該標識符。
若是但願建立一個新的IPC結構,保證不是引用具備同一標識符的一個現行IPC結構,
那麼必須在flag中同時指定IPC_CREAT和IPC_EXCL位。這樣作了之後,若是IPC結構已經存在就會形成出錯,
返回EEXIST,這與指定了O_CREAT和O_EXCL標誌的open相相似
struct ipc_perm { uid_t uid; /* owner’s effective user ID */ gid_t gid; /* owner’s effective group ID */ uid_t cuid; /* creator’s effective user ID */ gid_t cgid; /* creator’s effective group ID */ mode_t mode; /* access modes */ ... };
建立IPC結構時,對以上數據結構進行賦值
mode字段的值對於任何IPC結構都不存在執行許可權。
sysctl 觀察和修改內核配置參數
ipcs -l 顯示IPC相關的限制
缺點
1. IPC結構是在系統範圍內起做用的,沒有訪問計數。
例如,若是建立了一個消息隊列,在該隊列中放入了幾則消息,而後終止,可是該消息隊列及其內容並不被刪除。
它們餘留在系統中直至由某個進程調用讀消息或刪除消息隊列等
與管道相比,當最後一個訪問管道的進程終止時,管道就被徹底地刪除了。
對於FIFO而言,雖然當最後一個引用FIFO的進程終止時其名字仍保留在系統中,
可是留在FIFO中的數據卻在此時所有刪除。
2. 這些IPC結構並不按名字爲文件系統所知。咱們不能用第三、4章中所述的函數來存取它們或修改它們的特性。
爲了支持它們不得不增長了十多個全新的系統調用. 由於這些IPC不使用文件描述符,因此不能對它們使用
多路轉接I/O函數:select和poll。這就使得一次使用多個IPC結構,以及用文件或設備I/O來使用IPC結構很難作到。
優勢
a. 它們是可靠的,b. 流是受到控制的, c. 面向記錄, d. 能夠用非先進先出方式處理。
特色:能夠按非先進先出次序讀消息
數據結構
struct msqid_ds { // message queue id data structure struct ipc_perm msg_perm; /* 權限*/ msgqnum_t msg_qnum; /*消息數量*/ msglen_t msg_qbytes; /* max # of bytes on queue */ pid_t msg_lspid; /* pid of last msgsnd() */ pid_t msg_lrpid; /* pid of last msgrcv() */ time_t msg_stime; /* last-msgsnd() time */ time_t msg_rtime; /* last-msgrcv() time */ time_t msg_ctime; /* last-change time */ ... };
1.獲取已有的message queue id,或新建一個message queue並返回id
#include <sys/msg.h> int msgget(key_t key, int flag);
2. 對隊列進行操做
int msgctl(int msqid, int cmd, struct msqid_ds* buf);
三種操做:
IPC_STAT : 取此隊列的msqid_ds結構,並將其存放在buf指向的結構中
IPC_SET : 按由buf指向的結構中的值,設置與此隊列相關的結構中的四個字段. 只有超級用戶才能增長msg_qbytes的值
IPC_RMID : 從系統中刪除該消息隊列以及仍在該隊列上的全部數據。這種刪除當即生效。
仍在使用這一消息隊列的其餘進程在它們下一次試圖對此隊列進行操做時,將出錯返回EIDRM。
此命令只能由下列兩種進程執行:一種是其有效用戶ID等於msg_perm.cuid或msg_perm.uid;另外一種是具備超級用戶特權的進程。
3.發送消息
int msgsnd (int msqid, const void* ptr, size_t nbytes, int flag);
每一個消息都由三部分組成,它們是:正長整型的類型字段、非負長度以及實際數據。消息老是放在隊列尾端。
ptr能夠指向這樣的數據結構
struct msg_struct{ long msg_type;// 自定義 struct{ /*content*/ }; };
3.1. flag的值能夠指定爲IPC_NOWAIT。這相似於文件I/O的非阻塞I/O標誌。
3.2. 若消息隊列已滿,則指定IPC_NOWAIT使得msgsnd當即出錯返回EAGAIN。 若是沒有指定IPC_NOWAIT,則進程阻塞直到:
---a. 空間能夠容納要發送的消息
---b. 從系統中刪除了此隊列。errno設置爲EIDRM
---c. 捕捉到一個信號,並從信號處理程序返回。errno設置爲EINTR。
4.接收消息
ssize_t msgrcv (int msqid, void* ptr, size_t nbytes, long type, int flag);
4.1 如同msgsnd中同樣,ptr參數指向一個長整型數(返回的消息類型存放在其中),跟隨其後的是存放實際消息數據的緩存。
4.2 nbytes說明數據緩存的長度。若返回的消息大於nbytes,並且在flag中設置了MSG_NOERROR,則該消息被截短
4.3 若是沒有設置這一標誌,而消息又太長,則出錯返回E2BIG(消息仍留在隊列中)。
4.4 參數type使咱們能夠指定想要哪種消息:
---a. type == 0 返回隊列中的第一個消息。
---b. type > 0 返回隊列中消息類型爲type的第一個消息。
---c. type < 0 返回隊列中消息類型值小於或等於type絕對值,並且在這種消息中,其類型值又最小的消息。
非零type用於以非先進先出次序讀消息。例如,若應用程序對消息賦優先權,那麼type就能夠是優先權值。
若是一個消息隊列由多個客戶機和一個服務器使用,那麼type字段能夠用來包含客戶機進程ID。
4.5 能夠指定flag值爲IPC_NOWAIT,使操做不阻塞。這使得若是沒有所指定類型的消息,則msgrcv出錯返回ENOMSG。
4.6 若是沒有指定IPC_NOWAIT,則進程阻塞直至:
---a. 有了指定類型的消息
---b. 從系統中刪除了此隊列(出錯返回EIDRM)
---c. 捕捉到一個信號並從信號處理程序返回(出錯返回EINTR)
a.信號量定義爲含有一個或多個信號量值的集合。當建立一個信號量時,指定集合的各個值
b. 建立信息量(semget)與對其賦初值(semctl)分開。這是一個致命的弱點。
由於不能原子地建立一個信號量集合,而且對該集合中的全部值賦初值。
c. 進程在終止時並無釋放已經分配給它的信號量。下面將要說明的UNDO功能就是要處理這種狀況的。
數據結構
struct semid_ds{ struct ipc_perm sem_perm; /*see Section 14.6.2*/ ushort sem_nsems; /*#of semaphores in set 信號量集合的大小*/ time_t sem_otime; /*last-semop() time*/ time_t sem_ctime; /*last-change time*/ ... };
獲取一個信號量ID
int semget (key_t key, int nsems, int flag);/*nsems: 信號量的數量*/
若是是建立新集合(服務器中),則必須指定nsems。若是引用一個現存的集合(客戶機),則將nsems指定爲0。
int semctl (int semid, int semnum, int cmd, .../* union semun arg */);
semnum :[0, nsems - 1]
cmd: 見【apue 中文第三版 P457】, 共十種命令
最後一個參數是個聯合(union),而非指向一個聯合的指針。暫略
信號集操做
int semop (int semid, struct sembuf semoparray[], size_t nops);
struct sembuf{ unsigned short sem_num; short sem_op; // 操做 short sem_flg; // IPC_WAIT, SEM };
不管什麼時候只要爲信號量操做指定了SEM_UNDO標誌,而後分配資源(sem_op值小於0)
那麼內核就會記住對於該特定信號量,分配給咱們多少資源(sem_op的絕對值)。
當該進程終止時,內核都將檢驗該進程是否還有還沒有處理的信號量調整值,則按調整值對相應量值進行調整。
數據結構
struct shmid_ds { struct ipc_perm shm_perm; /* see Section 15.6.2 */ size_t shm_segsz; /* size of segment in bytes */ //共享內存的大小 pid_t shm_lpid; /* pid of last shmop() */ pid_t shm_cpid; /* pid of creator */ shmatt_t shm_nattch; /* number of current attaches */ time_t shm_atime; /* last-attach time */ time_t shm_dtime; /* last-detach time */ time_t shm_ctime; /* last-change time */ ... };
資源限制
1. 建立一個共享存儲結構或引用一個現有的共享存儲結構
int shmget(key_t key, size_t size, int flag);
建立時指定size, 引用時令size = 0, size爲系統頁長的整數倍
2. 對共享存儲段執行操做
int shmctl(int shmid, int cmd, struct shmid_ds *buf );
cmd : IPC_STAT IPC_SET IPC_RMID SHM_LOCK SHM_UNLOCK
3. 將共享存儲段鏈接到它的地址空間中
void* shmat (int shmid, const void* addr, int flag);
3.1. 若是addr爲0,則此段鏈接到由內核選擇的第一個可用地址上。(通常應指定add r爲0,以便由內核選擇地址。)
3.2. 若是在flag中指定了SHM_RDONLY位,則以只讀方式鏈接此段。不然以讀寫方式鏈接此段。
4. 當對共享存儲段的操做已經結束時,則調用shmdt脫接該段。
int shmdt(const void*addr); // dt : detach
shmdt沒有刪除其標識符以及其數據結構。該標識符仍然存在,直至某個進程調用shmctl特意刪除它。
將struct shmid_ds的shm_nattch減一
5. 特定系統上的存儲區佈局
mmap和共享存儲的異同
1.二者都實現進程間的通訊
2.mmap在不使用MAP_ANONYMOUS標誌時,還涉及到對磁盤文件的讀寫
6. 利用具備共享和robust屬性的互斥鎖管理共享內存
#define SHM_U_R 0400 #define SHM_U_W 0200 #define ERR_SYS(str, err) do{ \ fprintf(stderr, (str)); \ fprintf(stderr, strerror(err)); \ exit(0); \ } while(0) #define ERR_QUIT(str) do{ \ fprintf(stderr, (str)) \ exit(0); \ } while(0) struct ds_t{ int age; char name[32]; char sex; }; struct shm_struct{ pthread_mutex_t shm_mutex; struct ds_t shm_info; }; static void print_ds(const struct ds_t* ds_ptr){ printf("age = %d.\n",ds_ptr->age); printf("sex = %c.\n",ds_ptr->sex); printf("name = %s.\n",ds_ptr->name); }
static void shm_init(struct shm_struct* shm_ptr){ pthread_mutexattr_t mutex_attr; int err; if (( err = pthread_mutexattr_init(&mutex_attr) )!= 0) ERR_SYS("pthread_mutexattr_init failed.",err); if (( err = pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED) )!= 0) ERR_SYS("pthread_mutexattr_setpshared failed.", err); if (( err = pthread_mutexattr_setrobust(&mutex_attr, PTHREAD_MUTEX_ROBUST) )!= 0) ERR_SYS("pthread_mutexattr_setrobust failed.", err); if ((err = pthread_mutex_init(&shm_ptr->shm_mutex, &mutex_attr)) != 0) ERR_SYS("pthread_mutex_init failed.",err); struct ds_t tmp = { 1, "HELLO WORLD", 'M'}; shm_ptr->shm_info = tmp; }
static void shm_lock(struct shm_struct* ptr){ int err; if ((err = pthread_mutex_lock(&ptr->shm_mutex)) < 0){ ERR_SYS("ptread_muetx_lock failed", err); } else if (err == EOWNERDEAD){ if (pthread_mutex_consistent(&ptr->shm_mutex) < 0) ERR_SYS("pthread_mutex_consistent failed", err); else{ pthread_mutex_unlock(&ptr->shm_mutex); shm_lock(ptr); // just in case that another process locks mutex first, then dies again } //printf("success recovered!\n"); } } static void shm_unlock(struct shm_struct* ptr){ int err; if ((err = pthread_mutex_unlock(&ptr->shm_mutex)) < 0) ERR_SYS("ptread_muetx_lock failed", err); }
int main(){ size_t shm_size = sizeof (struct shm_struct); int shm_id; if ( ( shm_id = shmget(IPC_PRIVATE, shm_size, SHM_U_R | SHM_U_W)) < 0 ) ERR_SYS("shmget failed.",errno); struct shm_struct* addr; if ( ( addr = (struct shm_struct*) shmat(shm_id, 0, 0)) < 0 ) ERR_SYS("shmat failed.",errno); shm_init(addr); shm_lock(addr); // then, parent process dies before unlocks share memory pid_t pid; if ((pid = fork()) < 0 ){ ERR_SYS("fork failed",errno); }else if (pid == 0){ struct shm_struct* addr; if ( ( addr = (struct shm_struct*)shmat(shm_id, 0, 0)) < 0 ) ERR_SYS("shmat failed.",errno); shm_lock(addr); print_ds(&addr->shm_info); shm_unlock(addr); exit(0); } return 0; }
共享存儲可由不相關的進程使用。可是,若是進程是相關的,則SVR4提供了一種不一樣的方法
設備/dev/zero在讀時,是0字節的無限資源。此設備接收寫向它的任何數據,但忽略此數據。
咱們對此設備做爲IPC的興趣在於,當對其進行存儲映射時,它具備一些特殊性質:
a. 建立一個未名存儲區,其長度是mmap的第二個參數,將其取整爲系統上的最近頁長。存儲區都初始化爲0。
b. 若是多個進程的共同祖先進程對mmap指定了MAP_SHARED標誌,則這些進程可共享此存儲區。
int main(int argc, char* argv[]){ const char buf[] = "/dev/zero"; int fd; if ((fd = open(buf, O_RDWR)) < 0) ERR_SYS("open failed:",errno); struct shm_struct* shm; size_t shm_size = sizeof (struct shm_struct); if ((shm = (struct shm_struct*)mmap(0, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)) == (struct shm_struct*)MAP_FAILED){ ERR_SYS("mmap failed:", errno); } close(fd); // can close it shm_init(shm); pid_t pid; if ((pid = fork()) < 0){ ERR_SYS("fork err:", errno); }else if (pid == 0 ){ print_ds(&shm->shm_info); if (munmap(shm, shm_size) < 0) ERR_SYS("munmap failed:", errno); exit(0); } return 0; }
相比較"XSI信號量:
POSIX信號量:性能更好、接口使用更簡單、在刪除的表現更好
#include <semaphore> sem_t* sem_open(const char*name, int oflag, mode_t mode, unsigned int value);
其餘暫略
1. 學會使用管道和FIFO,由於在大量應用程序中仍可有效地使用這兩種基本技術。
2. 在新的應用程序中,要儘量避免使用消息隊列以及信號量,而應當考慮管道和記錄鎖
3. 由於它們與UNIX內核的其餘部分集成得要好得多。
4. 共享存儲段有其應用場合,而mmap函數則可能在之後的版本中起更大做用。