經過多路複用構建高性能服務器是一種常見的模型,單個I/O多路複用線程+一組工做線程,I/O線程負責協調分配任務,而實際工做交給工做線程處理。這種模型的好處在於高效併發和充分利用多線程的處理能力。ios
以memcached的構架圖爲例服務器
memcached的主線程用epoll監聽到EPOLLIN事件,而且觸發該事件的fd是服務器listen的fd,就accept該鏈接請求,返回的fd主線程並不處理,而是經過CQ隊列發送給工做線程去處理,工做線程又維護了一個epoll多路複用隊列,子線程的epoll輪詢和響應請求。 多線程
這種架構包含兩大塊:架構
1.多路複用併發
#include<stdlib.h>socket
#include<sys/epoll.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<sys/types.h>
#include<fcntl.h>
using namespace std;
const int PORT = 8888;
const int MAX_CLIENT_NUM = 10000;
const int MAX_LEN = 2000;
bool setfdnoblock(int fd)
{
int flg = fcntl(fd, F_GETFL);
if(flg < 0)
{
cout << "get fd flag failed" << endl;
return false;
}
if(fcntl(fd, F_SETFL, O_NONBLOCK | flg) < 0)
{
return false;
}
return true;
}
int CreateTcpServer(int port, int listennum)
{
int fd;
fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
sockaddr_in TcpServer;
bzero(&TcpServer, sizeof(TcpServer));
TcpServer.sin_family = AF_INET;
TcpServer.sin_port = htons(8888);
TcpServer.sin_addr.s_addr = htonl(INADDR_ANY);
int iRet = bind(fd, (struct sockaddr*)&TcpServer, sizeof(TcpServer));
if(-1 == iRet)
{
cout << "server bind error!" << endl;
return -1;
}
if(listen(fd, listennum) == -1)
{
cout << "server listen error" << endl;
return -1;
}
return fd;
}
int main()
{
int Serverfd = CreateTcpServer(PORT, MAX_CLIENT_NUM);
if(Serverfd == -1)
{
cout << "server create failed" << endl;
}
else
{
cout << "serverfd is :" << Serverfd << endl;
}
int Epollfd = epoll_create(MAX_CLIENT_NUM);
if(Epollfd == -1)
{
cout << "epoll_create failed" << endl;
}
epoll_event ev, events[MAX_CLIENT_NUM];
int nfds = 0;
int client = 0;
char buff[MAX_LEN];
sockaddr_in CliAddr;
unsigned int iCliSize = sizeof(CliAddr);
ev.events = EPOLLIN|EPOLLOUT;
ev.data.fd = Serverfd;
if(!setfdnoblock(Serverfd))
{
cout << "set serverfd no_block failed" << endl;
}
if(epoll_ctl(Epollfd, EPOLL_CTL_ADD, Serverfd, &ev))
{
cout << "epoll add serverfd error" << endl;
}
while(1)
{
nfds = epoll_wait(Epollfd, events, MAX_CLIENT_NUM, 100000);
if(nfds == -1)
{
cout << "error occur, exit" << endl;
return -1;
}
else if( nfds == 0)
{
cout << "epoll_wait return zero" << endl;
}
else
{
for(int i = 0; i < nfds; i++)
{
cout << "events[i].data.fd is :" << events[i].data.fd << endl;
if(events[i].data.fd == Serverfd)
{
cout << " Serverfd received event" << endl;
client = accept(Serverfd, (struct sockaddr*)&CliAddr, &iCliSize);
if(client == -1)
{
cout << "accept error" << endl;
return -1;
}
ev.data.fd = client;
if(!setfdnoblock(client))
{
cout << "set client fd no_block error" << endl;
}
if(epoll_ctl(Epollfd, EPOLL_CTL_ADD, client, &ev))
{
cout << "epoll add client error" << endl;
}
else
{
cout << "success add client" << endl;
}
}
else if(events[i].events&EPOLLIN)
{
cout << "recv client msg" << endl;
if(events[i].data.fd < 0)
{
cout << " event[i].data.fd is smaller than zero" << endl;
continue;
}
if(read(events[i].data.fd, buff, MAX_LEN) == -1)
{
perror("clifd read");
}
else
{
cout << "read client msg suc" << endl;
printf("%s",buff);
}
char resp[] = "recv a client msg, this is resp msg";
write(events[i].data.fd, resp, strlen(resp)+1);
//read and mod
}
else if(events[i].events&EPOLLOUT)
{
//send and mod
}
}
}
}
}memcached
例子中epoll listen和accept新鏈接,並響應新鏈接的請求。性能
2.工做線程or線程池this
進程的線程數量並非越多越好,也不是越少越好,須要根據機器逐步調優。spa
工做線程的工做原理,
1.I/O線程把收到的請求放入隊列,並通知工做線程處理,隊列和通知機制能夠是傳統的加鎖消息隊列、信號量,也能夠是memcached+libevent的實現:CQ隊列裝消息,線程管道通知工做線程。
2.I/O線程沒有新的任務分配,工做線程阻塞或等待一段時間。
線程池用到的比較少,不作評價。