muduo是由陳碩(http://www.cnblogs.com/Solstice)開發的一個Linux多線程網絡庫,採用了不少新的Linux特性(例如eventfd、timerfd)和GCC內置函數。其主要特色爲: react
調用了以下GCC提供的原子操做內建函數: 安全
線程安全的隊列,內部實現爲std::deque<T> 性能優化
與BlockingQueue相似,可是內部容器基於boost::circular_buffer<T> 服務器
pthread_cond的封裝 網絡
CountDownLatch,相似發令槍,對condition的再包裝,能夠保證全部線程同時啓動。 多線程
backtrace_symbols和backtrace的包裝類 app
MutexLock:pthread_mutex_*的包裝類 socket
void* Thread::startThread(void* obj) ide { 函數 Thread* thread = static_cast<Thread*>(obj); thread->runInThread();//對func_的包裝,調用了func_ return NULL; }
void Thread::runInThread() { tid_ = CurrentThread::tid(); muduo::CurrentThread::t_threadName = name_.c_str(); try { func_(); muduo::CurrentThread::t_threadName = "finished"; } … }
typedef boost::function<void ()> ThreadFunc; Thread::Thread(const ThreadFunc& func, const string& n): started_(false), pthreadId_(0), tid_(0), //func_是實際上要在線程裏執行的函數,以boost::function生成了一個函數對象 (functor) func_(func), name_(n) { numCreated_.increment(); } |
依舊用pthread_get/setspecific(OP:爲什麼不用__thread關鍵字?)。
線程單例模式,單例模板類的instance成員採用__thread關鍵字修飾,具備TLS屬性。
void ThreadPool::run(const Task& task) { //若是沒有線程,直接執行task定義的函數 if (threads_.empty()) { task(); } else { MutexLockGuard lock(mutex_); //加入任務隊列 queue_.push_back(task); cond_.notify(); } }
ThreadPool::Task ThreadPool::take() { MutexLockGuard lock(mutex_); // always use a while-loop, due to spurious wakeup while (queue_.empty() && running_) { //若是沒有任務,則等待 cond_.wait(); } Task task; if(!queue_.empty()) { task = queue_.front(); queue_.pop_front(); } return task; }
//此函數就是線程函數 void ThreadPool::runInThread() { try { while (running_) { //每一個線程都從這裏獲取任務 Task task(take()); if (task) { //執行任務 task(); } } } … } |
若是writable < datalen,可是prependable+writeable >= datalen,則將readIndex挪至最前,將prependable+writeable合併獲得一個足夠大的緩衝區(通常來講,這種狀況是因爲還有還沒有讀取的數據,readIndex向後移動位置形成的);若是prependable+writeable < datalen,說明所有可寫區域之和也不足,則vertor::resize()擴展緩衝區。
void makeSpace(size_t len) { if (writableBytes() + prependableBytes() < len + kCheapPrepend) { // FIXME: move readable data buffer_.resize(writerIndex_+len); } else { // move readable data to the front, make space inside buffer assert(kCheapPrepend < readerIndex_); size_t readable = readableBytes(); std::copy(begin()+readerIndex_, begin()+writerIndex_, begin()+kCheapPrepend); readerIndex_ = kCheapPrepend; writerIndex_ = readerIndex_ + readable; assert(readable == readableBytes()); } } |
class Channel : boost::noncopyable { public: typedef boost::function<void()> EventCallback; typedef boost::function<void(Timestamp)> ReadEventCallback; private: EventLoop* loop_; //屬於哪一個reactor const int fd_; //關聯的FD int events_; //關注事件 int revents_; //ready事件 bool eventHandling_; //當前正在處理事件 ReadEventCallback readCallback_; EventCallback writeCallback_; //如何寫數據 EventCallback closeCallback_; //如何關閉連接 EventCallback errorCallback_; //如何處理錯誤 }; |
若是loop有事件發生,將觸發handleEvent回調:
void Channel::handleEventWithGuard(Timestamp receiveTime) { eventHandling_ = true; if ((revents_ & POLLHUP) && !(revents_ & POLLIN)) { if (logHup_) { LOG_WARN << "Channel::handle_event() POLLHUP"; } if (closeCallback_) closeCallback_(); }
if (revents_ & POLLNVAL) { LOG_WARN << "Channel::handle_event() POLLNVAL"; }
if (revents_ & (POLLERR | POLLNVAL)) { if (errorCallback_) errorCallback_(); } if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) { if (readCallback_) readCallback_(receiveTime); } if (revents_ & POLLOUT) { if (writeCallback_) writeCallback_(); } eventHandling_ = false; } |
class EventLoop : boost::noncopyable { public: void loop(); void quit();
/// Runs callback immediately in the loop thread. /// It wakes up the loop, and run the cb. /// If in the same loop thread, cb is run within the function. /// Safe to call from other threads. void runInLoop(const Functor& cb);
/// Queues callback in the loop thread. /// Runs after finish pooling. /// Safe to call from other threads. void queueInLoop(const Functor& cb);
/// Runs callback at 'time'. /// Safe to call from other threads. TimerId runAt(const Timestamp& time, const TimerCallback& cb);
/// Runs callback after @c delay seconds. /// Safe to call from other threads. TimerId runAfter(double delay, const TimerCallback& cb);
/// Runs callback every @c interval seconds. /// Safe to call from other threads. TimerId runEvery(double interval, const TimerCallback& cb);
/// Cancels the timer. /// Safe to call from other threads. void cancel(TimerId timerId);
// internal usage void wakeup(); void updateChannel(Channel* channel); void removeChannel(Channel* channel); bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); } private: void handleRead(); // waked up void doPendingFunctors(); typedef std::vector<Channel*> ChannelList;
bool looping_; /* atomic */ bool quit_; /* atomic */ bool eventHandling_; /* atomic */ bool callingPendingFunctors_; /* atomic */ const pid_t threadId_; Timestamp pollReturnTime_; boost::scoped_ptr<Poller> poller_; boost::scoped_ptr<TimerQueue> timerQueue_; int wakeupFd_; // unlike in TimerQueue, which is an internal class, // we don't expose Channel to client. boost::scoped_ptr<Channel> wakeupChannel_; ChannelList activeChannels_; Channel* currentActiveChannel_; MutexLock mutex_; std::vector<Functor> pendingFunctors_; // @BuardedBy mutex_ };
__thread EventLoop* t_loopInThisThread = 0; |
t_loopInThisThread被定義爲per thread的全局變量,並在EventLoop的構造函數中初始化:
epoll默認工做方式是LT。
從這個muduo的工做模型來看,能夠採用an IO thread per fd的形式處理各connection的讀/寫/encode/decode等工做,計算線程池中的線程在一個eventfd上監聽,激活後就將connection做爲參數與decoded packet一塊兒傳遞到計算線程池中,並在計算完成後將結果直接寫入IO thread的fd。並採用round-robin的方式選出下一個計算線程。
不一樣的解決方案:實際上這些線程是能夠歸併的,僅僅取決於任務的性質:IO密集型或是計算密集型。限制僅僅在於:出於避免過多thread context切換形成性能降低和資源對thread數量的約束,不能採用a thread per fd的模型,而是將fd分爲若干組比較均衡的分配到IO線程中。
EventLoop的跨線程激活:
EventLoop::EventLoop() : wakeupFd_(createEventfd()), wakeupChannel_(new Channel(this, wakeupFd_)) { wakeupChannel_->setReadCallback( boost::bind(&EventLoop::handleRead, this)); // 綁定到handleRead上面了 // we are always reading the wakeupfd wakeupChannel_->enableReading(); } |
跨線程激活的函數是wakeUp:
void EventLoop::wakeup() { uint64_t one = 1; ssize_t n = sockets::write(wakeupFd_, &one, sizeof one); // 相似於管道直接寫 } |
一旦wakeup完成以後那麼wakeUpFd_就是可讀的,這樣EventLoop就會被通知到而且馬上跳出epoll_wait開始處理。固然咱們須要將這個wakeupFd_ 上面數據讀出來,否則的話下一次又會被通知到,讀取函數就是handleRead:
void EventLoop::handleRead() { uint64_t one = 1; ssize_t n = sockets::read(wakeupFd_, &one, sizeof one); } |
runInLoop和queueInLoop就是跨線程任務。
void EventLoop::runInLoop(const Functor& cb){ //若是這個函數在本身的線程調用,那麼就能夠當即執行 if (isInLoopThread()){ cb(); }else{ //若是是其餘線程調用,那麼加入到pendingFunctors裏面去 queueInLoop(cb); //而且通知這個線程,有任務到來 wakeup(); } }
void EventLoop::queueInLoop(const Functor& cb){ { MutexLockGuard lock(mutex_); pendingFunctors_.push_back(cb); } /*被排上隊以後若是是在本身線程而且正在執行pendingFunctors的話,那麼就能夠激活 不然下一輪徹底能夠被排上,因此沒有必要激活*/ if (isInLoopThread() && callingPendingFunctors_){ wakeup(); } } |
調用棧:
addTimer(const TimerCallback& cb,Timestamp when, double interval) => addTimerInLoop(Timer* timer) =>insert(timer)中:
typedef std::pair<Timestamp, Timer*> Entry; typedef std::set<Entry> TimerList; bool earliestChanged = false; Timestamp when = timer->expiration(); TimerList::iterator it = timers_.begin(); if (it == timers_.end() || when < it->first) { earliestChanged = true; } |
這裏的微妙之處在於:若是是第一個定時器,begin()=end(),那麼earliestChanged = true;會觸發resetTimerfd:
void TimerQueue::addTimerInLoop(Timer* timer) { loop_->assertInLoopThread(); bool earliestChanged = insert(timer);
if (earliestChanged) { //調用::timerfd_settime(timerfd, 0, &newValue, &oldValue)啓動定時器 resetTimerfd(timerfd_, timer->expiration()); } } |
當定時器觸發後:
void TimerQueue::handleRead() { loop_->assertInLoopThread(); Timestamp now(Timestamp::now()); readTimerfd(timerfd_, now); //咱們能夠知道有哪些計時器超時 std::vector<Entry> expired = getExpired(now); // safe to callback outside critical section for (std::vector<Entry>::iterator it = expired.begin(); it != expired.end(); ++it) { //對於這些超時的Timer,執行run()函數,對應也就是咱們一開始註冊的回調函數 it->second->run(); } reset(expired, now); } |
TcpConnection完成的工做就是當TCP鏈接創建以後處理socket的讀寫以及關閉。一樣咱們看看TcpConnection的結構
class TcpConnection : boost::noncopyable, public boost::enable_shared_from_this<TcpConnection> { public: /// Constructs a TcpConnection with a connected sockfd /// /// User should not create this object. TcpConnection(EventLoop* loop, // 創建鏈接須要一個Reactor const string& name, // 鏈接名稱 int sockfd, // 鏈接fd const InetAddress& localAddr, // 本地IP@ const InetAddress& peerAddr); //對端IP@ // called when TcpServer accepts a new connection void connectEstablished(); // should be called only once // called when TcpServer has removed me from its map void connectDestroyed(); // should be called only once private: enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting }; void sendInLoop(const void* message, size_t len); // 發送消息 void setState(StateE s) { state_ = s; }
EventLoop* loop_; string name_; StateE state_; // FIXME: use atomic variable // we don't expose those classes to client. boost::scoped_ptr<Socket> socket_; // socket. boost::scoped_ptr<Channel> channel_; // 鏈接channel InetAddress localAddr_; InetAddress peerAddr_; ConnectionCallback connectionCallback_; // 鏈接回調,這個觸發包括在鏈接創建和斷開都會觸發 MessageCallback messageCallback_; // 有數據可讀的回調 WriteCompleteCallback writeCompleteCallback_; // 寫完畢的回調 CloseCallback closeCallback_; // 鏈接關閉回調 Buffer inputBuffer_; // 數據讀取buffer. Buffer outputBuffer_; // FIXME: use list<Buffer> as output buffer. boost::any context_; // 上下文環境 // FIXME: creationTime_, lastReceiveTime_ // bytesReceived_, bytesSent_ }; |
首先TcpConnection在初始化的時候會創建好channel。而後一旦TcpClient或者是TcpServer創建鏈接以後的話,那麼調用TcpConnection::connectEstablished。這個函數內部的話就會將channel設置成爲可讀。一旦可讀的話那麼TcpConnection內部就會調用handleRead這個動做,內部託管了讀取數據這個操做。 讀取完畢以後而後交給MessageBack這個回調進行操做。若是須要寫的話調用sendInLoop,那麼會將message放在outputBuffer裏面,而且設置可寫。當可寫的話TcpConnection內部就託管寫,而後寫完以後的話會發生writeCompleteCallback這個回調。託管的讀寫操做都是非阻塞的。若是但願斷開的話調用 shutdown。解除這個鏈接的話那麼能夠調用TcpConnection::connectDestroyed,內部大體操做就是從reactor移除這個channel。
在TcpConnection這層並不知道一次須要讀取多少個字節,這個是在上層進行消息拆分的。TcpConnection一次最多讀取64K字節的內容,而後交給Upper App。後者決定這些內容是否足夠,若是不夠的話那麼直接返回讓Reactor繼續等待讀。 一樣寫的話內部也是會分屢次寫。這樣就要求reactor內部必須使用水平觸發而不是邊緣觸發。
這個類主要包裝了TcpConnector的功能。
TcpClient::TcpClient(EventLoop* loop, const InetAddress& serverAddr, const string& name) : loop_(CHECK_NOTNULL(loop)), connector_(new Connector(loop, serverAddr)), name_(name), connectionCallback_(defaultConnectionCallback), messageCallback_(defaultMessageCallback), retry_(false), connect_(true), nextConnId_(1) { connector_->setNewConnectionCallback( boost::bind(&TcpClient::newConnection, this, _1)); // FIXME setConnectFailedCallback } |
TcpServer::TcpServer(EventLoop* loop, const InetAddress& listenAddr, const string& nameArg) : loop_(CHECK_NOTNULL(loop)), hostport_(listenAddr.toHostPort()), name_(nameArg), acceptor_(new Acceptor(loop, listenAddr)), threadPool_(new EventLoopThreadPool(loop)), connectionCallback_(defaultConnectionCallback), messageCallback_(defaultMessageCallback), started_(false), nextConnId_(1) { acceptor_->setNewConnectionCallback( boost::bind(&TcpServer::newConnection, this, _1, _2)); } |
一樣是創建好acceptor這個對象而後設置好回調爲TcpServer::newConnection,同時在外部設置好TcpConnection的各個回調。而後調用start來啓動服務器,start 會調用acceptor::listen這個方法,一旦有鏈接創建的話那麼會調用newConnection。下面是newConnection代碼:
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) { loop_->assertInLoopThread(); EventLoop* ioLoop = threadPool_->getNextLoop(); char buf[32]; snprintf(buf, sizeof buf, ":%s#%d", hostport_.c_str(), nextConnId_); ++nextConnId_; string connName = name_ + buf; // FIXME poll with zero timeout to double confirm the new connection TcpConnectionPtr conn( new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr)); connections_[connName] = conn; conn->setConnectionCallback(connectionCallback_); conn->setMessageCallback(messageCallback_); conn->setWriteCompleteCallback(writeCompleteCallback_); conn->setCloseCallback( boost::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn)); } |
對於服務端來講鏈接都被惟一化了而後映射爲字符串放在connections_這個容器內部。threadPool_->getNextLoop()能夠輪詢地將取出每個線程而後將 TcpConnection::connectEstablished輪詢地丟到每一個線程裏面去完成。存放在connections_是有緣由了,每一個TcpConnection有惟一一個名字,這樣Server 就能夠根據TcpConnection來從本身內部移除連接了。在析構函數裏面能夠遍歷connections_內容獲得全部創建的鏈接而且逐一釋放。