1.Echo網絡庫的編寫ios
1.1 Echo網絡庫1.0 c++
1.1.1 Echo網絡庫 1.0 框架分析
a)class InetAddress: 主要用來定義一個struct sockaddr_in 結構(用自定義端口號初始化),並提供獲取這個結構體成員如IP、Port等的接口; 數組
b)class Socket : 主要用來把一個普通的 sockfd 變爲 listenfd(這裏用一個sockfd初始化對象),提供bind 、listen、accept 等接口。服務器
c)class TcpConnection:包含一個TCP鏈接 和 一個Rio緩衝區,該類對外提供讀寫的接口而且經過調用 Rio 的函數封裝了本身的IO接口; 網絡
d)class PollPoller:封裝Poll模型的主要成員和接口,這裏增長了一個 map<fd, TcpConnectionPtr>類型的成員變量,用來表示 fd 到 TCP 鏈接的映射。數據結構
1.1.2 源碼和測試案例多線程
a)Rio 類app
#ifndef __RIO_H__ #define __RIO_H__ #include "NonCopyable.h" #include <stdio.h> #define RIO_BUFFER 8192 /* * 將read write 操做封裝到一個帶有緩衝區的 IO 系統中 * */ class Rio : NonCopyable{ public: explicit Rio(int fd); ssize_t readn(char *usrbuf, size_t n); ssize_t writen(const char *usrbuf, size_t n); ssize_t readline(char *usrbuf, size_t maxline); private: ssize_t read(char *usrbuf, size_t n); //這個函數供上述函數調用 int fd_; ssize_t left_; //緩衝區剩餘可讀取的字節數 char *bufptr_; //指向可讀取字節緩衝區 char buffer_[RIO_BUFFER]; }; #endif /*__RIO_H__*/ #include "Rio.h" #include <unistd.h> #include <string.h> #include <stdio.h> #include <errno.h> #define ERR_EXIT(m) \ do { \ perror(m);\ exit(EXIT_FAILURE);\ }while(0) Rio::Rio(int fd) :fd_(fd), left_(0), bufptr_(buffer_){ memset(buffer_, 0, sizeof buffer_); } ssize_t Rio::readn(char *usrbuf, size_t n){ char *bufptr = usrbuf; ssize_t nleft = n; ssize_t nread; while(nleft > 0){ nread = read(bufptr, nleft); if(nread == -1){ return -1; } else if(nread == 0) break; nleft -= nread; bufptr += nread; } return (n - nleft); } ssize_t Rio::writen(const char *usrbuf, size_t n){ const char *bufptr = usrbuf; size_t nleft = n; int nwrite; while(nleft > 0){ nwrite = write(fd_, bufptr, nleft); if(nwrite <= 0){ if(errno == EINTR) nwrite = 0; else return -1; } nleft -= nwrite; bufptr += nwrite; } return n; } /* * readline 每次都讀取一個字符 而後判斷是不是\n * 注意讀取一行時 如fgets 最後一個字符包含\n */ ssize_t Rio::readline(char *usrbuf, size_t maxline){ char *bufptr = usrbuf; ssize_t nleft = maxline - 1; ssize_t nread; char c; while(nleft > 0){ nread = read(&c, 1); if(nread == -1) return -1; else if(nread == 0) break; *bufptr++ = c; nleft --; if(c == '\n') break; } *bufptr = '\0'; return (maxline - 1 - nleft); } /* * 緩衝區中一直有數據可讀 而且每次都是直接從buffer_中讀取 * 再也不繫統調用 只有當緩衝區中沒有數據可讀時,才調用系統的read */ ssize_t Rio::read(char *usrbuf, size_t n){ ssize_t nread; while(left_ <= 0){ //緩衝區中沒有有效數據 nread = ::read(fd_, buffer_, sizeof buffer_); if(nread == -1){ if(errno ==EINTR) continue; return -1; } else if(nread == 0) return 0; left_ = nread; bufptr_ = buffer_; } int cnt = n; // cnt 存儲返回值,即用戶實際獲取的字節數 if(left_ < cnt) cnt = left_; ::memcpy(usrbuf, bufptr_, cnt); left_ -= cnt; bufptr_ += cnt; return cnt; }
#include "Rio.h" #include <iostream> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #define ERR_EXIT(m) \ do { \ perror(m);\ exit(EXIT_FAILURE);\ }while(0) using namespace std; int main(int argc, const char *argv[]) { int fd = open("test.txt", O_RDONLY); if(fd == -1) ERR_EXIT("open"); Rio rio(fd); char buf[50] = {0}; rio.readline(buf, 50); cout << buf ; memset(buf, 0, sizeof buf); rio.readn(buf, sizeof buf); cout << buf ; close(fd); return 0; }
b)InetAddress 類框架
#ifndef __INET_ADDRESS_H__ #define __INET_ADDRESS_H__ #include "Copyable.h" #include <netinet/in.h> //strcuct sock_addr_in 結構在這裏定義 #include <string.h> // memset 函數 /* * 對 struct sockaddr_in 的定義 * */ typedef struct sockaddr SA; class InetAddress : private Copyable{ public: explicit InetAddress(uint16_t port); //explicit 禁止參數隱式轉換 InetAddress(const struct sockaddr_in &addr); const struct sockaddr_in *getSockAddrInet() const{ return &addr_; } void setSockAddrInet(const struct sockaddr_in &addr){ addr_= addr; } uint16_t portNetEndian() const{ //返回sockaddr_in的端口 return addr_.sin_port; } uint32_t ipNetEndian() const{ //返回IP return addr_.sin_addr.s_addr; } private: struct sockaddr_in addr_; }; inline InetAddress::InetAddress(uint16_t port){ ::memset(&addr_, 0, sizeof addr_); // ::表示全局做用域 避免重名 addr_.sin_family = AF_INET; addr_.sin_port = ::htons(port); addr_.sin_addr.s_addr = ::htonl(INADDR_ANY); } inline InetAddress::InetAddress(const struct sockaddr_in &addr) :addr_(addr){ } #endif /*__INET_ADDRESS_H__*/
c)Socket類socket
#ifndef __SOCKET_H__ #define __SOCKET_H__ #include "NonCopyable.h" /* * 將一個fd 變成listenfd, 並開始處理用戶的請求 * */ class InetAddress; class Socket : NonCopyable{ public: explicit Socket(int sockfd); ~Socket(); //關閉sockfd int fd() const{ return sockfd_; } void bindAddress(const InetAddress &addr); void listen(); int accept(); void shutdownWrite(); void setReusePort(); private: const int sockfd_; }; #endif /*__SOCKET_H__*/ #include "Socket.h" #include "InetAddress.h" #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <stdlib.h> #include <unistd.h> #include <stdio.h> #define ERR_EXIT(m) \ do { \ perror(m);\ exit(EXIT_FAILURE);\ }while(0) Socket::Socket(int sockfd) :sockfd_(sockfd){ } Socket::~Socket(){ ::close(sockfd_); } void Socket::bindAddress(const InetAddress &addr){ if(::bind(sockfd_, (struct sockaddr *)&addr, sizeof (struct sockaddr_in)) == -1) ERR_EXIT("bind"); } void Socket::listen(){ if(::listen(sockfd_, SOMAXCONN) == -1) ERR_EXIT("listen"); } int Socket::accept(){ int peerfd = ::accept(sockfd_, NULL, NULL); if(peerfd == -1) ERR_EXIT("accept"); return peerfd; } void Socket::shutdownWrite(){ ::shutdown(sockfd_, SHUT_WR); } void Socket::setReusePort(){ int on = 1; if(setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &on, static_cast<socklen_t>(sizeof on)) == -1) ERR_EXIT("setsockopt"); }
d)TcpConnection 類
#ifndef __TCP_CONNECTION_H__ #define __TCP_CONNECTION_H__ #include "NonCopyable.h" #include "InetAddress.h" #include "Socket.h" #include "Rio.h" #include <string> #include <functional> #include <memory> /* * 表示一個 TCP 鏈接,提供收發信息,關閉鏈接等操做 */ class TcpConnection; typedef std::shared_ptr<TcpConnection> TcpConnectionPtr; //智能指針類型 class TcpConnection{ public: typedef std::function<void (const TcpConnectionPtr &)> TcpConnectionCallback; //回調函數類型,注意這裏含有一個智能指針參數,所以回調的時候注意傳參數 explicit TcpConnection(int sockfd); ~TcpConnection(); int fd() const{ return socket_.fd(); } // 經過調用 Rio 中的函數,封裝一套本身的 IO 接口 ssize_t readn(char *usrbuf, size_t n); ssize_t readline(char *usrbuf, size_t maxline); ssize_t writen(const char *usrbuf, size_t n); void send(const std::string &s); // 向TCP鏈接中的對端發送數據 std::string receive();//接收數據 void shutdown(); //關閉本鏈接的寫端 private: Socket socket_; //一個Socket對象就表示一條已經創建的TCP鏈接 Rio buffer_; //該TCP鏈接的讀寫緩衝區 即用本fd初始化 }; #endif /*__TCP_CONNECTION_H__*/ #include "TcpConnection.h" #include <stdlib.h> using namespace std; #define ERR_EXIT(m) \ do { \ perror(m);\ exit(EXIT_FAILURE);\ }while(0) TcpConnection::TcpConnection(int sockfd) :socket_(sockfd), buffer_(sockfd){ } TcpConnection::~TcpConnection(){ shutdown(); } ssize_t TcpConnection::readn(char *usrbuf, size_t n){ ssize_t nread = buffer_.readn(usrbuf, n); if(nread == -1) ERR_EXIT("read"); return nread; } ssize_t TcpConnection::readline(char *usrbuf, size_t maxline){ ssize_t nread = buffer_.readline(usrbuf, maxline); if(nread == -1) ERR_EXIT("readline"); return nread; } ssize_t TcpConnection::writen(const char *usrbuf, size_t n){ ssize_t nwrite = buffer_.writen(usrbuf, n); if(nwrite <= 0) ERR_EXIT("writen"); return nwrite; } void TcpConnection::send(const string &s){ writen(s.c_str(), s.size()); //調用本身的IO 函數 } string TcpConnection::receive(){ char buf[1024]; readline(buf, sizeof buf); return string(buf); } void TcpConnection::shutdown(){ socket_.shutdownWrite(); } #include "TcpConnection.h" #include "InetAddress.h" #include "Socket.h" #include <iostream> #include <stdlib.h> #define ERR_EXIT(m) \ do { \ perror(m);\ exit(EXIT_FAILURE);\ }while(0) /* * 測試 TcpConnection 類 * */ using namespace std; int main(int argc, const char *argv[]) { int fd = socket(AF_INET, SOCK_STREAM, 0); if(fd == -1) ERR_EXIT("socket"); InetAddress inet(9999); Socket sock(fd); sock.setReusePort(); sock.bindAddress(inet); sock.listen(); int peerfd; peerfd = sock.accept(); TcpConnection con(peerfd); char buf[1024]; while(1){ con.readline(buf, 1024); cout << "recv data: " << buf << endl; con.writen(buf, strlen(buf)); } return 0; }
e)PollPoller類
#ifndef __POLL_POLLELR_H__ #define __POLL_POLLELR_H__ #include "NonCopyable.h" #include "TcpConnection.h" #include <poll.h> #include <map> #include <functional> class PollPoller : private NonCopyable{ public: typedef TcpConnection::TcpConnectionCallback PollerCallback; //爲何要用TcpConnection的回調函數類型 explicit PollPoller(int listenfd); void poll(); void handleAccept(); void handleData(); void handleConnectionEvent(int peerfd); void handleMessageEvent(int peerfd); void handleCloseEvent(int i); void setConnectionCallback(const PollerCallback &cb){ onConnectionCallback_ = cb; } void setMessageCallback(const PollerCallback &cb){ onMessageCallback_ = cb; } void setCloseCallback(const PollerCallback &cb){ onCloseCallback_ = cb; } private: struct pollfd event_[2048]; int listenfd_; int maxi_; int nready_; std::map<int, TcpConnectionPtr> lists_; // 一個fd 對應 一個指向Tcp鏈接的智能指針(採用引用計數) PollerCallback onConnectionCallback_; PollerCallback onMessageCallback_; PollerCallback onCloseCallback_; typedef std::map<int, TcpConnectionPtr>::iterator TcpIterator; }; #endif /*__POLL_POLLELR_H__*/ #include "PollPoller.h" #include <utility> #include <assert.h> #include <iostream> #define ERR_EXIT(m) \ do { \ perror(m);\ exit(EXIT_FAILURE);\ }while(0) PollPoller::PollPoller(int listenfd) :listenfd_(listenfd){ int i; for(i = 0; i < 2048; i++) event_[i].fd = -1; event_[0].fd = listenfd_; event_[0].events = POLLIN; maxi_ = 0; nready_ = 0; } void PollPoller::poll(){ int ret; do{ ret = ::poll(event_, maxi_ + 1, 10000); }while(ret == -1 && errno == EINTR); if(ret == -1) ERR_EXIT("poll"); nready_ = ret; } void PollPoller::handleAccept(){ if(event_[0].revents & POLLIN){ int peerfd; if((peerfd = ::accept(listenfd_, NULL, NULL)) == -1) ERR_EXIT("accpet"); handleConnectionEvent(peerfd); } } void PollPoller::handleData(){ // 依次處理客戶 int i; for(i = 1; i <= maxi_; i++){ //注意 這裏從1開始 int peerfd = event_[i].fd; if(peerfd == -1) continue; if(event_[i].revents & POLLIN){ char buf[1024]; int nread = ::recv(peerfd, buf, sizeof buf, MSG_PEEK); //預讀取 if(nread == -1) ERR_EXIT("revc"); else if(nread == 0) handleCloseEvent(i); else handleMessageEvent(peerfd); } } } void PollPoller::handleConnectionEvent(int peerfd){ // 1.將新鏈接的fd 添加到 event 數組中 int i; for(i = 1; i < 2048; i++){ if(event_[i].fd == -1){ event_[i].fd = peerfd; event_[i].events = POLLIN; if(i > maxi_) maxi_ = i; break; } } if(i == 2048){ std::cout << " too many clients" << std::endl; exit(EXIT_FAILURE); } // 2.將該新的TCP鏈接添加到 map 中 TcpConnectionPtr conn(new TcpConnection(peerfd)); std::pair<TcpIterator, bool> ret = lists_.insert(make_pair(peerfd, conn)); assert(ret.second == true); //這裏確認是否成功插入 // 3.調用處理鏈接時要用的回調函數 onConnectionCallback_(conn); // 注意回調函數要傳入一個TcpConnectionPtr 對象表示當前 TCP 鏈接 } void PollPoller::handleMessageEvent(int peerfd){ TcpIterator it = lists_.find(peerfd); assert(it != lists_.end()); onMessageCallback_(it->second); } void PollPoller::handleCloseEvent(int i){ // 1.從event_中清除 assert(i >= 0 && i < 2048); int peerfd = event_[i].fd; assert(peerfd != -1); event_[i].fd = -1; //2.調用Close時的回調函數 並從map 中清除 TcpIterator it = lists_.find(peerfd); assert(it != lists_.end()); onCloseCallback_(it->second); lists_.erase(it); } #include "PollPoller.h" #include "Socket.h" #include "InetAddress.h" #include <iostream> using namespace std; #define ERR_EXIT(m) \ do { \ perror(m);\ exit(EXIT_FAILURE);\ }while(0) // 自定義的回調函數 void onConnect(const TcpConnectionPtr &conn){ conn->send("Hello!\r\n"); } void onMessage(const TcpConnectionPtr &conn){ string recvData(conn->receive()); cout << "recv data:" << recvData; conn->send(recvData); } void onClose(const TcpConnectionPtr &conn){ cout << "Close conn" << endl; conn->shutdown(); } int main(int argc, const char *argv[]) { int fd = socket(AF_INET, SOCK_STREAM, 0); if(fd == -1) ERR_EXIT("socket"); Socket sock(fd); InetAddress inet(9999); sock.bindAddress(inet); sock.listen(); PollPoller poller(sock.fd()); poller.setConnectionCallback(onConnect); poller.setMessageCallback(onMessage); poller.setCloseCallback(onClose); while(1){ poller.poll(); poller.handleAccept(); poller.handleData(); } return 0; }
1.2 Echo網絡庫 2.0
1.2.1 Echo 網絡庫2.0改進:
a)InetAddress 結構中新增獲取本地sockaddr_in 結構和對端sockaddr_in 結構的成員函數 以及 獲取點分十進制的IP和本地字節序端口號的函數;
b)TcpConnection 新增回調函數的設置和執行接口,其中在 shared_ptr 的 enable_shared_from_this 類中定義了成員函數 shared_from_this() ,返回 shared_ptr<TcpConnection> 類型的變量;此外還新增了兩個InetAddress類的成員變量即一個TCP鏈接兩端的sockaddr_in 結構體,並加入了構造函數的參數中。
c)PollPoller 中函數執行回調函數的方式發生改變,這裏須要向TcpConnection對象中註冊回調函數,而後由TcpConnectionPtr 智能指針對象調用TcpConnection中註冊的回調函數。
1.2.2 改進的程序
a)InetAddress類
#ifndef __INET_ADDRESS_H__ #define __INET_ADDRESS_H__ #include "Copyable.h" #include <netinet/in.h> //strcuct sock_addr_in 結構在這裏定義 #include <string.h> // memset 函數 #include <string> /* * 對 struct sockaddr_in 的定義 * */ typedef struct sockaddr SA; class InetAddress : private Copyable{ public: explicit InetAddress(uint16_t port); //explicit 禁止參數隱式轉換 InetAddress(const struct sockaddr_in &addr); const struct sockaddr_in *getSockAddrInet() const{ return &addr_; } void setSockAddrInet(const struct sockaddr_in &addr){ addr_= addr; } //返回網絡字節序的 IP 和 port uint16_t portNetEndian() const{ return addr_.sin_port; } uint32_t ipNetEndian() const{ return addr_.sin_addr.s_addr; } // 2.0 add // 返回主機字節序的IP 和 Port std::string toIP() const; uint16_t toPort() const; static InetAddress getLocalAddress(int sockfd); static InetAddress getPeerAddress(int sockfd); private: struct sockaddr_in addr_; }; #endif /*__INET_ADDRESS_H__*/ #include "InetAddress.h" #include <sys/socket.h> #include <arpa/inet.h> #include <stdlib.h> #include <stdio.h> #define ERR_EXIT(m) \ do { \ perror(m);\ exit(EXIT_FAILURE);\ }while(0) InetAddress::InetAddress(uint16_t port){ ::memset(&addr_, 0, sizeof addr_); // ::表示全局做用域 避免重名 addr_.sin_family = AF_INET; addr_.sin_port = ::htons(port); addr_.sin_addr.s_addr = ::htonl(INADDR_ANY); } InetAddress::InetAddress(const struct sockaddr_in &addr) :addr_(addr){ } std::string InetAddress::toIP() const{ return std::string(inet_ntoa(addr_.sin_addr)); } uint16_t InetAddress::toPort() const{ return ntohs(addr_.sin_port); } InetAddress InetAddress::getLocalAddress(int sockfd){ struct sockaddr_in addr; socklen_t len = sizeof addr; if(::getsockname(sockfd, (SA *)&addr, &len) == -1) ERR_EXIT("getsockname"); return InetAddress(addr); } InetAddress InetAddress::getPeerAddress(int sockfd){ struct sockaddr_in addr; socklen_t len = sizeof addr; if(::getpeername(sockfd, (SA *)&addr, &len) == -1) ERR_EXIT("getpeername"); return InetAddress(addr); }
b)TcpConnection 類
#ifndef __TCP_CONNECTION_H__ #define __TCP_CONNECTION_H__ #include "NonCopyable.h" #include "InetAddress.h" #include "Socket.h" #include "Rio.h" #include <string> #include <functional> #include <memory> /* * 表示一個 TCP 鏈接,提供收發信息,關閉鏈接等操做 */ class TcpConnection; typedef std::shared_ptr<TcpConnection> TcpConnectionPtr; //智能指針類型 class TcpConnection : private NonCopyable, public std::enable_shared_from_this<TcpConnection>{ public: typedef std::function<void (const TcpConnectionPtr &)> TcpConnectionCallback; //回調函數類型,注意這裏含有一個智能指針參數,所以回調的時候注意傳參數 TcpConnection(int sockfd, const InetAddress localAddr, const InetAddress peerAddr); ~TcpConnection(); int fd() const{ return socket_.fd(); } // 經過調用 Rio 中的函數,封裝一套本身的 IO 接口 ssize_t readn(char *usrbuf, size_t n); ssize_t readline(char *usrbuf, size_t maxline); ssize_t writen(const char *usrbuf, size_t n); void send(const std::string &s); // 向TCP鏈接中的對端發送數據 std::string receive();//接收數據 void shutdown(); //關閉本鏈接的寫端 //2.0 add //獲取本TCP 鏈接兩端的sockaddr 結構 const InetAddress &getLocalAddr() const{ return localAddr_; } const InetAddress &getPeerAddress() const{ return peerAddr_; } //註冊回調函數 void setConnectionCallback(const TcpConnectionCallback &cb){ onConnectionCallback_= cb; } void setMessageCallback(const TcpConnectionCallback &cb){ onMessageCallback_ = cb; } void setCloseCallback(const TcpConnectionCallback &cb){ onCloseCallback_ = cb; } //執行回調函數 void handleConnection(){ if(onConnectionCallback_) onConnectionCallback_(shared_from_this()); //shared_from_this 返回TcpConnectionPtr } void handleMessage(){ if(onMessageCallback_) onMessageCallback_(shared_from_this()); } void handleClose(){ if(onCloseCallback_) onCloseCallback_(shared_from_this()); else shutdown(); } private: Socket socket_; //一個Socket對象就表示一條已經創建的TCP鏈接 Rio buffer_; //該TCP鏈接的讀寫緩衝區 即用本fd初始化 //2.0 add const InetAddress localAddr_; const InetAddress peerAddr_; TcpConnectionCallback onConnectionCallback_; TcpConnectionCallback onMessageCallback_; TcpConnectionCallback onCloseCallback_; }; #endif /*__TCP_CONNECTION_H__*/ #include "TcpConnection.h" #include <stdlib.h> using namespace std; #define ERR_EXIT(m) \ do { \ perror(m);\ exit(EXIT_FAILURE);\ }while(0) TcpConnection::TcpConnection(int sockfd, const InetAddress localAddr, const InetAddress peerAddr) :socket_(sockfd), buffer_(sockfd), localAddr_(localAddr), peerAddr_(peerAddr){ } TcpConnection::~TcpConnection(){ shutdown(); } ssize_t TcpConnection::readn(char *usrbuf, size_t n){ ssize_t nread = buffer_.readn(usrbuf, n); if(nread == -1) ERR_EXIT("read"); return nread; } ssize_t TcpConnection::readline(char *usrbuf, size_t maxline){ ssize_t nread = buffer_.readline(usrbuf, maxline); if(nread == -1) ERR_EXIT("readline"); return nread; } ssize_t TcpConnection::writen(const char *usrbuf, size_t n){ ssize_t nwrite = buffer_.writen(usrbuf, n); if(nwrite <= 0) ERR_EXIT("writen"); return nwrite; } void TcpConnection::send(const string &s){ writen(s.c_str(), s.size()); //調用本身的IO 函數 } string TcpConnection::receive(){ char buf[1024]; readline(buf, sizeof buf); return string(buf); } void TcpConnection::shutdown(){ socket_.shutdownWrite(); }
c)PollPoller類
#ifndef __POLL_POLLELR_H__ #define __POLL_POLLELR_H__ #include "NonCopyable.h" #include "TcpConnection.h" #include <poll.h> #include <map> #include <functional> class PollPoller : private NonCopyable{ public: typedef TcpConnection::TcpConnectionCallback PollerCallback; explicit PollPoller(int listenfd); void poll(); void handleAccept(); void handleData(); void handleConnectionEvent(int peerfd); void handleMessageEvent(int peerfd); void handleCloseEvent(int i); void setConnectionCallback(const PollerCallback &cb){ onConnectionCallback_ = cb; } void setMessageCallback(const PollerCallback &cb){ onMessageCallback_ = cb; } void setCloseCallback(const PollerCallback &cb){ onCloseCallback_ = cb; } private: struct pollfd event_[2048]; int listenfd_; int maxi_; int nready_; std::map<int, TcpConnectionPtr> lists_; // 一個fd 對應 一個指向Tcp鏈接的智能指針(採用引用計數) PollerCallback onConnectionCallback_; PollerCallback onMessageCallback_; PollerCallback onCloseCallback_; typedef std::map<int, TcpConnectionPtr>::iterator TcpIterator; }; #endif /*__POLL_POLLELR_H__*/ #include "PollPoller.h" #include <utility> #include <assert.h> #include <iostream> #define ERR_EXIT(m) \ do { \ perror(m);\ exit(EXIT_FAILURE);\ }while(0) PollPoller::PollPoller(int listenfd) :listenfd_(listenfd){ int i; for(i = 0; i < 2048; i++) event_[i].fd = -1; event_[0].fd = listenfd_; event_[0].events = POLLIN; maxi_ = 0; nready_ = 0; } void PollPoller::poll(){ int ret; do{ ret = ::poll(event_, maxi_ + 1, 10000); }while(ret == -1 && errno == EINTR); if(ret == -1) ERR_EXIT("poll"); nready_ = ret; } void PollPoller::handleAccept(){ if(event_[0].revents & POLLIN){ int peerfd; if((peerfd = ::accept(listenfd_, NULL, NULL)) == -1) ERR_EXIT("accpet"); handleConnectionEvent(peerfd); } } void PollPoller::handleData(){ // 依次處理客戶 int i; for(i = 1; i <= maxi_; i++){ //注意 這裏從1開始 int peerfd = event_[i].fd; if(peerfd == -1) continue; if(event_[i].revents & POLLIN){ char buf[1024]; int nread = ::recv(peerfd, buf, sizeof buf, MSG_PEEK); //預讀取 if(nread == -1) ERR_EXIT("revc"); else if(nread == 0) handleCloseEvent(i); else handleMessageEvent(peerfd); } } } void PollPoller::handleConnectionEvent(int peerfd){ // 1.將新鏈接的fd 添加到 event 數組中 int i; for(i = 1; i < 2048; i++){ if(event_[i].fd == -1){ event_[i].fd = peerfd; event_[i].events = POLLIN; if(i > maxi_) maxi_ = i; break; } } if(i == 2048){ std::cout << " too many clients" << std::endl; exit(EXIT_FAILURE); } // 2.將該新的TCP鏈接添加到 map 中 TcpConnectionPtr conn(new TcpConnection(peerfd, InetAddress::getLocalAddress(peerfd), InetAddress::getPeerAddress(peerfd)) ); std::pair<TcpIterator, bool> ret = lists_.insert(make_pair(peerfd, conn)); assert(ret.second == true); //這裏確認是否成功插入 //2.0 add 註冊回調函數到Tcpconnection類中 conn->setConnectionCallback(onConnectionCallback_); conn->setMessageCallback(onMessageCallback_); conn->setCloseCallback(onCloseCallback_); // 3.調用處理鏈接時要用的回調函數 //onConnectionCallback_(conn); // 注意回調函數要傳入一個TcpConnectionPtr 對象表示當前 TCP 鏈接 //2.0 回調函數的方式發生改變 conn->handleConnection(); } void PollPoller::handleMessageEvent(int peerfd){ TcpIterator it = lists_.find(peerfd); assert(it != lists_.end()); //onMessageCallback_(it->second); //2.0 add it->second->handleMessage(); } void PollPoller::handleCloseEvent(int i){ // 1.從event_中清除 assert(i >= 0 && i < 2048); int peerfd = event_[i].fd; assert(peerfd != -1); event_[i].fd = -1; //2.調用Close時的回調函數 並從map 中清除 TcpIterator it = lists_.find(peerfd); assert(it != lists_.end()); // onCloseCallback_(it->second); //2.0 add it->second->handleClose(); lists_.erase(it); }
1.3 Echo服務器模型 3.0
1.3.1 Echo 服務器模型3.0改進
a)根據現有的類封裝一個TcpServer類,包含設置回調函數,開啓服務器等接口,到這裏一個服務器模型已經相對完整。
1.3.2 新增 TcpServer 類
#ifndef __TCP_SERVER_H #define __TCP_SERVER_H #include "InetAddress.h" #include "Socket.h" #include "TcpConnection.h" #include "PollPoller.h" #include <memory> class TcpServer : private NonCopyable{ public: typedef TcpConnection::TcpConnectionCallback TcpServerCallback; explicit TcpServer(const InetAddress &inet); // 設置回調函數 void setConnection(const TcpServerCallback &cb){ onConnect_ = cb; } void setMessage(const TcpServerCallback &cb){ onMessage_ = cb; } void setClose(const TcpServerCallback &cb){ onClose_ =cb; } // 開啓服務器 void start(); private: std::unique_ptr<Socket> socket_; std::unique_ptr<PollPoller> poller_; TcpServerCallback onConnect_; TcpServerCallback onMessage_; TcpServerCallback onClose_; }; #endif /*__TCP_SERVER_H*/ #include "TcpServer.h" #include <signal.h> #define ERR_EXIT(m) \ do { \ perror(m);\ exit(EXIT_FAILURE);\ }while(0) //4 add class IgnoreSigpipe{ public: IgnoreSigpipe(){ if(::signal(SIGPIPE, SIG_IGN) == SIG_ERR) ERR_EXIT("signal"); } }; IgnoreSigpipe initObj; //全局對象,系統初始化時必然處理SIGPIPE TcpServer::TcpServer(const InetAddress &addr){ int sockfd = ::socket(AF_INET, SOCK_STREAM, 0); if(sockfd == -1) ERR_EXIT("socket"); socket_.reset(new Socket(sockfd)); //reset是unique_ptr的一個成員函數 socket_->setReusePort(); socket_->bindAddress(addr); socket_->listen(); } void TcpServer::start(){ poller_.reset(new PollPoller(socket_->fd())); poller_->setConnectionCallback(onConnect_); poller_->setMessageCallback(onMessage_); poller_->setCloseCallback(onClose_); while(1){ poller_->poll(); poller_->handleAccept(); poller_->handleData(); } }
1.4 Echo 網絡庫 4.0
1.4.1 Echo 網絡庫 4.0 改進
a)增長了線程池機制 和 異常的處理;
b)線程池中將 PtrVector<Thread> threads_;改成std::vector<std::unique_ptr<Thread> > threads_;
c)在TcpServer類中增長了sigpipe信號的處理;
1.4.2 新增類
a)MutexLock Condition Thread ThreadPool
class Thread : NonCopyable{ public: typedef std::function<void ()> Callback_Func; Thread(Callback_Func func); ~Thread(); void start(); void join(); private: pthread_t tid_; bool is_started_; Callback_Func callback_; }; #endif #include "Thread.h" #include <iostream> class RunFunction{ public: typedef std::function<void ()> Callback_Func; RunFunction(Callback_Func func) :func_(func) { } void run(){ func_(); } private: std::function<void ()> func_; }; Thread::Thread(Callback_Func func) :tid_(0), is_started_(false), callback_(func) { } Thread::~Thread(){// if(is_started_){ pthread_detach(tid_); } } void *thread_func(void *arg){ RunFunction *pr = static_cast<RunFunction *>(arg); pr->run(); delete pr; return NULL; } void Thread::start(){ is_started_ = true; RunFunction *pr = new RunFunction(callback_); if(pthread_create(&tid_, NULL, thread_func, pr)){ delete pr; } } void Thread::join(){ is_started_ = false; pthread_join(tid_, NULL); } #ifndef __THREADPOOL_H__ #define __THREADPOOL_H__ #include "Condition.h" #include "MutexLock.h" #include "Thread.h" #include <functional> #include <queue> #include <memory> class Thread; class ThreadPool{ public: typedef std::function<void ()> Task; //任務類型 ThreadPool(size_t poolSize, size_t queueSize); ~ThreadPool(); void start(); void stop(); void add_task(const Task &tsk); Task get_task(); private: void runInThread(); //線程池內的回調函數 size_t poolSize_; size_t queueSize_; //PtrVector<Thread> threads_; //線程數組 // 4 add std::vector<std::unique_ptr<Thread> > threads_; std::queue<Task> que_; //任務隊列 MutexLock mutex_; Condition empty_; Condition full_; bool isStarted_; //線程池是否開啓 }; #endif /*__THREADPOOL_H__*/ #include "ThreadPool.h" #include "Thread.h" ThreadPool::ThreadPool(size_t poolSize, size_t queueSize) :poolSize_(poolSize), queueSize_(queueSize), empty_(mutex_), full_(mutex_), isStarted_(false) { } ThreadPool::~ThreadPool(){ if(isStarted_){ stop(); } } void ThreadPool::start(){ if(isStarted_) return; isStarted_ = true; size_t i; for(i = 0; i < poolSize_; i++){ threads_.push_back(std::unique_ptr<Thread>(new Thread(std::bind(&ThreadPool::runInThread, this)))); } for(i = 0; i < poolSize_; i++){ threads_[i]->start(); //數組中存放的是指針 Thread * } } void ThreadPool::stop(){ if(! isStarted_) return; { MutexLockGuard lock(mutex_); isStarted_ = false; full_.notifyAll(); } size_t i; for(i = 0; i < poolSize_; i++){ threads_[i]->join(); } while(!que_.empty()){ que_.pop(); } threads_.clear(); } void ThreadPool::runInThread(){ while(isStarted_){ Task tsk(get_task()); if(tsk) tsk(); //執行真正的任務 } } void ThreadPool::add_task(const Task &tsk){ MutexLockGuard lock(mutex_); while(que_.size() == queueSize_){ // 任務隊列已滿 empty_.wait(); } que_.push(tsk); full_.notify(); } ThreadPool::Task ThreadPool::get_task(){ MutexLockGuard lock(mutex_); while(que_.empty() && isStarted_){ full_.wait(); } //如果被stop函數喚醒 此時隊列已被清空 Task tsk; if(!que_.empty()){ tsk = que_.front(); que_.pop(); empty_.notify(); } return tsk; }
b)Exception 類
#ifndef __EXCEPTION_H__ #define __EXCEPTION_H__ #include <exception> #include <string> class Exception : public std::exception{ public: Exception(const char *); Exception(const std::string &); virtual ~Exception() throw();//表示這個函數不拋出異常 virtual const char * what() const throw(); const char* stackTrace()throw(); private: void fillStackTrace();// std::string message_; //異常的名字 std::string stack_; //棧痕跡 }; #endif #include "Exception.h" #include <execinfo.h> #include <stdlib.h> Exception::Exception(const char *s) :message_(s) { fillStackTrace(); } Exception::Exception(const std::string &s) :message_(s) { fillStackTrace(); } Exception::~Exception() throw() { } const char * Exception::what() const throw(){ return message_.c_str(); } void Exception::fillStackTrace(){ const int len = 200; void * buffer[len]; // 獲取棧痕跡 存儲在buffer數組中,這裏len是數組的最大長度,而返回值nptrs是數組的實際長度 int nptrs = ::backtrace(buffer, len); //把buffer中的地址轉化成字符串存儲在strings中 //這裏在生成strings的時候調用了malloc函數 動態分配了內存 所以後面須要free char** strings = ::backtrace_symbols(buffer, nptrs); if(strings){ int i; for(i = 0; i < nptrs; i++){ stack_.append(strings[i]); stack_.push_back('\n'); } } free(strings); } const char *Exception::stackTrace() throw(){ return stack_.c_str(); }
1.5 打包成Echo網絡庫放進系統庫文件中
1.5.1 步驟
a)執行make,生成echo頭文件夾和靜態庫libecho.a;
將echo安裝到/usr/include/下,將libecho.a放置/usr/lib/下。
編譯的時候須要加上-std=c++0x -lecho -lpthread
b)Makefile
.PHONY:clean CC=g++ CFLAGS=-Wall -g BIN=libecho.a OBJS=InetAddress.o Socket.o Rio.o TcpConnection.o PollPoller.o TcpServer.o Thread.o Condition.o ThreadPool.o Exception.o STARD=-std=c++0x -rdynamic LINKS=-lpthread $(BIN):$(OBJS) ar -crv $@ -o $^ chmod +x $@ rm -f *.o mkdir echo cp *.h echo/ %.o:%.cpp $(CC) $(CFLAGS) -c $< -o $@ $(STARD) clean: rm -rf *.o $(BIN) echo
1.6 小結(重點)
a)NonCopyable、Copyable表示對象是否具備value語義(複製和賦值),Echo中除了InetAddress以外,其他均禁用掉了value語義,這是爲了不潛在的BUG;
b)Exception相比標準庫的exception,增長了打印棧痕跡的功能;
c)ThreadPool系列,主要包含MutexLock、Condition、Thread、ThreadPool。其中大量採用了RAII技術,避免資源的泄露,對於Thread和ThreadPool,咱們採用了function做爲泛型技術,用戶只需註冊回調函數。
d)Timer,內部採用timerfd系列的定時器,不使用信號,而是使用fd可讀做爲定時器的觸發事件,這使得Timer能夠加入到IO複用模型,咱們採用的是Poll模型。也能夠單獨把Timer放到一個線程,這就是TimerThread的產生。
e)用戶註冊事件與回調流程:先註冊給TcpServer,而後是PollPoller,以後是TcpConnection,這樣完成了事件的註冊。回調函數由PollPoller觸發,經過map尋找到Tcp鏈接,而後調用裏面的回調函數。
f)TcpServer實質是一個IO複用模型,ThreadPool則是代筆多線程。用戶在使用時,能夠只選擇其一。若是計算任務負擔較重,能夠將計算任務與Tcp回發封裝成函數,交給線程池去計算。
g)此時,運行TcpServer的線程是一個IO線程,ThreadPool 裏面的線程專一於CPU密集型計算。
2.Echo 網絡庫的使用
2.1 轉換大小寫回顯服務器
2.1.1 轉換大小寫回顯服務器分析:
a)根據現有的網絡庫採用類的組合能夠編寫出新的服務器類,這個服務器類要作的事情有,設置回調函數,開啓服務器兩件事。
b)第一件事在Server類的構造函數中設置,所以本類中須要有要綁定的回調函數,這些回調函數主要是用來處理鏈接發生和關閉以及傳送數據時的情況。此外,對於任務型服務器,這裏能夠和線程池組合,將新任務放到任務隊列中,供線程池調度,對於任務較輕的服務器能夠不使用線程池。這裏將轉換大小寫和回顯操做封裝成 compute 函數,每次來一個任務就加入到任務隊列中。
2.1.2 程序示例
#ifndef __ECHO_SERVER_H #define __ECHO_SERVER_H #include <echo/TcpServer.h> #include <echo/ThreadPool.h> #include <echo/NonCopyable.h> class EchoServer : NonCopyable{ public: EchoServer(const InetAddress &addr); void start(); private: void onConnection(const TcpConnectionPtr &conn); void onMessage(const TcpConnectionPtr &conn); void onClose(const TcpConnectionPtr &conn); void compute(std::string s, const TcpConnectionPtr &conn); //轉換大小寫函數 TcpServer server_; ThreadPool pool_; }; #endif /*__ECHO_SERVER_H*/ #include "EchoServer.h" #include <functional> #include <iostream> using namespace std; using namespace placeholders; EchoServer::EchoServer(const InetAddress &addr) :server_(addr), pool_(1000, 4){// 任務數組的大小 和 線程池的大小 server_.setConnection(bind(&EchoServer::onConnection, this, _1)); server_.setMessage(bind(&EchoServer::onMessage, this, _1)); server_.setClose(bind(&EchoServer::onClose, this, _1)); } void EchoServer::onConnection(const TcpConnectionPtr &conn){ conn->send("Hello! Welcome to Echo Server!\r\n"); } void EchoServer::onMessage(const TcpConnectionPtr &conn){ string s = conn->receive(); cout << "recv : " << s; pool_.add_task(bind(&EchoServer::compute, this, s, conn)); } void EchoServer::onClose(const TcpConnectionPtr &conn){ cout << "Client closed\r\n" << endl; conn->shutdown(); } void EchoServer::compute(string s, const TcpConnectionPtr &conn){ int i; for(i = 0; i < s.size(); i++){ if(isupper(s[i])) s[i] = tolower(s[i]); else if(islower(s[i])) s[i] = toupper(s[i]); } conn->send(s); } void EchoServer::start(){ pool_.start(); server_.start(); } #include "EchoServer.h" using namespace std; int main(int argc, const char *argv[]) { EchoServer server(InetAddress(9999)); server.start(); return 0; }
2.2 文本查詢服務器
2.2.1 文本查詢程序分析:
a)程序的要求是,用戶輸入一個單詞,輸出該單詞的詞頻和每次出現該單詞的一行文本;
b)數據結構,用一個vector數組存儲要錄入文本的每行內容,用map<int, set>組合來實現單詞和行號集合的映射關係,這樣每次查詢時,會返回一個行號的集合set,根據set中的元素依次輸出vector中的元素內容便可;
c)本程序在構造函數中錄入文本,填充上述兩個數據結構,給出查詢接口,最後須要對查詢結果拼接成字符串,這在以後的網絡傳輸中比較方便。
d)最後使用Echo網絡庫將文本查詢類和TcpServer以及線程池類組合,封裝出本查詢服務器類,這裏相對上例只須要修改任務函數,此外還要注意,在用戶輸入的字符串在傳輸中加了\r\n,所以服務器須要對接收到的字符處理一下,不然將不匹配。
2.2.2 程序示例
#ifndef __TEXTQUERY_H__ #define __TEXTQUERY_H__ #include <string> #include <vector> #include <map> #include <set> class TextQuery{ public: typedef std::vector<std::string>::size_type LineNo; //行號的類型 TextQuery(const std::string &filename); std::string run_query(const std::string &word) const; std::string print_result(const std::string &word, const std::set<LineNo> &nums) const; private: void read_file(const std::string &filename); std::vector<std::string> lines_; //存儲文檔中每行的內容 std::map<std::string, std::set<LineNo> > word_map_; //單詞和行號之間的映射 }; #endif /*__TEXTQUERY_H__*/ #include "TextQuery.h" #include <sstream> #include <fstream> #include <echo/Exception.h> #include <stdio.h> using namespace std; TextQuery::TextQuery(const string &filename){ read_file(filename); } string TextQuery::run_query(const string &word) const{ map<string, set<LineNo> >::const_iterator loc = word_map_.find(word); if(loc == word_map_.end()) return print_result(word, set<LineNo>()); else return print_result(word, loc->second); } string TextQuery::print_result(const string &word, const set<LineNo> &nums) const{ string ret; char buf[16]; snprintf(buf, sizeof buf, "%u", nums.size()); ret += word + " occurs " + buf + " times\n"; set<LineNo>::const_iterator it = nums.begin(); for(; it != nums.end(); it++){ snprintf(buf, sizeof buf, "%u", *it + 1); ret += string("(line ") + buf + " ) "; ret += lines_[*it] + "\n"; } return ret; } void TextQuery::read_file(const string &filename){ ifstream in; in.open(filename.c_str()); if(!in) throw Exception("open file fail"); //讀入每一行的內容到數組vector中 string line; while(getline(in, line)) lines_.push_back(line); in.close(); //將每一行的內容抽取出單詞和行號 存入map中 for(LineNo num = 0; num != lines_.size(); num++){ istringstream line(lines_[num]); string word; while(line >> word){ word_map_[word].insert(num);//這裏word_map_[word]是一個set類型 } } } #ifndef __QUERY_SERVER_H #define __QUERY_SERVER_H #include <echo/TcpServer.h> #include <echo/ThreadPool.h> #include <echo/NonCopyable.h> #include "TextQuery.h" class QueryServer : NonCopyable{ public: QueryServer(const InetAddress &addr, const std::string &filename); void start(); private: void onConnection(const TcpConnectionPtr &conn); void onMessage(const TcpConnectionPtr &conn); void onClose(const TcpConnectionPtr &conn); void query(const std::string s, const TcpConnectionPtr &conn); //轉換大小寫函數 TcpServer server_; TextQuery query_; ThreadPool pool_; }; #endif /*__QUERY_SERVER_H*/ #include "QueryServer.h" #include <functional> #include <iostream> #include <fstream> #include <set> #include <stdio.h> using namespace std; using namespace placeholders; QueryServer::QueryServer(const InetAddress &addr, const string &filename) :server_(addr), pool_(1000, 4), query_(filename){// 任務數組的大小 和 線程池的大小 server_.setConnection(bind(&QueryServer::onConnection, this, _1)); server_.setMessage(bind(&QueryServer::onMessage, this, _1)); server_.setClose(bind(&QueryServer::onClose, this, _1)); } void QueryServer::onConnection(const TcpConnectionPtr &conn){ conn->send("Hello! Welcome to Echo Server!\r\n"); } void QueryServer::onMessage(const TcpConnectionPtr &conn){ string s = conn->receive(); cout << "recv : " << s; pool_.add_task(bind(&QueryServer::query, this, s, conn)); } void QueryServer::onClose(const TcpConnectionPtr &conn){ cout << "Client closed\r\n" << endl; conn->shutdown(); } void QueryServer::query(const string s, const TcpConnectionPtr &conn){ string word = s; if(word.substr(word.size()-2, 2) == "\r\n"){ word.erase(word.size() - 1); word.erase(word.size() - 1); } string ret = query_.run_query(word); cout << ret; conn->send(ret + "\r\n"); } void QueryServer::start(){ pool_.start(); server_.start(); } #include "QueryServer.h" using namespace std; int main(int argc, const char *argv[]) { QueryServer server(InetAddress(9999), argv[1]); server.start(); return 0; }
2.3 聊天室服務器
2.3.1 使用Echo網絡庫編寫一個簡單的聊天室,將一我的的消息轉發給全部在線的用戶,而且用一個靜態變量限制最大鏈接數。
2.3.2 程序示例
#ifndef __CHAT_SERVER_H #define __CHAT_SERVER_H #include <echo/NonCopyable.h> #include <echo/TcpServer.h> #include <set> class ChatServer : NonCopyable{ public: ChatServer(const InetAddress &addr); void start(); private: void onConnection(const TcpConnectionPtr &conn); void onMessage(const TcpConnectionPtr &conn); void onClose(const TcpConnectionPtr &conn); TcpServer server_; std::set<TcpConnectionPtr> clients_; //用set而不用vector 保證惟一性 const static size_t kMaxClients_ = 3; //用來控制鏈接數 }; #endif /*__CHAT_SERVER_H*/ #include "ChatServer.h" #include <iostream> using namespace std; using namespace placeholders; ChatServer::ChatServer(const InetAddress &addr) :server_(addr){ server_.setConnection(bind(&ChatServer::onConnection, this, _1)); server_.setMessage(bind(&ChatServer::onMessage, this, _1)); server_.setClose(bind(&ChatServer::onClose, this, _1)); } void ChatServer::onConnection(const TcpConnectionPtr &conn){ if(clients_.size() >= kMaxClients_){ conn->send("Chat Server is full, please try later\r\n"); conn->shutdown(); return; } clients_.insert(conn); cout << "New Client online IP: " << conn->getPeerAddress().toIP() \ << " Port: " << conn->getPeerAddress().toPort() << endl; conn->send("Hello ! Welcome to ChatRoom!\r\n");// } void ChatServer::onMessage(const TcpConnectionPtr &conn){ string s = "recv from " + conn->getPeerAddress().toIP() + " Massage: "; s += conn->receive(); set<TcpConnectionPtr>::iterator it = clients_.begin(); for(; it != clients_.end(); it ++){ (*it)->send(s); } } void ChatServer::onClose(const TcpConnectionPtr &conn){ cout << "client " << conn->getPeerAddress().toIP() << " leave !" << endl; clients_.erase(conn); conn->shutdown(); } void ChatServer::start(){ server_.start(); } ➜ chat_server ➜ chat_server cat ChatServer.h ChatServer.cpp main.cpp #ifndef __CHAT_SERVER_H #define __CHAT_SERVER_H #include <echo/NonCopyable.h> #include <echo/TcpServer.h> #include <set> class ChatServer : NonCopyable{ public: ChatServer(const InetAddress &addr); void start(); private: void onConnection(const TcpConnectionPtr &conn); void onMessage(const TcpConnectionPtr &conn); void onClose(const TcpConnectionPtr &conn); TcpServer server_; std::set<TcpConnectionPtr> clients_; //用set而不用vector 保證惟一性 const static size_t kMaxClients_ = 3; //用來控制鏈接數 }; #endif /*__CHAT_SERVER_H*/ #include "ChatServer.h" #include <iostream> using namespace std; using namespace placeholders; ChatServer::ChatServer(const InetAddress &addr) :server_(addr){ server_.setConnection(bind(&ChatServer::onConnection, this, _1)); server_.setMessage(bind(&ChatServer::onMessage, this, _1)); server_.setClose(bind(&ChatServer::onClose, this, _1)); } void ChatServer::onConnection(const TcpConnectionPtr &conn){ if(clients_.size() >= kMaxClients_){ conn->send("Chat Server is full, please try later\r\n"); conn->shutdown(); return; } clients_.insert(conn); cout << "New Client online IP: " << conn->getPeerAddress().toIP() \ << " Port: " << conn->getPeerAddress().toPort() << endl; conn->send("Hello ! Welcome to ChatRoom!\r\n");// } void ChatServer::onMessage(const TcpConnectionPtr &conn){ string s = "recv from " + conn->getPeerAddress().toIP() + " Massage: "; s += conn->receive(); set<TcpConnectionPtr>::iterator it = clients_.begin(); for(; it != clients_.end(); it ++){ (*it)->send(s); } } void ChatServer::onClose(const TcpConnectionPtr &conn){ cout << "client " << conn->getPeerAddress().toIP() << " leave !" << endl; clients_.erase(conn); conn->shutdown(); } void ChatServer::start(){ server_.start(); } #include "ChatServer.h" int main(int argc, const char *argv[]) { ChatServer server(InetAddress(9999)); server.start(); return 0; }