Linux IPC實踐(7) --Posix消息隊列

1. 建立/獲取一個消息隊列ide

#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>
mqd_t mq_open(const char *name, int oflag);	//專用於打開一個消息隊列
mqd_t mq_open(const char *name, int oflag, mode_t mode,
              struct mq_attr *attr);

參數:函數

   name:  消息隊列名字;ui

   oflag: 與open函數類型, 能夠是O_RDONLY, O_WRONLY, O_RDWR, 還能夠按位或上O_CREAT, O_EXCL, O_NONBLOCK.this

   mode: 若是oflag指定了O_CREAT, 須要指定mode參數;spa

   attr: 指定消息隊列的屬性;命令行

返回值:線程

   成功: 返回消息隊列文件描述符;指針

   失敗: 返回-1;code


注意-Posix IPC名字限制:blog

   1. 必須以」/」開頭, 而且後面不能還有」/」, 形如:/file-name;

   2. 名字長度不能超過NAME_MAX

   3. 連接時:Link with -lrt.

/** System V 消息隊列

經過msgget來建立/打開消息隊列

int msgget(key_t key, int msgflg);

**/

 

2. 關閉一個消息隊列

int mq_close(mqd_t mqdes);
/** System V 消息隊列沒有相似的該函數調用**/

3. 刪除一個消息隊列

int mq_unlink(const char *name);
/** System V 消息隊列
經過msgctl函數, 並將cmd指定爲IPC_RMID來實現
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
**/
//示例
int main()
{
    mqd_t mqid = mq_open("/abc", O_CREAT|O_RDONLY, 0666, NULL);
    if (mqid == -1)
        err_exit("mq_open error");
    cout << "mq_open success" << endl;
    mq_close(mqid);
    mq_unlink("/abc");
    cout << "unlink success" << endl;
}

4. 獲取/設置消息隊列屬性

int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, struct mq_attr *newattr,
                        struct mq_attr *oldattr);

參數:

   newattr: 須要設置的屬性

   oldattr: 原來的屬性

//struct mq_attr結構體說明
struct mq_attr
{
    long mq_flags;       /* Flags: 0 or O_NONBLOCK */
    long mq_maxmsg;      /* Max. # of messages on queue: 消息隊列可以保存的消息數 */
    long mq_msgsize;     /* Max. message size (bytes): 消息的最大長度 */
    long mq_curmsgs;     /* # of messages currently in queue: 消息隊列當前保存的消息數 */
};

/** System V 消息隊列

經過msgctl函數, 並將cmd指定爲IPC_STAT/IPC_SET來實現

int msgctl(int msqid, int cmd, struct msqid_ds *buf);

**/

/** 示例: 獲取消息隊列的屬性
**/
int main(int argc,char **argv)
{
    mqd_t mqid = mq_open("/test", O_RDONLY|O_CREAT, 0666, NULL);
    if (mqid == -1)
        err_exit("mq_open error");

    struct mq_attr attr;
    if (mq_getattr(mqid, &attr) == -1)
        err_exit("mq_getattr error");
    cout << "Max messages on queue: " << attr.mq_maxmsg << endl;
    cout << "Max message size: " << attr.mq_msgsize << endl;
    cout << "current messages: " << attr.mq_curmsgs << endl;

    mq_close(mqid);
    return 0;
}

5. 發送消息

int mq_send(mqd_t mqdes, const char *msg_ptr,
           size_t msg_len, unsigned msg_prio);

參數:

   msg_ptr: 指向須要發送的消息的指針

   msg_len: 消息長度

   msg_prio: 消息的優先級

/** System V 消息隊列

經過msgsnd函數來實現消息發送

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

**/

/** 示例: 向消息隊列中發送消息, prio須要從命令行參數中讀取 **/
struct Student
{
    char name[36];
    int age;
};
int main(int argc,char **argv)
{
    if (argc != 2)
        err_quit("./send <prio>");

    mqd_t mqid = mq_open("/test", O_WRONLY|O_CREAT, 0666, NULL);
    if (mqid == -1)
        err_exit("mq_open error");

    struct Student stu = {"xiaofang", 23};
    unsigned prio = atoi(argv[1]);
    if (mq_send(mqid, (const char *)&stu, sizeof(stu), prio) == -1)
        err_exit("mq_send error");

    mq_close(mqid);
    return 0;
}

