muduo Library

muduo是由陳碩(http://www.cnblogs.com/Solstice)開發的一個Linux多線程網絡庫,採用了不少新的Linux特性(例如eventfdtimerfd)和GCC內置函數。其主要特色爲: react

  • 線程安全,支持多核多線程
  • 不考慮可移植性,不跨平臺,只支持 Linux,不支持 Windows // 支持Windows有時候代價太大了
  • 在不增長複雜度的前提下能夠支持 FreeBSD/Darwin,方便未來用 Mac 做爲開發用機,但不爲它作性能優化。也就是說 IO multiplexing 使用 poll epoll
  • 主要支持 x86-64,兼顧 IA32
  • 不支持 UDP,只支持 TCP
  • 不支持 IPv6,只支持 IPv4
  • 不考慮廣域網應用,只考慮局域網 // 不會存在慢鏈接,因此即便是阻塞讀也不會花去太長時間用在阻塞上面
  • 只支持一種使用模式:non-blocking IO + one event loop per thread,不考慮阻塞 IO
  • API 簡單易用,只暴露具體類和標準庫裏的類,不使用 non-trivial templates,也不使用虛函數 // GP而非OO
  • 只知足經常使用需求的 90%,不面面俱到,必要的時候以 app 來適應 lib
  • 只作 library,不作成 framework
  • 爭取所有代碼在 5000 行之內(不含測試)
  • 以上條件都知足時,能夠考慮搭配 Google Protocol Buffers RPC // RPC能夠簡化不少東西
  • Base Class
    • Atomic

      調用了以下GCC提供的原子操做內建函數: 安全

      • __sync_lock_test_and_set
      • __sync_val_compare_and_swap
      • __sync_fetch_and_add
    • BlockingQueue

      線程安全的隊列,內部實現爲std::deque<T> 性能優化

    • BoundedBlockingQueue

      BlockingQueue相似,可是內部容器基於boost::circular_buffer<T> 服務器

    • Condition

      pthread_cond的封裝 網絡

    • CountDownLatch

      CountDownLatch,相似發令槍,對condition的再包裝,能夠保證全部線程同時啓動。 多線程

    • Exception

      backtrace_symbolsbacktrace的包裝類 app

    • Mutex

      MutexLockpthread_mutex_*的包裝類 socket

    • Singleton
      • 依舊是靜態成員變量做爲單例對象,可是用pthread_once保證多線程訪問的惟一性;
      • ::atexit(destroy),在進程退出時銷燬之。
    • Thread
      • syscall(SYS_gettid)等同於gettid
      • numCreated_,類靜態成員,最後調用__sync_fetch_and_add
      • __thread關鍵字表示線程本地存儲(TSS)
      • boost::functionboost::bind實現相似C#delegate

void* Thread::startThread(void* obj) ide

{ 函數

  Thread* thread = static_cast<Thread*>(obj);

  thread->runInThread();//func_的包裝,調用了func_

  return NULL;

}

   

void Thread::runInThread()

{

  tid_ = CurrentThread::tid();

  muduo::CurrentThread::t_threadName = name_.c_str();

  try

  {

    func_();

    muduo::CurrentThread::t_threadName = "finished";

  }

 …

}

   

typedef boost::function<void ()> ThreadFunc;

Thread::Thread(const ThreadFunc& func, const string& n): started_(false),  pthreadId_(0), tid_(0),

    //func_是實際上要在線程裏執行的函數,以boost::function生成了一個函數對象  (functor)

    func_(func), name_(n)

{

  numCreated_.increment();

}

  • int pthread_atfork(void (*prepare) (void), void (*parent) (void), void (*child) (void) );
    pthread_atfork() 函數聲明瞭在調用 fork() 的線程的上下文中的 fork() 先後調用的 fork() 處理程序。
        fork() 啓動前調用 prepare 處理程序。
        在父進程中返回 fork() 後調用 parent 處理程序。
        在子進程中返回 fork() 後調用 child 處理程序。
    能夠將任何處理程序參數都設置爲 NULL。對 pthread_atfork() 進行連續調用的順序很是重要。例如,prepare 處理程序可能會獲取全部須要的互斥。而後,parent child 處理程序可能會釋放互斥。獲取全部須要的互斥的 prepare 處理程序可確保在對進程執行 fork 以前,全部相關的鎖定都由調用 fork 函數的線程持有。此技術可防止子進程中出現死鎖。
  • ThreadLocal

    依舊用pthread_get/setspecificOP:爲什麼不用__thread關鍵字?)。

  • ThreadLocalSingleton

    線程單例模式,單例模板類的instance成員採用__thread關鍵字修飾,具備TLS屬性。

  • ThreadPool

