相比於System V消息隊列的問題之一是沒法通知一個進程什麼時候在某個隊列中放置了一個消息,POSIX消息隊列容許異步事件通知(asyschronous event notification),有兩種方式進行選擇:安全
1,產生一個信號數據結構
2,建立一個線程來執行一個指定的函數異步
指定隊列創建和刪除異步事件通知: async
#include <mqueue.h>函數
int mq_notify(mqd_t mqdes, const struct sigevent *sevp);atom
struct sigeventspa
{線程
int sigev_notify; //notification type SIGEV_{NONE, SIGNAL, THREAD}code
int sigev_signo; //signal number if SIGEV_SIGNAL隊列
union sigval sigev_value; // passed to signal handler or thread
// following two is SIGEV_THREAD
void (*sigev_notify_function)(union sigval);
pthread_attr_t *sigev_notify_attributes;
}
union sigval
{
int sival_int; //integer value
void *sival_ptr; //pointer value
}
運用此函數的若干規則:
(1),若是notification爲非空,那麼此進程被註冊爲接收該隊列的通知
(2),若是notification爲空,那麼此進程已存在的註冊將被撤銷
(3),任什麼時候刻只有一個進程能夠被註冊爲接收某個隊列的通知
(4),若是當一個消息到達隊列時,此時正好有個mq_receive阻塞在該隊列上,那麼通知不會發出
(5),當通知到達註冊進程時,其註冊即被撤銷
注意:通知只有在一個消息被放置到一個空隊列上時纔會發出,若是這個消息隊列不是空隊列,則通知無效。
例子:簡單通知
代碼:
//mqnotifysig1.c
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <mqueue.h> #include <signal.h> mqd_t mqd; void *buff; struct mq_attr attr; struct sigevent sigev; static void sig_usr1(int); int main(int argc, char **argv) { if (argc != 2) { printf("usage: mqnotifysig1 <name>"); exit(0); } if ((mqd = mq_open(argv[1], O_RDONLY)) == -1) { printf("open error"); exit(0); } mq_getattr(mqd, &attr); buff = malloc(attr.mq_msgsize); signal(SIGUSR1, sig_usr1); sigev.sigev_notify = SIGEV_SIGNAL; sigev.sigev_signo = SIGUSR1; mq_notify(mqd, &sigev); for ( ; ; ) pause(); exit(0); } static void sig_usr1(int signo) { ssize_t n; mq_notify(mqd, &sigev); n = mq_receive(mqd, buff, attr.mq_msgsize, NULL); printf("SIGUSR1 received, read %ld bytes\n", (long) n); return; }
異步信號安全函數:
這裏涉及到一個異步信號安全(async-signal-safe)的概念:就是在信號處理的函數中有些函數是不能用的,這麼講,可重入的函數就是信號安全。
知足下列條件的函數是不可重入的:
1) 函數體內使用了靜態的數據結構;
2) 函數體內調用了malloc() 或者free() 函數;
3) 函數體內調用了標準I/O 函數。4) 謹慎使用堆棧。最好先在使用前先OS_ENTER_KERNAL 。
例子:信號通知,讓處理程序僅僅設置一個全局標誌,有某個線程檢查該標誌以肯定什麼時候接收到一個信息
代碼:
//mqnotifysig2.c
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <mqueue.h> #include <signal.h> volatile sig_atomic_t mqflag; static void sig_usr1(int); int main(int argc, char **argv) { mqd_t mqd; void *buff; ssize_t n; sigset_t zeromask, newmask, oldmask; struct mq_attr attr; struct sigevent sigev; if (argc != 2) { mq_notify(mqd, &sigev); printf("usage: mqnotifysig2 <name>"); exit(0); } mqd = mq_open(argv[1], O_RDONLY); mq_getattr(mqd, &attr); buff = malloc(attr.mq_msgsize); sigemptyset(&zeromask); sigemptyset(&newmask); sigemptyset(&oldmask); sigaddset(&newmask, SIGUSR1); signal(SIGUSR1, sig_usr1); sigev.sigev_notify = SIGEV_SIGNAL; sigev.sigev_signo = SIGUSR1; mq_notify(mqd, &sigev); for( ; ; ) { sigprocmask(SIG_BLOCK, &newmask, &oldmask); while (mqflag == 0) sigsuspend(&zeromask); mqflag = 0; mq_notify(mqd, &sigev); n = mq_receive(mqd, buff, attr.mq_msgsize, NULL); printf("read %ld bytes\n",(long) n); sigprocmask(SIG_UNBLOCK, &newmask, NULL); } exit(0); } static void sig_usr1(int signo) { mqflag = 1; printf("functin sig_usr1 has been called\n"); return; }
上面的例子有個小小的bug,就是在一個通知來了以後一段時間,即便有消息發送到隊列中,也不會發送出通知來的。這時咱們能夠經過將mq_receive非阻塞讀取來避免。同時,在上面的例子中,主線程被阻塞,信號處理程序執行,主線程再次執行,這樣效率很不高,咱們能夠經過阻塞在某個函數裏,僅僅等待該信號的遞交,而不是讓內核執行一個只爲設置一個標誌的信號處理程序。因此咱們用到sigwait函數。
#include <signal.h>
int sigwait(const sigset_t *set, int *sig);
//mqnotifysig4.c
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <mqueue.h> #include <signal.h> #include <errno.h> int main(int argc, char **argv) { int signo; mqd_t mqd; void *buff; ssize_t n; sigset_t newmask; struct mq_attr attr; struct sigevent sigev; if (argc != 2) { printf("usage: mqnotifysig3 <name>"); exit(0); } mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK); mq_getattr(mqd, &attr); buff = malloc(attr.mq_msgsize); sigemptyset(&newmask); sigaddset(&newmask, SIGUSR1); sigprocmask(SIG_BLOCK, &newmask, NULL); sigev.sigev_notify = SIGEV_SIGNAL; sigev.sigev_signo = SIGUSR1; mq_notify(mqd, &sigev); for ( ; ; ) { sigwait(&newmask, &signo); if (signo == SIGUSR1) { mq_notify(mqd, &sigev); while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) { printf("reda %ld bytes\n",(long) n); } if (errno != EAGAIN) { printf("mq_receive error"); exit(0); } } } exit(0); }
消息隊列描述符並非普通的文件描述符,因此咱們沒法使用select機制來高效率的處理,可是咱們能夠模仿出來,經過在信號處理函數中,往一個管道寫入一個字符,那麼在主線程中select此管道描述就OK 了。
例子:
代碼:
//mqnotifysig5.c
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <mqueue.h> #include <signal.h> #include <errno.h> int pipefd[2]; static void sig_usr1(int); int main(int argc, char **argv) { int nfds; char c; fd_set rset; mqd_t mqd; ssize_t n; void *buff; struct mq_attr attr; struct sigevent sigev; if (argc != 2) { printf("usage: mq_notifysig5 <name>"); exit(0); } mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK); mq_getattr(mqd, &attr); buff = malloc(attr.mq_msgsize); pipe(pipefd); signal(SIGUSR1, sig_usr1); sigev.sigev_notify = SIGEV_SIGNAL; sigev.sigev_signo = SIGUSR1; mq_notify(mqd, &sigev); FD_ZERO(&rset); for ( ; ; ) { FD_SET(pipefd[0], &rset); nfds = select(pipefd[0] + 1, &rset, NULL, NULL, NULL); if (FD_ISSET(pipefd[0], &rset)) { read(pipefd[0], &c, 1); mq_notify(mqd, &sigev); while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) { printf("read %ld bytes\n", (long) n); } if (errno != EAGAIN) { printf("mq_receive error"); exit(0); } } } exit(0); } static void sig_usr1(int signo) { write(pipefd[1], "", 1); return; }
異步事件通知的另外一種方式是把sigev_notify設置成SIGEV_THREAD,這樣當一消息放到一個空的隊列上,會建立一個線程。
例子:
代碼:
//mqnotifysigthread1.c
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <mqueue.h> #include <signal.h> #include <errno.h> mqd_t mqd; struct mq_attr attr; struct sigevent sigev; static void notify_thread(union sigval); int main(int argc, char **argv) { if (argc != 2) { printf("usage: mqnotifythread1 <name>"); exit(0); } mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK); mq_getattr(mqd, &attr); sigev.sigev_notify = SIGEV_THREAD; sigev.sigev_value.sival_ptr = NULL; sigev.sigev_notify_function = notify_thread; sigev.sigev_notify_attributes = NULL; mq_notify(mqd, &sigev); for ( ; ; ) pause; exit(0); } static void notify_thread(union sigval arg) { ssize_t n; void *buff; printf("notify_thread started\n"); buff = malloc(attr.mq_msgsize); mq_notify(mqd, &sigev); while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) { printf("read %ld bytes\n", (long) n); } if (errno != EAGAIN) { printf("mq_receive error\n"); exit(0); } free(buff); pthread_exit(NULL); }