Linux進程通訊之共享內存實現生產者/消費者模式

共享內存 

  共享內存是內核爲進程建立的一個特殊內存段,它將出如今進程本身的地址空間中,其它進程能夠將同一段共享內存鏈接(attach)到本身的地址空間。這是最快的進程間通訊方式,可是不提供任何同步功能(須要咱們信號量實現)。數組

  

  使用共享內存實現生產者消費者任務模式。併發

共享內存系統調用 

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
int semget(key_t key, int size, int flag);
void *shmat(int shmid, void *addr, int flag);
int shmdt(void *addr);
int shmctl(int shmid, int cmd, struct shmid_ds *buf);

shmget函數:

  功能:得到或建立一個共享內存標識符。函數

int semget(key_t key, size_t size, int shmflag);

 

  • 成功返回一個共享內存標識符,失敗返回-1;
  • 第一個參數key爲共享內存段命名(通常由ftok產生);
  • 第二個參數size爲須要共享的內存容量。(若是共享內存已存在時,不能不大於該共享內存段的大小);
  • 第三個參數設置訪問權限(低9位)與IPC_CREAT, IPC_EXCL 的按位或。

shmat函數

  功能:將共享內存段鏈接到一個進程的地址空間中。post

void *shmat(int  shm_id, const  void *addr, int shmflg) ;  
  • 成功返回共享存儲段鏈接的實際地址,失敗返回-1
  • 第一個參數shm_id爲shmget返回的共享內存標識符。
  • 第二個參數addr指明共享內存段要鏈接到的地址(進程空間內部地址),一般指定爲空指針,表示讓系統來選擇共享內存在進程地址空間中出現的地址。
  • 第三個參數shmflg能夠設置爲兩個標誌位(一般設置爲0)
  • SHM_RND( 表示第二個參數指定的地址應被向下靠攏到內存頁面大小的整數倍)
  • SHM_RDONLY,要鏈接的共享內存段是隻讀的。

shmdt函數

  功能:將共享內存從當前進程中分離。ui

int shmdt(const  void *shmaddr) ;    //其中shmaddr爲shmat返回的地址。

shmctl函數

  功能:查看及修改共享內存段的shmid_ds結構,刪除該結構以及相連的共享存儲段標識。spa

int shmctl(int  shm_id, int command, struct shmid_ds *buf) ; 
  • 成功返回0,失敗返回-1
  • 第二個參數commad取值:
  • IPC_STAT 獲取當前共享內存段的shmid_ds結構
  • IPC_SET 把共享內存段的當前關聯值設置爲shmid_ds結構給出的值
  • IPC_RMID 從系統中刪除該共享存儲段。
  • 第三個參數buf是一個結構指針,它指向共享內存模式和訪問權限的結構指針

    struct shmid_ds 
    { 
        uid_t shm_perm.uid; 
        uid_t shm_perm.gid; 
        mode_t shm_perm.mode; 
    };

 

生產者/消費者模式

  某個模塊負責產生數據(生產者),另外一個模塊來負責處理(消費者)。code

生產者任務流程圖

消費者任務流程圖

爲何要使用生產者消費者模式

  • 解耦 ----若是讓生產者直接與消費者交互,那麼生產者對於消費者就會產生依賴(也就是耦合);
  • 支持併發----傳統方式,在消費者的方法沒有返回以前,生產者只好一直等在那邊。萬一消費者處理數據很慢,生產者就會白白糟蹋大好時光。使用了生產者/消費者模式以後,生產者把製造出來的數據往緩衝區一丟,就能夠再去生產下一個數據;
  • 支持忙閒不均----當數據製造快的時候,消費者來不及處理,未處理的數據能夠暫時存在緩衝區中。

實現共享內存的生產者消費者模式 

  編寫兩個應用程序,其中一個應用程序實現生產者任務,一個應用程序實現消費者任務。
