c++ 高性能日誌庫(muduo_AsyncLogging)

c++ 高性能日誌庫(muduo_AsyncLogging)

實現一個高效的網絡日誌庫要解決那些問題?
首先明確一下問題的模型,這是一個典型的多生產者 單消費者問題,對於前端的日誌庫使用者來講,應該作到非阻塞添加,做爲後端的文件寫入,應該注意磁盤IO的瓶頸。前端

功能需求

  1. 日誌的級別分級
  2. 發生時間和具體線程信息
  3. 線程安全

實現思路

多個線程共有一個前端,經過後端寫入磁盤文件
異步日誌是必須的,因此須要一個緩衝區,在這裏咱們使用的是多緩衝技術,基本思路是準備多塊Buffer,前端負責向Buffer中填數據,後端負責將Buffer中數據取出來寫入文件,這種實現的好處在於在新建日誌消息的時候沒必要等待磁盤IO操做,前端寫的時候也不會阻塞。c++

實現結構

LogStream 負責寫入消息的格式化
LogFile 負責文件寫入
AsyncLogging 負責實現 多緩衝技術 協調先後端後端

LogFile

若是有必要就給日至文件加鎖安全

void LogFile::append(const char* logline, int len)
{
  if (mutex_)
  {
    MutexLockGuard lock(*mutex_);
    append_unlocked(logline, len);
  }
  else
  {
    append_unlocked(logline, len);
  }
}

LogStream

重寫流操做符
構造一個格式轉化類,給日誌中消息提供一個統一的格式網絡

AsyncLogging

是及實現採用了四個緩衝區,這樣能夠進一步減小前端等待,數據結構數據結構

typedef boost::ptr_vector<LargeBuffer> BufferVector;
typedef BufferVector::auto_type BufferPtr;
MutexLock lock;
Condition cond;
BufferPtr nextBuffer;
BufferVector buffers_;

append的具體實現
在當前的緩衝區和備用緩衝區中選擇一個足夠使用的進行寫入。app

void AsynLogging::append(const char* logline, int len){
    LockGuard(mutex);
    if(curbuf->avail() > len){//當前緩衝區足夠
        curbuf->append(logline,len);
    }
    else{
        buffers.push_back(curbuf->release());
        if(nextbuf){
            curbuf = std::move(nextbuf);
        }
        else{
            curbuf.reset(new LargeBuffer);  
        }
        curbuf->append(logline, len);
        cond.notify();
    }
}
  • 接收方的後端實現
    首先準備好兩塊空閒的buffer,已備在臨界區內交換,等待條件標量出發的條件又兩個,超時或者是前端寫滿了一個或者多個Buffer,當條件知足時,先將當前緩衝移入buffer,而且馬上將空閒的newBuffer1做爲當前緩衝,接下來將buffers和buffersToWrite交換,隨後將buffersToWrite寫入文件,從新設計設置Buffer。
void AsyncLogging::threadFunc(){
    BufferPtr nweBuffer1(new LargeBuffer);
    BufferPtr newBuffer2(new LargeBuffer);
    BufferVector bufferToWrite;
    while(running_){
    {
        MutexLockGuard lock(mutex);
        if(buffers.empty()){
            cond.wait_for(muted,flushInterval_);
        }
        buffers.push_back(currentBuffer_.release());
        currentBuffer = move(newBuffer1);
        buffersTowrite.swap(buffers_);
        if(!nextBuf){
            nextBuf = std::move(newBuffer2);
        }
    }
    }
}

交給後端去寫入,以及從新設置兩個緩衝區異步

for (size_t i = 0; i < buffersToWrite.size(); ++i)
      {
        // FIXME: use unbuffered stdio FILE ? or use ::writev ?
        output.append(buffersToWrite[i].data(), buffersToWrite[i].length());
      }

      if (buffersToWrite.size() > 2)
      {
        // drop non-bzero-ed buffers, avoid trashing
        buffersToWrite.resize(2);
      }

      if (!newBuffer1)
      {
        assert(!buffersToWrite.empty());
        newBuffer1 = buffersToWrite.pop_back();
        newBuffer1->reset();
      }

      if (!newBuffer2)
      {
        assert(!buffersToWrite.empty());
        newBuffer2 = buffersToWrite.pop_back();
        newBuffer2->reset();
      }

      buffersToWrite.clear();
      output.flush();
相關文章
相關標籤/搜索