在使用epoll實現實際的傳輸層以前,先設計一個抽象的傳輸層,這個抽象的傳輸層是傳輸層實現的接口層。c++
接口層中一共有如下幾個通用的類或者接口:bash
(1)Socket:通用的套接字層,用於封裝本地套接字,同時會在析構時自動關閉套接字,避免資源泄漏服務器
(2)DataSink:通用的數據接收層,當傳輸層接收到數據時,會經過用戶定義的DataSink對象傳輸到外部socket
(3)IStream:通用的數據流程,表明可讀/寫的字節流類的接口ide
(4)IConnectable:一個接口,表示能夠連接到其它服務器工具
(5)BasicServer:基本的服務器類,繼承了Socket類oop
(6)BasicStream:基本的數據流淚,繼承IStream和Socket類測試
#ifndef SOCKET #define SOCKET int32_t #endif typedef SOCKET NativeSocket; class Socket { public: Socket() : _nativeSocket(0) { } Socket(NativeSocket nativeSocket) :_nativeSocket(nativeSocket) { } virtual ~Socket() { close(_nativeSocket); } NativeSocket GetNativeSocket() const {return _nativeSocket;} void SetNativeSocket(NativeSocket nativeSocket) {_nativeSocket = nativeSocket;} private: NativeSocket _nativeSocket; };
class DataSink { public: virtual int32_t OnDataIndication(IStream *stream, const char *buf, int64_t bytes) = 0; };
class IStream { public: typedef std::function<void(const char* buf, int64_t size)> DataIndicationHandler; virtual int32_t Receive(char *buffer, int32_t buffSize, int32_t& readSize) = 0; virtual int32_t Send(const ByteArray& byteArray) = 0; virtual void OnDataIndication(DataIndicationHandler handler) = 0; virtual DataIndicationHandler GetDataIndication() = 0; };
class BasicStream : public Socket, public IStream { public: BasicStream(NativeSocket nativeSocket) : Socket(nativeSocket) {} BasicStream(const BasicStream& stream) = delete; virtual void SetDataSink(DataSink* dataSink) { _dataSink = dataSink; } virtual DataSink* GetDataSink() { return _dataSink; } virtual const DataSink* GetDataSink() const { return _dataSink; } private: DataSink* _dataSink; };
class IConnectable { public: virtual void Connect(const std::string& host, int32_t port) = 0; };
template <class ConnectionType> class BasicServer : public Socket { public: typedef std::function<void(IStream* stream)> ConnectIndicationHandler; typedef std::function<void(IStream* stream)> DisconnectIndicationHandler; BasicServer() { } virtual int32_t Listen(const std::string& host, int32_t port, int backlog) = 0; virtual void OnConnectIndication(ConnectIndicationHandler handler) = 0; virtual void OnDisconnectIndication(DisconnectIndicationHandler handler) = 0; virtual ConnectionType Accept(int32_t listenfd) = 0; };
在前面的內容中已經完成了抽象TP傳輸層和基礎工具(消息隊列、線程池、緩衝區抽象、事件循環和日誌工具)的實現,接下來在抽象TP傳輸層和基礎工具的基礎上完成基於epoll機制服務器和客戶端的實現。ui
#pragma once #include <string> #include <sys/types.h> #include <sys/socket.h> #include <sys/time.h> #include <netinet/in.h> #include <arpa/inet.h> #include <unistd.h> #include "../include/Net.h" namespace meshy { class EpollStream : public BasicStream { public: EpollStream(NativeSocket nativeSocket) : BasicStream(nativeSocket){} int32_t Send(const ByteArray& byteArray) override { const char *buf = byteArray.data(); int32_t size = byteArray.size(); int32_t n = size; while(n > 0) { int32_t writeSize = write(GetNativeSocket(), buf + size - n, n); if(writeSize < n) { if (writeSize == -1 && errno != EAGAIN) { TRACE_ERROR("FATAL write data to peer failed!"); } break; } n -= writeSize; } return size; } int32_t Receive(char *buffer, int32_t buffSize, int32_t& readSize) override { readSize = 0; int32_t nread = 0; //NativeSocketEvent ev; while ((nread = read(GetNativeSocket(), buffer + readSize, buffSize - 1)) > 0) { readSize += nread; } return nread; } void OnDataIndication(DataIndicationHandler handler) override { } DataIndicationHandler GetDataIndication() override { return _handler; } private: DataIndicationHandler _handler; }; typedef shared_ptr<EpollStream> EpollStreamPtr; }
#pragma once #include <string> #include "../include/Net.h" #include "EpollStream.h" namespace meshy { class EpollServer : public BasicServer<EpollStreamPtr> { public: EpollServer(){} virtual int32_t Listen(const std::string& host, int32_t port, int backlog = 1) override; virtual EpollStreamPtr Accept(int32_t eventfd); virtual void OnConnectIndication(ConnectIndicationHandler handler) override { _connectHandler = handler; } virtual void OnDisconnectIndication(DisconnectIndicationHandler handler) override { _disconnectIndication = handler; } private: int32_t _SetNonBlocking(int32_t fd); int32_t _bind(const std::string& host, int32_t port); ConnectIndicationHandler _connectHandler; DisconnectIndicationHandler _disconnectIndication; }; }
#include "EpollServer.h" #include "EpollLoop.h" #include <unistd.h> #include <fcntl.h> #include <sys/types.h> #include <sys/socket.h> namespace meshy { int32_t EpollServer::_bind(const std::string& host, int32_t port) { int32_t listenfd; if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { TRACE_ERROR("Create socket failed!"); return -1; } int32_t option = 1; setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option)); _SetNonBlocking(listenfd); SetNativeSocket(listenfd); sockaddr_in addr; bzero(&addr, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr(host.c_str()); int32_t errorCode = bind(listenfd, (struct sockaddr *) &addr, sizeof(addr)); if (errorCode < 0) { TRACE_ERROR("Bind socket failed!"); return errorCode; } return 0; } int32_t EpollServer::Listen(const std::string& host, int32_t port, int backlog) { _bind(host, port); int32_t listenfd = GetNativeSocket(); int32_t errorCode = listen(listenfd, backlog); if (-1 == errorCode) { TRACE_ERROR("Listen socket failed!"); return errorCode; } TRACE_DEBUG("Listen Success"); errorCode = EpollLoop::Get()->AddEpollEvents(EPOLLIN | EPOLLET, listenfd); if(errorCode < 0) { TRACE_ERROR("epoll_ctl faild : listen socket"); return errorCode; } EpollLoop::Get()->AddServer(listenfd, this); return 0; } EpollStreamPtr EpollServer::Accept(int32_t eventfd) { sockaddr_in addr; int32_t addrlen; memset(&addr, 0, sizeof(sockaddr_in)); NativeSocket connSocket; NativeSocket listenfd = GetNativeSocket(); while((connSocket = accept(listenfd, (sockaddr*)&addr, (socklen_t*)&addrlen)) > 0) { // 獲得客戶端IP getpeername(connSocket, (struct sockaddr *)&addr, (socklen_t*)&addrlen); std::ostringstream os; os.str(""); os << "client:" << inet_ntoa(addr.sin_addr) << "connect success"; std::string strInfo = os.str(); TRACE_DEBUG(strInfo); _SetNonBlocking(connSocket); int32_t errorCode = EpollLoop::Get()->AddEpollEvents(EPOLLIN | EPOLLET, connSocket); if(errorCode < 0) { TRACE_ERROR("epoll_ctl faild : conn socket"); } EpollStreamPtr connection = std::make_shared<EpollStream>(connSocket); //if ( _connectHandler ) { // _connectHandler(connection.get()); //} return connection; } if (connSocket == -1) { TRACE_ERROR("Accept error"); } return EpollStreamPtr(nullptr); } // 設置阻塞模式 int32_t EpollServer::_SetNonBlocking(int32_t fd) { int32_t opts = fcntl(fd, F_GETFL); if(opts < 0) { TRACE_ERROR("fcntl(F_GETFL)"); return -1; } opts = (opts | O_NONBLOCK); if(fcntl(fd, F_SETFL, opts) < 0) { TRACE_ERROR("fcntl(F_SETFL)"); return -1; } return 0; } }
// 基於Epoll實現的客戶端 #include "../incldue/Net.h" #include <memory> class EpollClient : public IConnectable, public BasicStream { typdef std::shared_ptr<EpollClient*> EpollClientPtr; public: EpollClient(const EPollClient&) = delete; EpollClient& operator=(const EpollClient&) = delete; virtual int32_t Receive(char *buffer, int32_t buffSize, int32_t& readSize) override; virtual int32_t Send(const ByteArray& byteArray) override; virtual void OnConnectIndication(ConnectIndicationHandler handler) override { _connectHandler = handler; } virtual void OnDisconnectIndication(DisconnectIndicationHandler handler) override { _disconnectIndication = handler; } virtual void Connect(const std::string& host, int32_t port) override; static EpollClientPtr Connect(const std::string& host, int32_t port); protected: EpollClient(NativeSocket nativeSocket) :BasicStream(nativeSocket) { } int32_t _SetNonBlocking(int32_t fd); private: ConnectIndicationHandler _connectHandler; DisconnectIndicationHandler _disconnectIndication; }
#include "EpollClient.h" #include "EpollLoop.h" #include <unistd.h> #include <fcntl.h> #include <sys/types.h> #include <sys/socket.h> int32_t EpollClient::Receive(char *buffer, int32_t buffSize, int32_t& readSize) { readSize = 0; int32_t nread = 0; //NativeSocketEvent ev; while ((nread = read(GetNativeSocket(), buffer + readSize, buffSize - 1)) > 0) { readSize += nread; } return nread; } int32_t EpollClient::Send(const ByteArray& byteArray) { const char *buf = byteArray.data(); int32_t size = byteArray.size(); int32_t n = size; while(n > 0) { int32_t writeSize = write(GetNativeSocket(), buf + size - n, n); if(writeSize < n) { if (writeSize == -1 && errno != EAGAIN) { TRACE_ERROR("FATAL write data to peer failed!"); } break; } n -= writeSize; } return size; } void EpollClient::Connect(const std::string& host, int32_t port) { sockaddr_in addr; bzero(&addr, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr(host.c_str()); _SetNonBlocking(GetNativeSocket()); if (connect(sock, (sockaddr *)&addr, sizeof(addr)) < 0) { TREACE_ERROR("connect faild"); return; } } EpollClientPtr EpollClient::Connect(const std::string& host, int32_t port) { NativeSocket sock = socket(AF_INET, SOCK_STREAM, 0); if(sock < 0) { TRACE_ERROR("EpollClient socket faild"); retrun nullptr; } EpollClientPtr client = EpollStreamPtr(new EpollClient(sock)); client->Connect(host, port); EpollLoop *pEpollLoop = EpollLoop::Get(); if(pEpollLoop) { if(pEpollLoop->AddEpollEvents(EPOLLIN | EPOLLET, sock) < 0) { TRACE_ERROR("cleint AddEpollEvents faild"); return nullptr; } } pEpollLoop->AddStream(client); return client; } // 設置阻塞模式 int32_t EpollServer::_SetNonBlocking(int32_t fd) { int32_t opts = fcntl(fd, F_GETFL); if(opts < 0) { TRACE_ERROR("fcntl(F_GETFL)"); return -1; } opts = (opts | O_NONBLOCK); if(fcntl(fd, F_SETFL, opts) < 0) { TRACE_ERROR("fcntl(F_SETFL)"); return -1; } return 0; }
#pragma once #include "../include/Loop.h" #include "../include/Logger.h" #include "EpollServer.h" #include <stdint.h> #include <map> #include <stdint.h> #include <sys/epoll.h> namespace meshy { #define FD_SIZE 1024 #define BUFF_SIZE 1024 class EpollLoop : public Loop { public: static EpollLoop* Get(); virtual ~EpollLoop(); int32_t AddEpollEvents(int32_t events, int32_t fd); // 第二個參數爲要監聽的描述符 int32_t ModifyEpollEvents(int32_t events, int32_t fd); int32_t DelEpollEvents(int32_t events, int32_t fd); void AddServer(NativeSocket socket, EpollServer* server); void AddStream(EpollStreamPtr stream); protected: EpollLoop(); void _Run() override; void _EpollThread(); void _HandleEvent(int32_t eventfd, struct epoll_event* events, int32_t nfds); void _Initialize(); int32_t _Accept(int32_t eventfd, int32_t listenfd); void _Read(int32_t eventfd, int32_t fd, uint32_t events); private: int32_t _eventfd; bool _shutdown; std::map<NativeSocket, EpollServer*> _servers; std::map <NativeSocket, EpollStreamPtr> _streams; }; }
#include "EpollLoop.h" #include "../include/util.h" #include <thread> namespace meshy { EpollLoop* EpollLoop::Get() { static EpollLoop epoolLoop; return &epoolLoop; } EpollLoop::EpollLoop() { _Initialize(); _shutdown = false; } EpollLoop::~EpollLoop() { _shutdown = true; } int32_t EpollLoop::AddEpollEvents(int32_t events, int32_t fd) { epoll_event ev; ev.events = events; ev.data.fd = fd; return epoll_ctl(_eventfd, EPOLL_CTL_ADD, fd, &ev); } int32_t EpollLoop::ModifyEpollEvents(int32_t events, int32_t fd) { epoll_event event; event.events = events; event.data.fd = fd; return epoll_ctl(_eventfd, EPOLL_CTL_MOD, fd, &event); } int32_t EpollLoop::DelEpollEvents(int32_t events, int32_t fd) { epoll_event event; event.events = events; event.data.fd = fd; return epoll_ctl(_eventfd, EPOLL_CTL_DEL, fd, &event); } void EpollLoop::_Run() { std::thread ThreadProc(&EpollLoop::_EpollThread, this); ThreadProc.detach(); } void EpollLoop::_EpollThread() { TRACE_DEBUG("_EPollThread"); epoll_event events[FD_SIZE]; while (!_shutdown) { int32_t nfds = epoll_wait(_eventfd, events, FD_SIZE, -1); if (-1 == nfds) { TRACE_ERROR("FATAL epoll_wait failed!"); exit(EXIT_FAILURE); } _HandleEvent(_eventfd, events, nfds); } } void EpollLoop::_HandleEvent(int32_t eventfd, struct epoll_event* events, int32_t nfds) { for (int i = 0; i < nfds; ++i) { int32_t fd; fd = events[i].data.fd; // 獲得監聽的描述符 if (_servers.find(fd) != _servers.end()) { _Accept(eventfd, fd); continue; } if (events[i].events & EPOLLIN) { _Read(eventfd, fd, events[i].events); } if (events[i].events & EPOLLOUT) { } } } void EpollLoop::_Initialize() { _eventfd = epoll_create(FD_SIZE); if(_eventfd < 0) { TRACE_ERROR("epoll_create failed"); } std::ostringstream os; os.str(""); os << "epoll_create success, fdsize:" << FD_SIZE; std::string strInfo = os.str(); TRACE_DEBUG(strInfo); } int32_t EpollLoop::_Accept(int32_t eventfd, int32_t listenfd) { TRACE_DEBUG("_Accept"); EpollServer* server = _servers.find(listenfd)->second; EpollStreamPtr connection = server->Accept(eventfd); if (connection != nullptr) { // _streams[connection->GetNativeSocket()] = connection; AddStream(connection); return 0; } return -1; } void EpollLoop::_Read(int32_t eventfd, int32_t fd, uint32_t events) { TRACE_DEBUG("_Read"); EpollStreamPtr stream = _streams[fd]; char buffer[BUFF_SIZE] = {0}; int32_t readSize; int32_t nread = stream->Receive(buffer, BUFF_SIZE, readSize); //stream->SetEvents(events); if ((nread == -1 && errno != EAGAIN) || readSize == 0) { _streams.erase(fd); // Print error message char message[50]; sprintf(message, "errno: %d: %s, nread: %d, n: %d", errno, strerror(errno), nread, readSize); TRACE_WARNING(message); return; } char utf8_buff[BUFF_SIZE] = {0}; int32_t destreadsize = GB2312ToUTF8(buffer, utf8_buff, BUFF_SIZE); std::ostringstream os; os.str(""); os << "srcreadSize:" << readSize << "--destreadsize:" << BUFF_SIZE - destreadsize; std::string strInfo = os.str(); TRACE_DEBUG(strInfo); TRACE_INFO(std::string(utf8_buff)); // Write buf to the receive queue. //_Enqueue(stream, buffer, readSize); } void EpollLoop::AddServer(NativeSocket socket, EpollServer* server) { _servers.insert(std::pair<NativeSocket, EpollServer*>(socket, server)); } void EpollLoop::AddStream(EpollStreamPtr stream) { _streams[stream->GetNativeSocket()] = stream; } }
#include "stdio.h" #include "./epoll/EpollServer.h" #include "./epoll/EpollLoop.h" #include <thread> #include <chrono> int main() { meshy::EpollLoop *pEpollLoop = meshy::EpollLoop::Get(); pEpollLoop->Start(); std::string ip = "192.168.1.40"; int32_t port = 1122; meshy::EpollServer server; server.Listen(ip, port, 1); while (1) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } return 0; }
#!/bin/bash all: out COMPILE=g++ FLAGS=-Wall -g -O0 -std=c++11 -lpthread out: main.o Logger.o EpollServer.o EpollLoop.o $(COMPILE) $(FLAGS) -o out main.o Logger.o EpollServer.o EpollLoop.o main.o: main.cpp $(COMPILE) $(FLAGS) -c main.cpp Logger.o: Logger.cpp $(COMPILE) $(FLAGS) -c Logger.cpp EpollServer.o: ./epoll/EpollServer.cpp $(COMPILE) $(FLAGS) -c ./epoll/EpollServer.cpp EpollLoop.o: ./epoll/EpollLoop.cpp $(COMPILE) $(FLAGS) -c ./epoll/EpollLoop.cpp clean: rm *.o rm out
代碼下載地址: https://pan.baidu.com/s/1P4wM6pCz-S8Dbtf-4xKlqA