6. 從消息隊列中讀取消息

ssize_t mq_receive(mqd_t mqdes, char *msg_ptr,
                       size_t msg_len, unsigned *msg_prio);

參數:

  msg_len: 讀取的消息的長度, 注意: 此值必定要等於mq_attr::mq_msgsize的值, 該值能夠經過mq_getattr獲取, 但通常是8192字節     [this must be greater than the mq_msgsize attribute of the queue (see mq_getattr(3)).]

  msg_prio: 保存獲取的消息的優先級

返回值:

  成功: 返回讀取的消息的字節數

  失敗: 返回-1

  注意: 讀取的永遠是消息隊列中優先級最高的最先的消息, 若是消息隊列爲, 若是不指定爲非阻塞模式, 則mq_receive會阻塞;

/** System V 消息隊列

經過msgrcv函數來實現消息發送的

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);

**/

/** 示例: 從消息隊列中獲取消息 **/
int main(int argc,char **argv)
{
    mqd_t mqid = mq_open("/test", O_RDONLY);
    if (mqid == -1)
        err_exit("mq_open error");

    struct Student buf;
    int nrcv;
    unsigned prio;
    struct mq_attr attr;
    if (mq_getattr(mqid, &attr) == -1)
        err_exit("mq_getattr error");

    if ((nrcv = mq_receive(mqid, (char *)&buf, attr.mq_msgsize, &prio)) == -1)
        err_exit("mq_receive error");

    cout << "receive " << nrcv << " bytes, priority: " << prio << ", name: "
         << buf.name << ", age: " << buf.age << endl;

    mq_close(mqid);
    return 0;
}

7. 創建/刪除消息到達通知事件

int mq_notify(mqd_t mqdes, const struct sigevent *sevp);

參數sevp:

   NULL: 表示撤銷已註冊通知;

   非空: 表示當消息到達且消息隊列當前爲空, 那麼將獲得通知;

通知方式:

   1. 產生一個信號, 須要本身綁定

   2. 建立一個線程, 執行指定的函數

注意: 這種註冊的方式只是在消息隊列從空到非空時才產生消息通知事件, 並且這種註冊方式是一次性的!

//sigevent結構體
struct sigevent
{
    int          sigev_notify; /* Notification method */
    int          sigev_signo;  /* Notification signal */
    union sigval sigev_value;  /* Data passed with notification */
    void       (*sigev_notify_function) (union sigval);  /* Function used for thread notification (SIGEV_THREAD) */
    void        *sigev_notify_attributes; /* Attributes for notification thread (SIGEV_THREAD) */
    pid_t        sigev_notify_thread_id; /* ID of thread to signal (SIGEV_THREAD_ID) */
};
union sigval            /* Data passed with notification */
{
    int     sival_int;         /* Integer value */
    void   *sival_ptr;         /* Pointer value */
};

sigev_notify表明通知的方式: 通常經常使用兩種取值:SIGEV_SIGNAL, 以信號方式通知; SIGEV_THREAD, 以線程方式通知

若是以信號方式通知: 則須要設定一下兩個參數:

   sigev_signo: 信號的代碼

   sigev_value: 信號的附加數據(實時信號)

若是以線程方式通知: 則須要設定如下兩個參數:

   sigev_notify_function

   sigev_notify_attributes

/** Posix IPC所特有的功能, System V沒有 **/

/**示例: 將下面程序多運行幾遍, 尤爲是當消息隊列」從空->非空」, 屢次」從空->非空」, 當消息隊列不空時運行該程序時, 觀察該程序的狀態;
**/
mqd_t mqid;
long size;
void sigHandlerForUSR1(int signo)
{
    //將數據的讀取轉移到對信號SIGUSR1的響應函數中來
    struct Student buf;
    int nrcv;
    unsigned prio;
    if ((nrcv = mq_receive(mqid, (char *)&buf, size, &prio)) == -1)
        err_exit("mq_receive error");

    cout << "receive " << nrcv << " bytes, priority: " << prio << ", name: "
         << buf.name << ", age: " << buf.age << endl;
}