void ThreadPool::run(const Task& task)

{

  //若是沒有線程,直接執行task定義的函數

  if (threads_.empty())

  {

    task();

  }

  else

  {

    MutexLockGuard lock(mutex_);

    //加入任務隊列

    queue_.push_back(task);

    cond_.notify();

  }

}

   

ThreadPool::Task ThreadPool::take()

{

  MutexLockGuard lock(mutex_);

  // always use a while-loop, due to spurious wakeup

  while (queue_.empty() && running_)

  {

    //若是沒有任務,則等待

    cond_.wait();

  }

  Task task;

  if(!queue_.empty())

  {

    task = queue_.front();

    queue_.pop_front();

  }

  return task;

}

   

//此函數就是線程函數

void ThreadPool::runInThread()

{

  try

  {

    while (running_)

    {

              //每一個線程都從這裏獲取任務

      Task task(take());

      if (task)

      {

                  //執行任務

        task();

      }

    }

  }

  …

}

  • Net Classes
    • Buffer

      • Buffervector<char>實現。
      • prependable是用來在屢次序列化消息後一次性在其前部寫入長度之用的。
      • 分別以readIndexwriteIndex表示可讀和可寫的緩衝區位置。要寫入x字節,則writeIndex += xreadIndex不變,此時可讀區域長度爲writeIndex - readIndex。若是readIndex == writeIndex,說明無數據供Upper Application讀取。
      • makeSpace用於擴展或者重整整個緩衝區,其邏輯以下:

        若是writable < datalen,可是prependable+writeable >= datalen,則將readIndex挪至最前,將prependable+writeable合併獲得一個足夠大的緩衝區(通常來講,這種狀況是因爲還有還沒有讀取的數據,readIndex向後移動位置形成的);若是prependable+writeable < datalen,說明所有可寫區域之和也不足,則vertor::resize()擴展緩衝區。

void makeSpace(size_t len)

{

    if (writableBytes() + prependableBytes() < len + kCheapPrepend)

    {

      // FIXME: move readable data

      buffer_.resize(writerIndex_+len);

    }

    else

    {

      // move readable data to the front, make space inside buffer

      assert(kCheapPrepend < readerIndex_);

      size_t readable = readableBytes();

      std::copy(begin()+readerIndex_,

                begin()+writerIndex_,

                begin()+kCheapPrepend);

      readerIndex_ = kCheapPrepend;

      writerIndex_ = readerIndex_ + readable;

      assert(readable == readableBytes());

    }

}

  • Channel

class Channel : boost::noncopyable

{

public:

  typedef boost::function<void()> EventCallback;

  typedef boost::function<void(Timestamp)> ReadEventCallback;

private:

  EventLoop* loop_; //屬於哪一個reactor

  const int  fd_; //關聯的FD

  int        events_; //關注事件

  int        revents_; //ready事件

  bool eventHandling_; //當前正在處理事件

  ReadEventCallback readCallback_;

  EventCallback writeCallback_; //如何寫數據

  EventCallback closeCallback_; //如何關閉連接

  EventCallback errorCallback_; //如何處理錯誤

};

若是loop有事件發生,將觸發handleEvent回調:

void Channel::handleEventWithGuard(Timestamp receiveTime)

{

  eventHandling_ = true;

  if ((revents_ & POLLHUP) && !(revents_ & POLLIN))

  {

    if (logHup_)

    {

      LOG_WARN << "Channel::handle_event() POLLHUP";

    }

    if (closeCallback_) closeCallback_();

  }

   

  if (revents_ & POLLNVAL)

  {

    LOG_WARN << "Channel::handle_event() POLLNVAL";

  }

   

  if (revents_ & (POLLERR | POLLNVAL))

  {

    if (errorCallback_) errorCallback_();

  }

  if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))

  {

    if (readCallback_) readCallback_(receiveTime);

  }

  if (revents_ & POLLOUT)

  {

    if (writeCallback_) writeCallback_();

  }

  eventHandling_ = false;

}

  • EventLoop

class EventLoop : boost::noncopyable

{

public:

  void loop();

  void quit();

   

  /// Runs callback immediately in the loop thread.

  /// It wakes up the loop, and run the cb.

