實際上這也是進程之間的兩種關係,在學習這兩種關係以前,須要回顧一下順序程序與併發程序的特徵:算法
①、順序性安全
順序程序執行的順序是按照指令的前後順序來執行的,當前的指令需依賴於前一條指令,並與前一條指令構成了必定的因果關係,後一條指令的執行必定要在前一條指令的基礎之上纔可以運行。bash
②、封閉性:(運行環境的封閉性)服務器
也就是說順序程序在運行過程當中,它的運行環境不會受其它程序的影響。網絡
③、肯定性數據結構
只要給程序必定的輸入,無論程序是運行在比較快的機器上,仍是運行在比較慢的機器上,它必定會有特定的輸出。併發
④、可再現性框架
一個程序在必定時期運行的結果,跟另一個時期運行的結果能夠是同樣的,只要是具備相同的輸入,就必定具備相同的輸出,這跟"肯定性"是頗有關係。socket
①、共享性函數
②、併發性
③、隨機性
下面來正式理解進程同步與進程互斥:
如兩個小孩爭搶同一個玩具,這是一種互斥關係。
下面來舉一個同步的示例:汽車售票
一般狀況下,將這兩種關係統稱爲同步關係。
①、數據傳輸:一個進程須要將它的數據發送給另外一個進程
②、資源共享:多個進程之間共享一樣的資源。
③、通知事件:一個進程須要向另外一個或一組進程發送消息,通知它(它們)發生了某種事件(如進程終止時要通知父進程)。【如:上面說的汽車售票的例子】
④、進程控制:有些進程但願徹底控制另外一個進程的執行(如Debug進程),此時控制進程但願可以攔截另外一個進程的全部陷入和異常,並可以及時知道它的狀態改變。【能夠經過信號的方式來實現,如:SIGTRAP信號】
①、管道
其中匿名管道能夠用於親緣關係的進程之間進行通訊,而有名管道能夠用於不相關的進程之間進行通訊。
②、System V進程間通訊【使用最普遍】
這個是以後發展出來的,以後還會介紹。
③、POSIX進程間通訊
①、文件
這個其實也是進程間通訊的一種試,一個進程向一個文件寫數據,而另一個進程向一個文件讀數據。
②、文件鎖:是爲了互斥和同步用的
③、管道(pipe)和有名管理(FIFO)
④、信號(signal)
⑤、消息隊列
⑥、共享內存
⑦、信號量
其中System V包含:消息隊列、共享內存、信號量
⑧、互斥量
⑨、條件變量
⑩、讀寫鎖
⑪、套接字(socket)
下面來介紹一下System V IPC & POSIX IPC:
因爲這三種共享方式,就出現了三種進程間通訊對象的持續性,以下:
①、隨進程持續:一直存在直到打開的最後一個進程結束。(如pipe和FIFO)
②、隨內核持續:一直存在直到內核自舉或顯式刪除(如System V消息隊列、共享內存、信號量)
③、隨文件系統持續:一直存在直到顯式刪除,即便內核自舉還存在。(POSIX消息隊列、共享內存、信號量若是是使用映射文件來實現)
在上面已經介紹了進程的兩種關係:互斥和同步,死鎖則是進程的另一種關係:
死鎖是指多個進程之間相互等待對方的資源,而在獲得對方資源以前又不釋放本身的資源,這樣,形成循環等待的一種現象。若是全部進程都在等待一個不可能發生的事,則進程就死鎖了。
①、資源一次性分配:(破壞請求和保持條件)
②、可剝奪資源:破壞不可剝奪條件)
③、資源有序分配法:(破壞循環等待條件)
①、預防死鎖的幾種策略,會嚴重地損害系統性能。所以在避免死鎖時,要施加較弱的限制,從而得到較滿意的系統性能。
②、因爲在避免死鎖的策略中,容許進程動態地申請資源。於是,系統在進行資源分配以前預先計算資源分配的安全性。若這次分配不會致使系統進入不安全狀態,則將資源分配給進程;不然,進程等待。
下面來對死鎖進行舉例,其中最具備表明性的避免死鎖算法是銀行家算法。
爲保證資金的安全,銀行家規定:
(1) 當一個顧客對資金的最大需求量不超過銀行家現有的資金時就可接納該顧客;
(2) 顧客能夠分期貸款,但貸款的總數不能超過最大需求量
(3) 當銀行家現有的資金不能知足顧客尚需的貸款數額時,對顧客的貸款可推遲支付,但總能使顧客在有限的時間裏獲得貸款
(4) 當顧客獲得所需的所有資金後,必定能在有限的時間裏歸還全部的資金.
另外還有一個很經典的例子:哲學家就餐問題
1965年,Dijkstra(迪傑斯特拉)提出並解決了一個他稱之爲哲學家就餐的同步問題。從那時起,每一個發明新的同步原語的人都但願經過解決哲學家就餐問題來展現其同步原語的精妙之處。這個問題能夠簡單地描述以下:五個哲學家圍坐在一張圓桌周圍,每一個哲學家面前都有一盤通心粉。因爲通心粉很滑,因此須要兩把叉子才能夾住。相鄰兩個盤子之間放有一把叉子,餐桌如圖2-44所示。
哲學家的生活中有兩種交替活動時段:即吃飯和思考(這只是一種抽象,即對哲學家而言其餘活動都可有可無)。當一個哲學家以爲餓了時,他就試圖分兩次去取其左邊和右邊的叉子,每次拿一把,但不分次序。若是成功地獲得了兩把叉子,就開始吃飯,吃完後放下叉子繼續思考。關鍵問題是:能爲每個哲學家寫一段描述其行爲的程序,且決不會死鎖嗎?(要求拿兩把叉子是人爲規定的,咱們也能夠將意大利麪條換成中國菜,用米飯代替通心粉,用筷子代替叉子。)
哲學家就餐問題解法:
①、服務生解法
哲學家在拿刀叉以前,須要獲得服務生的贊成,也就是服務生是管理者,他在統一的分配刀叉,他在斷定當前的資源是否處於一個安全的狀態,若是資源處於一個安全的狀態,服務生就容許哲學家將叉子拿起,不然就不容許。
②、最多4個哲學家
這不是解決問題的最好方案,由於將咱們的限定條件更改了,4個哲學家有5把叉子勢必有一個哲學家能獲得兩把叉子,這實際上就是一種抽屜原則。
③、僅當一個哲學家兩邊叉子均可用時才容許他拿叉子
④、給全部哲學家編號,奇數號的哲學家必須首先拿左邊的筷子,偶數號的哲學家則反之
信號量和P、V原語由Dijkstra(迪傑斯特拉)提出,其中他有不少貢獻:
①、在程序設計中,提出了goto語句是有害的,因此能夠認爲他是程序設計語言之父。
②、在操做系統,提出了信號量、PV原語。
③、在網絡上,提出了最短路徑。
根據上面的描述,很容易獲得信號量它所擁有的數據結構,以下:
另外還能夠得出PV原語的僞代碼:
用PV原語主要是來解決進程的同步互斥的問題,因此,下面舉例來講明:
有一汽車租賃公司有兩部敞篷車能夠出租,假定同時來了四個顧客都要租敞篷車,那麼確定會有兩我的租不到:
用一個簡單的圖來描述一下:
另外須要注意:必須是同類的資源纔可以進行PV操做,若是一部是敞篷車,一部是普通的汽車,是不可以進行PV來解決同步問題的。
其實還能夠經過管道,可是,管道是基於字節流的,因此一般會將它稱爲流管道,數據與數據之間是沒有邊界的;而消息隊列是基於消息的,數據與數據之間是有邊界的,這是消息隊列跟管道有區別的地方,另一個差異就是在於接收:消息隊列在接收是不必定按先入先出,而管道必定是按照先入先出的原則來進行接收的。
關於這些,能夠經過命令來查看其值,以下:
上次提到過,System_V IPC對象有三種,以下:
這些IPC對象都是隨內核持續的,也就是說當訪問這些對象的最後一個進程結束時候,內核也不會自動刪除這些對象,直到咱們顯示刪除這些對象纔可以從內核中刪除掉,因此說內核必須爲每一個IPC對象維護一個數據結構,其形式以下:
下面來看下消息隊列的具體結構是怎麼樣的:
這裏先學前兩個函數:
下面則用代碼來學習一下該函數:
從圖中能夠看出建立失敗了,這是爲何呢?這時能夠查看其幫助:
實際上msgget函數相似於open函數同樣,若是在open一個文件時沒有指定O_CREATE選項,則不可以建立一個文件,一樣的:
若是建立失敗,則會返回:
因此修改代碼以下:
那建立成功的消息隊列怎麼查看呢?能夠經過以下命令:
若是再次運行呢?
其錯誤代碼是:
可見每運行一次則就建立成功一個新的消息隊列,並且key都是爲0,這意味着兩個進程就沒法共享同一個消息隊列了,可是同一個進程仍是能夠共享的,其實也能夠有一個辦法達到兩個不一樣進程進行共享,就是將消息隊列id保存到文件當中,另外一個進程讀取消息隊列id來得到消息隊列,這樣也能實現共享,只是麻煩一些,這裏就很少贅述了。另外若是key值爲IPC_PRIVATE,那麼沒有IPC_CREATE選項也同樣會建立成功,以下:
另一旦一個消息隊列建立成功以後,若是要打開一個消息隊列,這時候就不用指定IPC_CREATE了,並且參數值能夠直接填0:
接下來刪除一些已經建立的消息隊列,有兩種方式:
那若是像這剩下key全爲0的,用這種方式還能起做用麼,我們來試一下:
下面來講一個權限的問題:
下面來以600更高的權限來打開剛纔建立低權限的消息隊列:
那有木有一種辦法,在打開消息隊列時,就以原建立的權限打開,固然有,也就是打開時不指定權限既可,以下:
上面演示了各類msgget建立用法,下面來用圖來總結一下各個狀況:
接下來學習一下消息隊列的按制函數,以下:
上面已經用命令知道怎麼刪除已經建立的消息隊列了,下面採用代碼來實現消息隊列的刪除:
接下來來獲取消息隊列的信息,這時須要就須要關注第三個參數了,man查看一下:
而其中ipc_perm結構體內容以下:
下面來更改一下消息隊列的狀態,將權限666改成600,具體作法以下:
【說明】:上圖中用到了sscanf函數來將一個指定的字符串賦值給變量,對於scanf函數你們應該都不陌生,它是從標準輸入中賦值變量,而sscanf是將指定的字符串按指定的格式賦給變量,二者的惟一區別就是數據的來源變了,很容易理解。
編譯運行:
下面則開始用代碼來使用一下該發送函數:
在運行以前,先查看一下1234消息隊列是否已經建立:
用上次編寫的查看消息隊列狀態的程序來查看一下此時的狀態:
接下來運行發送消息程序:
接下來再來發送一個消息:
目前發送的字節總數爲300,尚未超過最大字節數msgmnb=16384,下面來看下若是超過了這個字節數,會怎麼樣?因此繼續發送消息:
這是因爲每條消息最大長度是有上限的(MSGMAX),它的上線就等於8192:
這已經在上節中介紹過,因此,將上面的8193改成8192,就不會發送失敗了,以下:
發送這時阻塞了,這是因爲原有的消息隊列中的總字節數8492+要發送的8192已經大於16384(消息隊列總的字節數),默認就會阻塞,也就是發送仍是沒成功,查看一下狀態既可知道:
這時能夠指定一個發送選項來更改阻塞行爲:
可見就不會阻塞了,返回EAGAIN錯誤了。
其中最主要是要了解第四個參數,msgtyp,以下:
下面用程序來驗證下,在運行這個程序時,能夠這樣使用:
要實現這樣的功能,須要用到getopt函數,因此首先須要經過該函數來解析命令參數,下面先來熟悉一下該函數,其實不是太難:
下面運行一下:
下面來解析-n-t選項,修改代碼以下:
關於getopt函數的使用基本就這些,仍是比較簡單,下面則正式開始編寫消息的接收功能:
msg_recv.c:
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while(0)
 struct msgbuf {
   long mtype; /* message type, must be > 0 */
   char mtext[1]; /* message data */
 };
