內核(操做系統)一旦發現進程指定的一個或者多個IO條件準備讀或者準備寫的時候,就會給該進程發一個通知。當服務端要處理多個套接字文件描述符的時候,這個時候能夠採用IO複用,操做系統發現哪些套接字文件描述符可讀或可寫的時候,就會通知相應的進程纔去執行對應的read(保證文件描述符對應的地址有可用的數據返回,而不是因爲試探性的返回無用的值)或write操做。html
能夠舉個例子:linux
例如:如今李老師收取剛剛佈置給學生要默寫在紙上的古詩的做業。ios
第一種狀況:李老師按照學號的順序來收取,而且會等待將要收取做業的同窗贊成提交做業,直到該同窗提交做業,纔會去下一個學號的同窗那裏去詢問是否提交做業。(循環處理每一個socket,不支持高併發,效率低)編程
第二種狀況:李老師向其餘老師請求幫助,拉來了不少老師幫忙收做業,每一個老師處理一小部分學生的古詩詞做業的提交任務。(至關與建立多個進程或者線程處理socket)數組
第三種狀況:李老師站在講臺上,根據同窗們的反應來作出相應的動做(若是誰的要提交做業,該同窗就舉手),某些同窗舉手後,李就會去收取這些同窗的做業。(IO複用)網絡
將多個文件描述符集中到一塊兒統一監視。好比對多個套接字進行統一管理與調度 。數據結構
函數會作的事情包括:併發
檢測是否存在套接字接受數據socket
檢測是否存在套接字無阻塞的傳輸數據函數
哪些套接字發生了異常
該函數的調用時的順序
1. 設置文件描述符
將須要監視的文件描述符集中到一塊兒(fd_set),集中的時候要按照監視項來區分(包括接收,傳輸,異常)。fd_set
結構體以下:
/* The fd_set member is required to be an array of longs. */ typedef long int __fd_mask; /* Number of descriptors that can fit in an `fd_set'. */ #define __FD_SETSIZE 1024 /* It's easier to assume 8-bit bytes than to get CHAR_BIT. */ #define __NFDBITS (8 * (int) sizeof (__fd_mask)) /* fd_set for select and pselect. */ typedef struct { __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS]; } fd_set;
fd_set
結構以下(有三種狀況,監聽接收做用的fd_set,監聽傳輸做用的fd_set,監聽異常做用的fd_set),當對應的區上的位置的值0被置爲1,表示該位置對應的文件描述符正在被監視,或可讀或可寫,亦或者是有異常:
在fd_set
中註冊文件描述符或者更改值的操做都是由相關宏來完成的。
/* Access macros for `fd_set'. */ #define FD_SET(fd, fdsetp) __FD_SET (fd, fdsetp) #define FD_CLR(fd, fdsetp) __FD_CLR (fd, fdsetp) #define FD_ISSET(fd, fdsetp) __FD_ISSET (fd, fdsetp) #define FD_ZERO(fdsetp) __FD_ZERO (fdsetp) FD_ZERO(fd_set* fdset); //將fd_set的全部位都初始化爲0 FD_SET(int fd, fd_set* fdset); //在fd_set中註冊文件描述符fd的信息 FD_CLR(int fd, fd_set* fdset); //從參數fd_set中清除文件描述符fd的信息 FD_ISSET(int fd, fd_set* fdset); //若參數fd_set所指向的變量包含文件描述符fd的信息,則返回1,不然返回0/* Access macros for `fd_set'. */ #define FD_SET(fd, fdsetp) __FD_SET (fd, fdsetp) #define FD_CLR(fd, fdsetp) __FD_CLR (fd, fdsetp) #define FD_ISSET(fd, fdsetp) __FD_ISSET (fd, fdsetp) #define FD_ZERO(fdsetp) __FD_ZERO (fdsetp) FD_ZERO(fd_set* fdset); //將fd_set的全部位都初始化爲0 FD_SET(int fd, fd_set* fdset); //在fd_set中註冊文件描述符fd的信息 FD_CLR(int fd, fd_set* fdset); //從參數fd_set中清除文件描述符fd的信息 FD_ISSET(int fd, fd_set* fdset); //若參數fd_set所指向的變量包含文件描述符fd的信息,則返回1,不然返回0
2.指定監視範圍和超時
select
函數的原型以下:
/* Check the first NFDS descriptors each in READFDS (if not NULL) for read readiness, in WRITEFDS (if not NULL) for write readiness, and in EXCEPTFDS (if not NULL) for exceptional conditions. If TIMEOUT is not NULL, time out after waiting the interval specified therein. Returns the number of ready descriptors, or -1 for errors. This function is a cancellation point and therefore not marked with __THROW. */ extern int select (int __nfds, fd_set *__restrict __readfds, fd_set *__restrict __writefds, fd_set *__restrict __exceptfds, struct timeval *__restrict __timeout);
第一個參數:監視對象文件描述符的數量
第二個參數:傳遞包含全部關注「是否存在待讀取」的文件描述符的fdset。
第三個參數:傳遞包含全部關注「是否可傳輸無阻塞數據」的文件描述符的fdset 。
第三個參數:傳遞包含全部關注「是否可發生異常」的文件描述符的fdset 。
第四個參數:爲防止陷入無限阻塞的狀態, 傳遞超時信息。
3. 調用select函數
調用函數返回結果:
-1:發送錯誤時返回-1。
0:超時返回0。
>0:返回發生時間的文件描述符。
4. 調用select查看結果
select
函數返回值若是是大於0的整數,表示相應數量的文件描述符發生了變化。以下圖示例,
調用函數select函數時,向其傳遞的fd_set變量將發送變化,全部的1都被爲0,但發生變化變化的文件描述符對應的位除外,如圖,調用select函數結束後,可知傳入的fd_set中只有fd1和fd3是1,即它們對應的文件描述符發生了變化。
理解select模型的關鍵在於理解fd_set(這裏聲明的變量名爲reads)這個數據結構,如今假設fd_set的大小是1字節,即8個bit(位)。執行過程能夠這樣表示:
第一步:FD_ZERO(&reads); 將read指向的fd_set初始化爲00000000。
將對應位設置處於監聽狀態,如fd=6,FD_SET(fd, &reads);此時fd_set變爲00100000。
若是還有須要監聽的文件描述符,fd1=1,fd2=2,經過FD_SET後結果變爲00100011。
調用select(7,reads,0,0,timeval)阻塞等待。默認是從位置0開始,因此要將最大的fd_max+1。
若是此時fd=2對應的文件描述服發生了可讀事件,select調用結束,此時fd_set對應的值是00000010,沒有事件發生所對應的位將被清空。
利用select監聽鍵盤輸入操做:
#include <iostream> #include <list> #include <unistd.h> #include <sys/wait.h> #include <sys/select.h> using namespace std; #define BUF_SIZE 30 int main(){ fd_set reads, temps; int result, str_len; char buf[BUF_SIZE]; struct timeval timeout; FD_ZERO(&reads); FD_SET(0, &reads); //0-該位置是控制檯的標準輸入 while (1) { temps = reads; timeout.tv_sec = 5; //秒 timeout.tv_usec = 0; //微秒 result = select(1, &temps, 0, 0, &timeout); if(result == -1){ puts("select() error..."); break; } else if(result == 0){ puts("Time wait..."); }else{ if(FD_ISSET(0, &temps)){ //fd_set指向的變量中包含文件描述符的信息,返回真 str_len = read(0, buf, BUF_SIZE); buf[str_len] = 0; printf("message from console: %s.", buf); } } } return 0; }
可監控的文件描述符的數量與機器對應的fd_set大小有關,即sizeof(fd_set);
將fd_set傳入到select函數調用前,還須要一個fd_set結構存儲源數據,用於和調用select函數後的fd_set進行逐位對比,若是有事件發生,則經過FD_ISSET返回;若是原來標記爲1,處於監聽的文件描述符但沒有事件發生,此時會將其置爲0;。如上面示例程序的reads和temps。
select上接收到普通數據或者帶外數據會使select返回不一樣的就緒狀態,普通數據觸發可讀狀態,帶外數據觸發異常狀態。
以下,是一個I/O複用的服務端的案例用來解決多客戶端請求的問題:
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <signal.h> #include <sys/wait.h> #include <arpa/inet.h> #include<sys/socket.h> #define BUF_SIZE 100 void error_handling(char* message); int main(int argc, char *argv[]){ int serv_sock, clnt_sock; struct sockaddr_in serv_adr, clnt_adr; struct timeval timeout; fd_set reads, cpy_reads; socklen_t adr_sz; int fd_max, str_len, fd_num, i; char buf[BUF_SIZE]; if(argc != 2){ printf("Usage : %s <port>\n", argv[0]); exit(1); } serv_sock = socket(PF_INET, SOCK_STREAM, 0); memset(&serv_adr, 0, sizeof(serv_adr)); serv_adr.sin_family = AF_INET; serv_adr.sin_port = htons(atoi(argv[1])); serv_adr.sin_addr.s_addr = htonl(INADDR_ANY); if(bind(serv_sock, (struct sockaddr*)&serv_adr, sizeof(serv_adr)) == -1){ error_handling("bind() error........."); } if(listen(serv_sock, 5) == -1){ error_handling("listen error........."); } FD_ZERO(&reads); FD_SET(serv_sock, &reads); fd_max = serv_sock; while(1){ //無限循環中調用select cpy_reads = reads; timeout.tv_sec = 5; timeout.tv_usec = 500; if((fd_num = select(fd_max+1, &cpy_reads, 0, 0, &timeout)) == -1) break; if(fd_num == 0) continue; for(i = 0; i < fd_max+1; i++){ //遍歷觀察那些文件描述符發生了變化 if(FD_ISSET(i, &cpy_reads)) //觀察fd_set中位發生變化 { if(i == serv_sock){ //若是是鏈接請求 adr_sz = sizeof(clnt_adr); clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &adr_sz); FD_SET(clnt_sock, &reads); if(fd_max < clnt_sock) //若是clnt_sock對應fd_set中位置大於原先設定的須要監聽的範圍,則修改監聽範圍。 fd_max = clnt_sock; printf("Connected client : %d \n", clnt_sock); } else{ //某些套接字文件描述符指向的信息發生了改變,即收到通知,該文件描述符如今可讀 str_len = read(i, buf, BUF_SIZE); if(str_len == 0){ FD_CLR(i,&reads); close(i); printf("close client:%d \n", i); }else{ write(i, buf, str_len); } } } } } close(serv_sock); return 0; } void error_handling(char* message){ fputs(message, stderr); fputc('\n', stderr); exit(1); }
epoll是在2.5.44版內核中提出的(在使用前,應該驗證一下內核版本,如今大部份內核版本都在2.6以上,能夠經過cat /proc/sys/kernel/osrelease
查看),並且epoll方式只在linux下體統支持。關於epoll實現的三個函數:
#include <sys/epoll.h> int epoll_create(int size); int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
epoll_create
int epoll_create(int size);
經過調用該函數執行成功後建立的文件描述符保存空間被稱爲「epoll例程」,參數size只是爲操做系統提供一個參考須要爲epoll例程多大的空間,即size大小並不等於最終的epoll例程大小。
epoll_ctl
生成epoll例程後,在其內部註冊監視對象文件描述符時須要用到epoll_ctl
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
參數說明:
epfd 用於註冊監視對象的epoll實例(文件描述符)
op 指定監視對象的添加,刪除或修改等操做
fd 須要監視對象的文件描述符
event 監視對象的事件類型
op能夠有3個值,分別爲:
EPOLL_CTL_ADD : 添加監聽的事件
EPOLL_CTL_DEL : 刪除監聽的事件
EPOLL_CTL_MOD : 修改監聽的事件
如epoll_ctl(A, EPOLL_CTL_ADD, B, C)
表示在epoll例程A中註冊文件描述符B用於監視參數C中的事件。
epoll_event的結構體以下:
struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable 根據用戶需求定製 */ }; typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t;
epoll_event的event中保存的常量及其對應的具體時間類型
EPOLLERR : 文件上發上了一個錯誤。這個事件是一直監控的,即便沒有明確指定
EPOLLHUP : 文件被掛斷。這個事件是一直監控的,即便沒有明確指定
EPOLLRDHUP : 對端關閉鏈接或者shutdown寫入半鏈接
EPOLLET : 開啓邊緣觸發,默認的是水平觸發,因此咱們並未看到EPOLLLT
EPOLLONESHOT : 一個事件發生並讀取後,文件自動再也不監控
EPOLLIN : 文件可讀
EPOLLPRI : 文件有緊急數據可讀
EPOLLOUT : 文件可寫
EPOLLWAKEUP : 若是EPOLLONESHOT和EPOLLET清除了,而且進程擁有CAP_BLOCK_SUSPEND權限,那麼這個標誌可以保證事件在掛起或者處理的時候,系統不會掛起或休眠
以下示例代碼展現其過程
struct epoll_event event; event.events = EPOLLIN; //發生須要讀取數據的狀況時 event.data.fd = sockfd; epoll_ctl(epfd,EPOLL_CTL_ADD,sockfd,event);
若是epoll_ctl
方法返回-1,則標誌出現了問題,咱們能夠讀取errno來定位錯誤,有以下errno會被設置:
EBADF : epfd或者fd不是一個有效的文件描述符
EEXIST : op爲EPOLL_CTL_ADD,但fd已經被監控
EINVAL : epfd是無效的epoll文件描述符
ENOENT : op爲EPOLL_CTL_MOD或者EPOLL_CTL_DEL,而且fd未被監控
ENOMEM : 沒有足夠的內存完成當前操做
ENOSPC : epoll實例超過了/proc/sys/fs/epoll/max_user_watches中限制的監聽數量
epoll_wait
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
參數說明:
epfd 用於註冊監視對象的epoll實例(文件描述符)
events 保存發生事件的文件描述符集合的結構體地址值(數組首地址)
maxevents 保存的最大事件數量
timeout 等待時間(毫秒),-1表示一直等待事件的發生
該函數的做用相似與select函數。該函數被調用後,返回發生事件的文件描述符數,同時,第二個參數保存發生事件的文件描述符集合。此時,就不須要向像select那樣針對全部文件描述符進行循環掃描,肯定發生事件的文件描述符。
工做模式
epoll對文件描述符的操做有兩種模式:LT(level trigger 條件觸發)和ET(edge trigger 邊緣觸發)。LT模式是默認模式,LT模式與ET模式的區別能夠經過TCP/IP網絡編程書中的案例進行解釋:
LT 水平觸發
兒子:「媽媽,我收到了5000元壓歲錢。」
媽媽:「恩,省着點花!」
兒子:「媽媽,我今天買了個ipad,花了3000元。」
媽媽:「噢,這東西真貴。」
兒子:「媽媽,我今天買好多吃的,還剩1000元。」
媽媽:「用完了這些錢,我可不會再給你了。」
兒子:「媽媽,那1000元我沒花,零花錢夠用了。」
媽媽:「恩,這纔是明智的作法!」
兒子:「媽媽,那1000元我沒花,我要攢起來。」
媽媽:「恩,加油!」
只要兒子手中還有錢,他就會一直彙報,這就是LT模式。有錢就是1,沒錢就是0,那麼只要兒子還有錢,這種事件就是1->1類型事件,天然是LT。將案例中兒子錢包換成輸入緩衝區,壓歲錢換成輸入數據,在條件觸發中,只要輸入緩衝區有數據,將將會以事件的方式再次註冊。
ET 邊緣觸發
兒子:「媽媽,我收到了5000元壓歲錢。」
媽媽:「恩,省着點花!」
兒子:「……」
媽媽:「你卻是說話啊?壓歲錢呢?!」
兒子從沒錢到有錢,是一個0->1的過程,所以爲ET。兒子和媽媽說過本身拿到了壓歲錢就完事了,至於怎麼花錢,還剩多少錢,一律不說。能夠看出,邊緣觸發中輸入緩衝區中收到數據時僅註冊一次,即便輸入緩衝區中還有數據,也不會再次註冊。
示例程序
基於socket的客戶端和服務端利用epoll來處理I/O複用問題:
服務端程序:
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <signal.h> #include <sys/wait.h> #include <arpa/inet.h> #include<sys/socket.h> #include<sys/epoll.h> #define BUF_SIZE 4 #define EPOLL_SIZE 50 void error_handling(char* message); int main(int argc, char *argv[]){ int serv_sock, clnt_sock; struct sockaddr_in serv_adr, clnt_adr; socklen_t adr_sz; int str_len, i; char buf[BUF_SIZE]; if(argc != 2){ printf("Usage : %s <port>\n", argv[0]); exit(1); } struct epoll_event* ep_events; struct epoll_event event; int epfd, event_cnt; serv_sock = socket(PF_INET, SOCK_STREAM, 0); memset(&serv_adr, 0, sizeof(serv_adr)); serv_adr.sin_family = AF_INET; serv_adr.sin_port = htons(atoi(argv[1])); serv_adr.sin_addr.s_addr = htonl(INADDR_ANY); if(bind(serv_sock, (struct sockaddr*)&serv_adr, sizeof(serv_adr)) == -1){ error_handling("bind() error........."); } if(listen(serv_sock, 5) == -1){ error_handling("listen error........."); } epfd = epoll_create(EPOLL_SIZE); //返回建立的epoll文件描述符 ep_events = (struct epoll_event*)malloc(sizeof(struct epoll_event)*EPOLL_SIZE); event.events = EPOLLIN; //發生讀取事件的時候 event.data.fd = serv_sock; epoll_ctl(epfd, EPOLL_CTL_ADD, serv_sock, &event); while(1){ event_cnt = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1); if(event_cnt == -1){ error_handling("epoll_wait() failed..."); } puts("return epoll_wait"); for(i = 0; i < event_cnt; i++){ if(ep_events[i].data.fd == serv_sock){ //處理新進的鏈接請求 adr_sz = sizeof(clnt_adr); clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &adr_sz); event.data.fd = clnt_sock; event.events= EPOLLIN; epoll_ctl(epfd, EPOLL_CTL_ADD, clnt_sock, &event); printf("Connected client: %d \n", clnt_sock); }else{ str_len = read(ep_events[i].data.fd, buf, BUF_SIZE); if(str_len == 0){ // 關閉客戶端鏈接 epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL); //將讀取完的鏈接取消監聽 close(ep_events[i].data.fd); //關閉客戶端鏈接 printf("close client: %d \n", ep_events[i].data.fd); }else{ write(ep_events[i].data.fd, buf, str_len); } } } } close(serv_sock); close(epfd); return 0; } void error_handling(char* message){ fputs(message, stderr); fputc('\n', stderr); exit(1); }
客戶端程序:
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> #include <arpa/inet.h> #include <unistd.h> using namespace std; #define BUF_SIZE 1024 void errorhandling(char *message); int main(int argc, char *argv[]) { int sock; struct sockaddr_in serv_addr; char message[BUF_SIZE]; int str_len = 0 ,idx = 0, read_len = 0; if(argc != 3){ printf( "Usage : %d <IP> <port> ", argv[0]); exit(0); } sock = socket(PF_INET, SOCK_STREAM, 0); if(sock == -1){ errorhandling("socket() error;"); } memset(&serv_addr, 0, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = inet_addr(argv[1]); serv_addr.sin_port = htons(atoi(argv[2])); if(connect(sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -1){ errorhandling("connect error!"); }else{ printf("connected.....\n"); } while(1){ fgets(message, BUF_SIZE, stdin); fflush(stdin); if(!strcmp(message, "q\n") || !strcmp(message, "Q\n")) break; write(sock, message, strlen(message)); memset(message, 0, sizeof(message)); str_len = read(sock, message, BUF_SIZE - 1); message[str_len] = '\0'; printf("Message from server : %s", message); } close(sock); return 0; } void errorhandling(char *message){ fputs(message, stderr); fputc('\n', stderr); exit(1); }