  /// If in the same loop thread, cb is run within the function.

  /// Safe to call from other threads.

  void runInLoop(const Functor& cb);

   

  /// Queues callback in the loop thread.

  /// Runs after finish pooling.

  /// Safe to call from other threads.

  void queueInLoop(const Functor& cb);

   

  /// Runs callback at 'time'.

  /// Safe to call from other threads.

  TimerId runAt(const Timestamp& timeconst TimerCallback& cb);

   

  /// Runs callback after @c delay seconds.

  /// Safe to call from other threads.

  TimerId runAfter(double delay, const TimerCallback& cb);

   

  /// Runs callback every @c interval seconds.

  /// Safe to call from other threads.

  TimerId runEvery(double interval, const TimerCallback& cb);

   

  /// Cancels the timer.

  /// Safe to call from other threads.

  void cancel(TimerId timerId);

   

  // internal usage

  void wakeup();

  void updateChannel(Channel* channel);

  void removeChannel(Channel* channel);

  bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }

private:

  void handleRead();  // waked up

  void doPendingFunctors();

  typedef std::vector<Channel*> ChannelList;

   

  bool looping_; /* atomic */

  bool quit_; /* atomic */

  bool eventHandling_; /* atomic */

  bool callingPendingFunctors_; /* atomic */

  const pid_t threadId_;

  Timestamp pollReturnTime_;

  boost::scoped_ptr<Poller> poller_;

  boost::scoped_ptr<TimerQueue> timerQueue_;

  int wakeupFd_;

  // unlike in TimerQueue, which is an internal class,

  // we don't expose Channel to client.

  boost::scoped_ptr<Channel> wakeupChannel_;

  ChannelList activeChannels_;

  Channel* currentActiveChannel_;

  MutexLock mutex_;

  std::vector<Functor> pendingFunctors_; // @BuardedBy mutex_

};

   

__thread EventLoop* t_loopInThisThread = 0;

t_loopInThisThread被定義爲per thread的全局變量,並在EventLoop的構造函數中初始化:

   

epoll默認工做方式是LT

   

從這個muduo的工做模型來看,能夠採用an IO thread per fd的形式處理各connection的讀//encode/decode等工做,計算線程池中的線程在一個eventfd上監聽,激活後就將connection做爲參數與decoded packet一塊兒傳遞到計算線程池中,並在計算完成後將結果直接寫入IO threadfd。並採用round-robin的方式選出下一個計算線程。

不一樣的解決方案:實際上這些線程是能夠歸併的,僅僅取決於任務的性質:IO密集型或是計算密集型。限制僅僅在於:出於避免過多thread context切換形成性能降低和資源對thread數量的約束,不能採用a thread per fd的模型,而是將fd分爲若干組比較均衡的分配到IO線程中。

   

EventLoop的跨線程激活:

EventLoop::EventLoop()

  : wakeupFd_(createEventfd()),

    wakeupChannel_(new Channel(this, wakeupFd_))

{

  wakeupChannel_->setReadCallback(

      boost::bind(&EventLoop::handleRead, this)); // 綁定到handleRead上面了

  // we are always reading the wakeupfd

  wakeupChannel_->enableReading();

}

跨線程激活的函數是wakeUp

void EventLoop::wakeup()

{

  uint64_t one = 1;

  ssize_t n = sockets::write(wakeupFd_, &one, sizeof one); // 相似於管道直接寫

}

一旦wakeup完成以後那麼wakeUpFd_就是可讀的,這樣EventLoop就會被通知到而且馬上跳出epoll_wait開始處理。固然咱們須要將這個wakeupFd_ 上面數據讀出來,否則的話下一次又會被通知到,讀取函數就是handleRead

void EventLoop::handleRead()

{

  uint64_t one = 1;

  ssize_t n = sockets::read(wakeupFd_, &one, sizeof one);

}

runInLoopqueueInLoop就是跨線程任務。

void EventLoop::runInLoop(const Functor& cb){

     //若是這個函數在本身的線程調用,那麼就能夠當即執行

  if (isInLoopThread()){ 

    cb();

  }else{

          //若是是其餘線程調用,那麼加入到pendingFunctors裏面去

     queueInLoop(cb);

          //而且通知這個線程,有任務到來

     wakeup(); 

  }

}

   