int main(int argc,char **argv)
{
    // 安裝信號響應函數
    if (signal(SIGUSR1, sigHandlerForUSR1) == SIG_ERR)
        err_exit("signal error");

    mqid = mq_open("/test", O_RDONLY);
    if (mqid == -1)
        err_exit("mq_open error");

    // 獲取消息的最大長度
    struct mq_attr attr;
    if (mq_getattr(mqid, &attr) == -1)
        err_exit("mq_getattr error");
    size = attr.mq_msgsize;

    // 註冊消息到達通知事件
    struct sigevent event;
    event.sigev_notify = SIGEV_SIGNAL;  //指定以信號方式通知
    event.sigev_signo = SIGUSR1;        //指定以SIGUSR1通知
    if (mq_notify(mqid, &event) == -1)
        err_exit("mq_notify error");

    //死循環, 等待信號到來
    while (true)
        pause();

    mq_close(mqid);
    return 0;
}
/** 示例:屢次註冊notify, 這樣就能過屢次接收消息, 可是仍是不能從隊列非空的時候進行接收, 將程序改造以下:
**/
mqd_t mqid;
long size;
struct sigevent event;
void sigHandlerForUSR1(int signo)
{
    // 注意: 是在消息被讀走以前進行註冊,
    // 否則該程序就感應不到消息隊列"從空->非空"的一個過程變化了
    if (mq_notify(mqid, &event) == -1)
        err_exit("mq_notify error");

    //將數據的讀取轉移到對信號SIGUSR1的響應函數中來
    struct Student buf;
    int nrcv;
    unsigned prio;
    if ((nrcv = mq_receive(mqid, (char *)&buf, size, &prio)) == -1)
        err_exit("mq_receive error");

    cout << "receive " << nrcv << " bytes, priority: " << prio << ", name: "
         << buf.name << ", age: " << buf.age << endl;
}

int main(int argc,char **argv)
{
    // 安裝信號響應函數
    if (signal(SIGUSR1, sigHandlerForUSR1) == SIG_ERR)
        err_exit("signal error");

    mqid = mq_open("/test", O_RDONLY);
    if (mqid == -1)
        err_exit("mq_open error");

    // 獲取消息的最大長度
    struct mq_attr attr;
    if (mq_getattr(mqid, &attr) == -1)
        err_exit("mq_getattr error");
    size = attr.mq_msgsize;

    // 註冊消息到達通知事件
    event.sigev_notify = SIGEV_SIGNAL;  //指定以信號方式通知
    event.sigev_signo = SIGUSR1;        //指定以SIGUSR1通知
    if (mq_notify(mqid, &event) == -1)
        err_exit("mq_notify error");

    //死循環, 等待信號到來
    while (true)
        pause();

    mq_close(mqid);
    return 0;
}

mq_notify 注意點總結:

   1. 任什麼時候刻只能有一個進程能夠被註冊爲接收某個給定隊列的通知;

   2. 當有一個消息到達某個先前爲空的隊列, 並且已有一個進程被註冊爲接收該隊列的通知時, 只有沒有任何線程阻塞在該隊列的mq_receive調用的前提下, 通知纔會發出;

   3. 當通知被髮送給它的註冊進程時, 該進程的註冊被撤銷. 進程必須再次調用mq_notify以從新註冊(若是須要的話),可是要注意: 從新註冊要放在從消息隊列讀出消息以前而不是以後(如同示例程序);

 

附-查看已經成功建立的Posix消息隊列

#其存在與一個虛擬文件系統中, 須要將其掛載到系統中才能查看

  Mounting the message queue filesystem On Linux, message queues are created in a virtual filesystem.  

(Other implementations may also  provide such a feature, but the details are likely to differ.)  This 

file system can be mounted (by the superuser, 注意是使用root用戶才能成功) using the following commands:

mkdir /dev/mqueue

mount -t mqueue none /dev/mqueue

還可使用cat查看該消息隊列的狀態, rm刪除:

cat /dev/mqueue/abc 

rm abc

還可umount該文件系統

umount /dev/mqueue

 

附-Makefile

.PHONY: clean all
CC = g++
CPPFLAGS = -Wall -g
BIN = main
SOURCES = $(BIN.=.cpp)
all: $(BIN)

%.o: %.c
	$(CC) $(CPPFLAGS) -c $^ -o $@
main: main.o
	$(CC) $(CPPFLAGS) $^ -lrt -o $@

clean:
	-rm -rf $(BIN) *.o bin/ obj/ core
相關文章
相關標籤/搜索