生產者任務和消費者任務之間經過共享內存機制實現跨進程的共享緩衝池;在信號量集合中支持一個信號量(或利用一個POSIX信號量),實現對共享緩衝池的互斥訪問;緩衝區的分配計數經過修改緩衝池結構中的計數變量來實現。 blog

  緩衝池結構:進程

struct shared_use_st 
{ 
     //爲0表示對應的緩衝區未被生產者使用,可分配但不可消費;爲1表示對應    
     的緩衝區以被生產者使用,不可分配但可消費

    //5個字符串緩衝區
    char Buffer[5][100];
    int Index[5];
};    

  緩衝區的互斥訪問,5個緩衝區的緩衝池做爲一個臨界資源:

  • 當生產者任務從數據源—文件中讀取數據後將會申請一個緩衝區,並將此數據放入緩衝區中。
  • 消費者任務從一個緩衝區中取走數據,並將其中的內容打印輸出。
  • 當一個生產者任務正在訪問緩衝區時,其餘生產者和消費者任務不能訪問緩衝區

 

  • 當一個消費者任務正在訪問緩衝區時,其餘其餘生產者和消費者任務不能訪問緩衝區
  • 使用信號量集(包含1個信號量,其初始值爲1)實現對緩衝池的互斥訪問

 

源碼

#Producer.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <semaphore.h>
#include <fcntl.h>
#define TEXT_SZ 1024

//緩衝池結構
struct shared_use_st  
{  
    int Index[5];    //5個緩衝池,爲0表示對應的緩衝區未被生產者使用,可分配但不可消費;爲1表示對應的緩衝區已被生產者使用,不可分配但可消費
    char Buffer[5][TEXT_SZ];    //5個字符串緩衝區
    sem_t sem;        //信號量,同步功能
};

int main()
{
    int running = 1;
    int i = 0;  
    void *shm = NULL;        //共享存儲段鏈接的實際地址
    struct shared_use_st *shared = NULL;
    char buffer[BUFSIZ + 1];        //緩衝區存放字符
    int shmid;        //共享內存標識符
    //得到或建立一個共享內存標識符
    shmid = shmget((key_t)1121, sizeof(struct shared_use_st), 0666|IPC_CREAT);
    if(shmid == -1)        //獲取或建立一個共享內存標識符失敗
    {   
      exit(EXIT_FAILURE);
    }
    shm = shmat(shmid, (void*)0, 0);        //返回共享存儲段鏈接的實際地址
    if(shm == (void*)-1)
    {
        exit(EXIT_FAILURE);
    }
    printf("Memory attached at %ld\n", (intptr_t)shm);
    shared = (struct shared_use_st*)shm;        //緩衝池爲共享存儲段鏈接地址
    for( ; i < 5; i++ )
    {
        shared->Index[i] = 0;        //對緩衝池初始化,Index爲0表示能夠生產
    }
    sem_init(&(shared->sem),1,1);        //信號量化初始化,且信號量初始值爲第二個1
    i = 0;
    while(running)        //製造一個循環
    {
        if(sem_wait(&(shared->sem)) == -1)        //sem_wait爲P操做,減小信號量的值
        {
            printf("P操做 ERROR!\n");
            exit(EXIT_FAILURE);
        }
        for(i = 0; i < 5 && shared->Index[i] == 1; i++)
        ;
        if(i == 5)        //Index爲1表示緩衝池被消費者佔用
        {  
            //當五個空間都被消費者佔用時輸出「waiting...」
            sem_post(&shared->sem);        //sem_post爲V操做,用來增長信號量的值
            sleep(1);        //sleep一段時間,再次進入循環
            printf("Waiting for some time...\n");
        }
        else
        {
            sem_post(&shared->sem);        //V 操做增長信號量
               printf("Enter some text with keyboard: ");
               fgets(buffer, BUFSIZ, stdin);        //讀取stdin字符流最多BUFSIZ-1個,並存在buffer數組中 其中stdin是鍵盤輸入到緩衝區的字符
               strncpy(shared->Buffer[i%5], buffer,TEXT_SZ);        //讀取的字符串存入緩衝區shared->Buffer中
               shared->Index[i%5] = 1;        //表示該緩衝區被生產者使用了
            if(strncmp(buffer, "end", 3) == 0)        //緩衝區的字符爲end時,結束循環
            {
                running = 0;
            }
        }   
     }
     //將共享內存從當前進程中分離
    if(shmdt(shm) == -1)        //失敗
    {
        exit(EXIT_FAILURE);
    }
    /*查看及修改共享內存段的shmid_ds結構,刪除該結構以及相連的共享存儲段標識
    struct shmid_ds  
    {  
        uid_t shm_perm.uid;  
        uid_t shm_perm.gid;  
        mode_t shm_perm.mode;  
    };
    */
    if(shmctl(shmid, IPC_RMID, 0) == -1)        //失敗
    {
        exit(EXIT_FAILURE);
    }  
    exit(EXIT_SUCCESS);
}
#Consumer.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/shm.h>
#include <unistd.h>
#include <semaphore.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <fcntl.h>
#define TEXT_SZ 1024

