前面兩篇文章分解介紹了匿名管道和命名管道方式的進程間通訊,本文將介紹Linux消息隊列(posix)的通訊機制和特色。異步
消息隊列的實現分爲兩種,一種爲System V的消息隊列,一種是Posix消息隊列;這篇文章將主要圍繞Posix消息隊列介紹;函數
消息隊列能夠認爲是一個消息鏈表,某個進程往一個消息隊列中寫入消息以前,不須要另外某個進程在該隊列上等待消息的達到,這一點與管道和FIFO相反。Posix消息隊列與System V消息隊列的區別以下:atom
(1) 對Posix消息隊列的讀老是返回最高優先級的最先消息,對System V消息隊列的讀則能夠返回任意指定優先級的消息。spa
(2)當往一個空隊列放置一個消息時,Posix消息隊列容許產生一個信號或啓動一個線程,System V消息隊列則不提供相似的機制。線程
2.1 打開一個消息隊列指針
#include <mqueue.h>code
typedef int mqd_t;blog
mqd_t mq_open(const char *name, int oflag, ... /* mode_t mode, struct mq_attr *attr */);隊列
返回: 成功時爲消息隊列描述字,出錯時爲-1。 進程
功能: 建立一個新的消息隊列或打開一個已存在的消息的隊列。
2.2 關閉一個消息隊列
#include <mqueue.h>
int mq_close(mqd_t mqdes);
返回: 成功時爲0,出錯時爲-1。
功能: 關閉已打開的消息隊列。
2.3 刪除一個消息隊列
#include <mqueue.h>
int mq_unlink(const char *name)
返回: 成功時爲0,出錯時爲-1
功能: 從系統中刪除消息隊列。
這三個函數操做的代碼以下:
#include <mqueue.h> #include <unistd.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <stdio.h> int main(int argc, char* argv[]) { int flag = O_RDWR | O_CREAT | O_EXCL; int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; mqd_t mqid = mq_open("/mq_test", flag, mode,NULL); if (mqid == -1) { printf("mqueue create failed!\n"); return 1; } else { printf("mqueue create success!\n"); } mq_close(mqid); return 0; }
#include <mqueue.h> #include <unistd.h> int main(int argc, char* argv[]) { mq_unlink("/mq_test"); return 0; }
注意:編譯posix mqueue時,要鏈接運行時庫(runtime library),既-lrt選項,運行結果以下:
關於mqueue更多詳細內容能夠使用:man mq_overview命令查看,裏面有一條須要注意的是,Linux下的Posix消息隊列是在vfs中建立的,能夠用
mount -t mqueue none /dev/mqueue
將消息隊列掛在在/dev/mqueue目錄下,便於查看。
2.4 mq_close()和mq_unlink()
mq_close()的功能是關閉消息隊列的文件描述符,但消息隊列並不從系統中刪除,要刪除一個消息隊列,必須調用mq_unlink();這與文件系統的unlink()機制是同樣的。
#include <mqueue.h>
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, const struct mq_attr *attr, struct mq_attr *attr);
均返回:成功時爲0, 出錯時爲-1
每一個消息隊列有四個屬性:
struct mq_attr
{
long mq_flags; /* message queue flag : 0, O_NONBLOCK */
long mq_maxmsg; /* max number of messages allowed on queue*/
long mq_msgsize; /* max size of a message (in bytes)*/
long mq_curmsgs; /* number of messages currently on queue */
};
#include <mqueue.h>
int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio);
返回:成功時爲0,出錯爲-1
ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *priop);
返回:成功時爲消息中的字節數,出錯爲-1
mqsend代碼以下:
#include <unistd.h> #include <stdio.h> #include <string.h> #include <mqueue.h> #include <sys/stat.h> #include <sys/types.h> int main(int argc, char* argv[]) { int flag = O_RDWR; int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; mqd_t mqid = mq_open("/mq_test",flag,mode,NULL); if (mqid == -1) { printf("open mqueue failed!\n"); return 1; } char *buf = "hello, i am sender!"; mq_send(mqid,buf,strlen(buf),20); mq_close(mqid); return 0; }
mqrecv代碼以下:
#include <unistd.h> #include <stdio.h> #include <string.h> #include <mqueue.h> #include <sys/stat.h> #include <sys/types.h> int main(int argc, char* argv[]) { int flag = O_RDWR; int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; mqd_t mqid = mq_open("/mq_test",flag,mode,NULL); if (mqid == -1) { printf("open mqueue failed!\n"); return 1; } struct mq_attr attr; mq_getattr(mqid,&attr); char buf[256] = {0}; int priority = 0; mq_receive(mqid,buf,attr.mq_msgsize,&priority); printf("%s\n",buf); mq_close(mqid); return 0; }
運行結果以下:
首先咱們運行三次send,而後運行四次recv,可見recv的前三次是能夠收到消息隊列裏的三個消息的,當運行第四次的時,系統消息隊列裏爲空,recv就會阻塞;關於非阻塞式mqueue見下文。
如前文介紹,poxis消息隊列運行異步通知,以告知什麼時候有一個消息放置到某個空消息隊列中,這種通知有兩種方式能夠選擇:
(1)產生一個信號
(2)建立一個線程來執行一個指定的函數
這種通知經過mq_notify() 函數創建。該函數爲指定的消息消息隊列創建或刪除異步事件通知,
#include <mqueue.h>
int mq_notify(mqd_t mqdes, const struct sigevent* notification);
(1)若是notification參數爲非空,那麼當前進程但願在有一個消息到達所指定的先前爲空的對列時獲得通知。
(2)若是notification參數爲空,並且當前進程被註冊爲接收指定隊列的通知,那麼已存在的註冊將被撤銷。
(3)任意時刻只有一個進程能夠被註冊爲接收某個給定隊列的通知。
(4)當有一個消息到達先前爲空的消息隊列,並且已有一個進程被註冊爲接收該隊列的通知時,只有在沒有任何線程阻塞在該隊列的mq_receive調用中的前提下,通知纔會發出。即說明,在mq_receive調用中的阻塞比任何通知的註冊都優先。
(5)當前通知被髮送給它的註冊進程時,其註冊即被撤銷。該進程必須再次調用mq_notify以從新註冊。
sigevent結構以下:
union sigval{ int sival_int; /*integer value*/ void *sival_ptr; /*pointer value*/ }; struct sigevent{ int sigev_notify; /*SIGEV_{NONE, SIGNAL, THREAD}*/ int sigev_signo; /*signal number if SIGEV_SIGNAL*/ union sigval sigev_value; void (*sigev_notify_function)(union sigval); pthread_attr_t *sigev_notify_attributes; };
5.1 mq_notify() 使用信號處理程序
一個正確的使用非阻塞mq_receive的信號通知的例子:
#include <unistd.h> #include <stdio.h> #include <mqueue.h> #include <sys/stat.h> #include <sys/types.h> #include <signal.h> #include <stdlib.h> #include <errno.h> void sig_usr1(int ); volatile sig_atomic_t mqflag; int main(int argc, char* argv[]) { mqd_t mqid = 0; void *buff; struct mq_attr attr; struct sigevent sigev; sigset_t zeromask,newmask,oldmask; int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; mqid = mq_open("/mq_test",O_RDONLY | O_NONBLOCK,mode,NULL); // 非阻塞式打開mqueue mq_getattr(mqid,&attr); buff = malloc(attr.mq_msgsize); sigemptyset(&zeromask); sigemptyset(&newmask); sigemptyset(&oldmask); sigaddset(&newmask,SIGUSR1); // 初始化信號集 signal(SIGUSR1,sig_usr1); // 信號處理程序 sigev.sigev_notify = SIGEV_SIGNAL; sigev.sigev_signo = SIGUSR1; int n = mq_notify(mqid,&sigev); // 啓用通知 for (;;) { sigprocmask(SIG_BLOCK,&newmask,&oldmask); while(mqflag == 0) sigsuspend(&zeromask); mqflag = 0; ssize_t n; mq_notify(mqid, &sigev); // 從新註冊 while( (n = mq_receive(mqid,buff,attr.mq_msgsize,NULL)) >=0) printf("SIGUSR1 received, read %ld bytes.\n",(long)n); //讀取消息 if(errno != EAGAIN) printf("mq_receive error\n"); sigprocmask(SIG_UNBLOCK,&newmask,NULL); } return 0; } void sig_usr1(int signo) { mqflag = 1; }
這裏爲何使用的是非阻塞式mq_receive,爲何不在信號處理程序中打印接收到的字符請參閱《unp 第二卷》
5.2 mq_notify() 使用線程處理程序
異步事件通知的另外一種方式是把sigev_notify設置成SIGEV_THREAD,這會建立一個新線程,該線程調用由sigev_notify_function指定的函數,所用的參數由sigev_value指定,新線程的屬性由sigev_notify_attributes指定,要指定線程的默認屬性的話,傳空指針。新線程是做爲脫離線程建立的。
#include <unistd.h> #include <stdio.h> #include <mqueue.h> #include <sys/stat.h> #include <sys/types.h> #include <signal.h> #include <stdlib.h> #include <errno.h> #include <pthread.h> mqd_t mqid = 0; struct mq_attr attr; struct sigevent sigev; static void notify_thread(union sigval); int main(int argc, char* argv[]) { int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; mqid = mq_open("/mq_test",O_RDONLY | O_NONBLOCK,mode,NULL); mq_getattr(mqid,&attr); sigev.sigev_notify = SIGEV_THREAD; sigev.sigev_notify_function = notify_thread; sigev.sigev_value.sival_ptr = NULL; sigev.sigev_notify_attributes = NULL; int n = mq_notify(mqid,&sigev); for (;;) pause(); return 0; } static void notify_thread(union sigval arg) { ssize_t n; char* buff; printf("notify_thread_started!\n"); buff = malloc(attr.mq_msgsize); mq_notify(mqid, &sigev); while( (n = mq_receive(mqid,buff,attr.mq_msgsize,NULL)) >=0) printf("SIGUSR1 received, read %ld bytes.\n",(long)n); if(errno != EAGAIN) printf("mq_receive error\n"); free(buff); pthread_exit(0); }