做者:快課網——Jay13html
原文連接:http://www.cricode.com/3510.htmlnode
本文介紹幾種服務器網絡編程模型。廢話很少說,直接正題。linux
同步阻塞迭代模型是最簡單的一種IO模型。ios
其核心代碼以下:git
1
2
3
4
5
6
7
8
|
bind(srvfd);
listen(srvfd);
for(;;){
clifd = accept(srvfd,...); //開始接受客戶端來的鏈接
read(clifd,buf,...); //從客戶端讀取數據
dosomthingonbuf(buf);
write(clifd,buf) //發送數據到客戶端
}
|
上面的程序存在以下一些弊端:程序員
1)若是沒有客戶端的鏈接請求,進程會阻塞在accept系統調用處,程序不能執行其餘任何操做。(系統調用使得程序從用戶態陷入內核態,具體請參考:程序員的自我修養)github
2)在與客戶端創建好一條鏈路後,經過read系統調用從客戶端接受數據,而客戶端合適發送數據過來是不可控的。若是客戶端遲遲不發生數據過來,則程序一樣會阻塞在read調用,此時,若是另外的客戶端來嘗試鏈接時,都會失敗。編程
3)一樣的道理,write系統調用也會使得程序出現阻塞(例如:客戶端接受數據異常緩慢,致使寫緩衝區滿,數據遲遲發送不出)。數組
同步阻塞迭代模型有諸多缺點。多進程併發模型在同步阻塞迭代模型的基礎上進行了一些改進,以免是程序阻塞在read系統調用上。服務器
多進程模型核心代碼以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
bind(srvfd);
listen(srvfd);
for(;;){
clifd = accept(srvfd,...); //開始接受客戶端來的鏈接
ret = fork();
switch( ret )
{
case -1 :
do_err_handler();
break;
case 0 : // 子進程
client_handler(clifd);
break ;
default : // 父進程
close(clifd);
continue ;
}
}
//======================================================
void client_handler(clifd){
read(clifd,buf,...); //從客戶端讀取數據
dosomthingonbuf(buf);
write(clifd,buf) //發送數據到客戶端
}
|
上述程序在accept系統調用時,若是沒有客戶端來創建鏈接,擇會阻塞在accept處。一旦某個客戶端鏈接創建起來,則當即開啓一個新的進程來處理與這個客戶的數據交互。避免程序阻塞在read調用,而影響其餘客戶端的鏈接。
在多進程併發模型中,每個客戶端鏈接開啓fork一個進程,雖然linux中引入了寫實拷貝機制,大大下降了fork一個子進程的消耗,但若客戶端鏈接較大,則系統依然將不堪負重。經過多線程(或線程池)併發模型,能夠在必定程度上改善這一問題。
在服務端的線程模型實現方式通常有三種:
(1)按需生成(來一個鏈接生成一個線程)
(2)線程池(預先生成不少線程)
(3)Leader follower(LF)
爲簡單起見,以第一種爲例,其核心代碼以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
void *thread_callback( void *args ) //線程回調函數
{
int clifd = *(int *)args ;
client_handler(clifd);
}
//===============================================================
void client_handler(clifd){
read(clifd,buf,...); //從客戶端讀取數據
dosomthingonbuf(buf);
write(clifd,buf) //發送數據到客戶端
}
//===============================================================
bind(srvfd);
listen(srvfd);
for(;;){
clifd = accept();
pthread_create(...,thread_callback,&clifd);
}
|
服務端分爲主線程和工做線程,主線程負責accept()鏈接,而工做線程負責處理業務邏輯和流的讀取等。所以,即便在工做線程阻塞的狀況下,也只是阻塞在線程範圍內,對繼續接受新的客戶端鏈接不會有影響。
第二種實現方式,經過線程池的引入能夠避免頻繁的建立、銷燬線程,能在很大程序上提高性能。但無論如何實現,多線程模型先天具備以下缺點:
1)穩定性相對較差。一個線程的崩潰會致使整個程序崩潰。
2)臨界資源的訪問控制,在加大程序複雜性的同時,鎖機制的引入會是嚴重下降程序的性能。性能上可能會出現「辛辛苦苦好幾年,一晚上回到解放前」的狀況。
多進程模型和多線程(線程池)模型每一個進程/線程只能處理一路IO,在服務器併發數較高的狀況下,過多的進程/線程會使得服務器性能降低。而經過多路IO複用,能使得一個進程同時處理多路IO,提高服務器吞吐量。
在Linux支持epoll模型以前,都使用select/poll模型來實現IO多路複用。
以select爲例,其核心代碼以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
bind(listenfd);
listen(listenfd);
FD_ZERO(&allset);
FD_SET(listenfd, &allset);
for(;;){
select(...);
if (FD_ISSET(listenfd, &rset)) { /*有新的客戶端鏈接到來*/
clifd = accept();
cliarray[] = clifd; /*保存新的鏈接套接字*/
FD_SET(clifd, &allset); /*將新的描述符加入監聽數組中*/
}
for(;;){ /*這個for循環用來檢查全部已經鏈接的客戶端是否由數據可讀寫*/
fd = cliarray[i];
if (FD_ISSET(fd , &rset))
dosomething();
}
}
|
select IO多路複用一樣存在一些缺點,羅列以下:
相比select模型,poll使用鏈表保存文件描述符,所以沒有了監視文件數量的限制,但其餘三個缺點依然存在。
拿select模型爲例,假設咱們的服務器須要支持100萬的併發鏈接,則在__FD_SETSIZE 爲1024的狀況下,則咱們至少須要開闢1k個進程才能實現100萬的併發鏈接。除了進程間上下文切換的時間消耗外,從內核/用戶空間大量的無腦內存拷貝、數組輪詢等,是系統難以承受的。所以,基於select模型的服務器程序,要達到10萬級別的併發訪問,是一個很難完成的任務。
epoll IO多路複用:一個看起來很美好的解決方案。 因爲文章:高併發網絡編程之epoll詳解中對epoll相關實現已經有詳細解決,這裏就直接摘錄過來。
因爲epoll的實現機制與select/poll機制徹底不一樣,上面所說的 select的缺點在epoll上不復存在。
設想一下以下場景:有100萬個客戶端同時與一個服務器進程保持着TCP鏈接。而每一時刻,一般只有幾百上千個TCP鏈接是活躍的(事實上大部分場景都是這種狀況)。如何實現這樣的高併發?
在select/poll時代,服務器進程每次都把這100萬個鏈接告訴操做系統(從用戶態複製句柄數據結構到內核態),讓操做系統內核去查詢這些套接字上是否有事件發生,輪詢完後,再將句柄數據複製到用戶態,讓服務器應用程序輪詢處理已發生的網絡事件,這一過程資源消耗較大,所以,select/poll通常只能處理幾千的併發鏈接。
epoll的設計和實現與select徹底不一樣。epoll經過在Linux內核中申請一個簡易的文件系統(文件系統通常用什麼數據結構實現?B+樹)。把原先的select/poll調用分紅了3個部分:
1)調用epoll_create()創建一個epoll對象(在epoll文件系統中爲這個句柄對象分配資源)
2)調用epoll_ctl向epoll對象中添加這100萬個鏈接的套接字
3)調用epoll_wait收集發生的事件的鏈接
如此一來,要實現上面說是的場景,只須要在進程啓動時創建一個epoll對象,而後在須要的時候向這個epoll對象中添加或者刪除鏈接。同時,epoll_wait的效率也很是高,由於調用epoll_wait時,並無一股腦的向操做系統複製這100萬個鏈接的句柄數據,內核也不須要去遍歷所有的鏈接。
下面來看看Linux內核具體的epoll機制實現思路。
當某一進程調用epoll_create方法時,Linux內核會建立一個eventpoll結構體,這個結構體中有兩個成員與epoll的使用方式密切相關。eventpoll結構體以下所示:
1
2
3
4
5
6
7
8
|
struct eventpoll{
....
/*紅黑樹的根節點,這顆樹中存儲着全部添加到epoll中的須要監控的事件*/
struct rb_root rbr;
/*雙鏈表中則存放着將要經過epoll_wait返回給用戶的知足條件的事件*/
struct list_head rdlist;
....
};
|
每個epoll對象都有一個獨立的eventpoll結構體,用於存放經過epoll_ctl方法向epoll對象中添加進來的事件。這些事件都會掛載在紅黑樹中,如此,重複添加的事件就能夠經過紅黑樹而高效的識別出來(紅黑樹的插入時間效率是lgn,其中n爲樹的高度)。
而全部添加到epoll中的事件都會與設備(網卡)驅動程序創建回調關係,也就是說,當相應的事件發生時會調用這個回調方法。這個回調方法在內核中叫ep_poll_callback,它會將發生的事件添加到rdlist雙鏈表中。
在epoll中,對於每個事件,都會創建一個epitem結構體,以下所示:
1
2
3
4
5
6
7
|
struct epitem{
struct rb_node rbn;//紅黑樹節點
struct list_head rdllink;//雙向鏈表節點
struct epoll_filefd ffd; //事件句柄信息
struct eventpoll *ep; //指向其所屬的eventpoll對象
struct epoll_event event; //期待發生的事件類型
}
|
當調用epoll_wait檢查是否有事件發生時,只須要檢查eventpoll對象中的rdlist雙鏈表中是否有epitem元素便可。若是rdlist不爲空,則把發生的事件複製到用戶態,同時將事件數量返回給用戶。
epoll數據結構示意圖
從上面的講解可知:經過紅黑樹和雙鏈表數據結構,並結合回調機制,造就了epoll的高效。
OK,講解完了Epoll的機理,咱們便能很容易掌握epoll的用法了。一句話描述就是:三步曲。
第一步:epoll_create()系統調用。此調用返回一個句柄,以後全部的使用都依靠這個句柄來標識。
第二步:epoll_ctl()系統調用。經過此調用向epoll對象中添加、刪除、修改感興趣的事件,返回0標識成功,返回-1表示失敗。
第三部:epoll_wait()系統調用。經過此調用收集收集在epoll監控中已經發生的事件。
最後,附上一個epoll編程實例。(此代碼做者爲sparkliang)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
|
//
// a simple echo server using epoll in linux
//
// 2009-11-05
// 2013-03-22:修改了幾個問題,1是/n格式問題,2是去掉了原代碼不當心加上的ET模式;
// 原本只是簡單的示意程序,決定仍是加上 recv/send時的buffer偏移
// by sparkling
//
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <iostream>
using namespace std;
#define MAX_EVENTS 500
struct myevent_s
{
int fd;
void (*call_back)(int fd, int events, void *arg);
int events;
void *arg;
int status; // 1: in epoll wait list, 0 not in
char buff[128]; // recv data buffer
int len, s_offset;
long last_active; // last active time
};
// set event
void EventSet(myevent_s *ev, int fd, void (*call_back)(int, int, void*), void *arg)
{
ev->fd = fd;
ev->call_back = call_back;
ev->events = 0;
ev->arg = arg;
ev->status = 0;
bzero(ev->buff, sizeof(ev->buff));
ev->s_offset = 0;
ev->len = 0;
ev->last_active = time(NULL);
}
// add/mod an event to epoll
void EventAdd(int epollFd, int events, myevent_s *ev)
{
struct epoll_event epv = {0, {0}};
int op;
epv.data.ptr = ev;
epv.events = ev->events = events;
if(ev->status == 1){
op = EPOLL_CTL_MOD;
}
else{
op = EPOLL_CTL_ADD;
ev->status = 1;
}
if(epoll_ctl(epollFd, op, ev->fd, &epv) < 0)
printf("Event Add failed[fd=%d], evnets[%d]\n", ev->fd, events);
else
printf("Event Add OK[fd=%d], op=%d, evnets[%0X]\n", ev->fd, op, events);
}
// delete an event from epoll
void EventDel(int epollFd, myevent_s *ev)
{
struct epoll_event epv = {0, {0}};
if(ev->status != 1) return;
epv.data.ptr = ev;
ev->status = 0;
epoll_ctl(epollFd, EPOLL_CTL_DEL, ev->fd, &epv);
}
int g_epollFd;
myevent_s g_Events[MAX_EVENTS+1]; // g_Events[MAX_EVENTS] is used by listen fd
void RecvData(int fd, int events, void *arg);
void SendData(int fd, int events, void *arg);
// accept new connections from clients
void AcceptConn(int fd, int events, void *arg)
{
struct sockaddr_in sin;
socklen_t len = sizeof(struct sockaddr_in);
int nfd, i;
// accept
if((nfd = accept(fd, (struct sockaddr*)&sin, &len)) == -1)
{
if(errno != EAGAIN && errno != EINTR)
{
}
printf("%s: accept, %d", __func__, errno);
return;
}
do
{
for(i = 0; i < MAX_EVENTS; i++)
{
if(g_Events[i].status == 0)
{
break;
}
}
if(i == MAX_EVENTS)
{
printf("%s:max connection limit[%d].", __func__, MAX_EVENTS);
break;
}
// set nonblocking
int iret = 0;
if((iret = fcntl(nfd, F_SETFL, O_NONBLOCK)) < 0)
{
printf("%s: fcntl nonblocking failed:%d", __func__, iret);
break;
}
// add a read event for receive data
EventSet(&g_Events[i], nfd, RecvData, &g_Events[i]);
EventAdd(g_epollFd, EPOLLIN, &g_Events[i]);
}while(0);
printf("new conn[%s:%d][time:%d], pos[%d]\n", inet_ntoa(sin.sin_addr),
ntohs(sin.sin_port), g_Events[i].last_active, i);
}
// receive data
void RecvData(int fd, int events, void *arg)
{
struct myevent_s *ev = (struct myevent_s*)arg;
int len;
// receive data
len = recv(fd, ev->buff+ev->len, sizeof(ev->buff)-1-ev->len, 0);
EventDel(g_epollFd, ev);
if(len > 0)
{
ev->len += len;
ev->buff[len] = '\0';
printf("C[%d]:%s\n", fd, ev->buff);
// change to send event
EventSet(ev, fd, SendData, ev);
EventAdd(g_epollFd, EPOLLOUT, ev);
}
else if(len == 0)
{
close(ev->fd);
printf("[fd=%d] pos[%d], closed gracefully.\n", fd, ev-g_Events);
}
else
{
close(ev->fd);
printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
}
}
// send data
void SendData(int fd, int events, void *arg)
{
struct myevent_s *ev = (struct myevent_s*)arg;
int len;
// send data
len = send(fd, ev->buff + ev->s_offset, ev->len - ev->s_offset, 0);
if(len > 0)
{
printf("send[fd=%d], [%d<->%d]%s\n", fd, len, ev->len, ev->buff);
ev->s_offset += len;
if(ev->s_offset == ev->len)
{
// change to receive event
EventDel(g_epollFd, ev);
EventSet(ev, fd, RecvData, ev);
EventAdd(g_epollFd, EPOLLIN, ev);
}
}
else
{
close(ev->fd);
EventDel(g_epollFd, ev);
printf("send[fd=%d] error[%d]\n", fd, errno);
}
}
void InitListenSocket(int epollFd, short port)
{
int listenFd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(listenFd, F_SETFL, O_NONBLOCK); // set non-blocking
printf("server listen fd=%d\n", listenFd);
EventSet(&g_Events[MAX_EVENTS], listenFd, AcceptConn, &g_Events[MAX_EVENTS]);
// add listen socket
EventAdd(epollFd, EPOLLIN, &g_Events[MAX_EVENTS]);
// bind & listen
sockaddr_in sin;
bzero(&sin, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = INADDR_ANY;
sin.sin_port = htons(port);
bind(listenFd, (const sockaddr*)&sin, sizeof(sin));
listen(listenFd, 5);
}
int main(int argc, char **argv)
{
unsigned short port = 12345; // default port
if(argc == 2){
port = atoi(argv[1]);
}
// create epoll
g_epollFd = epoll_create(MAX_EVENTS);
if(g_epollFd <= 0) printf("create epoll failed.%d\n", g_epollFd);
// create & bind listen socket, and add to epoll, set non-blocking
InitListenSocket(g_epollFd, port);
// event loop
struct epoll_event events[MAX_EVENTS];
printf("server running:port[%d]\n", port);
int checkPos = 0;
while(1){
// a simple timeout check here, every time 100, better to use a mini-heap, and add timer event
long now = time(NULL);
for(int i = 0; i < 100; i++, checkPos++) // doesn't check listen fd
{
if(checkPos == MAX_EVENTS) checkPos = 0; // recycle
if(g_Events[checkPos].status != 1) continue;
long duration = now - g_Events[checkPos].last_active;
if(duration >= 60) // 60s timeout
{
close(g_Events[checkPos].fd);
printf("[fd=%d] timeout[%d--%d].\n", g_Events[checkPos].fd, g_Events[checkPos].last_active, now);
EventDel(g_epollFd, &g_Events[checkPos]);
}
}
// wait for events to happen
int fds = epoll_wait(g_epollFd, events, MAX_EVENTS, 1000);
if(fds < 0){
printf("epoll_wait error, exit\n");
break;
}
for(int i = 0; i < fds; i++){
myevent_s *ev = (struct myevent_s*)events[i].data.ptr;
if((events[i].events&EPOLLIN)&&(ev->events&EPOLLIN)) // read event
{
ev->call_back(ev->fd, events[i].events, ev->arg);
}
if((events[i].events&EPOLLOUT)&&(ev->events&EPOLLOUT)) // write event
{
ev->call_back(ev->fd, events[i].events, ev->arg);
}
}
}
// free resource
return 0;
}
|