//緩衝池結構
struct shared_use_st  
{  
    int Index[5];    //5個緩衝池,爲0表示對應的緩衝區未被生產者使用,可分配但不可消費;爲1表示對應的緩衝區被生產者使用,不可分配但可消費
    char Buffer[5][TEXT_SZ];    //5個字符串緩衝區
    sem_t sem;        //信號量,同步功能
};

int main()
{
    int running = 1;
    int i = 0; 
    void *shm = NULL;        //共享存儲段鏈接的實際地址
    struct shared_use_st *shared = NULL;    //緩衝池
    int shmid;        //聲明共享內存標識符
    
    shmid = shmget((key_t)1121, sizeof(struct shared_use_st), 0666|IPC_CREAT); //得到或建立一個共享內存標識符
    if(shmid == -1)        //獲取或建立一個共享內存標識符失敗
    {  
        exit(EXIT_FAILURE);
    }
    
    //將共享內存段鏈接到一個進程的地址空間中,返回void *指針
    shm = shmat(shmid, 0, 0);    //返回共享存儲段鏈接的實際地址    
    if(shm == (void*)-1)        //失敗
    {
        exit(EXIT_FAILURE);
    }
    printf("Memory attached at %ld\n", (intptr_t)shm);
    shared = (struct shared_use_st*)shm;        //緩衝池爲共享存儲段鏈接地址
    while(running)
    {
        if(sem_wait(&(shared->sem)) == -1)        //sem_wait爲P操做,減小信號量的值
        {
            printf("P操做 ERROR!\n");
            exit(EXIT_FAILURE);
        }
        for(i = 0; i < 5 && shared->Index[i] == 0; i++)
           ;
           //五個緩衝區沒有都被生產者佔用
        if(i != 5)
        {   
            printf("You wrote: %s\n", shared->Buffer[i%5]);        //打印出生產者寫入的字符
            shared->Index[i%5] = 0;        //爲0時,表示已被消費者使用
            sem_post(&shared->sem);        //sem_post爲V操做
            sleep(1); 
            if( strncmp(shared->Buffer[i%5], "end", 3) == 0 )        //緩衝區的字符爲end時,結束循環
            {
                 running= 0;
            }
        }
        //五個空間都被佔用,輸出waiting...  
          else
          {
             sem_post(&shared->sem);        //V操做
             sleep(1);  
            printf("Waiting for some time...\n");
        }
    }
    //將共享內存從當前進程中分離
    if(shmdt(shm) == -1)        //分離失敗
    {
        exit(EXIT_FAILURE);
    }
    if(shmctl(shmid, IPC_RMID, 0) == -1)    //失敗
    {
        exit(EXIT_FAILURE);
    }  
    exit(EXIT_SUCCESS);
}
相關文章
相關標籤/搜索