1. 基礎socket庫linux
socket.h:ios
/** * 網絡套接字庫 */ #ifndef Socket_h #define Socket_h #include <stdio.h> #include <string> #ifdef WIN32 // windows #include <winsock.h> typedef int socklen_t; #else // linux, MacOS #include <sys/socket.h> #include <netinet/in.h> #include <netdb.h> #include <fcntl.h> #include <unistd.h> #include <sys/stat.h> #include <sys/types.h> #include <arpa/inet.h> #include <errno.h> #define INVALID_SOCKET -1 #define SOCKET_ERROR -1 typedef int SOCKET; #endif #define SOCKET_BACKLOG 100 namespace avalon { int socket_error(); int socket_init(); int socket_clean(); void socket_debug(const char* message, ...); class Socket { public: static Socket* create(SOCKET socket_fd); static Socket* create(int family, int type, int protocal = IPPROTO_IP); public: Socket(SOCKET socket_fd); Socket(int family, int type, int protocal = IPPROTO_IP); Socket& operator = (SOCKET socket_fd); virtual ~Socket(); bool connect(const char* host, unsigned short port); bool bind(unsigned short port); bool listen(int backlog = SOCKET_BACKLOG); Socket* accept(char* client_host = nullptr); ssize_t send(const char* buffer, size_t len, int flag = 0); ssize_t recv(char* buffer, size_t len, int flag = 0); int close(); SOCKET getSocketFD(); void set_blocking(const bool blocking); private: SOCKET _socket_fd; int _family; int _type; int _protocal; }; } #endif
socket.cppwindows
/** * 網絡套接字庫 */ #include "AvalonSocket.h" #ifdef WIN32 #pragma comment(lib, "wsock32") #endif #define SOCKET_DEBUG_LEVEL 0 namespace avalon { int socket_error() { int error = 0; #ifdef WIN32 error = WSAGetLastError(); #else error = errno; #endif printf("Avalon socket error: %d %s \n", error, strerror(error)); return error; } void socket_debug(const char* message, ...) { char buf[1024] = ""; va_list args; va_start(args, message); vsnprintf(buf, 1024, message, args); va_end(args); std::string error = "Avalon sokcet: "; error.append(buf); error.append("\n"); printf(error.c_str()); if (SOCKET_DEBUG_LEVEL) { int error_no = socket_error(); if (error_no != -1) { throw error_no; } } } int socket_init() { #ifdef WIN32 WSADATA wsadata; WORD version = MAKEWORD(2, 0); int ret = WSAStartup(version,&wsadata); if (ret) { socket_debug("Initilize winsock error"); return -1; } #endif return 0; } int socket_clean() { #ifdef WIN32 return WSACleanup(); #endif return 0; } Socket* Socket::create(SOCKET socket_fd) { if (socket_fd < 0) { socket_debug("socket_fd(%d) is invailed.", socket_fd); return nullptr; } else { Socket* socket = new Socket(socket_fd); if (socket) { return socket; } else { socket_debug("Create avalon socket failed."); return nullptr; } } } Socket* Socket::create(int family, int type, int protocal) { Socket* socket = new Socket(family, type, protocal); if (socket) { if (socket->getSocketFD() == INVALID_SOCKET) { delete socket; socket_debug("Create socket failed."); return nullptr; } socket_debug("Create socket(%d) successfully.", socket->getSocketFD()); return socket; } else { socket_debug("Create avalon socket failed."); return nullptr; } } Socket::Socket(SOCKET socket_fd) : _family(AF_INET) , _type(SOCK_STREAM) , _protocal(IPPROTO_IP) { _socket_fd = socket_fd; } Socket::Socket(int family, int type, int protocal) : _family(AF_INET) , _type(SOCK_STREAM) , _protocal(IPPROTO_IP) { _socket_fd = socket(family, type, protocal); if (_socket_fd != INVALID_SOCKET) { _family = family; _type = type; _protocal = protocal; } } Socket& Socket::operator = (SOCKET socket_fd) { _socket_fd = socket_fd; return *this; } Socket::~Socket() { if (_socket_fd != -1) { this->close(); } } bool Socket::connect(const char* host, unsigned short port) { struct sockaddr_in remote_addr; remote_addr.sin_family = _family; remote_addr.sin_port = htons(port); inet_pton(_family, host, &remote_addr.sin_addr); if (errno == EAFNOSUPPORT) return false; int ret = ::connect(_socket_fd, (struct sockaddr*)(&remote_addr), sizeof(remote_addr)); if (ret == SOCKET_ERROR) { socket_debug("Connect %s:%d failed.", host, port); socket_error(); return false; } socket_debug("Connect %s:%d successfully.", host, port); return true; } bool Socket::bind(unsigned short port) { int opt = 1; if (setsockopt(_socket_fd, SOL_SOCKET, SO_REUSEADDR, (char*)&opt, sizeof(opt)) < 0) return false; struct sockaddr_in remote_addr; remote_addr.sin_family = _family; remote_addr.sin_addr.s_addr = INADDR_ANY; remote_addr.sin_port = htons(port); int ret = ::bind(_socket_fd, (struct sockaddr*)(&remote_addr), sizeof(remote_addr)); if (ret == SOCKET_ERROR) { socket_debug("Socket(%d) bind port(%d) failed.", _socket_fd, port); return false; } socket_debug("Socket(%d) bind port(%d) successfully.", _socket_fd, port); return true; } bool Socket::listen(int backlog) { int ret = ::listen(_socket_fd, backlog); if (ret == SOCKET_ERROR) { socket_debug("Socket(%d) listen failed.", _socket_fd); return false; } socket_debug("Socket(%d) Listen successfully.", _socket_fd); return true; } Socket* Socket::accept(char* client_host) { struct sockaddr_in com_socket; socklen_t len = sizeof(com_socket); SOCKET ret = -1; do { ret = ::accept(_socket_fd, (struct sockaddr*)(&com_socket), &len); if (ret == SOCKET_ERROR) { if (errno == EINTR) continue; else { socket_debug("Socket(%d) accept failed.", _socket_fd); socket_error(); return nullptr; } } else break; } while (true); avalon::Socket* socket = avalon::Socket::create(ret); if (client_host) { sprintf(client_host, "%s", inet_ntoa(com_socket.sin_addr)); } socket_debug("Socket(%d) accept successfully, client socket: %d ip: %s", _socket_fd, socket->getSocketFD(), inet_ntoa(com_socket.sin_addr)); return socket; } ssize_t Socket::send(const char* buffer, size_t len, int flag) { ssize_t count = 0; while (count < len) { ssize_t bytes = ::send(_socket_fd, buffer + count, len - count, flag); count += bytes; if (bytes == -1 || bytes == 0) { socket_error(); break; } } return count; } ssize_t Socket::recv(char* buffer, size_t len, int flag) { return ::recv(_socket_fd, buffer, len, flag); } ssize_t Socket::write(const char* buffer, size_t len) { ssize_t count = 0; while (count < len) { ssize_t bytes = ::write(_socket_fd, buffer + count, len - count); count += bytes; if (bytes == -1 || bytes == 0) { socket_error(); break; } } return count; } ssize_t Socket::read(char* buffer, size_t len) { return ::read(_socket_fd, buffer, len); } void Socket::set_blocking(const bool blocking) { int opts; opts = fcntl(_socket_fd, F_GETFL); if (opts < 0) return; if (!blocking) opts = (opts | O_NONBLOCK); else opts = (opts & ~O_NONBLOCK); fcntl(_socket_fd, F_SETFL, opts); } int Socket::close() { int ret = -1; #ifdef WIN32 ret = closesocket(_socket_fd); #else ret = ::close(_socket_fd); #endif if (ret == SOCKET_ERROR) { socket_debug("Socket(%d) close failed.", _socket_fd); } _socket_fd = -1; return ret; } SOCKET Socket::getSocketFD() { return _socket_fd; } }
2. 多線程的模型:數組
在accept成功以後,爲每一個通訊socket建立新的進程和線程,單獨用於處理服務器和客戶端的通訊。可是系統都會有建立進程數量的限制,在linux下,建立的線程也叫輕量級進程,因此即時建立的是線程也會受到系統的限制,一般這個默認限制是2048個,並且進程或者線程數量過多,也會致使進程或者線程切換的開銷:服務器
客戶端:網絡
avalon::Socket* socket = avalon::Socket::create(AF_INET, SOCK_STREAM); if (socket) { if (!socket->connect("127.0.0.1", 6666)) continue; char buf[1024] = ""; sprintf(buf, "%d I am a client socket!", i); ssize_t bytes = socket->send(buf, strlen(buf), 0); char recvBuf[1024]; while (true) { memset(recvBuf, 0, 1024); bytes = socket->recv(recvBuf, 1024); if (bytes > 0) { printf("%d recv data from remote: %d %s \n", i, bytes, recvBuf); } else if (bytes == 0) { printf("remote socket %d cloese. \n", socket->getSocketFD()); break; } else { int error = avalon::socket_error(); printf("%d socket error: %d %s \n", i, error, strerror(error)); break; } } }
服務端:多線程
void communiction_handler(avalon::Socket* socket) { char buffer[1024]; while (true) { if (!socket) continue; printf("thread %ld \n", std::this_thread::get_id()); ssize_t bytes = socket->recv(buffer, 1024, 0); if (bytes > 0) { buffer[bytes] = '\0'; printf("recv msg from client: %s \n", buffer); const char* data = "I am remote.0123456789abcdefg!wwwwwer"; ssize_t sendedBytes = socket->send(data, strlen(data), 0); } else if (bytes == 0) { printf("client socket(%d) closed. thread(%ld) \n", socket->getSocketFD(), std::this_thread::get_id()); socket->close(); break; } else { int error_no = avalon::socket_error(); printf("recv msg from client failed %d %s \n", error_no, strerror(error_no)); socket->close(); break; } } } int main(int argc, const char * argv[]) { // 多線程 std::vector<std::thread> threads; avalon::Socket* listen_socket = avalon::Socket::create(AF_INET, SOCK_STREAM); do { if (!listen_socket->bind(6666)) break; if (!listen_socket->listen(10)) break; while (true) { // 多線程 avalon::Socket* clientSocket = listen_socket->accept(); if (clientSocket) { threads.push_back( std::move( std::thread(communiction_handler, clientSocket) ) ); } } while (false); for(std::thread& thread : threads){ thread.join(); } delete listen_socket; }
3. I/O多路複用app
內核一旦檢測到某個I/O的讀取條件就緒,就通知用戶進程進行響應;socket
多路複用通常用於須要同時處理多個文件描述符,多個套接字口,多種協議的狀況;函數
相比使用多進程和多線程的機制,I/O多路複用具備系統開銷小的優點;
(1)select模型:
最大的問題就是鏈接數限制,一般是1024或者2048個,不過能夠修改內核配置達到更多的鏈接數。可是因爲select模型須要線性遍歷fd集合,所以若是鏈接數改的過大,例如10萬個,會致使線性遍歷的性能問題,最後的結果多是致使超時。其次,就是內存拷貝問題,select模型在fd消息通知用戶的時候,是採用的將內核中的數據拷貝到用戶空間中:
服務端:
// // main.cpp // SocketServer // // Created by avl-showell on 16/8/8. // Copyright © 2016年 avl-showell. All rights reserved. // #include <iostream> #include "socket/AvalonSocket.h" #include <mutex> #include <condition_variable> #include <chrono> #include <thread> #include <vector> // select #include <sys/select.h> #include <sys/time.h> #define MAX_CLIENT_SOCKET_COUNT 10000 #define RECV_BUFFER_LEN 10 int main(int argc, const char * argv[]) { std::vector<avalon::Socket*> socket_fds(MAX_CLIENT_SOCKET_COUNT, nullptr); fd_set read_fds, write_fds; struct timeval timeout; char recvBuf[RECV_BUFFER_LEN]; avalon::Socket* listen_socket = avalon::Socket::create(AF_INET, SOCK_STREAM); do { if (!listen_socket->bind(6666)) break; if (!listen_socket->listen(10)) break; while (true) { // select int listen_socket_fd = listen_socket->getSocketFD(); int max_socket_fd = listen_socket_fd; FD_ZERO(&read_fds); FD_SET(listen_socket_fd, &read_fds); for (int i = 0; i < MAX_CLIENT_SOCKET_COUNT; ++i) { avalon::Socket* socket = socket_fds[i]; if (!socket) continue; SOCKET socket_fd = socket->getSocketFD(); if (socket_fd > 0) { FD_SET(socket_fd, &read_fds); if (socket_fd > max_socket_fd) max_socket_fd = socket_fd; } } timeout.tv_sec = 5; timeout.tv_usec = 0; int ret = select(max_socket_fd + 1, &read_fds, NULL, NULL, &timeout); if (ret == SOCKET_ERROR) { avalon::socket_error(); break; } else if (ret == 0) { printf("select socket timeout. \n"); continue; } else { printf("_______________ \n"); for (int i = 0; i < MAX_CLIENT_SOCKET_COUNT; ++i) { avalon::Socket* socket = socket_fds[i]; if (!socket) continue; SOCKET socket_fd = socket->getSocketFD(); if (socket_fd > 0 && FD_ISSET(socket_fd, &read_fds)) { int recvedBytes = 0; while (true) { memset(recvBuf, 0, RECV_BUFFER_LEN); int bytes = socket->recv(recvBuf, RECV_BUFFER_LEN); if (bytes > 0) { recvedBytes += bytes; socket->send(recvBuf, RECV_BUFFER_LEN); break; } else { avalon::socket_error(); delete socket; socket_fds[i] = nullptr; break; } } recvBuf[recvedBytes] = '\0'; printf("select: recv data from client: %s \n", recvBuf);
// 處理數據... } } if (FD_ISSET(listen_socket_fd, &read_fds)) { printf("select: new client connection. \n"); bool found = false; for (int i = 0; i < MAX_CLIENT_SOCKET_COUNT; ++i) { avalon::Socket* socket = socket_fds[i]; if (!socket) { avalon::Socket* clientSocket = listen_socket->accept(); if (clientSocket) { // clientSocket->set_blocking(false); socket_fds[i] = clientSocket; found = true; break; } } } if (!found) { printf("select: out of max sockets limit. \n"); } } } } } while (false); delete listen_socket; return 0; }
(2) poll模型:
poll模型和select模型相似,可是poll沒有最大文件數量的限制,不過依然存在將消息從內核空間拷貝到用戶空間的問題:
服務端:
#include <poll.h> #define MAX_CLIENT_SOCKET_COUNT 10 #define RECV_BUFFER_LEN 10 int main(int argc, const char * argv[]) { char recvBuf[RECV_BUFFER_LEN]; avalon::Socket* listen_socket = avalon::Socket::create(AF_INET, SOCK_STREAM); // poll struct pollfd client_fds[MAX_CLIENT_SOCKET_COUNT]; client_fds[0].fd = listen_socket->getSocketFD(); client_fds[0].events = POLLIN; for (int i = 1; i < MAX_CLIENT_SOCKET_COUNT; ++i) { client_fds[i].fd = -1; } int max_socket = 0; do { if (!listen_socket->bind(6666)) break; if (!listen_socket->listen(10)) break; while (true) { int ready = poll(client_fds, max_socket + 1, 3000); if (ready == -1) { avalon::socket_error(); break; } else if (ready == 0) { printf("select socket timeout. \n"); continue; } printf("_______________ \n"); if (client_fds[0].revents & POLLIN) { printf("select: new client connection. \n"); bool found = false; int i = 0; for (i = 1; i < MAX_CLIENT_SOCKET_COUNT; ++i) { avalon::Socket* socket = socket_fds[i]; if (!socket) { avalon::Socket* clientSocket = listen_socket->accept(); if (clientSocket) { client_fds[i].fd = clientSocket->getSocketFD(); client_fds[i].events = POLLIN; socket_fds[i] = clientSocket; found = true; break; } } } if (!found) { printf("select: out of max sockets limit. \n"); } else { if (i > max_socket) max_socket = i; } } for (int j = 1; j <= max_socket; ++j) { avalon::Socket* socket = socket_fds[j]; if (!socket) continue; if (client_fds[j].revents & (POLLIN | POLLERR)) { int recvedBytes = 0; while (true) { memset(recvBuf, 0, RECV_BUFFER_LEN); int bytes = socket->read(recvBuf, RECV_BUFFER_LEN); if (bytes > 0) { recvedBytes += bytes; int writedBytes = socket->write(recvBuf, RECV_BUFFER_LEN); recvBuf[bytes] = '\0'; printf("select: recv data from client: %s \n", recvBuf); if (bytes < RECV_BUFFER_LEN) break; } else { avalon::socket_error(); delete socket; client_fds[j].fd = -1; socket_fds[j] = nullptr; break; } } } } } } while (false); for(std::thread& thread : threads){ thread.join(); } delete listen_socket; return 0; }
(2) epoll模型
epoll模型是poll模型的改進版本,沒有文件描述符的限制,epoll只處理活躍的文件描述符,不會遍歷整個集合,並且epoll使用了內核中的「共享內存」,減小了內存的拷貝:
/* 實現功能:經過epoll, 處理多個socket * 監聽一個端口,監聽到有連接時,添加到epoll_event */ #include "select.h" #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> #include <poll.h> #include <sys/epoll.h> #include <sys/time.h> #include <netinet/in.h> typedef struct _CLIENT{ int fd; struct sockaddr_in addr; /* client's address information */ } CLIENT; #define MYPORT 59000 //最多處理的connect #define MAX_EVENTS 500 //當前的鏈接數 int currentClient = 0; //數據接受 buf #define REVLEN 10 char recvBuf[REVLEN]; //EPOLL相關 //epoll描述符 int epollfd; //事件數組 struct epoll_event eventList[MAX_EVENTS]; void AcceptConn(int srvfd); void RecvData(int fd); int main() { int i, ret, sinSize; int recvLen = 0; fd_set readfds, writefds; int sockListen, sockSvr, sockMax; int timeout; struct sockaddr_in server_addr; struct sockaddr_in client_addr; //socket if((sockListen=socket(AF_INET, SOCK_STREAM, 0)) < 0) { printf("socket error\n"); return -1; } bzero(&server_addr, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_port = htons(MYPORT); server_addr.sin_addr.s_addr = htonl(INADDR_ANY); //bind if(bind(sockListen, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) { printf("bind error\n"); return -1; } //listen if(listen(sockListen, 5) < 0) { printf("listen error\n"); return -1; } //1. epoll 初始化 epollfd = epoll_create(MAX_EVENTS); struct epoll_event event; event.events = EPOLLIN|EPOLLET; event.data.fd = sockListen; //2. epoll_ctrl if(epoll_ctl(epollfd, EPOLL_CTL_ADD, sockListen, &event) < 0) { printf("epoll add fail : fd = %d\n", sockListen); return -1; } //epoll while(1) { timeout=3000; //3. epoll_wait int ret = epoll_wait(epollfd, eventList, MAX_EVENTS, timeout); if(ret < 0) { printf("epoll error\n"); break; } else if(ret == 0) { printf("timeout ...\n"); continue; } //直接獲取了事件數量,給出了活動的流,這裏是和poll區別的關鍵 int n = 0; for(n=0; n<ret; n++) { //錯誤退出 if ((eventList[n].events & EPOLLERR) || (eventList[n].events & EPOLLHUP) || !(eventList[n].events & EPOLLIN)) { printf ( "epoll error\n"); close (eventList[n].data.fd); return -1; } if (eventList[n].data.fd == sockListen) { AcceptConn(sockListen); }else{ RecvData(eventList[n].data.fd); //不刪除 // epoll_ctl(epollfd, EPOLL_CTL_DEL, pEvent->data.fd, pEvent); } } } close(epollfd); close(sockListen); printf("test\n"); return 0; } /************************************************** 函數名:AcceptConn 功能:接受客戶端的連接 參數:srvfd:監聽SOCKET ***************************************************/ void AcceptConn(int srvfd) { struct sockaddr_in sin; socklen_t len = sizeof(struct sockaddr_in); bzero(&sin, len); int confd = accept(srvfd, (struct sockaddr*)&sin, &len); if (confd < 0) { printf("bad accept\n"); return; }else { printf("Accept Connection: %d", confd); } //setnonblocking(confd); //4. epoll_wait //將新創建的鏈接添加到EPOLL的監聽中 struct epoll_event event; event.data.fd = confd; event.events = EPOLLIN|EPOLLET; epoll_ctl(epollfd, EPOLL_CTL_ADD, confd, &event); } //讀取數據 void RecvData(int fd) { int ret; int recvLen = 0; memset(recvBuf, 0, REVLEN); printf("RecvData function\n"); if(recvLen != REVLEN) { while(1) { //recv數據 ret = recv(fd, (char *)recvBuf+recvLen, REVLEN-recvLen, 0); if(ret == 0) { recvLen = 0; break; } else if(ret < 0) { recvLen = 0; break; } //數據接受正常 recvLen = recvLen+ret; if(recvLen<REVLEN) { continue; } else { //數據接受完畢 printf("buf = %s\n", recvBuf); recvLen = 0; break; } } } printf("content is %s", recvBuf); }