void EventLoop::queueInLoop(const Functor& cb){

  {

  MutexLockGuard lock(mutex_);

  pendingFunctors_.push_back(cb);

  }

  /*被排上隊以後若是是在本身線程而且正在執行pendingFunctors的話,那麼就能夠激活

  不然下一輪徹底能夠被排上,因此沒有必要激活*/

  if (isInLoopThread() && callingPendingFunctors_){

    wakeup(); 

  }

}

  • muduo的工做方式:
    • 一個主線程,處理IO相關事宜。loop放在主線程中,其pendingFunctors_成員是IO/timer相關任務的集合,包括:
      • Connector::startInLoop
      • TcpConnection::setCloseCallback
      • TcpConnection::sendInLoop
      • TcpConnection::shutdownInLoop
      • TcpConnection::connectDestroyed
      • Acceptor::listen
      • TcpConnection::connectEstablished
      • TcpServer::removeConnectionInLoop
      • TimerQueue::addTimerInLoop
      • TimerQueue::cancelInLoop
    • ThreadPool用於處理計算任務,調用棧爲:ThreadPool::run(const Task& task) => queue_.push_back(task),而後在ThreadPool::runInThread會做爲每一個線程的runner,不停的去take()任務並執行。若是須要輸出,根據對應的connection,將結果提交至EventLoop的發送隊列。

         

  • 定時器

    調用棧:

    addTimer(const TimerCallback& cb,Timestamp when, double interval) => addTimerInLoop(Timer* timer) =>insert(timer)中:

typedef std::pair<Timestamp, Timer*> Entry;

typedef std::set<Entry> TimerList;

bool earliestChanged = false;

Timestamp when = timer->expiration();

TimerList::iterator it = timers_.begin();

if (it == timers_.end() || when < it->first)

{

  earliestChanged = true;

}

這裏的微妙之處在於:若是是第一個定時器,begin()=end(),那麼earliestChanged = true;會觸發resetTimerfd

void TimerQueue::addTimerInLoop(Timer* timer)

{

  loop_->assertInLoopThread();

  bool earliestChanged = insert(timer);

   

  if (earliestChanged)

  {

    //調用::timerfd_settime(timerfd, 0, &newValue, &oldValue)啓動定時器

    resetTimerfd(timerfd_, timer->expiration());

  }

}

當定時器觸發後:

void TimerQueue::handleRead()

{

  loop_->assertInLoopThread();

  Timestamp now(Timestamp::now());

  readTimerfd(timerfd_, now);

  //咱們能夠知道有哪些計時器超時

  std::vector<Entry> expired = getExpired(now); 

  // safe to callback outside critical section

  for (std::vector<Entry>::iterator it = expired.begin();

      it != expired.end(); ++it)

  {

     //對於這些超時的Timer,執行run()函數,對應也就是咱們一開始註冊的回調函數

     it->second->run(); 

  }

  reset(expired, now);

}

  • TcpConnection Class

    TcpConnection完成的工做就是當TCP鏈接創建以後處理socket的讀寫以及關閉。一樣咱們看看TcpConnection的結構

class TcpConnection : boost::noncopyable, public boost::enable_shared_from_this<TcpConnection>

{

  public:

    /// Constructs a TcpConnection with a connected sockfd

    ///

    /// User should not create this object.

    TcpConnection(EventLoop* loop, // 創建鏈接須要一個Reactor

                  const string& name, // 鏈接名稱

                  int sockfd, // 鏈接fd

                  const InetAddress& localAddr, // 本地IP@

                  const InetAddress& peerAddr); //對端IP@

    // called when TcpServer accepts a new connection

    void connectEstablished();   // should be called only once

    // called when TcpServer has removed me from its map

    void connectDestroyed();  // should be called only once

  private:

    enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };

    void sendInLoop(const void* message, size_t len); // 發送消息

    void setState(StateE s) { state_ = s; }

   

    EventLoop* loop_;

    string name_;

    StateE state_;  // FIXME: use atomic variable

    // we don't expose those classes to client.

    boost::scoped_ptr<Socket> socket_; // socket.

    boost::scoped_ptr<Channel> channel_; // 鏈接channel

    InetAddress localAddr_;

    InetAddress peerAddr_;

    ConnectionCallback connectionCallback_; // 鏈接回調,這個觸發包括在鏈接創建和斷開都會觸發

    MessageCallback messageCallback_; // 有數據可讀的回調

    WriteCompleteCallback writeCompleteCallback_; // 寫完畢的回調

    CloseCallback closeCallback_; // 鏈接關閉回調

    Buffer inputBuffer_; // 數據讀取buffer.

    Buffer outputBuffer_; // FIXME: use list<Buffer> as output buffer.

    boost::any context_; // 上下文環境

    // FIXME: creationTime_, lastReceiveTime_

    //        bytesReceived_, bytesSent_

};

