Unix下可用的I/O模型有五種:linux
詳見Unix網絡編程卷一第六章編程
select()和poll()在Unix系統中存在時間長,主要優點在於可移植性,主要缺點在於當同時檢查大量的文件描述符時性能拓展性不佳。數組
epoll API的關鍵優點在於能讓應用高效地檢查大量的文件描述符,主要缺點是專屬於Linux系統的API。網絡
select()首次出如今BSD系統的套接字API中。數據結構
select()系統調用的用途:在一段指定的時間內,監聽用戶感興趣的文件描述符上的可讀、可寫和異常事件。多線程
系統調用select()會一直阻塞,直到一個或多個文件描述符集合成爲就緒態。app
#include <sys/select.h> #include <sys/time.h> //如有就緒描述符則返回其數目,若超時則返回0,若出錯則返回-1 int select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset, const struct timeval *timeout);
探究下fd_set的結構dom
/*typesizes.h*/ #define __FD_SETSIZE 1024 /*select.h*/ typedef long int __fd_mask; //long int類型共有多少bits #define __NFDBITS (8 * (int) sizeof (__fd_mask)) typedef struct { //long int型數組,數組大小 = 描述符最大數 / long int的位數 //數組大小爲 __FD_SETSIZE bits __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS]; } fd_set;
select()程序示例:異步
#include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <stdarg.h> static void usageError(const char* progName){ fprintf(stderr, "Usage: %s {timeout | -} fd-num[rw]...\n", progName); fprintf(stderr, " - means infinite timeout; \n"); fprintf(stderr, " r = monitor for read\n"); fprintf(stderr, " w = monitor for wirite\n\n"); fprintf(stderr, " e.g.: %s - 0rw 1w\n", progName); exit(1); } void cmdLineErr(const char *format, ...) { va_list argList; fflush(stdout); /* Flush any pending stdout */ fprintf(stderr, "Command-line usage error: "); va_start(argList, format); vfprintf(stderr, format, argList); va_end(argList); fflush(stderr); /* In case stderr is not line-buffered */ exit(EXIT_FAILURE); } int main(int argc, char* argv[]){ fd_set readfds, writefds; int ready, nfds, fd, numRead, j; struct timeval timeout; struct timeval *pto; char buf[10]; if(argc < 2 || strcmp(argv[1], "--help") == 0){ usageError(argv[0]); } if(strcmp(argv[1], "-") == 0){ pto = NULL; } else{ pto = &timeout; timeout.tv_sec = strtol(argv[1], NULL, 0); timeout.tv_usec = 0; } nfds = 0; FD_ZERO(&readfds); FD_ZERO(&writefds); for(j = 2; j < argc; j++){ numRead = sscanf(argv[j], "%d%2[rw]", &fd, buf); if(numRead != 2){ usageError(argv[0]); } if(fd >= FD_SETSIZE){ cmdLineErr("file descriptor exceeds limit (%d)\n", FD_SETSIZE); } if(fd >= nfds){ nfds = fd + 1; } if(strchr(buf, 'r') != NULL){ FD_SET(fd, &readfds); } if(strchr(buf, 'w') != NULL){ FD_SET(fd, &writefds); } } ready = select(nfds, &readfds, &writefds, NULL, pto); if(ready == -1){ printf("errExit(select)"); exit(1); } printf("ready = %d\n", ready); for(fd = 0; fd < nfds; fd++){ printf("%d: %s%s\n",fd, FD_ISSET(fd, &readfds) ? "r" : "", FD_ISSET(fd, &writefds) ? "w" : ""); } if(pto != NULL){ printf("timeout after select(): %ld.%03ld\n", (long) timeout.tv_sec, (long) timeout.tv_usec / 1000); } exit(0); }
select處理正常數據和帶外數據:socket
#include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> int main(int argc, char* argv[]){ if(argc <= 2){ printf("usage: %s ip_adress port_number\n", basename(argv[0])); return 1; } const char* ip = argv[1]; int port = atoi(argv[2]); int ret = 0; struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET, ip, &address.sin_addr); address.sin_port = htons(port); int listenfd = socket(PF_INET, SOCK_STREAM, 0); assert(listenfd >= 0); ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); assert(ret != -1); ret = listen(listenfd, 5); assert(ret != -1); struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); if(connfd < 0){ printf("error is: %d\n", errno); close(listenfd); } char buf[1024]; fd_set read_fds; fd_set exception_fds; FD_ZERO(&read_fds); FD_ZERO(&exception_fds); while(true){ memset(buf, '\0', sizeof(buf)); FD_SET(connfd, &read_fds); FD_SET(connfd, &exception_fds); ret = select(connfd + 1, &read_fds, NULL, &exception_fds, NULL); if(ret < 0){ printf("selection failure\n"); break; } if(FD_ISSET(connfd, &read_fds)){ ret = recv(connfd, buf, sizeof(buf)-1, 0); if(ret <= 0){ break; } printf("get %d bytes of normal data: %s\n", ret, buf); } else if(FD_ISSET(connfd, &exception_fds)){ ret = recv(connfd, buf, sizeof(buf)-1, MSG_OOB); if(ret <= 0){ break; } printf("get %d bytes of oob data: %s\n", ret, buf); } } close(connfd); close(listenfd); return 0; }
poll函數起源於SVR3,最初侷限於流設備,SVR4取消了這種限制,容許poll工做在任何描述符上。
poll提供的功能與select相似,不過在處理流設備時,它可以提供額外的信息。
#include <poll.h> struct pollfd{ int fd; short events; //指定要測試的條件 short revents;//返回描述符的狀態 } //如有就緒描述符返回其數目,超時返回0,出錯返回-1 int poll(struct pollfd *fdarray, unsigned long nfds, int timeout);
select()同poll()返回正整數的區別:若是一個文件描述符在返回的集合中出現了不止一次,系統調用select()會將同一個文件描述符計數屢次。而系統調用poll()返回的是就緒態文件描述符個數,且一個文件描述符只會統計一次,就算在相應的revents字段中設定了多個位掩碼也是如此。
poll示例程序:
#include <time.h> #include <poll.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <stdarg.h> static void usageError(const char* progName){ fprintf(stderr, "Usage: %s {timeout | -} fd-num[rw]...\n", progName); fprintf(stderr, " - means infinite timeout; \n"); fprintf(stderr, " r = monitor for read\n"); fprintf(stderr, " w = monitor for wirite\n\n"); fprintf(stderr, " e.g.: %s - 0rw 1w\n", progName); exit(1); } int main(int argc, char* argv[]){ int numPipes, j, ready, randPipe, numWrites; int (*pfds)[2];//指向數組的指針 struct pollfd *pollFd; if(argc < 2 || strcmp(argv[1], "--help") == 0){ printf("%s num-pipes [num-writes]\n", argv[0]); exit(1); } numPipes = strtol(argv[1], NULL, 10); pfds = (int (*)[2])calloc(numPipes, sizeof(int [2])); if(pfds == NULL){ printf("error malloc"); exit(1); } pollFd = (pollfd*)calloc(numPipes, sizeof(struct pollfd)); if(pollFd == NULL){ printf("error malloc"); exit(1); } for(j = 0; j < numPipes; j++){ if(pipe(pfds[j]) == -1){ printf("error pipe %d", j); exit(1); } } numWrites = (argc > 2) ? strtol(argv[2], NULL, 10) : 1; srandom((int)time(NULL)); for(j = 0; j < numWrites; j++){ randPipe = random() % numPipes; printf("Writing to fd: %3d (read fd: %3d)\n", pfds[randPipe][1], pfds[randPipe][0]); if (write(pfds[randPipe][1], "a", 1) == -1){ printf("write %d", pfds[randPipe][1]); exit(1); } } for(j = 0; j < numPipes; j++){ pollFd[j].fd = pfds[j][0]; pollFd[j].events = POLLIN; } ready = poll(pollFd, numPipes, -1); if(ready == -1){ printf("poll error"); exit(1); } printf("poll() returned: %d\n", ready); for(j = 0; j < numPipes; j++){ if(pollFd[j].revents & POLLIN){ printf("Readable: %d %3d\n", j, pollFd[j].fd); } } return 0; }
epoll API由三組系統調用組成;
epoll_create()
建立一個epoll實例epoll_ctl()
操做同epoll實例相關聯的興趣列表epoll_wait()
返回與epoll相關聯的就緒列表中的成員epoll實例:epoll API的核心數據結構,和一個打開的文件描述符相關聯。這個文件描述符不用來作IO操做,相反它是內核數據結構的句柄,這些內核數據結構實現了兩個目的:
#include <sys/epoll.h> int epoll_create(int size);
參數size指定咱們想要經過epoll實例來檢查的描述符個數,不是上限,只是告知內核應該如何爲內部數據結構劃分初始大小。
函數返回epoll實例的文件描述符,該文件描述符不須要時須要close()。
當全部與epoll實例相關的文件描述符都被關閉時,實例被銷燬,相關資源釋放。(多個文件描述符可能引用到相同的epoll實例,這是因爲調用了fork()或dup()這樣的相似函數所致)。
linux2.6.8版以來,size參數被忽略不用。
linux2.6.27以來,Linux支持一個新的系統調用epoll_create1():
- 去掉了無用的參數size
- 增長了一個可用來修改系統調用行爲的flags參數
- flag目前只支持一個標誌:EPOLL_CLOEXEC,使內核在新的文件描述符上啓動了執行即關閉(close-on-exec)標誌(FD_CLOEXEC)
#include <sys/epoll.h> int epoll_ctl(int epfd, int op, int fd, struct epoll *ev);
成功返回0,失敗返回-1並設置errno。
參數fd:指明修改興趣列表中哪個文件描述符的設定
參數op:指定須要執行的操做
參數ev:
struct epoll_event{ uint32_t events;//epoll事件,位掩碼 epoll_data_t data; //用戶數據 }
typedef union epoll_data{ void *ptr; int fd; uint32_t u32; uint64_t u64; }epoll_data_t;
max_user_watches上限
每一個註冊到epoll實例上的文件描述符須要佔用一小段不能被交換的內核內存空間,所以內核提供了一個接口用來定義每一個用戶能夠註冊到epoll實例上的文件描述符總數。
這個上限值能夠經過max_user_watches來查看和修改,max_user_watches是專屬於Linux系統的/proc/sys/fd/epoll目錄下的一個文件。默認上限值根據可用系統內存計算得出。
#include <sys/epoll.h> int epoll_wait(int epfd, struct epoll_event *evlist, int maxevents, int timeout);
成功返回就緒態的文件描述符的個數,失敗返回-1並設置errno
參數evlist指向的結構體數組中返回的是有關就緒態文件描述符的信息。數組evlist的空間由調用者負責申請,所包含的元素個數在參數maxevents中指定。
在數組evlist中每一個元素返回的都是單個就緒態文件描述符的信息:
參數timeout用來肯定epoll_wait()的阻塞行爲:
在多線程程序中,能夠在一個線程中使用epoll_ctl()將文件描述符添加到另外一個線程中由epoll_wait()所監視的epoll實例的興趣列表中去。這些對興趣列表的修改將馬上獲得處理,而epoll_wait()調用將返回有關新添加的文件描述符的就緒信息。
epoll事件:除了有一個額外的前綴E外,大多數位掩碼的名稱同poll中對應的事件掩碼名稱相同。例外狀況:
epoll程序示例:
#include <sys/epoll.h> #include <fcntl.h> #include <string.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <errno.h> #define MAX_BUF 1000 #define MAX_EVENTS 5 int main(int argc, char* argv[]){ int epfd, ready, fd, s, j, numOpenFds; struct epoll_event ev; struct epoll_event evlist[MAX_EVENTS]; char buf[MAX_BUF]; if(argc < 2 || strcmp(argv[1], "--help")==0){ printf("usage: %s file...\n", argv[0]); exit(1); } epfd = epoll_create(argc - 1); if(epfd == -1){ printf("error epoll_create"); exit(1); } for(j = 1; j < argc; j++){ fd = open(argv[j], O_RDONLY); if(fd == -1){ printf("error open"); exit(1); } printf("Opened \"%s\" on fd %d\n", argv[j], fd); ev.events = EPOLLIN; ev.data.fd = fd; if(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) == -1){ printf("error epoll_ctl"); exit(1); } } numOpenFds = argc - 1; while(numOpenFds > 0){ printf("About to epoll_wait()\n"); ready = epoll_wait(epfd, evlist, MAX_EVENTS, -1); if(ready == -1){ if(errno == EINTR)continue; else{ printf("error epoll_wait"); exit(1); } } printf("Ready: %d\n", ready); for(j = 0; j < ready; j++){ printf(" fd = %d; events: %s%s%s\n", evlist[j].data.fd, (evlist[j].events & EPOLLIN) ? "EPOLLIN ":"", (evlist[j].events & EPOLLHUP) ? "EPOLLHUP":"", (evlist[j].events & EPOLLERR) ? "EPOLLERR":""); if(evlist[j].events & EPOLLIN){ s = read(evlist[j].data.fd, buf, MAX_BUF); if(s == -1){ printf("error read"); } printf(" read %d bytes : %.*s",s,s,buf); } else if(evlist[j].events & (EPOLLHUP | EPOLLERR)){ printf(" closing fd %d\n", evlist[j].data.fd); if(close(evlist[j].data.fd) == -1){ printf("error close"); exit(1); } numOpenFds--; } } } printf("All file descriptors closed; bye\n"); exit(0); }
ET模式比LT模式觸發事件的次數更少:
#include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include <pthread.h> #define MAX_EVENT_NUMBER 1024 #define BUFFER_SIZE 10 int setnonblocking(int fd){ int old_option = fcntl(fd, F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option; } void addfd(int epollfd, int fd, bool enable_et){ epoll_event event; event.data.fd = fd; event.events = EPOLLIN; if(enable_et){ event.events |= EPOLLET; } epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); setnonblocking(fd); } void lt(epoll_event *events, int number, int epollfd, int listenfd){ char buf[BUFFER_SIZE]; for(int i = 0; i < number; i++){ int sockfd = events[i].data.fd; if(sockfd == listenfd){ struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); addfd(epollfd, connfd, false); } else if(events[i].events & EPOLLIN){ printf("event trigger once\n"); memset(buf, '\0', BUFFER_SIZE); int ret = recv(sockfd, buf, BUFFER_SIZE-1,0); if(ret <= 0){ close(sockfd); continue; } printf("get %d bytes of content: %s\n", ret, buf); } else{ printf("something else happened \n"); } } } void et(epoll_event* events, int number, int epollfd, int listenfd){ char buf[BUFFER_SIZE]; for(int i = 0; i < number; i++){ int sockfd = events[i].data.fd; if(sockfd == listenfd){ struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); addfd(epollfd, connfd, true); } else if(events[i].events & EPOLLIN){ printf("event trigger once\n"); while(true){ memset(buf, '\0',BUFFER_SIZE); int ret = recv(sockfd, buf, BUFFER_SIZE-1, 0); if(ret < 0){ if((errno == EAGAIN) || (errno == EWOULDBLOCK)){ printf("read later\n"); break; } close(sockfd); break; } else if(ret == 0){ close(sockfd); } else{ printf("get %d bytes of content: %s\n",ret, buf); } } } else{ printf("something else happend \n"); } } } int main(int argc, char* argv[]){ if(argc <= 2){ printf("usage: %s ip_address port_number\n", basename(argv[0])); return 1; } const char *ip = argv[1]; int port = atoi(argv[2]); int ret = 0; struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET, ip, &address.sin_addr); address.sin_port = htons(port); int listenfd = socket(PF_INET, SOCK_STREAM, 0); assert(listenfd >= 0); ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); assert(ret != -1); ret = listen(listenfd, 5); assert(ret != -1); epoll_event events[MAX_EVENT_NUMBER]; int epollfd = epoll_create(5); assert(epollfd != -1); addfd(epollfd, listenfd, true); while(true){ int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); if(ret < 0){ printf("epoll failure\n"); break; } lt(events, ret, epollfd, listenfd); //et(events, ret, epollfd, listenfd); } close(listenfd); return 0; }
#include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include <pthread.h> #define MAX_EVENT_NUMBER 1024 #define BUFFER_SIZE 1024 struct fds{ int epollfd; int sockfd; }; int setnonblocking(int fd){ int old_option = fcntl(fd, F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option; } void addfd(int epollfd, int fd, bool oneshot){ epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET; if(oneshot){ event.events |= EPOLLONESHOT; } epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); setnonblocking(fd); } void reset_oneshot(int epollfd, int fd){ epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET | EPOLLONESHOT; epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event); } void *worker(void *arg){ int sockfd = ((fds*)arg)->sockfd; int epollfd = ((fds*)arg)->epollfd; printf("start new thread to receive data on fd: %d\n", sockfd); char buf[BUFFER_SIZE]; memset(buf, '\0', BUFFER_SIZE); while(1){ int ret = recv(sockfd, buf, BUFFER_SIZE-1, 0); if(ret == 0){ close(sockfd); printf("foreiner closed the connection\n"); break; } else if(ret < 0){ if(errno == EAGAIN){ reset_oneshot(epollfd, sockfd); printf("read later\n"); break; } } else{ printf("get content: %s\n", buf); sleep(5); } } printf("end thread receving data on fd : %d\n", sockfd); } int main(int argc, char* argv[]){ if(argc < 2){ printf("usage: %s ip_address port_number\n", basename(argv[0])); return 1; } const char* ip = argv[1]; int port = atoi(argv[2]); int ret = 0; struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET, ip, &address.sin_addr); address.sin_port = htons(port); int listenfd = socket(PF_INET, SOCK_STREAM, 0); assert(listenfd >= 0); ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); assert(ret != -1); ret = listen(listenfd, 5); assert(ret != -1); epoll_event events[MAX_EVENT_NUMBER]; int epollfd = epoll_create(5); assert(epollfd != -1); addfd(epollfd, listenfd, false); while(1){ int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); if(ret < 0){ printf("epoll failure\n"); break; } for(int i = 0; i < ret; i++){ int sockfd = events[i].data.fd; if(sockfd == listenfd){ struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); addfd(epollfd, connfd, true); } else if(events[i].events & EPOLLIN){ pthread_t thread; fds fds_for_new_worker; fds_for_new_worker.epollfd = epollfd; fds_for_new_worker.sockfd = sockfd; pthread_create(&thread, NULL, worker, (void*)&fds_for_new_worker); } else{ printf("something else happened \n"); } } } close(listenfd); return 0; }xxxxxxxxxx c