Github源碼目錄: https://github.com/chenshuo/m...git
編譯muduo碰見的報錯能夠在github上的issue上面查找。通常都能順利解決,我遇到的就是沒有安裝boost-dev.github
centos7系統 執行: sudo yum install boost-dev
編程
圖片截取自《Linux多線程服務端編程:使用muduo C++網絡庫》
centos
摘錄一個examples裏面的pingpong爲例網絡
server.cc 設置回調,開啓服務
void onConnection(const TcpConnectionPtr& conn) { if (conn->connected()) { conn->setTcpNoDelay(true); } } void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp) { conn->send(buf); } int main(int argc, char* argv[]) { if (argc < 4) { fprintf(stderr, "Usage: server <address> <port> <threads>\n"); } else { LOG_INFO << "pid = " << getpid() << ", tid = " << CurrentThread::tid(); Logger::setLogLevel(Logger::WARN); const char* ip = argv[1]; uint16_t port = static_cast<uint16_t>(atoi(argv[2])); InetAddress listenAddr(ip, port); int threadCount = atoi(argv[3]); EventLoop loop; TcpServer server(&loop, listenAddr, "PingPong"); server.setConnectionCallback(onConnection); server.setMessageCallback(onMessage); if (threadCount > 1) { server.setThreadNum(threadCount); } server.start(); loop.loop(); } }
main函數中新建TcpServer 對象,TcpServer構造函數會新建Acceptor對象。Acceptor構造函數中定義了channel對象。channel中包含EventLoop。EventLoop裏面根據poller來獲取各類事件。
多線程
代碼能夠從main入手,從下往上看。這裏便於理清原理,倒敘表達app
1.poller 2種模式之epoll
fillActiveChannels(numEvents, activeChannels);
channel->set_revents(events_[i].events);
socket
EPollPoller::EPollPoller(EventLoop* loop) : Poller(loop), epollfd_(::epoll_create1(EPOLL_CLOEXEC)), events_(kInitEventListSize) { if (epollfd_ < 0) { LOG_SYSFATAL << "EPollPoller::EPollPoller"; } } EPollPoller::~EPollPoller() { ::close(epollfd_); } Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels) { LOG_TRACE << "fd total count " << channels_.size(); int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), timeoutMs); int savedErrno = errno; Timestamp now(Timestamp::now()); if (numEvents > 0) { LOG_TRACE << numEvents << " events happened"; fillActiveChannels(numEvents, activeChannels); if (implicit_cast<size_t>(numEvents) == events_.size()) { events_.resize(events_.size()*2); } } else if (numEvents == 0) { LOG_TRACE << "nothing happened"; } else { // error happens, log uncommon ones if (savedErrno != EINTR) { errno = savedErrno; LOG_SYSERR << "EPollPoller::poll()"; } } return now; } Poller* Poller::newDefaultPoller(EventLoop* loop) { if (::getenv("MUDUO_USE_POLL")) { return new PollPoller(loop); } else { return new EPollPoller(loop); } } void EPollPoller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const { assert(implicit_cast<size_t>(numEvents) <= events_.size()); for (int i = 0; i < numEvents; ++i) { Channel* channel = static_cast<Channel*>(events_[i].data.ptr); #ifndef NDEBUG int fd = channel->fd(); ChannelMap::const_iterator it = channels_.find(fd); assert(it != channels_.end()); assert(it->second == channel); #endif channel->set_revents(events_[i].events); activeChannels->push_back(channel); } }
2.EventLoop 獲取各類事件
poller_(Poller::newDefaultPoller(this))
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
currentActiveChannel_->handleEvent(pollReturnTime_);
tcp
EventLoop::EventLoop() : looping_(false), quit_(false), eventHandling_(false), callingPendingFunctors_(false), iteration_(0), threadId_(CurrentThread::tid()), poller_(Poller::newDefaultPoller(this)), timerQueue_(new TimerQueue(this)), wakeupFd_(createEventfd()), wakeupChannel_(new Channel(this, wakeupFd_)), currentActiveChannel_(NULL) { LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_; if (t_loopInThisThread) { LOG_FATAL << "Another EventLoop " << t_loopInThisThread << " exists in this thread " << threadId_; } else { t_loopInThisThread = this; } wakeupChannel_->setReadCallback( boost::bind(&EventLoop::handleRead, this)); // we are always reading the wakeupfd wakeupChannel_->enableReading(); } void EventLoop::loop() { assert(!looping_); assertInLoopThread(); looping_ = true; quit_ = false; // FIXME: what if someone calls quit() before loop() ? LOG_TRACE << "EventLoop " << this << " start looping"; while (!quit_) { activeChannels_.clear(); pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); ++iteration_; if (Logger::logLevel() <= Logger::TRACE) { printActiveChannels(); } // TODO sort channel by priority eventHandling_ = true; for (ChannelList::iterator it = activeChannels_.begin(); it != activeChannels_.end(); ++it) { currentActiveChannel_ = *it; currentActiveChannel_->handleEvent(pollReturnTime_); } currentActiveChannel_ = NULL; eventHandling_ = false; doPendingFunctors(); } LOG_TRACE << "EventLoop " << this << " stop looping"; looping_ = false; }
3.channel 分發事件根據EventLoop
revents_ 在epoll中已經設置
函數
void Channel::handleEvent(Timestamp receiveTime) { boost::shared_ptr<void> guard; if (tied_) { guard = tie_.lock(); if (guard) { handleEventWithGuard(receiveTime); } } else { handleEventWithGuard(receiveTime); } } void Channel::handleEventWithGuard(Timestamp receiveTime) { eventHandling_ = true; LOG_TRACE << reventsToString(); if ((revents_ & POLLHUP) && !(revents_ & POLLIN)) { if (logHup_) { LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLHUP"; } if (closeCallback_) closeCallback_(); } if (revents_ & POLLNVAL) { LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLNVAL"; } if (revents_ & (POLLERR | POLLNVAL)) { if (errorCallback_) errorCallback_(); } if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) { if (readCallback_) readCallback_(receiveTime); } if (revents_ & POLLOUT) { if (writeCallback_) writeCallback_(); } eventHandling_ = false; }
4.Acceptor
loop_(loop),
acceptChannel_(loop, acceptSocket_.fd()),
acceptChannel_.setReadCallback(boost::bind(&Acceptor::handleRead, this));
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport) : loop_(loop), acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())), acceptChannel_(loop, acceptSocket_.fd()), listenning_(false), idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC)) { assert(idleFd_ >= 0); acceptSocket_.setReuseAddr(true); acceptSocket_.setReusePort(reuseport); acceptSocket_.bindAddress(listenAddr); acceptChannel_.setReadCallback( boost::bind(&Acceptor::handleRead, this)); } void Acceptor::handleRead() { loop_->assertInLoopThread(); InetAddress peerAddr; //FIXME loop until no more int connfd = acceptSocket_.accept(&peerAddr); if (connfd >= 0) { // string hostport = peerAddr.toIpPort(); // LOG_TRACE << "Accepts of " << hostport; if (newConnectionCallback_) { newConnectionCallback_(connfd, peerAddr); } else { sockets::close(connfd); } } else { LOG_SYSERR << "in Acceptor::handleRead"; // Read the section named "The special problem of // accept()ing when you can't" in libev's doc. // By Marc Lehmann, author of libev. if (errno == EMFILE) { ::close(idleFd_); idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL); ::close(idleFd_); idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC); } } }
5.TcpServer
acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),
ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
tcpserver 在獲得⌈ 新鏈接newConnection之後,會新建一個 TcpConnection ⌋
來處理後續協議報文的發送接收
TcpServer::TcpServer(EventLoop* loop, const InetAddress& listenAddr, const string& nameArg, Option option) : loop_(CHECK_NOTNULL(loop)), ipPort_(listenAddr.toIpPort()), name_(nameArg), acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)), threadPool_(new EventLoopThreadPool(loop, name_)), connectionCallback_(defaultConnectionCallback), messageCallback_(defaultMessageCallback), nextConnId_(1) { acceptor_->setNewConnectionCallback( boost::bind(&TcpServer::newConnection, this, _1, _2)); } void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) { loop_->assertInLoopThread(); EventLoop* ioLoop = threadPool_->getNextLoop(); char buf[64]; snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_); ++nextConnId_; string connName = name_ + buf; LOG_INFO << "TcpServer::newConnection [" << name_ << "] - new connection [" << connName << "] from " << peerAddr.toIpPort(); InetAddress localAddr(sockets::getLocalAddr(sockfd)); // FIXME poll with zero timeout to double confirm the new connection // FIXME use make_shared if necessary TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr)); connections_[connName] = conn; conn->setConnectionCallback(connectionCallback_); conn->setMessageCallback(messageCallback_); conn->setWriteCompleteCallback(writeCompleteCallback_); conn->setCloseCallback( boost::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn)); } void TcpServer::start() { if (started_.getAndSet(1) == 0) { threadPool_->start(threadInitCallback_); assert(!acceptor_->listenning()); loop_->runInLoop( boost::bind(&Acceptor::listen, get_pointer(acceptor_))); } }
6.TcpConnection
註冊各類IO事件
TcpConnection::TcpConnection(EventLoop* loop, const string& nameArg, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr) : loop_(CHECK_NOTNULL(loop)), name_(nameArg), state_(kConnecting), reading_(true), socket_(new Socket(sockfd)), channel_(new Channel(loop, sockfd)), localAddr_(localAddr), peerAddr_(peerAddr), highWaterMark_(64*1024*1024) { channel_->setReadCallback( boost::bind(&TcpConnection::handleRead, this, _1)); channel_->setWriteCallback( boost::bind(&TcpConnection::handleWrite, this)); channel_->setCloseCallback( boost::bind(&TcpConnection::handleClose, this)); channel_->setErrorCallback( boost::bind(&TcpConnection::handleError, this)); LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this << " fd=" << sockfd; socket_->setKeepAlive(true); }
7.main使用
EventLoop loop;
TcpServer server(&loop, listenAddr, "PingPong");
server.start();
loop.loop();
void onConnection(const TcpConnectionPtr& conn) { if (conn->connected()) { conn->setTcpNoDelay(true); } } void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp) { conn->send(buf); } int main(int argc, char* argv[]) { if (argc < 4) { fprintf(stderr, "Usage: server <address> <port> <threads>\n"); } else { LOG_INFO << "pid = " << getpid() << ", tid = " << CurrentThread::tid(); Logger::setLogLevel(Logger::WARN); const char* ip = argv[1]; uint16_t port = static_cast<uint16_t>(atoi(argv[2])); InetAddress listenAddr(ip, port); int threadCount = atoi(argv[3]); EventLoop loop; TcpServer server(&loop, listenAddr, "PingPong"); server.setConnectionCallback(onConnection); server.setMessageCallback(onMessage); if (threadCount > 1) { server.setThreadNum(threadCount); } server.start(); loop.loop(); } }