Linux進程間通訊 -- 消息隊列

0. 前言

   進程是一個獨立的資源管理單元,不一樣進程間的資源是獨立的,不能在一個進程中訪問另外一個進程的用戶空間和內存空間。可是,進程不是孤立的,不一樣進程之間須要信息的交互和狀態的傳遞,所以須要進程間數據的傳遞、同步和異步的機制。linux

    固然,這些機制不能由哪個進程進行直接管理,只能由操做系統來完成其管理和維護,Linux提供了大量的進程間通訊機制,包括同一個主機下的不一樣進程和網絡主機間的進程通訊,以下圖所示:
mark編程

  • 同主機間的信息交互
  • 無名管道
    特色:多用於親緣關係進程間通訊,方向爲單向;爲阻塞讀寫;通訊進程雙方退出後自動消失
    問題:多進程用同一管道通訊容易形成交叉讀寫的問題
  • 有名管道
    FIFO(First In First Out),方向爲單向(雙向需兩個FIFO),以磁盤文件的方式存在;通訊雙方一方不存在則阻塞
  • 消息隊列
    可用於同主機任意多進程的通訊,但其可存放的數據有限,應用於少許的數據傳遞
  • 共享內存
    可實現同主機任意進程間大量數據的通訊,但多進程對共享內存的訪問存在着競爭
  • 同主機進程間同步機制:信號量(Semaphore)
  • 同主機進程間異步機制:信號(Signal)
  • 網絡主機間數據交互:Socket(套接字)

1. IPC

IPC, Inter-Process Communication,進程間通訊,包括消息隊列、信號量和共享內存三種機制。
IPC使用前必需要先建立,每種IPC都有其建立者、全部者和訪問權限。
使用ipcs能夠查看系統中的IPC工具:數組

[niesh@niesh ~]$ ipcs

--------- 消息隊列 -----------
鍵        msqid      擁有者  權限     已用字節數 消息

------------ 共享內存段 --------------
鍵        shmid      擁有者  權限     字節     nattch     狀態
0x00000000 131072     niesh      600        524288     2          目標
0x00000000 163841     niesh      600        4194304    2          目標

--------- 信號量數組 -----------
鍵        semid      擁有者  權限     nsems
  • key:
    用於建立ID值(ID值由一個進程建立的話,因爲進程資源的私有性,另外一個進程沒法獲取到該ID);採用統一key值建立的ID是相同的;
  • id:
    IPC機制的惟一標識

1). 獲取key值 - ftok():

  • 做用
    獲取key值網絡

  • 頭文件數據結構

    #include <sys/ipc.h>
  • 函數原型異步

    key_t ftok(const char *pathname, int proj_id)
  • 參數函數

pathname:文件名
proj_id: 做爲key值的組成部分,用到了低8位工具

  • 返回值
    成功:key值
bit 描述
31-24 proj_id & 0xFF (低8位)
23-16 stat(pathname).st_dev & 0xFF (低8位)
15-0 stat(pathname).st_ino & 0xFFFF (低16位)

失敗:-1ui

幾個結構體須要詳細瞭解:操作系統

  • struct msqid_ds: 消息隊列數據結構
  • struct msg: 單個消息的數據結構
  • struct msgbuf: 用戶自定義消息緩衝區
  • struct msginfo:

2. 消息隊列

1). 經常使用數據結構

mark

①. struct msqid_ds

/* FILE: /usr/include/linux/msg.h   */

/* Obsolete, used only for backwards compatibility and libc5 compiles */
struct msqid_ds {
    struct ipc_perm msg_perm;
    struct msg *msg_first;      /* first message on queue,unused  */
    struct msg *msg_last;       /* last message in queue,unused */
    __kernel_time_t msg_stime;  /* last msgsnd time */
    __kernel_time_t msg_rtime;  /* last msgrcv time */
    __kernel_time_t msg_ctime;  /* last change time */
    unsigned long  msg_lcbytes; /* Reuse junk fields for 32 bit */
    unsigned long  msg_lqbytes; /* ditto */
    unsigned short msg_cbytes;  /* current number of bytes on queue */
    unsigned short msg_qnum;    /* number of messages in queue */
    unsigned short msg_qbytes;  /* max number of bytes on queue */
    __kernel_ipc_pid_t msg_lspid;   /* pid of last msgsnd */
    __kernel_ipc_pid_t msg_lrpid;   /* last receive pid */
};
/* FILE: /usr/include/bits/ipc.h    */