#define MSGMAX 8192//定義一個宏,表示一條消息的最大字節數
int main(int argc, char *argv[])
{
int flag = 0;
int type = 0;
int opt;
while (1)
{
opt = getopt(argc, argv, "nt:");
if (opt == '?')
exit(EXIT_FAILURE);
if (opt == -1)
break;
switch (opt)
{
case 'n':
/*printf("AAAA\n");*/
flag |= IPC_NOWAIT;
break;
case 't':
/*
printf("BBBB\n");
int n = atoi(optarg);
printf("n=%d\n", n);
*/
type = atoi(optarg);
break;
}
}
int msgid;
msgid = msgget(1234, 0);
if (msgid == -1)
ERR_EXIT("msgget");
struct msgbuf *ptr;
ptr = (struct msgbuf*)malloc(sizeof(long) + MSGMAX);
ptr->mtype = type;
int n = 0;
if ((n = msgrcv(msgid, ptr, MSGMAX, type, flag)) < 0)//接收消息
ERR_EXIT("msgsnd");
printf("read %d bytes type=%ld\n", n, ptr->mtype);
return 0;
}
複製代碼
下面再來發送一些消息:
下面來驗證一下當消息類型爲負數時的狀況,先清空消息:
當消息類型爲負時,還有一個特色,以下:
下面來驗證一下,從新發送幾個消息,並來分析接收的順序:
默認狀況下每條消息最大長度是有上限的(MSGMAX),它的上線就等於8192,當發送消息超過這個大小時,則會報錯,上面也已經論證過:
可是若是將msgflg設置成MSG_NOERROR,消息超過期,是不會報錯的,只是消息會被截斷。
下面用一個示意圖來表示其實現原理:
那麼服務器端是如何區分消息是發送給不一樣的客戶端的呢?很天然想到的就是用類型進行區分,給不一樣客戶端發送的是不一樣類型的消息,客戶端則接收對應類型的消息,那這個類型用什麼標識不一樣的客戶端呢?進程的pid則是一個很好的類型方案,以下:
首先實現服務器端:
其中服務器要乾的事,就是不斷地接收類型爲1的消息,而且將其消息回射給不一樣的客戶端,因此下面來實現一下:
而後取出前4個字節,表明客戶端的進程ID,以下:
接下來則將真正的內容打印出來,而且將其回射給客戶端:
這樣服務端就編寫好了,接下來編寫客戶端,其實現跟服務器很相似:
首先是不斷地從鍵盤中獲取數據,發送給服務器:
接下來,則是處理從服務器回射回來的數據,其寫法跟服務端的很相似:
【說明】:爲啥不清空前四個字節,若是清空了則下次發送消息時又得存,因此爲了簡單,就不清空了。
下面來發送消息:
從以上實驗結果來看,回射功能是沒問題,可是,服務器的回顯只打印了一條消息,這看樣子是個小bug,下面來解決一下:
而主要緣由是因爲服務器回顯給客戶端時形成type的改變,以下:
因此在客戶端每次循環時,就得每次都指定一下msg.mtype = 1,不然,實際上客戶端是自已發給本身,就形成了客戶端有回顯,而服務端沒有,改變以後,再次運行:
發現服務器的消息顯示還有些問題,緣由其實很簡單,就是因爲沒有清空以前數據形成,修改以下:
實際上,這裏還存在一種隱含的問題,會產生「死鎖」現象,下面來分析一下產生的緣由:
避免死鎖的方案須要採用另一種實現方式,這裏只是探討一下,能夠本身去實現:
一、用管道或者消息隊列傳遞數據
這個示意圖的功能是服務器向客戶端傳輸文件,以下:
①、首先要將文件從內核讀取到進程的用戶空間當中,因此這裏就涉及到了一次read()系統調用。
②、服務器須要將讀取到的數據拷貝到管道或消息隊列當中,涉及到第二次系統調用。
③、對於客戶端來講,須要從管道或消息隊列中讀取這些數據,涉及到第三次系統調用。
④、讀到這些數據到應用的數據緩衝區當中,而後將緩衝區的內容寫到輸出文件中,涉及到第四次系統調查用。
從以上步驟來看,總共涉及到了四次系統調用, 四次內存拷貝(從內核空間到用戶空間),那共享內容方式又如何呢?
二、用共享內存傳遞數據
下面來學習一下相關函數的使用:
下面來看一下內存映射文件示意圖:
下面則用代碼來實踐一下:
下面來建立一個文件:
接下來對文件進行映射:
當映射成功以後,接下來則往文件中寫入一些數據,這時候就能夠直接經過指針寫入了,對文件的操做就好像對內存的訪問,以下:
可見就經過內存的方式來對文件進行了數據寫入,這就是內存映射文件的做用。
下面來寫一個讀取文件內容的功能,將寫入的五個同窗的數據讀出來,基於mmap_write.c來寫,代碼差很少:
mmap_read.c:
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while(0)
typedef struct stu
{
char name[4];
int age;
} STU;
int main(int argc, char *argv[])
{
if (argc != 2)
{
fprintf(stderr, "Usage: %s <file>\n", argv[0]);
exit(EXIT_FAILURE);
}
int fd;
fd = open(argv[1], O_RDWR);
if (fd == -1)
ERR_EXIT("open");
STU *p;
p = (STU*)mmap(NULL, sizeof(STU)*5, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (p == NULL)
ERR_EXIT("mmap");
//從映射內存中,來讀取文件的內容
int i;
for (i=0; i<5; i++)
{
printf("name = %s age = %d\n", (p+i)->name, (p+i)->age);
}
munmap(p, sizeof(STU)*5);
printf("exit ...\n");
return 0;
}
複製代碼
對共享內存進行寫操做的時候:
實際上,這些操做並無馬上寫回到文件當中,內核也會選擇一個比較好的時機將這些內容寫入到文件當中,若是咱們想要馬上寫回到文件當中,則就能夠用msync函數了。
①、映射不能改變文件的大小
下面來修改一下程序來驗證一下:
②、可用於進程間通訊的有效地址空間不徹底受限於被映射文件的大小
mmap在映射的時候,是將其映射到一個內存頁面當中,也就是說,咱們能夠通信的區域是之內存頁面爲單位的,好比咱們映射了40個字節,可是內存頁面確定是大於40個字節的,因此可以通訊的有效地址空間確定是超過了40個字節,下面也用程序來講明一下:
在寫進程還沒結束時,再快速地運行讀進程時,從中能夠發現讀到了10個學生信息,這也就論證了這點,爲何能讀到10個學生信息,由於咱們所映射的內存共享區大於文件的內容,由於映射的時候是基於頁面來分配的,咱們映射了40個字節,可能分配了4K的空間,只要咱們在這4K的地址空間中訪問就不會出錯,若是咱們超過了4K的地址空間,則極可能會產生一個SIGBUS的信號,若是咱們訪問的大小超過了幾個內存頁面,則有可能還會產生一個SIGSEGV信號,這取決於咱們超出部份的大小。
而若是寫進程結束了,再來讀取,從實驗結果來看,後面的五個學生信息就讀取不到了,爲何呢?由於寫進程結束了,也就是先前的那塊內存映射區域,對於讀端進程來講已經看不到了,這時讀端進程又去從文件當中進行映射,只可以看到五個學生的信息了,因此說爲啥要用sleep 10來講明這一問題。
③、文件一旦被映射後,全部對映射區域的訪問其實是對內存區域的訪問。映射區域內容寫回文件時,所寫內容不能超過文件的大小,
跟消息隊列同樣,共享內存也是有本身的數據結構的,system v共享內存也是隨內核持續的,也就是說當最後一個訪問內存共享的進程結束了,內核也不會自動刪除共享內存段,除非顯示去刪除共享內在,其數據結構跟消息隊列很相似:
跟消息隊列同樣,共享內存也提供了四個函數:
下面詳細來看一下各函數的用法:
用法跟msgget函數如出一轍,下面用代碼來實驗一下:
當共享內存建立好以後,則但願往共享內存當中進行寫入操做,在寫入以前,須要將共享內存映射到進程的地址空間,接下來來看一下第二個函數:
關於這點,其實能夠從幫助文檔中查看到:
其中「SHMLBA」是等於4K=4096b的值,好比說shmaddr地址指定爲4097,而這時所鏈接地址並非4097,而是4097-(4097%4096)=4097-1=4096,而若是shmaddr地址指定爲8193,這時所鏈接的地址爲:8193-(8193%4096)=8193-1=8192,就是這個意思。
一般狀況下,shmflg都指定爲0,表示鏈接到的共享內存既可讀也能夠寫。
下面來看另一個函數:
因此接下來將共享內存映射到進程的地址空間當中,具體寫法以下:
當執行完以後,能夠解決映射:
爲了觀看到鏈接數,能夠sleep一段時間,修改程序以下:
接下來從共享內存中讀取數據:
以前已經說過,共享內存是隨內核持續存在的,也就是它不會自動從內核中刪除,能夠經過下面這個函數來手動刪除它,以下:
其中cmd的值可取以下:
下面則用此函數來刪除共享內存段,當進程結束以後:
固然這樣使用有點粗暴,還沒等別人從共享內存中讀走數據就被刪除了,下面來讓程序更加合理一點從內核刪除,當有人讀取了共享內存的數據時,會將quit字串寫到共享內存的前面四個字節當中,因此在寫程序中能夠循環來作一個監聽,以下:
接下來,修改讀取程序:
這時查看一下共享內存是否還存在於內核當中:
另外要注意的一點是:共享內存的前面四個字節的數據類型不必定,能夠爲整型,也能夠爲字符串,以下:
目前前四個字節爲整型,也能夠將數據寫入到姓名字段中,將結構體修改一下:
運行效果其實也是同樣的,爲何呢,由於咱們比較的僅僅只是內存:
只要保證有四個字節的空間,就可以存放四個字符,不在意是什麼數據類型。
經過上面的描述,很容易就能想到信號量的一上數據結構:
下面再來回顧一下P、V原語:
所謂的原語就是指這段代碼是原子性的,是不會被其它信號中斷的,
在Linux中,system v 信號量是以信號量集來實現的,跟其它system v IPC對象同樣,也有本身的數據結構:
信號量集也提供了一些函數來操做:
下面用具體代碼來實踐一下,會封裝一些對信號量集的一些函數:
另外,能夠用命令來刪除已經建立的信號量集,跟消息隊列同樣(ipcrm -S key 或ipcrm -s semid兩種):
下面建立了以後,則能夠封裝一個打開信號量集的方法:
當建立了一個信號量集,並裏面有一個信號量時,這時候最想作的事情是對進信號量設置一個計數值,因而第二個函數出現了:
其中先看下SETVAL參數,查看MAN幫助:
因而將其結構體拷貝一下,來給信號集來設置一個計數值:
有了設置計數值,那接下來就能夠用GETVAL來獲取信號量集中的信號量的計數值:
接下來封裝一個刪除指定的信號量集,注意:不能夠直接刪除某個信號量,只能刪除一個信號量集,並將裏面全部的信號量給刪除了,以下:
下面則在main中修改一下,來實驗下刪除功能是否有效:
能夠看出五秒以後,已經成功刪除了信號集。
接下來第三個函數是一個比較核心的函數,用來進行P、V操做的:
下面用它來封裝一下P、V操做:
下面對其sembuf進行進一步說明:
其中sem_op表示咱們要操做的方式,而代碼中咱們寫的是-1跟+1,實際上還能夠-2,-3,+2,+3,減一個大於零的數字,表示要將信號量的數值減去相應的值,若是當前的個數小於計數值時則會阻塞,處於等待狀態,當前前提是sem_flg等於0,若是sem_flg爲IPC_NOWAIT而又沒有可用資源時,這時semop函數就會返回失敗,返回-1,而且錯誤代碼爲EAGAIN;而當sem_flg爲SEM_UNDO,表示撤消,當一個進程終止的時候,對信號量所作的P或V操做會被撤消, 好比咱們對信號進行了一個P操做,對其進行了-1,當進程結束時,最後一次-1將會被撤消,一樣的,若是進行了一個V操做,也就是對其進行了+1,當進程結束時,最後的一次+1則會被撤消。
接下來再經過一個例子,來更好的理解信號量的一些機制:
當運行此程序時,會給出命令使用方法,帶不一樣的參數則會有不一樣的功能,下面具體解釋一下:
①、建立一個信號量集
②、刪除一個信號量集
③、進行一個P操做
④、進行一個V操做
⑤、對信號量集中的信號量設置一個初始的計數值
⑥、獲取信號量集中信號量的計數值
⑦、查看信號量集的權限
⑧、更改信號量集的權限
下面則具體來使用一下:
可是並不能無限往上加,整數的最大值是有限制的,實際上計數值內部是一個short類型,也就是範圍爲-32768~32767
接下來分析下程序,首先解析參數:
其中查看一下ftok函數幫助:
這裏用"s"一個字符的低八位能夠確何不爲0,而「.」表示當前路徑,兩個參數經過ftok就能夠產生惟一的一個key_t,至於內部怎麼實現不須要關心。
接下來來判斷這些參數選項:
【說明】:關於這裏面用到的方法全是上面封裝的
下面兩個參數是尚未在上面進行封裝過,關於權限的獲取和設置,下面來看下:
其中權限保存的字段能夠從man幫助中查看到:
因此很容易理解,下面爲了進一步演示信號量的其它P、V用法,下面更改一下程序:
也就是所作的最後一次操做將會被撤消,一樣的,對於v操做這個SEM_UNDO也一樣適用,這裏就不演示了。
下面會舉例用信號量來實現進程互斥,來進一步加深對信號量的認識。
先用圖來描述一下這個程序的一個意圖:
下面則開始實現,基於以前信號量的封裝:
print.c:
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
union semun {
int val; /* Value for SETVAL */
struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */
unsigned short *array; /* Array for GETALL, SETALL */
struct seminfo *__buf; /* Buffer for IPC_INFO
(Linux-specific) */
};
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while(0)
int sem_create(key_t key)
{
int semid;
semid = semget(key, 1, IPC_CREAT | IPC_EXCL | 0666);
if (semid == -1)
ERR_EXIT("semget");
return semid;
}
int sem_open(key_t key)
{
int semid;
semid = semget(key, 0, 0);
if (semid == -1)
ERR_EXIT("semget");
return semid;
}
int sem_setval(int semid, int val)
{
union semun su;
su.val = val;
int ret;
ret = semctl(semid, 0, SETVAL, su);
if (ret == -1)
ERR_EXIT("sem_setval");
return 0;
}
int sem_getval(int semid)
{
int ret;
ret = semctl(semid, 0, GETVAL, 0);
if (ret == -1)
ERR_EXIT("sem_getval");
return ret;
}
int sem_d(int semid)
{
int ret;
ret = semctl(semid, 0, IPC_RMID, 0);
if (ret == -1)
ERR_EXIT("semctl");
return 0;
}
int sem_p(int semid)
{
struct sembuf sb = {0, -1, 0};
int ret;
ret = semop(semid, &sb, 1);
if (ret == -1)
ERR_EXIT("semop");
return ret;
}
int sem_v(int semid)
{
struct sembuf sb = {0, 1, 0};
int ret;
ret = semop(semid, &sb, 1);
if (ret == -1)
ERR_EXIT("semop");
return ret;
}
int semid;
int main(int argc, char *argv[])
{
semid = sem_create(IPC_PRIVATE);//因爲是父子進程,因此能夠建立私有的信號量集
sem_setval(semid, 0);//初始化信號量計數值爲0
pid_t pid;
pid = fork();
if (pid == -1)
ERR_EXIT("fork");
if (pid > 0)
{//父進程
}
else
{//子進程
}
return 0;
}
複製代碼
接下來則進行值打印:
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
union semun {
int val; /* Value for SETVAL */
struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */
unsigned short *array; /* Array for GETALL, SETALL */
struct seminfo *__buf; /* Buffer for IPC_INFO
(Linux-specific) */
};
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while(0)
int sem_create(key_t key)
{
int semid;
semid = semget(key, 1, IPC_CREAT | IPC_EXCL | 0666);
if (semid == -1)
ERR_EXIT("semget");
return semid;
}
int sem_open(key_t key)
{
int semid;
semid = semget(key, 0, 0);
if (semid == -1)
ERR_EXIT("semget");
return semid;
}
int sem_setval(int semid, int val)
{
union semun su;
su.val = val;
int ret;
ret = semctl(semid, 0, SETVAL, su);
if (ret == -1)
ERR_EXIT("sem_setval");
return 0;
}
int sem_getval(int semid)
{
int ret;
ret = semctl(semid, 0, GETVAL, 0);
if (ret == -1)
ERR_EXIT("sem_getval");
return ret;
}
int sem_d(int semid)
{
int ret;
ret = semctl(semid, 0, IPC_RMID, 0);
if (ret == -1)
ERR_EXIT("semctl");
return 0;
}
int sem_p(int semid)
{
struct sembuf sb = {0, -1, 0};
int ret;
ret = semop(semid, &sb, 1);
if (ret == -1)
ERR_EXIT("semop");
return ret;
}
int sem_v(int semid)
{
struct sembuf sb = {0, 1, 0};
int ret;
ret = semop(semid, &sb, 1);
if (ret == -1)
ERR_EXIT("semop");
return ret;
}
int semid;
void print(char op_char)
{
int pause_time;
srand(getpid());//以當前進程作爲隨機數的種子
int i;
for (i=0; i<10; i++)//各輸出十次
{
sem_p(semid);//進行一個P操做
printf("%c", op_char);
fflush(stdout);//因爲沒有用\n,因此要想在屏幕中打印出字符,須要強制清空一下緩衝區
pause_time = rand() % 3;//在0,1,2秒中隨機
sleep(pause_time);
printf("%c", op_char);
fflush(stdout);
sem_v(semid);//進行一個V操做
pause_time = rand() % 2;
sleep(pause_time);//最後在0,1秒中隨機
}
}
int main(int argc, char *argv[])
{
semid = sem_create(IPC_PRIVATE);//因爲是父子進程,因此能夠建立私有的信號量集
sem_setval(semid, 0);//初始化信號量計數值爲0
pid_t pid;
pid = fork();
if (pid == -1)
ERR_EXIT("fork");
if (pid > 0)
{//父進程
sem_setval(semid, 1);//因爲計數值初使爲0,因此進行P操做時則會等待,爲了進行p操做,則設置值爲1
print('O');
wait(NULL);//等待子進程的退出
sem_d(semid);//最後刪除信號量值
}
else
{//子進程
print('X');
}
return 0;
}
複製代碼
從運行結果來看,o跟x必定是成對出現的,不可能出現ox一塊兒打印,這就是信號量達到互斥做用的效果。
下面迴歸到實際代碼上來,因爲此次的信號集中有多個信號量,因此這個實驗中就不能用以前封裝的方法了,需從新編寫:
dining.c:
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
union semun {
int val; /* Value for SETVAL */
struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */
unsigned short *array; /* Array for GETALL, SETALL */
struct seminfo *__buf; /* Buffer for IPC_INFO
(Linux-specific) */
};
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while(0)
int semid;
int main(int argc, char *argv[])
{
semid = semget(IPC_PRIVATE, 5, IPC_CREAT | 0666);//建立一個信號量集,裏面包含五個信號量,這裏也用私有的方式,由於會用子進程的方式模擬
if (semid == -1)
ERR_EXIT("semget");
//將五個信號量的計數值都初始爲1,資源均可用,模擬的是五把叉子
union semun su;
su.val = 1;
int i;
for (i=0; i<5; i++)
{
semctl(semid, i, SETVAL, su);
}
return 0;
}
複製代碼
而哲學家所作的事情以下:
接下來則實現wait_for_2fork()、free_2fork()兩個函數:
結合圖來想,就很容易明白這個算法,以下:
一樣的,釋放叉子相似:
至此解決哲學家就餐問題的代碼就寫完,下面來編譯運行一下:
從中能夠看到,沒有出現死鎖問題,下面從輸出結果來分析一下:
從結果分析來看:不可能兩個相鄰的哲學家同時處於「吃」的狀態,同時只可以有兩個哲學家處於「吃」的狀態。
接下來再來模擬一下死鎖的狀況,在模擬以前,注意:需手動將建立的信號量集給刪掉,由於剛纔運行是強制關閉程序的,另外在實現以前,須要思考一下怎麼樣能產生死鎖,其實思路很簡單,就是申請叉子的時候,一個個申請,而不是當只有兩個都有的狀況下才能申請,因此,修改代碼以下:
接下來實現wait_1fork():
從結果來看確實是阻塞了,因爲都拿起了左邊的叉子,並且都在等待右邊叉子,而都沒人釋放左叉子,因而乎死鎖就產生了。
最後貼上完整代碼:
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
union semun {
int val; /* Value for SETVAL */
struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */
unsigned short *array; /* Array for GETALL, SETALL */
struct seminfo *__buf; /* Buffer for IPC_INFO
(Linux-specific) */
};
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while(0)
#define DELAY (rand() % 5 + 1)//定義一個睡眠時間,1~5秒中
int semid;
//等待一把叉子
int wait_1fork(int no)
{
struct sembuf sb = {no, -1, 0};
int ret;
ret = semop(semid, &sb, 1);
if (ret == -1)
ERR_EXIT("semop");
return ret;
}
//等待左右兩個叉子
void wait_for_2fork(int no)
{
int left = no;
int right = (no + 1) % 5;
struct sembuf buf[2] = {
{left, -1, 0},
{right, -1, 0}
};
semop(semid, buf, 2);
}
//釋放左右兩信叉子
void free_2fork(int no)
{
int left = no;
int right = (no + 1) % 5;
struct sembuf buf[2] = {
{left, 1, 0},
{right, 1, 0}
};
semop(semid, buf, 2);
}
void philosophere(int no)
{
srand(getpid());//設置隨機的種子
for (;;)
{//不斷循環執行
/*
printf("%d is thinking\n", no);//首先思考
sleep(DELAY);
printf("%d is hungry\n", no);//餓了
wait_for_2fork(no);
printf("%d is eating\n", no);//當獲取到了左右兩把叉子,則開吃
sleep(DELAY);
free_2fork(no);//吃完則放下左右兩把叉子
*/
int left = no;
int right = (no + 1) % 5;
printf("%d is thinking\n", no);
sleep(DELAY);
printf("%d is hungry\n", no);
wait_1fork(left);
sleep(DELAY);
wait_1fork(right);
printf("%d is eating\n", no);
sleep(DELAY);
free_2fork(no);
}
}
int main(int argc, char *argv[])
{
semid = semget(IPC_PRIVATE, 5, IPC_CREAT | 0666);//建立一個信號量集,裏面包含五個信號量,這裏也用私有的方式,由於會用子進程的方式模擬
if (semid == -1)
ERR_EXIT("semget");
//將五個信號量的計數值都初始爲1,資源均可用,模擬的是五把叉子
union semun su;
su.val = 1;
int i;
for (i=0; i<5; i++)
{
semctl(semid, i, SETVAL, su);
}
//接下來建立四個子進程,加上父進程則爲5個,來模擬5個哲學家
int no = 0;
pid_t pid;
for (i=1; i<5; i++)
{
pid = fork();
if (pid == -1)
ERR_EXIT("fork");
if (pid == 0)
{
no = i;
break;
}
}
philosophere(no);
return 0;
}
複製代碼
下面用圖來講明一下該問題:
以上就是生產者消費者從邏輯上的一個解決方案,從中能夠看到這是互斥跟同步相結合的例子,下面則用所畫的這些模型來實現一下shmfifo
爲何要實現共享內存的先進先出的緩衝區(shmfifo)呢?實際上要實現進程間通訊能夠直接用消息隊列來實現先進先出的隊列,可是,因爲消息隊列還實現了其它的功能,若是僅僅只是想要先進先出這樣的一個功能的話,能使用共享內存來實現的話,效率會更高,由於對共享內存的訪問不涉及到對內核的操做,這個以前也有講過,所以就有必要實現一個shmfifo。
要實現這樣的一個緩衝區,咱們能夠作一些假定,假定放到緩衝區當中的數據塊是定長的,而且能夠有多個進程往緩衝區中寫入數據,也有多個進程往緩衝區中讀取數據,因此這是典型的生產者消費者問題,這塊緩衝區剛纔說過能夠用共享內存的方式來實現,可是有一個問題須要思考:生產者進程當前應該在什麼位置添加產品,消費者進程又從什麼位置消費產品呢?因此說還須要維護這些狀態,因此很天然地就能想到將這些狀態保存在共享內存當中,以下:
因爲多個生產者都能往裏面添加產品,多個消費者也可以從裏面消費產品,那生產者在生產產品的時候應該放在什麼位置呢?消費者又該從哪裏消費產品呢?下面來講明下:
而這時再次生產就會是在0的位置上開始了:
可見這是一個環形緩衝區,能夠重複利用的,基於這些分析下面來看一下所定義出來的數據結構:
有了這些數據結構實際上就可以實現了shmfifo了,下面實現一下:
因爲用到了信號量,因此將以前的信號量相關的函數及定義放到一個單獨的文件當中,裏面代碼都是以前學過的,就很少解釋了:
ipc.h:
#ifndef _IPC_H_
#define _IPC_H_
#include <sys/types.h>
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while(0)
union semun {
int val; /* value for SETVAL */
struct semid_ds *buf; /* buffer for IPC_STAT, IPC_SET */
unsigned short *array; /* array for GETALL, SETALL */
/* Linux specific part: */
struct seminfo *__buf; /* buffer for IPC_INFO */
};
int sem_create(key_t key);
int sem_open(key_t key);
int sem_p(int semid);
int sem_v(int semid);
int sem_d(int semid);
int sem_setval(int semid, int val);
int sem_getval(int semid);
int sem_getmode(int semid);
int sem_setmode(int semid,char* mode);
#endif /* _IPC_H_ */
複製代碼
ipc.c:
#include "ipc.h"
int sem_create(key_t key)
{
int semid = semget(key, 1, 0666 | IPC_CREAT | IPC_EXCL);
if (semid == -1)
ERR_EXIT("semget");
return semid;
}
int sem_open(key_t key)
{
int semid = semget(key, 0, 0);
if (semid == -1)
ERR_EXIT("semget");
return semid;
}
int sem_p(int semid)
{
struct sembuf sb = {0, -1, 0};
int ret = semop(semid, &sb, 1);
if (ret == -1)
ERR_EXIT("semop");
return ret;
}
int sem_v(int semid)
{
struct sembuf sb = {0, 1, 0};
int ret = semop(semid, &sb, 1);
if (ret == -1)
ERR_EXIT("semop");
return ret;
}
int sem_d(int semid)
{
int ret = semctl(semid, 0, IPC_RMID, 0);
/*
if (ret == -1)
ERR_EXIT("semctl");
*/
return ret;
}
int sem_setval(int semid, int val)
{
union semun su;
su.val = val;
int ret = semctl(semid, 0, SETVAL, su);
if (ret == -1)
ERR_EXIT("semctl");
//printf("value updated...\n");
return ret;
}
int sem_getval(int semid)
{
int ret = semctl(semid, 0, GETVAL, 0);
if (ret == -1)
ERR_EXIT("semctl");
//printf("current val is %d\n", ret);
return ret;
}
int sem_getmode(int semid)
{
union semun su;
struct semid_ds sem;
su.buf = &sem;
int ret = semctl(semid, 0, IPC_STAT, su);
if (ret == -1)
ERR_EXIT("semctl");
printf("current permissions is %o\n",su.buf->sem_perm.mode);
return ret;
}
int sem_setmode(int semid,char* mode)
{
union semun su;
struct semid_ds sem;
su.buf = &sem;
int ret = semctl(semid, 0, IPC_STAT, su);
if (ret == -1)
ERR_EXIT("semctl");
printf("current permissions is %o\n",su.buf->sem_perm.mode);
sscanf(mode, "%o", (unsigned int*)&su.buf->sem_perm.mode);
ret = semctl(semid, 0, IPC_SET, su);
if (ret == -1)
ERR_EXIT("semctl");
printf("permissions updated...\n");
return ret;
}
複製代碼
以上文件是爲了實現shmfifo提供輔助功能的,下面則開始實現它,分頭文件及具體實現:
shmfifo.h:
#ifndef _SHM_FIFO_H_
#define _SHM_FIFO_H_
#include "ipc.h"
typedef struct shmfifo shmfifo_t;
typedef struct shmhead shmhead_t;
struct shmhead
{
unsigned int blksize; // 塊大小
unsigned int blocks; // 總塊數
unsigned int rd_index; // 讀索引
unsigned int wr_index; // 寫索引
};
struct shmfifo
{
shmhead_t *p_shm; // 共享內存頭部指針
char *p_payload; // 有效負載的起始地址
int shmid; // 共享內存ID
int sem_mutex; // 用來互斥用的信號量
int sem_full; // 用來控制共享內存是否滿的信號量
int sem_empty; // 用來控制共享內存是否空的信號量
};
shmfifo_t* shmfifo_init(int key, int blksize, int blocks);//初始化
void shmfifo_put(shmfifo_t *fifo, const void *buf);//添加數據到環形緩衝區
void shmfifo_get(shmfifo_t *fifo, void *buf);//從緩衝區中取數據
void shmfifo_destroy(shmfifo_t *fifo);//釋放共享內存的環形緩衝區
#endif /* _SHM_FIFO_H_ */
複製代碼
下面來具體實現一下些這函數:
這個方法既能夠建立共享內存信號量,也能夠打開共享內存信號量,因此下面能夠作一個判斷:
接下來還得初始化共享內存中的其它字段:
接下來對其信號量集中的信號進行初始化:
shmfifo的初始化函數就已經寫完了,接下來來實現第二個函數:shmfifo_put(生產產品),對於生產者的過程,上面也說明過,則嚴格按照該步驟來進行實現:
下面則開始實現,首先先按照流程把代碼框架寫出來:
那如何生產產品呢?先來看下圖:
首先進行數據偏移:
【說明】:關於memcpy函數的使用,說明以下:
在生產一個產品以後,下一次要生產的位置則要發生改變,因此:
這樣生產產品的函數實現就如上,相似的,消費產品實現就容易了,依照這個流程:
接下來實現最後一個函數,就是資源釋放:
shmfifo.c:
#include "shmfifo.h"
#include <assert.h>
shmfifo_t* shmfifo_init(int key, int blksize, int blocks)
{
//分配內存空間
shmfifo_t *fifo = (shmfifo_t *)malloc(sizeof(shmfifo_t));
assert(fifo != NULL);
memset(fifo, 0, sizeof(shmfifo_t));
int shmid;
shmid = shmget(key, 0, 0);
int size = sizeof(shmhead_t) + blksize*blocks;
if (shmid == -1)
{//建立共享內存
fifo->shmid = shmget(key, size, IPC_CREAT | 0666);
if (fifo->shmid == -1)
ERR_EXIT("shmget");
fifo->p_shm = (shmhead_t*)shmat(fifo->shmid, NULL, 0);
if (fifo->p_shm == (shmhead_t*)-1)
ERR_EXIT("shmat");
fifo->p_payload = (char*)(fifo->p_shm + 1);
fifo->sem_mutex = sem_create(key);
fifo->sem_full = sem_create(key+1);
fifo->sem_empty = sem_create(key+2);
sem_setval(fifo->sem_mutex, 1);
sem_setval(fifo->sem_full, blocks);
sem_setval(fifo->sem_empty, 0);
}
else
{//打開共享內存
fifo->shmid = shmid;
fifo->p_shm = (shmhead_t*)shmat(fifo->shmid, NULL, 0);
if (fifo->p_shm == (shmhead_t*)-1)
ERR_EXIT("shmat");
fifo->p_payload = (char*)(fifo->p_shm + 1);
fifo->sem_mutex = sem_open(key);
fifo->sem_full = sem_open(key+1);
fifo->sem_empty = sem_open(key+2);
}
return fifo;
}
void shmfifo_put(shmfifo_t *fifo, const void *buf)
{
sem_p(fifo->sem_full);
sem_p(fifo->sem_mutex);
//生產產品
memcpy(fifo->p_payload+fifo->p_shm->blksize*fifo->p_shm->wr_index,
buf, fifo->p_shm->blksize);
fifo->p_shm->wr_index = (fifo->p_shm->wr_index + 1) % fifo->p_shm->blocks;
sem_v(fifo->sem_mutex);
sem_v(fifo->sem_empty);
}
void shmfifo_get(shmfifo_t *fifo, void *buf)
{
sem_p(fifo->sem_empty);
sem_p(fifo->sem_mutex);
memcpy(buf, fifo->p_payload+fifo->p_shm->blksize*fifo->p_shm->rd_index,
fifo->p_shm->blksize);
fifo->p_shm->rd_index = (fifo->p_shm->rd_index + 1) % fifo->p_shm->blocks;
sem_v(fifo->sem_mutex);
sem_v(fifo->sem_full);
}
void shmfifo_destroy(shmfifo_t *fifo)
{
//刪除建立的信息量集
sem_d(fifo->sem_mutex);
sem_d(fifo->sem_full);
sem_d(fifo->sem_empty);
//刪除共享內存
shmdt(fifo->p_shm);//刪除共享內存頭部
shmctl(fifo->shmid, IPC_RMID, 0);//刪除整個共享內存
//釋放fifo的內存
free(fifo);
}
複製代碼
下面則寫兩個測試程序,分別用來生產、消費產品:
一樣的,取出存放進去的學生信息,以下:
竟然生產第一個產品的時候就已經報錯了,從這個錯誤當中很難定位到問題在哪,而這個例外確定是產生了一個信號,因此下面用gdb來調試一下程序,再正式調查試以前,須要將以前建立的共享內存及信號量集給清掉,不然再次運行就不會報這個錯,而是阻塞了:
而手動一個個去刪除這些比較麻煩,由於咱們已經編寫好了資源的釋放函數了,因此能夠編寫一個專門釋放的程序,以下:
下面先將以前資源清除掉:
下面再次運行就會拋出例外,因此此次就能夠進行gdb調試來看問題出在哪?
因此怎麼來修復這個問題就比較容易了,只要作下初始化操做既可:
接下來運行一下接收產品的程序,一塊兒來看下生產者消費者的一個效果:
從實驗結果來看,當接收了數據以後,原來還在等待的2個學生信息就被成功發送了,這就是生產者與消費者的一個效果,實際中生產者能夠有多個,消費者也能夠有多個,此次學的內容有些多,經過這個例子能夠學習怎麼利用共享內存和信號量來實現一個先進先出的環形緩衝區shmfifo.