Linux網絡編程-IO複用技術

IO複用是Linux中的IO模型之一,IO複用就是進程預先告訴內核須要監視的IO條件,使得內核一旦發現進程指定的一個或多個IO條件就緒,就經過進程進程處理,從而不會在單個IO上阻塞了。Linux中,提供了select、poll、epoll三種接口函數來實現IO複用。ios

 

一、select函數

#include <sys/select.h>  
#include <sys/time.h>  
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);  
    // 返回:如有就緒描述符則爲其個數,超時爲0,出錯-1  

nfds參數指定了被監聽文件描述符的個數,一般設置爲監聽的全部描述符最大值加1,由於文件描述符是從0開始的。readfs、writefds和exceptfds分別對應可讀、可寫和異常等事件文件描述符集合,當調用select時,經過這3個參數傳入本身感興趣的文件描述符,select函數返回後,內核經過修改他們來通知應用程序那些文件描述符已經就緒。編程

fd_set結構體包含一個整形數組,該數組中每個元素的每一位標記一個文件描述符,fd_set容納的文件描述符數量由FD_SETSIZE指定,這就限制了select能同時處理的文件描述符最大個數。經過一些宏來操做fd_set結構體中的位:數組

#include <sys/select.h>  
FD_ZERO(fd_set *fdset);     /* 清除fdset全部標誌位 */  
FD_SET(int fd, fd_set fdset);       /* 設置fdset標誌位fd */  
FD_CLR(int fd, fd_set fdset);       /* 清除fdset標誌位fd */  
int FD_ISSET(int fd, fd_set *fdset);    /* 測試fdset的位fd是否被設置 */  

timeout參數用來設置select的超時時間,它是一個timeval結構類型指針,採用指針參數是應爲內核將修改它以告訴應用程序select等待了多久。不過咱們不能徹底信任select調用返回的timeout值,好比調用失敗後timeout的值是不肯定的。網絡

struct timeval  
{  
    long tv_sec;    //秒數  
    long tv_usec;   //微秒數  
};  

select提供了一個微妙的定時方案,若是給timeval的成員都賦值0,則select將當即返回;若是timeout爲NULL,則select將一直阻塞,直到某個文件描述符就緒。select成功時返回就緒的文件描述符的總數,若是在超時時間內沒有任何描述符就緒,select返回0,select失敗返回-1並設置errno。若是在select等待期間,程序收到信號,則select當即返回-1,並設置errno爲EINTR。socket

select缺點:函數

  • 每次調用select,都須要把fd集合從用戶態拷貝到內核態,這個開銷在fd不少時會很大
  • 每次調用select都須要在內核遍歷傳遞進來的全部fd,這個開銷在fd不少時也很大
  • select支持的文件描述符數量過小了,默認是1024

select測試用例:測試

#include <iostream>
#include <vector>

#include <unistd.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>

using namespace std;

int getMaxNumOfVector(vector<int> &fds);
vector<int> flipVector(vector<int> &fds);