首先TcpConnection在初始化的時候會創建好channel。而後一旦TcpClient或者是TcpServer創建鏈接以後的話,那麼調用TcpConnection::connectEstablished。這個函數內部的話就會將channel設置成爲可讀。一旦可讀的話那麼TcpConnection內部就會調用handleRead這個動做,內部託管了讀取數據這個操做。 讀取完畢以後而後交給MessageBack這個回調進行操做。若是須要寫的話調用sendInLoop,那麼會將message放在outputBuffer裏面,而且設置可寫。當可寫的話TcpConnection內部就託管寫,而後寫完以後的話會發生writeCompleteCallback這個回調。託管的讀寫操做都是非阻塞的。若是但願斷開的話調用 shutdown。解除這個鏈接的話那麼能夠調用TcpConnection::connectDestroyed,內部大體操做就是從reactor移除這個channel

TcpConnection這層並不知道一次須要讀取多少個字節,這個是在上層進行消息拆分的。TcpConnection一次最多讀取64K字節的內容,而後交給Upper App。後者決定這些內容是否足夠,若是不夠的話那麼直接返回讓Reactor繼續等待讀。 一樣寫的話內部也是會分屢次寫。這樣就要求reactor內部必須使用水平觸發而不是邊緣觸發。

  • TcpClient Class

    這個類主要包裝了TcpConnector的功能。

TcpClient::TcpClient(EventLoop* loop,

                     const InetAddress& serverAddr,

                     const string& name)

        : loop_(CHECK_NOTNULL(loop)),

          connector_(new Connector(loop, serverAddr)),

          name_(name),

          connectionCallback_(defaultConnectionCallback),

          messageCallback_(defaultMessageCallback),

          retry_(false),

          connect_(true),

          nextConnId_(1)

{

    connector_->setNewConnectionCallback(

        boost::bind(&TcpClient::newConnection, this, _1));

    // FIXME setConnectFailedCallback

}

  • TcpServer Class

TcpServer::TcpServer(EventLoop* loop,

                     const InetAddress& listenAddr,

                     const string& nameArg)

  : loop_(CHECK_NOTNULL(loop)),

    hostport_(listenAddr.toHostPort()),

    name_(nameArg),

    acceptor_(new Acceptor(loop, listenAddr)),

    threadPool_(new EventLoopThreadPool(loop)),

    connectionCallback_(defaultConnectionCallback),

    messageCallback_(defaultMessageCallback),

    started_(false),

    nextConnId_(1)

{

  acceptor_->setNewConnectionCallback(

      boost::bind(&TcpServer::newConnection, this, _1, _2));

}

一樣是創建好acceptor這個對象而後設置好回調爲TcpServer::newConnection,同時在外部設置好TcpConnection的各個回調。而後調用start來啓動服務器,start 會調用acceptor::listen這個方法,一旦有鏈接創建的話那麼會調用newConnection。下面是newConnection代碼:

void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)

{

    loop_->assertInLoopThread();

    EventLoop* ioLoop = threadPool_->getNextLoop();

    char buf[32];

    snprintf(buf, sizeof buf, ":%s#%d", hostport_.c_str(), nextConnId_);

    ++nextConnId_;

    string connName = name_ + buf;

    // FIXME poll with zero timeout to double confirm the new connection

    TcpConnectionPtr conn(

        new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));

    connections_[connName] = conn;

    conn->setConnectionCallback(connectionCallback_);

    conn->setMessageCallback(messageCallback_);

    conn->setWriteCompleteCallback(writeCompleteCallback_);

    conn->setCloseCallback(

        boost::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe

    ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));

}

對於服務端來講鏈接都被惟一化了而後映射爲字符串放在connections_這個容器內部。threadPool_->getNextLoop()能夠輪詢地將取出每個線程而後將 TcpConnection::connectEstablished輪詢地丟到每一個線程裏面去完成。存放在connections_是有緣由了,每一個TcpConnection有惟一一個名字,這樣Server 就能夠根據TcpConnection來從本身內部移除連接了。在析構函數裏面能夠遍歷connections_內容獲得全部創建的鏈接而且逐一釋放。

相關文章
相關標籤/搜索