做一篇文章記錄實現,驅動優化迭代。 Github前端
好久以前就想寫一個日誌庫了,受限制於本身水平這個想法一直沒有完成。在上次寫一個 TCPServer 的時候,寫了一個錯誤的日誌庫,在多線程的狀況對速度影響很是大,而且最重要的正確性也沒有保證。linux
近期又有了點時間,決定從新實現,對其 特色指望:git
日誌格式和日誌文件格式指望以下:github
test_log_file.20191224.160354.7.log
20191224 16:04:05.125049 2970 ERROR 5266030 chello, world - LogTest.cpp:thr_1():20
以前看的兩個日誌庫,剛好都叫作 NanoLog:windows
前一個日誌庫學到了用戶接口宏的定義方式,後一個 nanolog 則是吸取了其後端的設計部分,尤爲是 thread_local 的使用,但我以爲它的精華在於前端部分,充斥着大量的模板。嗯,沒太看懂。後端
先後端分離實現數組
採用單例模式,全局只有一個 LimLog
對象,在這個對象構造時也建立一個後臺線程,經過無限循環掃描是否有數據能夠寫入至文件中。經過條件變量控制該循環的終止及 LimLog
對象的銷燬。多線程
使用 C++ 11 的關鍵字 thread_local 爲每一個線程都建立一個線程局部緩衝區,這樣咱們的前端的日誌寫入就能夠不用加鎖(雖然不用加鎖,可是有另外的處理,後說明),每次都寫入到各自線程的緩衝區中。而後在上面的循環中掃描這些緩衝區,將其中的日誌數據讀取至內部的輸出緩衝區中,再將其寫入到文件中。前後端分離
每一個緩衝區內存只建立一次使用,避免每次使用建立帶來沒必要要的申請消耗。每一個線程局部緩衝區的大小爲 1M(1 << 20 bytes), LimLog
對象內的輸出緩衝區大小爲 16M(1 << 24 bytes), 這個大小很是充足,實際測試 大概每一個線程的內存使用爲 30k(1M) 和 60-150k(16M) 左右,這個地方須要IO壓測一下。異步
後端日誌類的成員變量以下,
class LimLog{ ··· private: LogSink logSink_; bool threadSync_; // 先後端同步的標示 bool threadExit_; // 後臺線程標示 bool outputFull_; // 輸出緩衝區滿的標示 LogLevel level_; uint32_t bufferSize_; // 輸出緩衝區的大小 uint32_t sinkCount_; // 寫入文件的次數 uint32_t perConsumeBytes_; // 每輪循環讀取的字節數 uint64_t totalConsumeBytes_; // 總讀取的字節數 char *outputBuffer_; // first internal buffer. char *doubleBuffer_; // second internal buffer, unused. std::vector<BlockingBuffer *> threadBuffers_; std::thread sinkThread_; std::mutex bufferMutex_; // internel buffer mutex. std::mutex condMutex_; std::condition_variable proceedCond_; // 後臺線程是否繼續運行的標條件變量 std::condition_variable hitEmptyCond_; // 前端緩衝區爲空的條件變量 static LimLog singletonLog; static thread_local BlockingBuffer *blockingBuffer_; // 線程局部緩衝區 };
對 LimLog
對象的析構和後臺線程的退出須要十分的謹慎,這裏程序出問題的重災區,前一個日誌庫就是這個地方沒有處理好,致使有的時候日誌尚未寫完程序就退出了。爲了保證這個邏輯的正確運行,採用了兩個條件變量 proceedCond_
和 hitEmptyCond_
。在執行對象的析構函數時,須要等待無可讀數據的條件變量 hitEmptyCond_
, 保證這個時候線程局部緩衝區的數據都已經讀取完成,而後在析構函數中設置後臺線程循環退出的條件,而且使用 proceedCond_
通知後臺線程運行。
以上邏輯在析構函數中的邏輯以下:
LimLog::~LimLog() { { // notify background thread befor the object detoryed. std::unique_lock<std::mutex> lock(singletonLog.condMutex_); singletonLog.threadSync_ = true; singletonLog.proceedCond_.notify_all(); singletonLog.hitEmptyCond_.wait(lock); } { // stop sink thread. std::lock_guard<std::mutex> lock(condMutex_); singletonLog.threadExit_ = true; singletonLog.proceedCond_.notify_all(); } ··· }
以後就轉入到後臺線程的循環處理中,如下是全部後臺線程處理的邏輯:
// Sink log info to file with async. void LimLog::sinkThreadFunc() { // \fixed me, it will enter infinity if compile with -O3 . while (!threadExit_) { // move front-end data to internal buffer. { std::lock_guard<std::mutex> lock(bufferMutex_); uint32_t bbufferIdx = 0; // while (!threadExit_ && !outputFull_ && !threadBuffers_.empty()) { while (!threadExit_ && !outputFull_ && (bbufferIdx < threadBuffers_.size())) { BlockingBuffer *bbuffer = threadBuffers_[bbufferIdx]; uint32_t consumableBytes = bbuffer->consumable(); // 若是這個地方使用 used() 代替,就會出現日誌行截斷的現象 if (bufferSize_ - perConsumeBytes_ < consumableBytes) { outputFull_ = true; break; } if (consumableBytes > 0) { uint32_t n = bbuffer->consume( outputBuffer_ + perConsumeBytes_, consumableBytes); perConsumeBytes_ += n; } else { // threadBuffers_.erase(threadBuffers_.begin() + // bbufferIdx); } bbufferIdx++; } } // not data to sink, go to sleep, 50us. if (perConsumeBytes_ == 0) { std::unique_lock<std::mutex> lock(condMutex_); // if front-end generated sync operation, consume again. if (threadSync_) { threadSync_ = false; continue; } hitEmptyCond_.notify_one(); proceedCond_.wait_for(lock, std::chrono::microseconds(50)); } else { logSink_.sink(outputBuffer_, perConsumeBytes_); sinkCount_++; totalConsumeBytes_ += perConsumeBytes_; perConsumeBytes_ = 0; outputFull_ = false; } } }
當掃描發現無數據可取時,這個時候咱們再循環讀取一下,防止析構函數在掃描數據和判斷 threadSync_
執行的時間差以內又有日誌產生,在又一輪循環後,確保全部的數據都已經讀取,通知析構函數繼續執行,而後退出循環,該後臺線程函數退出,線程銷燬。 wait_for 是節約系統資源,出現無數據時當前線程就直接休眠 50us 避免沒必要要的循環消耗.
以上是對後臺線程的處理,對數據的處理其實仍是比較簡單的,後端提供了一個寫入數據的接口,該接口將前端產生的數據統一寫入至線程局部緩衝區中。
void LimLog::produce(const char *data, size_t n) { if (!blockingBuffer_) { std::lock_guard<std::mutex> lock(bufferMutex_); blockingBuffer_ = static_cast<BlockingBuffer *>(malloc(sizeof(BlockingBuffer))); threadBuffers_.push_back(blockingBuffer_); // 添加到後端的緩衝區數組中 } blockingBuffer_->produce(data, n); // 將數據添加到線程局部緩衝區中 }
後臺寫入線程這個時候,遍歷緩衝數組,讀取數據至內部的輸出緩衝區中,當出現可讀取的數據大於緩衝區時,直接寫入文件,剩餘的數據下一路再讀取;當無可讀取的數據時,判斷是對象析構了還只是簡單的沒有數據讀取,而且休眠 50us;有數據可讀則寫入至文件,重置相關數據。
線程局部緩衝區爲一個阻塞環形生產者消費者隊列,設計爲阻塞態的緣由爲緩衝區的大小足夠大了,若是改成非阻塞態,當緩衝區的滿了的時候從新分配內存,還要管理一次內存的釋放操做。雖然線程內的操做不須要使用鎖來同步,在後臺線程的循環範圍內,每次都要獲取數據的可讀大小,這裏可能就涉及多個線程對數據的訪問了。最初的版本測試在沒有同步而且開啓優化的狀況下,有必定機率在 consume()
操做陷入死循環,緣由是該線程的 cache 的變量未及時獲得更新。如今使用內存屏障來解決這個問題,避免編譯器優化。
對於後臺線程讀取各線程內的數據有一個隨機性的問題,若是使用 BlockingBuffer::unsed()
獲取已讀數據長度,可能會出現日誌行只被讀取了一半,而後馬上被另外的一個線程的日誌截斷的狀況,這不是咱們指望的。爲了解決這個問題,咱們又提供了一個接口 BlockingBuffer::incConsumablePos()
來移動能夠讀取的位置,該接口每次遞增的長度爲一條日誌行的長度,在一日誌行數據寫入到局部緩衝區後調用該接口來移動能夠讀取到的位置。而又提供接口 BlockingBuffer::consumable()
來獲取能夠讀取的長度,這樣就避免了一條日誌行被截斷的狀況。
一條日誌信息在內存中的佈局以下: +------+-----------+-------+------+------+----------+------+ | time | thread id | level | logs | file | function | line | +------+-----------+-------+------+------+----------+------+ 20191224 16:04:05.125044 2970 ERROR 5266025chello, world - LogTest.cpp:thr_1():20 20191224 16:04:05.125044 2970 ERROR 5266026chello, world - LogTest.cpp:thr_1():20 20191224 16:04:05.125045 2970 ERROR 5266027chello, world - LogTest.cpp:thr_1():20
前端實現雖然簡單,可是優化的空間很大,使用 perf 作性能分析,性能熱點集中在參數格式化爲字符串和時間的處理上面。
咱們用 LogLine
來表示一個日誌行,這個類很是簡單,重載了多個參數類型的操做符 <<
, 及保存一些日誌行相關信息,包括 文件名稱,函數,行和最重要的寫入字節數。
LogLine
不包含緩衝區,直接調用後端提供的接口 LimLog::produce()
將數據寫入到局部緩衝區內。
在 limlog 中,屢次使用到了 thread_local 關鍵字,在前端實現部分也是如此。
在 Linux 中使用 gettimeofday(2)
來獲取當前的時間戳,其餘平臺使用 std::chrono
來獲取時間戳,用 localtime()
和 strftime()
獲取本地的格式化時間 %Y%m%d %H:%M:%S, 合併微秒 timestamp % 1000000
組成 time.
這裏利用 thread_local 作一個優化,格式化時間的函數調用次數。定義一個線程局部變量來存儲時間戳對應的秒數 t_second
和 字符數組來存儲格式化時間 t_datetime[24]
,當秒數未發生改變時,直接取線程局部字符數組而不用再調用格式化函數。
在 x86-64 體系下,gettimeofday(2)
在 vdso 機制下已經不是系統調用了,經測試發現直接調用 gettimeofday(2)
比 std::chrono
的高精度時鐘快 15% 左右,雖然不是系統調用可是耗時仍是大頭,一個調用大概須要 600ns 左右的時間。
在 Linux 下,使用 gettid()
而不是 pthread_self()
來獲取線程id,windows 暫時未支持,這個 std::this_thread::get_id()
取出裏面的整形id須要hack一下。
這裏繼續使用 thread_local 來對線程id優化,每一個線程的id調用一次線程id獲取函數。建立一個線程局部線程id變量,判斷線程id存在時,就再也不調用 gettid() 了。
日誌等級、文件、函數和行數是簡單的字符串寫入操做了。
#define LOG(level) \ if (limlog::getLogLevel() <= level) \ LogLine(level, __FILE__, __FUNCTION__, __LINE__) #define LOG_TRACE LOG(limlog::LogLevel::TRACE) #define LOG_DEBUG LOG(limlog::LogLevel::DEBUG) #define LOG_INFO LOG(limlog::LogLevel::INFO) #define LOG_WARN LOG(limlog::LogLevel::WARN) #define LOG_ERROR LOG(limlog::LogLevel::ERROR) #define LOG_FATAL LOG(limlog::LogLevel::FATAL)
如上所示,定義了一些宏給調用者使用,每一條日誌都會建立一個 LogLine
對象,這個對象很是小,用完就銷燬,幾乎沒有開銷。用法同 std::cout
std::string str("std::string"); LOG_DEBUG << 'c' << 1 << 1.3 << "c string" << str;
LogLine::operator<<()
參數轉字符串作優化