int main(int argc, char **argv)
{
    vector<int> fds;
    int listenfd, connfd;
    struct sockaddr_in servaddr;
    
    listenfd = socket(AF_INET, SOCK_STREAM, 0);
    
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(8080);
    servaddr.sin_addr.s_addr = INADDR_ANY;
    
    bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
    listen(listenfd, 5);
    
    fd_set read_fd;
    fd_set write_fd;
    fd_set except_fd;
    char buff[1024];
    
    FD_ZERO(&read_fd);
    FD_ZERO(&write_fd);
    FD_ZERO(&except_fd);
    
    fds.push_back(STDIN_FILENO);
    fds.push_back(listenfd);
    
    bool running = true;
    while (running) {
        buff[0] = '\0';
        
        /**
         * 每次調用select都要從新初始化read_fd和except_fd中的文件描述符集
         */
        for (int i = 0; i < fds.size(); i++) {
            FD_SET(fds[i], &read_fd);
            if ((fds[i] != STDIN_FILENO) && (fds[i] != listenfd)) {
                //FD_SET(fds[i], &write_fd);
                FD_SET(fds[i], &except_fd);
            }
        }
        
        int event_num = select(getMaxNumOfVector(fds) + 1, &read_fd, &write_fd, &except_fd, NULL);
        if (event_num < 0) {
            cerr << "select error" << endl;
            break;
        }
        
        for (int i = 0; i < fds.size(); i++) {
            if (fds[i] == STDIN_FILENO) {
                // 從STDIN_FILENO中讀取數據
                if (FD_ISSET(STDIN_FILENO, &read_fd)) {
                    cin >> buff;
                    if (strcmp(buff, "quit") == 0) {
                        running = false;
                        break;
                    }
                    else {
                        cout << buff << endl;
                    }
                }
            }
            else if (fds[i] == listenfd) {
                if (FD_ISSET(listenfd, &read_fd)) {
                    connfd = accept(listenfd, NULL, NULL);
                    if (connfd < 0) {
                        running = false;
                        break;
                    }
                    
                    fds.push_back(connfd);
                    cout << "往fds添加 " << connfd << ", fds.size: " << fds.size() << endl;
                }
            }
            else {
                if (FD_ISSET(fds[i], &read_fd)) {
                    int len = recv(fds[i], buff, sizeof(buff) - 1, 0);
                    if (len < 0) {
                        cerr << "recv error" << endl;
                    }
                    else if (len == 0) {
                        cout << "從fds刪除 " << fds[i] << endl;
                        // 客戶端斷開了鏈接
                        fds[i] = -1;
                    }
                    else {
                        buff[len] = '\0';
                        cout << fds[i] << " recv: " << buff << endl;
                    }
                }
                else if (FD_ISSET(fds[i], &write_fd)) {
                    
                }
                else if (FD_ISSET(fds[i], &except_fd)) {
                    
                }
            }
        }
        
        fds = flipVector(fds);
    }
    
    // 關閉文件描述符
    for (int i = 0; i < fds.size(); i++) {
        close(fds[i]);
    }
    close(listenfd);

    return 0;
}

int getMaxNumOfVector(vector<int> &fds)
{
    int result = 0;
    
    for (int i = 0; i < fds.size(); i++) {
        if (fds[i] > result) {
            result = fds[i];
        }
    }
    
    return result;
}

vector<int> flipVector(vector<int> &fds) {
    vector<int> fdsnew;
    
    for (int i = 0; i < fds.size(); i++) {
        if (fds[i] != -1) {
            fdsnew.push_back(fds[i]);
        }
    }
    
    return fdsnew;
}

 

poll函數

#include <poll.h>  
int poll(struct pollfd *fds, nfds_t nfds, int timeout);  
    // 返回:如有就緒描述符則爲其數目,超時爲0,出錯-1 

poll系統調用和select相似,也是在必定時間內輪詢必定數量的文件描述符,測試是否有就緒者。nfds參數指定被監聽事件集合fds的大小,timeout指定poll的超時值,單位爲毫秒,當timeout爲-1時,poll調用將一直阻塞,直到某個事件發生;當timeout爲0時,poll調用立刻返回。ui

pollfd結構體spa

struct pollfd  
{  
    int fd;         /* 文件描述符 */  
    short events;       /* 註冊的事件 */  
    short revents;      /* 實際發生的事件,有內核填充 */  
};  

poll支持的事件類型:.net

poll函數測試用例,監聽多個socket鏈接和終端輸入,當在終端中輸入quit時退出程序:

#include <iostream>
#include <vector>

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/poll.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>

using namespace std;

vector<pollfd> flipVector(vector<pollfd> &fds);
struct pollfd *getPollfd(vector<pollfd> &fds, int *ppoll_size);