/* Data structure used to pass permission information to IPC operations.  */
struct ipc_perm
  {
    __key_t __key;          /* Key.  */
    __uid_t uid;            /* Owner's user ID.  */
    __gid_t gid;            /* Owner's group ID.  */
    __uid_t cuid;           /* Creator's user ID.  */
    __gid_t cgid;           /* Creator's group ID.  */
    unsigned short int mode;        /* Read/write permission.  */
    unsigned short int __pad1;
    unsigned short int __seq;       /* Sequence number.  */
    unsigned short int __pad2;
    __syscall_ulong_t __unused1;
    __syscall_ulong_t __unused2;
  };

②. struct msg

/*  FILE: /usr/src/kernels/3.10.0-327.el7.x86_64/include/linux/msg.h   */

/* one msg_msg structure for each message */
struct msg_msg {
	struct list_head m_list;
	long m_type;
	size_t m_ts;		/* message text size */
	struct msg_msgseg* next;
	void *security;
	/* the actual message follows immediately */
};

③. struct msgbuf (編程時,必須本身實現,由於mtext大小未定義)

/*FILE: /usr/include/linux/msg.h    */

/* message buffer for msgsnd and msgrcv calls */
struct msgbuf {
    long mtype;         /* type of message */
    char mtext[1];      /* 信息實體(用戶可自定義大小) */
};

④. struct msginfo

/* FILE: /usr/include/linux/msg.h   */

/* buffer for msgctl calls IPC_INFO, MSG_INFO */
struct msginfo 
{
    int msgpool; /* Size in kibibytes of buffer pool
                    used to hold message data;
                    unused within kernel */
    int msgmap;  /* Maximum number of entries in message
                    map; unused within kernel */
    int msgmax;  /* Maximum number of bytes that can be
                   written in a single message */
    int msgmnb;  /* Maximum number of bytes that can be
                   written to queue; used to initialize
                   msg_qbytes during queue creation (msgget(2)) */
    int msgmni;  /* Maximum number of message queues */
    int msgssz;  /* Message segment size;
                  unused within kernel */
    int msgtql;  /* Maximum number of messages on all queues
                  in system; unused within kernel */
    unsigned short int msgseg;  /* Maximum number of segments;
                                  unused within kernel */
};

2). 消息隊列的操做

①. 建立消息隊列 - msgget():

  • 做用
    建立消息隊列

  • 頭文件

    #include <sys/msg.h>
  • 函數原型

    int msgget(key_t key, int msgflg)
  • 參數

key: 有函數 ftok 返回的key值
msgflg: 消息隊列的訪問權限

msgflg num description
IPC_CREAT 0x1000 若key不存在,則建立;存在,則返回ID
IPC_EXCL 0x2000 若key存在,返回失敗
IPC_NOWAIT 0x4000 若須要等待,直接返回錯誤
  • 返回值
    成功:消息隊列的ID
    失敗:-1

②. 消息隊列屬性控制 - msgctl():

  • 做用
    設置/獲取消息隊列的屬性值

  • 頭文件

    #include <sys/msg.h>
  • 函數原型

    int msgctl(int msqid, int cmd, struct msqid_ds *buf)
  • 參數

msqid: 消息隊列ID
cmd: 要執行的操做

micro number description
IPC_RMID 0 刪除消息隊列
IPC_SET 1 設置ipc_perm的參數
IPC_STAT 2 從內核結構體msqid複製信息到msgqid_ds
IPC_INFO 3 獲取限制信息到msginfo結構體
MSG_INFO 12 同IPC_INFO,但會返回msginfo.msgpool/msgmap/msgtql
  • 返回值
    成功: 0(IPC_RMID/IPC_SET/IPC_STAT), 消息隊列數組索引的最大值(IPC_INFO/MSG_INFO)
    失敗:-1

③. 發送/接收消息隊列 - msgsnd()/msgrcv():

  • 做用
    發送消息到消息隊列(添加到尾端)/接收消息

  • 頭文件

    #include <sys/msg.h>
  • 函數原型

    int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg)

.

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg)
  • 參數

msqid: 消息隊列的ID值,msgget()的返回值
msgp: struct msgbuf,(發送/接收數據的暫存區,由用戶自定義大小)
msgsz: 發送/接收消息的大小(範圍:0~MSGMAP)
msgflg: 當達到系統爲消息隊列所指定的界限時,採起的操做(通常設置爲0)
msgtyp:

msgtyp description
= 0 讀取隊列中的第一個消息
> 0 讀取消息隊列的第一條 msgbuf.mtype=msgtype的消息
< 0 讀取第一條 lowest msgbuf.mtype < abs(msgtyp) 的消息
  • 返回值
    成功: 0   (for msgsnd());  實際寫入到mtext的字符個數  (for msgrcv())
    失敗:-1

