進程間通訊的方式中,咱們將多個進程共享同一塊存儲區來進行數據交換的方式稱爲共享內存通訊。源於它直接將「內存」共享的特殊機制,它成爲最快的一種IPC通訊方式;然而它也不完美,它雖快,可是沒有同步機制;一般在一個服務進程對共享存儲區還未完成寫操做以前,客戶進程是不該當去取這些數據的,可沒了同步,那可就亂了套了。數組
這種狀況不是咱們所願意看到的,因此基於此 咱們經常須要爲用到的共享內存段添加上同步的機制,使之「完美」起來。一般呢,實現同步咱們很天然地會想到信號量,是的,這裏我就是這麼幹的。用基於信號量的PV操做實現完成一個帶同步機制的共享內存段,這樣它就能夠像一個「fifo」隊列同樣了,而且它傳輸效率還會表現得很是不錯,雖然它還比較簡陋~。數據結構
先來講說用到的信號量及處理函數吧。信號量和其它的IPC結構(前面總結過管道、消息隊列)不一樣。它本質就是一個計數器,用於爲多個進程提供對共享數據對象的訪問。一般進程爲了得到共享的資源,須要執行如下操做:iphone
①測試控制該資源的信號量ide
②若此信號量>0,則進程可使用該資源。此種狀況下,進程會將信號量值減1,代表它使用了一個資源單位。函數
③不然,此信號量的值爲0,則使該進程進入休眠狀態,直至信號量值>1。若是有進程正在休眠狀態等待此信號量,則喚醒它們測試
還有就是爲了正確的實現信號量,信號量值的測試及減1操做還應當是原子的。爲此,信號量一般是在內核中實現的。通常而言,信號量初值能夠是任意一個正值,該值代表有多少個共享資源單位可供共享應用。然而遺憾的是,這裏我用到的XSI信號量也是有缺陷的。spa
這源於①信號量並不是是單個非負值,它被定義爲一個可能含有多個信號量值的集合。一般在建立信號量的時候,對該集合中信號量數量進行指定 ②信號量建立獨立於它的初始化。這是最致命的,由於這將致使的是不能原子的建立一個信號量集合,並對該集合中的各個信號量值賦初值。③有的程序1在終止時可能並無釋放掉已經分配給它的信號量。操作系統
而對於信號處理函數一般有3個,首先是3d
1.semget函數指針
做用:建立一個新的信號量或取得一個已有的信號量
原型:int semget(key_t key, int nsems, int semflg)
參數:
int nsems //它表明信號量集合中有多少個信號量。若是是建立新集合,則必須指定它;若是是引用現有的信號集(一般客戶進程中),則將其指定爲0.
int semflg //和前面IPC機制相似,用來初始化信號集維護的semid_ds結構中的ipc_perm結構成員sem_perm。一般用IPC_CREAT|0644建立,要直接打開已存在的話 也直接填0就好
2.semctl函數
用途:該函數用來直接控制信號量信息.也就是直接刪除信號量或者初始化信號量.
原型:int semctl(int semid, int semnum, int cmd, ...)
參數:
int semid //semget函數返回的信號量標識符.
int semnum, //它表明要給該信號量集中的第幾個信號量設置初值
int cmd //一般用SETVAL/GETVAL設置和獲取信號量集中的一個單獨的信號量。具體還有
IPC_STAT //讀取一個信號量集的數據結構semid_ds,並將其存儲在semun中的buf參數中。 IPC_SET //設置信號量集的數據結構semid_ds中的元素ipc_perm,其值取自semun中的buf參數。 IPC_RMID //將信號量集從內存中刪除。 GETALL //用於讀取信號量集中的全部信號量的值。 GETNCNT //返回正在等待資源的進程數目。 GETPID //返回最後一個執行semop操做的進程的PID。 GETVAL //返回信號量集中的一個單個的信號量的值。 GETZCNT //返回這在等待徹底空閒的資源的進程數目。 SETALL //設置信號量集中的全部的信號量的值。 SETVAL //設置信號量集中的一個單獨的信號量的值
若是有第四個參數,取決於所請求的命令,若是使用該參數,它一般是一個union semum結構,定義以下:
union semun { int val; /* Value for SETVAL */一般就要它就夠了 struct semid_ds *buf; unsigned short *arry; };
賦值形式:semun.val = 2 //初值放在val中
執行PV操做
3.semop函數
用途:用來改變信號量的值,該函數是具備原子性的。
原型:int semop(int semid, struct sembuf *sops, size_t nsops)
struct sembuf
{
unsigned short sem_num; /* semaphore number */除非使用一組信號量,不然它爲0
short sem_op; /* semaphore operation */ p -1, v 1
short sem_flg; /* operation flags */ 填 0就好 SEM_NOWAIT SEM_UNDO
}
注意這當中的sem_op參數,信號量集合中的各個成員的操做都應由相應的sem_op值規定。此值可正可負可爲0,相應的值就表明對於進程中佔用的資源數量,
同時這值會加到信號量值上,若指定undo標誌,還從信號量調整值上減去sem_op.
下面就能夠開始操做一段共享內存,使其帶有同步的機制,而後模擬重現咱們操做系統書上的那個經典的生產消費者問題,而且解決它。這裏我畫個圖幫助整理思路:
首先,定義出一個管理內存段的「shmfifo」結構,該結構中具體可用一個shm_head結構來管理數據的讀、寫位置和大小的信息。同時,shimfifo結構中還維護了3個用於解決互斥和同步的信號量sem_mutex,sem_empty, sem_full。
維護的shm_head結構存放在共享內存的頭部,寫入數據從payload處開始寫入一個數據塊大小,每次寫入以後,便更新頭部的wr_idx位,payload可由_head+1獲得;一樣,最開始讀出數據時也是從payload處開始讀,每次讀完便更新wr_idx。好,到這裏就有了大體的思路。因而能夠實現出來它的頭文件
#ifndef __SHMFIFO_H__ #define __SHMFIFO_H__ #include <sys/ipc.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <sys/ipc.h> #include <sys/sem.h> #include <sys/shm.h> typedef struct shmhead { int rd_idx; // 讀入數據索引 int wr_idx; // 寫數據索引 int blocks; // 存數據塊數量 int blksz; // 每一個數據塊大小 }shmhead_t; typedef struct shmfifo { shmhead_t *p_head; // 共享內存的起始地址 char * p_payload; // 有效數據的起始地址 int shmid; // 打開的共享內存id int sem_mutex; // 互斥量 int sem_empty; // 還剩多少個能夠消費
int sem_full; // 剩餘多少個地方能夠生產 }shmfifo_t; // 初始化函數 shmfifo_t *shmfifo_init(key_t key, int blocks, int blksz); // 放入數據 void shmfifo_put(shmfifo_t *fifo, const void *buf); // 取得數據 void shmfifo_get(shmfifo_t *fifo, void *buf); // 結構銷燬 void shmfifo_destroy(shmfifo_t *fifo); #endif //__SHMFIFO_H__
緊接着要考慮就該結構的初始化。首先,確定是先要爲結構開出空間來,其大小不難分析應該爲shm_head大小加上blocks*blksz,其次就是一步步對這些變量和信號進行初始化了。
緊接着,對於放數據和取數據就依葫蘆畫瓢就行了,值得注意就是對信號量的處理,進行PV操做時,針對寫數據時,咱們須要先P(sem_empty)保證先有地方能夠放數據,其次才進行P(sem_mutex)保證互斥性。(不然,會由於在放入數據時進行了P(sem_mutex)操做,在還沒來得及讀數據時,就將內存段放滿,進而使取數據操做阻塞在信號量sem_mutex<0條件上,最終致使死鎖。但這裏若能保證在內存段還未放滿時,讀數據進程能獲得調度,那麼就不會有這樣的問題了;好比,這裏可讓寫數據sleep一會,而後執行讀數據操做或者 先進行讀數據,而後再寫數據;你能夠去試試看~ )
而後放數據完成,即可進行V(sem_full)接着V(sem_mutex)。在進行取數據操做時同理。基於此,即可有如下代碼:
#include "shmfifo.h" typedef union semun{ int val; }semun; // 初始化 shmfifo_t* shmfifo_init(key_t key, int blocks, int blksz) { shmfifo_t *p = malloc(sizeof(shmfifo_t)); int shmid = shmget(key, 0, 0); int len = sizeof(shmhead_t) + blocks*blksz; //共享內存段大小 if(shmid == -1 ) // 內存段不存在,建立 { shmid = shmget(key, len, IPC_CREAT|0644); if ( shmid == -1) perror("shmget"),exit(1); //初始化內存段頭 p->p_head = shmat(shmid, NULL, 0); //將開出的內存段掛載到進程地址空間 p->p_head->rd_idx = 0; p->p_head->wr_idx = 0; p->p_head->blocks = blocks; p->p_head->blksz = blksz; //初始化後段 p->p_payload = (char*)(p->p_head+1); p->shmid = shmid; p->sem_mutex = semget(key, 1, IPC_CREAT|0644); p->sem_empty = semget(key+1, 1, IPC_CREAT|0644); p->sem_full = semget(key+2, 1, IPC_CREAT|0644); semun su = {1}; //設置互斥信號量初值爲1 semctl(p->sem_mutex, 0, SETVAL, su); su.val = blocks; semctl(p->sem_empty, 0, SETVAL, su); su.val = 0; //初始不能消費 semctl(p->sem_full, 0, SETVAL, su); } else //內存段存在 ,打開 { p->p_head = shmat(shmid, NULL, 0); p->p_payload = (char*)(p->p_head+1); p->shmid = shmid; p->sem_mutex = semget(key, 0, 0); // p->sem_empty = semget(key+1, 0, 0); p->sem_full = semget(key+2, 0, 0); } return p; } static void P(int id) { struct sembuf sb[1] = {0,-1, 0}; semop(id, sb, 1); } static void V(int id) { struct sembuf sb[1] = {0, 1, 0}; semop(id, sb, 1); } // 放入數據 void shmfifo_put(shmfifo_t *fifo, const void *buf) { P(fifo->sem_empty); //有多少地方可供生產,確保有空位生產 P(fifo->sem_mutex); //保證進程互斥 memcpy(fifo->p_payload + fifo->p_head->wr_idx * fifo->p_head->blksz, //寫入位置 buf, fifo->p_head->blksz); //每次寫入一個數據塊大小 fifo->p_head->wr_idx = (fifo->p_head->wr_idx+1) %fifo->p_head->blocks; //取模,保證數據存滿時,轉從payload處寫數據 V(fifo->sem_full); V(fifo->sem_mutex); } // 取得數據 void shmfifo_get(shmfifo_t* pFifo, void *buf) { P(pFifo->sem_full); //確保有數據可取 P(pFifo->sem_mutex); //從內存段讀取,拷入buf中 memcpy(buf, pFifo->p_payload + pFifo->p_head->rd_idx* pFifo->p_head->blksz, pFifo->p_head->blksz); pFifo->p_head->rd_idx = (pFifo->p_head->rd_idx+1) %pFifo->p_head->blocks; //取模,保證數據存滿時,轉從payload處取數據 V(pFifo->sem_empty); V(pFifo->sem_mutex); } // 銷燬 void shmfifo_destroy(shmfifo_t* pFifo) { shmdt(pFifo->p_head); //取消內存段掛載 shmctl(pFifo->shmid, IPC_RMID, 0); //釋放掉該內存段 //刪除信號量 semctl(pFifo->sem_mutex, 0, IPC_RMID, 0); semctl(pFifo->sem_empty, 0, IPC_RMID, 0); semctl(pFifo->sem_full, 0, IPC_RMID, 0); free(pFifo); }
最後就是分別實現get.c和put.c進行驗證,
get.c
#include "shmfifo.h" #include <unistd.h> typedef struct Products{ int id; char pro_name[10]; }Pro; int main() { shmfifo_t* fifo = shmfifo_init(12345, 3, sizeof(Pro)); Pro p; while( 1){ memset(&p, 0x00, sizeof(p)); shmfifo_get(fifo, &p); printf("id:%d, 產品名:%s\n", p.id, p.pro_name); sleep(1); } shmfifo_destroy(fifo); }
put.c
#include "shmfifo.h" typedef struct Product { int id; char pro_name[10]; }Pro; int main() { shmfifo_t *fifo = shmfifo_init(12345, 4, sizeof(Pro)); Pro p; for (int i=0; i<20; ++i) { memset(&p, 0x00, sizeof(p)); sprintf(p.pro_name, "iphone%d", i); p.id = i+1; shmfifo_put(fifo, &p); printf("put %d ok\n", i); } }
驗證同步,當寫進程結束,讀數據進程便阻塞
驗證互斥,進程1寫第9個數據時,我另開啓了一進程寫數據,從右側可見這兩進程是交替進行寫操做的