int main(int argc, char **argv)
{
    int listenfd, connfd;
    struct sockaddr_in servaddr;
    
    listenfd = socket(AF_INET, SOCK_STREAM, 0);
    
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(8080);
    servaddr.sin_addr.s_addr = INADDR_ANY;
    
    bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
    listen(listenfd, 5);
    
    struct pollfd poll_fd;
    vector<struct pollfd> fds;
    
    poll_fd.fd = STDIN_FILENO;
    poll_fd.events = POLLIN;
    fds.push_back(poll_fd);
    
    poll_fd.fd = listenfd;
    poll_fd.events = POLLIN;
    fds.push_back(poll_fd);
    
    char buff[1024];
    struct pollfd *ppoll = NULL;
    int poll_size = 0;
    
    ppoll = getPollfd(fds, &poll_size);
    
    bool running = true;
    while (running) {
        int oldSize = fds.size();
        buff[0] = '\0';
        
        int event_num = poll(ppoll, poll_size, -1);
        if (event_num < 0) {
            cerr << "select error" << endl;
            break;
        }
        
        int fds_size = fds.size();
        for (int i = 0; i < fds_size; i++) {
            if (ppoll[i].fd == STDIN_FILENO) {
                // 從STDIN_FILENO中讀取數據
                if (ppoll[i].revents & POLLIN) {
                    cin >> buff;
                    if (strcmp(buff, "quit") == 0) {
                        running = false;
                        break;
                    }
                    else {
                        cout << buff << endl;
                    }
                }
            }
            else if (ppoll[i].fd == listenfd) {
                if (ppoll[i].revents & POLLIN) {
                    connfd = accept(listenfd, NULL, NULL);
                    if (connfd < 0) {
                        running = false;
                        break;
                    }
                    
                    poll_fd.fd = connfd;
                    poll_fd.events = POLLIN;
                    fds.push_back(poll_fd);
                    cout << "往fds添加 " << connfd << ", fds.size: " << fds.size() << endl;
                }
            }
            else {
                if (ppoll[i].revents & POLLIN) {
                    int len = recv(ppoll[i].fd, buff, sizeof(buff) - 1, 0);
                    if (len < 0) {
                        cerr << "recv error" << endl;
                    }
                    else if (len == 0) {
                        cout << "從fds刪除 " << fds[i].fd << endl;
                        // 客戶端斷開了鏈接
                        fds[i].events = 0;
                        fds[i].fd = -1;
                    }
                    else {
                        buff[len] = '\0';
                        cout << fds[i].fd << " recv: " << buff << endl;
                    }
                }
            }
        }
        
        fds = flipVector(fds);
        if (oldSize != fds.size()) {
            free(ppoll);
            ppoll = getPollfd(fds, &poll_size);
        }
    }
    
    // 關閉文件描述符
    for (int i = 0; i < fds.size(); i++) {
        if (fds[i].fd != -1) {
            close(fds[i].fd);
        }
    }
    close(listenfd);

    return 0;
}

struct pollfd *getPollfd(vector<pollfd> &fds, int *ppoll_size)
{
    struct pollfd *poll = (struct pollfd *) malloc(fds.size() * sizeof(struct pollfd));
    for (int i = 0; i < fds.size(); i++) {
        poll[i].fd = fds[i].fd;
        poll[i].events = fds[i].events;
    }
    
    *ppoll_size = fds.size();
    return poll;
}

vector<pollfd> flipVector(vector<pollfd> &fds) {
    vector<pollfd> fdsnew;
    
    for (int i = 0; i < fds.size(); i++) {
        if (fds[i].fd != -1) {
            fdsnew.push_back(fds[i]);
        }
    }
    
    return fdsnew;
}

 

epoll系列函數

epoll是Linux特有的IO複用函數,它在實現和使用上與select和poll有很大差別,首先,epoll使用一組函數來完成操做,而不是單個函數。其次,epoll把用戶關心的文件描述符上的事件放在內核上的一個事件表中,從而無須像select和poll那樣每次調用都要重複傳入文件描述符集合事件表。但epoll須要使用一個額外的文件描述符,來惟一標識內核中這個事件表,這個文件描述符使用以下epoll_create函數建立:

#include <sys/epoll.h>  
int epoll_create(int size);  
    // 返回:成功返回建立的內核事件表對應的描述符,出錯-1  

size參數如今並不起做用,只是給內核一個提示,告訴它內核表須要多大,該函數返回的文件描述符將用做其餘全部epoll函數的第一個參數,以指定要訪問的內核事件表。用epoll_ctl函數操做內核事件表

#include <sys/epoll.h>  
int epoll_ctl(int opfd, int op, int fd, struct epoll_event *event);  

返回:成功返回0,出錯-1

         fd參數是要操做的文件描述符,op指定操做類型,操做類型有3種

  • EPOLL_CTL_ADD:往事件表中註冊fd上的事件
  • EPOLL_CTL_MOD:修改fd上的註冊事件
  • EPOLL_CTL_DEL:刪除fd上的註冊時間

event指定事件類型,它是epoll_event結構指針類型

struct epoll_event  
{  
    __uint32_t events;  /* epoll事件 */  
    epoll_data_t data;  /* 用戶數據 */  
};  

其中events描述事件類型,epoll支持的事件類型和poll基本相同,表示epoll事件類型的宏是在poll對應的宏加上」E」,好比epoll的數據可讀事件是EPOLLIN,但epoll有兩個額外的事件類型-EPOLLET和EPOLLONESHOT,它們對於高效運做很是關鍵,data用於存儲用戶數據,其類型epoll_data_t定義以下:

typedef union epoll_data  
{  
    void *ptr;  
    int fd;  
    uint32_t u32;  
    uint64_t u64;  
}epoll_data_t;  

epoll_data_t是一個聯合體,其4個成員最多使用的是fd,它指定事件所從屬的目標文件描述符,ptr成員可用來指定fd相關的用戶數據,但因爲opoll_data_t是一個聯合體,咱們不能同時使用fd和ptr,若是要將文件描述符嗯哼用戶數據關聯起來,以實現快速的數據訪問,則只能使用其餘手段,好比放棄使用fd成員,而在ptr指針指向的用戶數據中包含fd。

#include <sys/epoll.h>  
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);  
    // 返回:成功返回就緒的文件描述符個數,出錯-1  

timeout參數的含義與poll接口的timeout參數相同,maxevents參數指定最多監聽多少個事件,它必須大於0。

epoll_wait若是檢測到事件,就將全部就緒的事件從內核事件表(由epfd指定)中複製到events指定的數組中,這個數組只用來輸epoll_wait檢測到的就緒事件,而不像select和poll的參數數組既傳遞用於用戶註冊的事件,有用於輸出內核檢測到就緒事件,這樣極大提升了應用程序索引就緒文件描述符的效率。

epoll函數測試用例,監聽多個socket鏈接和終端輸入,當在終端中輸入quit時退出程序:

#include <iostream>

#include <stdio.h>  
#include <stdlib.h>  
#include <unistd.h>  
#include <sys/types.h>  
#include <sys/socket.h>  
#include <sys/epoll.h>  
#include <netinet/in.h>  
#include <arpa/inet.h>  
#include <fcntl.h>  
#include <string.h> 

using namespace std;

void addfd(int epollfd, int fd)
{
    epoll_event event;
    
    event.data.fd = fd;
    event.events = EPOLLIN;
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
}

void delfd(int epollfd, int fd)
{
    epoll_event event;
    
    event.data.fd = fd;
    event.events = EPOLLIN;
    epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &event);
}

int main(int argc, char **argv)
{
    int listenfd, connfd;
    struct sockaddr_in servaddr;
    
    listenfd = socket(AF_INET, SOCK_STREAM, 0);
    
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(8080);
    servaddr.sin_addr.s_addr = INADDR_ANY;
    
    bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
    listen(listenfd, 5);
    
    int epollfd = epoll_create(32);
    if (epollfd < 0) {
        cerr << "epoll_create error" << endl;
        exit(-1);
    }
    
    addfd(epollfd, STDIN_FILENO);
    addfd(epollfd, listenfd);
    
    epoll_event events[32];
    
    char buff[1024];
    bool running = true;
    while (running) {
        buff[0] = '\0';
        
        int event_num = epoll_wait(epollfd, events, 32, -1);
        if (event_num < 0) {
            cerr << "epoll_wait error" << endl;
            break;
        }
        
        for (int i = 0; i < event_num; i++) {
            int fd = events[i].data.fd;
            int event = events[i].events;
            
            if (fd == STDIN_FILENO) {
                // 從STDIN_FILENO中讀取數據
                if (event & EPOLLIN) {
                    cin >> buff;
                    if (strcmp(buff, "quit") == 0) {
                        running = false;
                        break;
                    }
                    else {
                        cout << buff << endl;
                    }
                }
            }
            else if (fd == listenfd) {
                if (event & EPOLLIN) {
                    connfd = accept(listenfd, NULL, NULL);
                    if (connfd < 0) {
                        running = false;
                        break;
                    }
                    
                    addfd(epollfd, connfd);
                    cout << "往epoll添加 " << connfd << endl;
                }
            }
            else {
                if (event & EPOLLIN) {
                    int len = recv(fd, buff, sizeof(buff) - 1, 0);
                    if (len < 0) {
                        cerr << "recv error" << endl;
                    }
                    else if (len == 0) {
                        cout << "從epoll刪除 " << fd << endl;
                        // 客戶端斷開了鏈接
                        delfd(epollfd, fd);
                    }
                    else {
                        buff[len] = '\0';
                        cout << fd << " recv: " << buff << endl;
                    }
                }
            }
        }
    }
    
    // 關閉文件描述符
    close(listenfd);

    return 0;
}

 

參考:

  一、《UNIX網絡編程》IO複用章節

  二、 網絡編程API-下 (I/O複用函數)

相關文章
相關標籤/搜索