3. 示例代碼

本程序主要是實現兩個進程經過消息隊列發送信息:

  • server:
  1. 等待接收客戶端發送的數據,若時間超出600s,則自動exit;
  2. 當收到信息後,打印接收到的數據;並原樣的發送給客戶端,由客戶端顯示
  • client:
  1. 啓動兩個進程(父子進程),父進程用於發送數據,子進程接收由server發送的數據;
  2. 發送數據:由使用者手動輸入信息,回車後發送;當寫入「end~」後,退出本進程
  3. 接收數據:接收由Server端發送的數據信息,並打印

代碼以下:
1. Client

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/msg.h>
#include <sys/ipc.h>
#include <signal.h>

#define BUF_SIZE 128

//Rebuild the strcut (must be)
struct msgbuf
{
    long mtype;
    char mtext[BUF_SIZE];
};


int main(int argc, char *argv[])
{
    //1. creat a mseg queue
    key_t key;
    int msgId;
    
    printf("THe process(%s),pid=%d started~\n", argv[0], getpid());

    key = ftok(".", 0xFF);
    msgId = msgget(key, IPC_CREAT|0644);
    if(-1 == msgId)
    {
        perror("msgget");
        exit(EXIT_FAILURE);
    }

    //2. creat a sub process, wait the server message
    pid_t pid;
    if(-1 == (pid = fork()))
    {
        perror("vfork");
        exit(EXIT_FAILURE);
    }

    //In child process
    if(0 == pid)
    {
        while(1)
        {
            alarm(0);
            alarm(100);     //if doesn't receive messge in 100s, timeout & exit
            struct msgbuf rcvBuf;
            memset(&rcvBuf, '\0', sizeof(struct msgbuf));
            msgrcv(msgId, &rcvBuf, BUF_SIZE, 2, 0);                
            printf("Server said: %s\n", rcvBuf.mtext);
        }
        
        exit(EXIT_SUCCESS);
    }

    else    //parent process
    {
        while(1)
        {
            usleep(100);
            struct msgbuf sndBuf;
            memset(&sndBuf, '\0', sizeof(sndBuf));
            char buf[BUF_SIZE] ;
            memset(buf, '\0', sizeof(buf));
            
            printf("\nInput snd mesg: ");
            scanf("%s", buf);
            
            strncpy(sndBuf.mtext, buf, strlen(buf)+1);
            sndBuf.mtype = 1;

            if(-1 == msgsnd(msgId, &sndBuf, strlen(buf)+1, 0))
            {
                perror("msgsnd");
                exit(EXIT_FAILURE);
            }
            
            //if scanf "end~", exit
            if(!strcmp("end~", buf))
                break;
        }
        
        printf("THe process(%s),pid=%d exit~\n", argv[0], getpid());
    }

    return 0;
}

2. Server

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/msg.h>
#include <sys/ipc.h>
#include <signal.h>

#define BUF_SIZE 128

//Rebuild the strcut (must be)
struct msgbuf
{
    long mtype;
    char mtext[BUF_SIZE];
};


int main(int argc, char *argv[])
{
    //1. creat a mseg queue
    key_t key;
    int msgId;
    
    key = ftok(".", 0xFF);
    msgId = msgget(key, IPC_CREAT|0644);
    if(-1 == msgId)
    {
        perror("msgget");
        exit(EXIT_FAILURE);
    }

    printf("Process (%s) is started, pid=%d\n", argv[0], getpid());

    while(1)
    {
        alarm(0);
        alarm(600);     //if doesn't receive messge in 600s, timeout & exit
        struct msgbuf rcvBuf;
        memset(&rcvBuf, '\0', sizeof(struct msgbuf));
        msgrcv(msgId, &rcvBuf, BUF_SIZE, 1, 0);                
        printf("Receive msg: %s\n", rcvBuf.mtext);
        
        struct msgbuf sndBuf;
        memset(&sndBuf, '\0', sizeof(sndBuf));

        strncpy((sndBuf.mtext), (rcvBuf.mtext), strlen(rcvBuf.mtext)+1);
        sndBuf.mtype = 2;

        if(-1 == msgsnd(msgId, &sndBuf, strlen(rcvBuf.mtext)+1, 0))
        {
            perror("msgsnd");
            exit(EXIT_FAILURE);
        }
            
        //if scanf "end~", exit
        if(!strcmp("end~", rcvBuf.mtext))
             break;
    }
        
    printf("THe process(%s),pid=%d exit~\n", argv[0], getpid());

    return 0;
}
相關文章
相關標籤/搜索