這是學習網絡編程後寫的一個練手的小程序,能夠幫助複習socket,I/O複用,非阻塞I/O等知識點。linux
經過回顧寫的過程當中遇到的問題的形式記錄程序的關鍵點,最後給出完整程序代碼。ios
0. 功能編程
編寫一個簡易羣聊程序,程序具有的基本功能:小程序
服務器:支持多個客戶端鏈接,並將每一個客戶端發過來的消息發給全部其餘的客戶端服務器
客戶端:可以鏈接服務器,並向服務器發送消息,同時接收服務器發過來的任何消息網絡
1. Server I/O模型多線程
採用事件驅動(I/O複用)+ 非阻塞I/O的模型,即Reactor模式。I/O複用採用linux下的epoll機制。socket
相關API介紹見最後,先梳理幾個寫程序的時候想到的問題。函數
1.1 I/O複用爲何搭配非阻塞I/O?(select/epoll返回可讀後還用非阻塞是否是沒有意義?)oop
select/epoll返回了可讀,並不必定表明能讀,在返回可讀到調用read函數之間,是有時間間隙的。內核可能把數據丟失,也可能存在好比多個線程監聽該socket,
數據被別人讀走的狀況。因此這裏使用非阻塞I/O是有意義的。
能夠參考知乎這個問題 https://www.zhihu.com/question/37271342
1.2 epoll的條件觸發LT(水平觸發)和邊緣觸發ET區別,如何正確地處理ET模式下的讀操做?
簡單講,以讀取數據操做舉例。條件觸發,只要輸入緩衝中還有數據,就會以事件方式再次註冊;
而邊緣觸發中僅在輸入緩衝收到數據時註冊一次該事件(你沒讀完也epoll_wait也再也不返回了)。
因此若是使用邊緣觸發發生輸入相關事件,須要讀取輸入緩衝中的所有數據。方法是一直讀,直到read返回-1,而且變量errno中的值爲EAGAIN,說明沒有數據可讀。
因此在這裏再次考慮一下1.1中的問題,epoll若是採用邊緣觸發,更要使用非阻塞I/O,不然可能就由於無數據可讀阻塞整個線程了。
1.3 select與epoll的差異
一個老生常談的問題,select函數效率低主要有如下兩個緣由,首先是每次調用select函數時須要向操做系統傳遞監視對象信息,其次是調用後針對全部文件描述符的循環語句。
第一點對效率的影響更大。
此外,epoll還支持ET模式,而select只支持LT模式。
但select也有優勢,好比兼容性好(大多數操做系統支持),在服務端介入者少的狀況下仍然能夠考慮使用select。
1.4 epoll相關API
// 建立一個epoll句柄,參數size向操做系統建議epoll例程大小 int epoll_create(int size) /* 函數功能: epoll事件註冊函數 參數epfd爲epoll的句柄,即epoll_create返回值 參數op表示動做,用3個宏來表示: EPOLL_CTL_ADD(註冊新的fd到epfd), EPOLL_CTL_MOD(修改已經註冊的fd的監聽事件), EPOLL_CTL_DEL(從epfd刪除一個fd); 其中參數fd爲須要監聽的標示符; 參數event告訴內核須要監聽的事件,event的結構以下: struct epoll_event { __uint32_t events; //Epoll events epoll_data_t data; //User data variable }; 其中介紹events是宏的集合,經常使用的有: EPOLLIN:有數據可讀 EPOLLONESHOT:發生一次事件後,相應的文件描述符再也不收到事件通知。所以須要向第二個參數傳遞EPOLL_CTL_MOD再次設置事件。 例如在多線程處理時,若是某個線程在處理fd的同時,又有新的一批數據發來,該fd可讀,那麼該fd會被分給另外一個線程,這樣兩個線程處理同一個fd確定就不對了,
這時用EPOLLONESHOT能夠解決。在fd返回可讀後,須要顯式地設置一下才能讓epoll從新返回這個fd。 */ int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) // 等待事件的產生,函數返回須要處理的事件數目 int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)
2. Client怎麼處理?
Client採用分割讀寫的方式,開兩個進程。父進程負責負責接受數據,子進程負責發送數據。
if (pid == 0) { //子進程負責寫操做 write_routine(sock); } else { //父進程負責讀操做 read_routine(sock); }
3. 代碼
代碼中有詳細註釋
1 //utility.h 2 #ifndef UTILITY_H_ 3 #define UTILITY_H_ 4 5 #include <iostream> 6 #include <list> 7 #include <sys/types.h> 8 #include <sys/socket.h> 9 #include <netinet/in.h> 10 #include <arpa/inet.h> 11 #include <sys/epoll.h> 12 #include <fcntl.h> 13 #include <errno.h> 14 #include <unistd.h> 15 #include <stdio.h> 16 #include <stdlib.h> 17 #include <string.h> 18 19 using namespace std; 20 21 // clients_list save all the clients's socket 22 list<int> clients_list; 23 24 /********************** macro defintion **************************/ 25 // server ip 26 #define SERVER_IP "127.0.0.1" 27 28 // server port 29 #define SERVER_PORT 8888 30 31 //epoll size 32 #define EPOLL_SIZE 5000 33 34 //message buffer size 35 #define BUF_SIZE 0xFFFF 36 37 #define SERVER_WELCOME "Welcome you to join the chatroom! Your chat ID is: Client #%d" 38 39 #define SERVER_MESSAGE "ClientID %d say >> %s" 40 41 // exit 42 #define EXIT "EXIT" 43 44 #define CAUTION "There is only one int the chatroom!" 45 46 /********************** some function **************************/ 47 /** 48 *設置非阻塞IO 49 **/ 50 int setnonblocking(int sockfd) 51 { 52 fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0)| O_NONBLOCK); 53 return 0; 54 } 55 56 /** 57 * 將文件描述符fd添加到epollfd標示的內核事件表中, 並註冊EPOLLIN事件, 58 * EPOOLET代表是ET工做方式,根據enable_et來斷定是否設置邊緣觸發。 59 * 最後將文件描述符設置非阻塞方式 60 **/ 61 void addfd( int epollfd, int fd, bool enable_et ) 62 { 63 struct epoll_event ev; 64 ev.data.fd = fd; 65 ev.events = EPOLLIN; 66 if( enable_et ) 67 ev.events = EPOLLIN | EPOLLET; 68 epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev); 69 setnonblocking(fd); 70 printf("fd added to epoll!\n\n"); 71 } 72 73 /** 74 * 羣發消息 75 **/ 76 int sendBroadcastmessage(int clientfd) 77 { 78 // buf[BUF_SIZE] receive new chat message 79 // message[BUF_SIZE] save format message 80 char buf[BUF_SIZE], message[BUF_SIZE]; 81 bzero(buf, BUF_SIZE); 82 bzero(message, BUF_SIZE); 83 84 // receive message 85 printf("read from client(clientID = %d)\n", clientfd); 86 int len = recv(clientfd, buf, BUF_SIZE, 0); 87 88 if(len == 0) // len = 0 means the client closed connection 89 { 90 close(clientfd); 91 clients_list.remove(clientfd); //server remove the client 92 printf("ClientID = %d closed.\n now there are %d client in the chatroom\n", clientfd, (int)clients_list.size()); 93 94 } 95 else //broadcast message 96 { 97 if(clients_list.size() == 1) { // this means There is only one int the chatroom 98 send(clientfd, CAUTION, strlen(CAUTION), 0); 99 return len; 100 } 101 // format message to broadcast 102 sprintf(message, SERVER_MESSAGE, clientfd, buf); 103 104 list<int>::iterator it; 105 for(it = clients_list.begin(); it != clients_list.end(); ++it) { 106 if(*it != clientfd){ 107 if( send(*it, message, BUF_SIZE, 0) < 0 ) { perror("error"); exit(-1);} 108 } 109 } 110 } 111 return len; 112 } 113 #endif // UTILITY_H_
1 //Server.cpp 2 3 #include "utility.h" 4 5 int main(int argc, char *argv[]) 6 { 7 //服務器端口號和IP地址 8 struct sockaddr_in serverAddr; 9 serverAddr.sin_family = PF_INET; 10 serverAddr.sin_port = htons(SERVER_PORT); 11 serverAddr.sin_addr.s_addr = inet_addr(SERVER_IP); 12 //建立監聽套接字 13 int listener = socket(PF_INET, SOCK_STREAM, 0); 14 if(listener < 0) { 15 perror("listener"); exit(-1); 16 } 17 printf("listen socket created \n"); 18 //綁定地址 19 if( bind(listener, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) { 20 perror("bind error"); 21 exit(-1); 22 } 23 //listen 24 int ret = listen(listener, 5); 25 if(ret < 0) { 26 perror("listen error"); 27 exit(-1); 28 } 29 printf("Start to listen: %s\n", SERVER_IP); 30 //建立epoll事件表 31 int epfd = epoll_create(EPOLL_SIZE); 32 if(epfd < 0) { 33 perror("epfd error"); 34 exit(-1); 35 } 36 printf("epoll created, epollfd = %d\n", epfd); 37 static struct epoll_event events[EPOLL_SIZE]; 38 //註冊監聽套接字到epoll事件表 39 addfd(epfd, listener, true); 40 //main loop 41 while(1) 42 { 43 //epoll_events_count指明待處理事件數 44 int epoll_events_count = epoll_wait(epfd, events, EPOLL_SIZE, -1); 45 if (epoll_events_count < 0) { 46 perror("epoll failure"); 47 break; 48 } 49 50 printf("epoll_events_count = %d\n", epoll_events_count); 51 //處理事件 52 for (int i = 0; i < epoll_events_count; ++i) 53 { 54 int sockfd = events[i].data.fd; 55 //sockfd == listener代表有新鏈接 56 if(sockfd == listener) 57 { 58 struct sockaddr_in client_address; 59 socklen_t client_addrLength = sizeof(struct sockaddr_in); 60 int clientfd = accept( listener, ( struct sockaddr* )&client_address, &client_addrLength ); 61 62 printf("client connection from: %s : % d(IP : port), clientfd = %d \n", 63 inet_ntoa(client_address.sin_addr), ntohs(client_address.sin_port), clientfd); 64 65 //把新鏈接加入epoll事件表中 66 addfd(epfd, clientfd, true); 67 68 // 把clientfd加入客戶鏈接的list內 69 clients_list.push_back(clientfd); 70 printf("Add new clientfd = %d to epoll\n", clientfd); 71 printf("Now there are %d clients int the chat room\n", (int)clients_list.size()); 72 73 // 想新鏈接發送歡迎信息 74 printf("welcome message\n"); 75 char message[BUF_SIZE]; 76 bzero(message, BUF_SIZE); 77 sprintf(message, SERVER_WELCOME, clientfd); 78 int ret = send(clientfd, message, BUF_SIZE, 0); 79 if(ret < 0) { 80 perror("send error"); 81 exit(-1); 82 } 83 } 84 //sockfd != listener代表以前的鏈接發來數據,將數據羣發給全部鏈接對象 85 else 86 { 87 printf("i got an message"); 88 int ret = sendBroadcastmessage(sockfd); 89 if(ret < 0) { perror("error");exit(-1); } 90 } 91 } 92 } 93 close(listener); //close socket 94 close(epfd); //close epoll instance 95 return 0; 96 }
1 //Client.cpp 2 3 #include "utility.h" 4 5 void write_routine(int sock); 6 void read_routine(int sock); 7 int main(int argc, char *argv[]) 8 { 9 //服務器IP和端口 10 struct sockaddr_in serverAddr; 11 serverAddr.sin_family = PF_INET; 12 serverAddr.sin_port = htons(SERVER_PORT); 13 serverAddr.sin_addr.s_addr = inet_addr(SERVER_IP); 14 15 // create socket 16 int sock = socket(PF_INET, SOCK_STREAM, 0); 17 if(sock < 0) { perror("sock error"); exit(-1); } 18 19 // 鏈接服務器 20 if(connect(sock, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) { 21 perror("connect error"); 22 exit(-1); 23 } 24 char buf[BUF_SIZE]; 25 int str_len = read(sock, buf, BUF_SIZE); 26 if (str_len == 0) { 27 return 0; 28 } 29 buf[str_len] = 0; 30 printf("%s\n", buf); 31 32 pid_t pid = fork(); 33 if (pid == 0) { 34 //子進程負責寫操做 35 write_routine(sock); 36 } 37 else { 38 //父進程負責讀操做 39 read_routine(sock); 40 } 41 42 return 0; 43 } 44 45 void read_routine(int sock) { 46 char buf[BUF_SIZE]; 47 while(1) { 48 memset(buf, 0, sizeof(buf)); 49 int str_len = read(sock, buf, BUF_SIZE); 50 if (str_len == 0) { 51 return; 52 } 53 buf[str_len] = 0; 54 printf("%s", buf); 55 } 56 } 57 58 void write_routine(int sock) { 59 char buf[BUF_SIZE]; 60 while(1) { 61 memset(buf, 0, sizeof(buf)); 62 fgets(buf, BUF_SIZE, stdin); 63 if (!strcmp(buf, "exit\n")) { 64 shutdown(sock, SHUT_WR); 65 return; 66 } 67 write(sock, buf, strlen(buf)); 68 } 69 }