Beego Logs 源碼分析 中篇

文件輸出引擎使用到的讀寫鎖 sync.RWMutex

讀寫鎖是一種同步機制,容許多個讀操做同時讀取數據,可是隻容許一個寫操做寫數據。鎖的狀態有三種:讀模式加鎖、寫模式加鎖、無鎖。
  • 無鎖。讀/寫進程均可以進入。
  • 讀模式鎖。讀進程能夠進入。寫進程不能夠進入。
  • 寫模式鎖。讀/寫進程都不能夠進入。

就拿文件行數這個變量來看,若是開啓了日誌文件按小時按行數切割的功能,要先讀取當前文件行數變量值。當併發狀況下,多個 goroutine 在打日誌,讀取文件行數和修改文件行數便成爲一對「讀寫」操做,因此須要用讀寫鎖,讀寫鎖對於讀操做不會致使鎖競爭和 goroutine 阻塞。緩存

// WriteMsg write logger message into file.
func (w *fileLogWriter) WriteMsg(when time.Time, msg string, level int) error {
    ···
    if w.Rotate {
        w.RLock()
        if w.needRotateHourly(len(msg), h) {
            w.RUnlock()
            w.Lock()
            if w.needRotateHourly(len(msg), h) {
                if err := w.doRotate(when); err != nil {
                    fmt.Fprintf(os.Stderr, "FileLogWriter(%q): %s\n", w.Filename, err)
                }
            }
            w.Unlock()
        } else if w.needRotateDaily(len(msg), d) {
            w.RUnlock()
            w.Lock()
            if w.needRotateDaily(len(msg), d) {
                if err := w.doRotate(when); err != nil {
                    fmt.Fprintf(os.Stderr, "FileLogWriter(%q): %s\n", w.Filename, err)
                }
            }
            w.Unlock()
        } else {
            w.RUnlock()
        }
    }

    w.Lock()
    _, err := w.fileWriter.Write([]byte(msg))
    if err == nil {
        w.maxLinesCurLines++
        w.maxSizeCurSize += len(msg)
    }
    w.Unlock()
    ···
}

總結下 Goroutine 的使用

監聽 msgChan

第一處是開啓異步選項時,啓動一個 goroutine 監聽 msgChan 是否爲空,發現不爲空便取走日誌信息進行輸出。併發

// Async set the log to asynchronous and start the goroutine
func (bl *BeeLogger) Async(msgLen ...int64) *BeeLogger {
    ···
    go bl.startLogger()
    ···
}

// start logger chan reading.
// when chan is not empty, write logs.
func (bl *BeeLogger) startLogger() {
    gameOver := false
    for {
        select {
        case bm := <-bl.msgChan:
            bl.writeToLoggers(bm.when, bm.msg, bm.level)
            logMsgPool.Put(bm)
        ···
        }
        ···
    }
}

監聽計時器實現日誌文件按日期分割

文件輸出引擎 file.go 文件中,初始化 fileWriter *os.File 時啓動一個 goroutine 執行 dailyRotate() :異步

func (w *fileLogWriter) initFd() error {
    fd := w.fileWriter
    fInfo, err := fd.Stat()
    if err != nil {
        return fmt.Errorf("get stat err: %s", err)
    }
    w.maxSizeCurSize = int(fInfo.Size())
    w.dailyOpenTime = time.Now()
    w.dailyOpenDate = w.dailyOpenTime.Day()
    w.maxLinesCurLines = 0
    if w.Daily {
        go w.dailyRotate(w.dailyOpenTime) //   <------
    }
    if fInfo.Size() > 0 && w.MaxLines > 0 {
        count, err := w.lines()
        if err != nil {
            return err
        }
        w.maxLinesCurLines = count
    }
    return nil
}

dailyRotate() 方法中,tm 定時器時間一到,便會往 tm.C 通道發送當前時間,此時 a 語句便中止阻塞,能夠繼續往下執行。async

func (w *fileLogWriter) dailyRotate(openTime time.Time) {
    y, m, d := openTime.Add(24 * time.Hour).Date()
    nextDay := time.Date(y, m, d, 0, 0, 0, 0, openTime.Location())
    tm := time.NewTimer(time.Duration(nextDay.UnixNano() - openTime.UnixNano() + 100))
    <-tm.C // <--- a 語句
    w.Lock()
    if w.needRotate(0, time.Now().Day()) {
        if err := w.doRotate(time.Now()); err != nil {
            fmt.Fprintf(os.Stderr, "FileLogWriter(%q): %s\n", w.Filename, err)
        }
    }
    w.Unlock()
}

開啓新的 goroutine 刪除失效的日誌文件

由於刪除文件涉及文件 IO 處理,爲了不阻塞主線程,便交由另外 goroutine 去作。,go w.deleteOldLog(),超過 MaxDays 的日誌文件即是失效的。url

// DoRotate means it need to write file in new file.
// new file name like xx.2013-01-01.log (daily) or xx.001.log (by line or size)
func (w *fileLogWriter) doRotate(logTime time.Time) error {
    ···
    err = os.Rename(w.Filename, fName)
    ···
    startLoggerErr := w.startLogger()
    go w.deleteOldLog()
    ···
}

func (w *fileLogWriter) deleteOldLog() {
    dir := filepath.Dir(w.Filename)
    filepath.Walk(dir, func(path string, info os.FileInfo, err error) (returnErr error) {
        defer func() {
            if r := recover(); r != nil {
                fmt.Fprintf(os.Stderr, "Unable to delete old log '%s', error: %v\n", path, r)
            }
        }()

        if info == nil {
            return
        }

        if !info.IsDir() && info.ModTime().Add(24*time.Hour*time.Duration(w.MaxDays)).Before(time.Now()) {
            if strings.HasPrefix(filepath.Base(path), filepath.Base(w.fileNameOnly)) &&
                strings.HasSuffix(filepath.Base(path), w.suffix) {
                os.Remove(path)
            }
        }
        return
    })
}

使用 goto 語句保證即便發生錯誤也要重啓 Logger

doRotate() 方法大致邏輯:線程

  • 重命名以前寫入的日誌文件,err = os.Rename(w.Filename, fName)3d

    • 首先找到 一個可用的 filename ,循環遍歷1-999,若是找不到報錯;
    • _,err:=os.Lstat(fName) :若以 fName 爲名的文件不存在則返回 err 不爲空。
    • os.Chmod(fName, os.FileMode(rotatePerm)) 修改文件權限。
  • 從新啓動 Logger :rest

    • 一是啓動 Logger ,w.startLogger()
    • 二是開啓一個 goroutine 刪除失效的日誌文件。

注意到下面代碼段中的 a 語句和 b 語句,它們並非返回錯誤阻止代碼繼續執行,而是即便發生錯誤也會保證重啓一個新的 Logger。若是是執行到 a 語句這種狀況,有多是該日誌文件已經被別的程序刪除或者其餘緣由致使文件不存在,但大可沒必要由於一個日誌文件的丟失而阻止了新 Logger 的啓動,簡而言之,這個錯誤是能夠忽略的。日誌

// DoRotate means it need to write file in new file.
// new file name like xx.2013-01-01.log (daily) or xx.001.log (by line or size)
func (w *fileLogWriter) doRotate(logTime time.Time) error {
    // file exists
    // Find the next available number
    num := 1
    fName := ""
    rotatePerm, err := strconv.ParseInt(w.RotatePerm, 8, 64)
    if err != nil {
        return err
    }

    _, err = os.Lstat(w.Filename)
    if err != nil {
        //even if the file is not exist or other ,we should RESTART the logger
        goto RESTART_LOGGER // <------- a 語句
    }

    if w.MaxLines > 0 || w.MaxSize > 0 {
        for ; err == nil && num <= 999; num++ {
            fName = w.fileNameOnly + fmt.Sprintf(".%s.%03d%s", 
                    logTime.Format("2006-01-02"), num, w.suffix)
            _, err = os.Lstat(fName)
        }
    } else {
        fName = fmt.Sprintf("%s.%s%s", w.fileNameOnly, 
                w.dailyOpenTime.Format("2006-01-02"), w.suffix)
        _, err = os.Lstat(fName)
        for ; err == nil && num <= 999; num++ {
            fName = w.fileNameOnly + fmt.Sprintf(".%s.%03d%s", 
                    w.dailyOpenTime.Format("2006-01-02"), num, w.suffix)
            _, err = os.Lstat(fName)
        }
    }
    // return error if the last file checked still existed
    if err == nil { 
        return fmt.Errorf(
        "Rotate: Cannot find free log number to rename %s", w.Filename)
    }

    // close fileWriter before rename
    w.fileWriter.Close()

    // Rename the file to its new found name
    // even if occurs error,we MUST guarantee to  restart new logger
    err = os.Rename(w.Filename, fName)
    if err != nil {
        goto RESTART_LOGGER  // <------- b 語句 
    }

    err = os.Chmod(fName, os.FileMode(rotatePerm))

RESTART_LOGGER:   // <-------

    startLoggerErr := w.startLogger()
    go w.deleteOldLog()

    if startLoggerErr != nil {
        return fmt.Errorf("Rotate StartLogger: %s", startLoggerErr)
    }
    if err != nil {
        return fmt.Errorf("Rotate: %s", err)
    }
    return nil
}

涉及到 sync.WaitGroup 的使用

a 語句處,開啓 goroutine 前計數器加一,執行完該 goroutine 後計數器減一,即 b 語句。code

// Async set the log to asynchronous and start the goroutine
func (bl *BeeLogger) Async(msgLen ...int64) *BeeLogger {
    ···
    bl.wg.Add(1) // <----- a 語句
    go bl.startLogger()
    return bl
}

// start logger chan reading.
// when chan is not empty, write logs.
func (bl *BeeLogger) startLogger() {
    gameOver := false
    for {
        select {
        case bm := <-bl.msgChan:
            bl.writeToLoggers(bm.when, bm.msg, bm.level)
            logMsgPool.Put(bm)
        case sg := <-bl.signalChan:
            // Now should only send "flush" or "close" to bl.signalChan
            bl.flush()
            if sg == "close" {
                for _, l := range bl.outputs {
                    l.Destroy()
                }
                bl.outputs = nil
                gameOver = true
            }
            bl.wg.Done()  // <------ b 語句
        }
        if gameOver {
            break
        }
    }
}

分析併發執行下面 Flush() 方法的狀況。假設有 A , B , C 三個 goroutine,而且假設 A 先執行到 e 語句,從
a 語句知道初始計數器爲 1 ,因此 e 語句必須等到上述 startLogger-goroutine 執行 b 語句完畢後才中止阻塞。然後 A 再讓計數器加一。由於 bl.signalChan 的緩存大小爲1,因此 B,C 阻塞在 d 語句,等到 B,C 其中之一能執行 e 語句的時候計數器必然大於0,纔不會致使永久阻塞。因此 f 語句要放在 e 語句以後。

// Flush flush all chan data.
func (bl *BeeLogger) Flush() {
    if bl.asynchronous {
        bl.signalChan <- "flush"  // <------ d 語句
        bl.wg.Wait()  // <------ e 語句
        bl.wg.Add(1)  // <------ f 語句
        return
    }
    bl.flush()
}

所以再看下面的 Close() 方法,它是不能併發執行的,會致使 "panic: close of closed channel"錯誤。不過筆者暫時沒懂爲何 beego logs 不把這裏作一下改進,讓 Close() 也支持併發調用很差嗎?

// Close close logger, flush all chan data and destroy all adapters in BeeLogger.
func (bl *BeeLogger) Close() {
    if bl.asynchronous {
        bl.signalChan <- "close"
        bl.wg.Wait()   // <------ g 語句
        close(bl.msgChan)
    } else {
        bl.flush()
        for _, l := range bl.outputs {
            l.Destroy()
        }
        bl.outputs = nil
    }
    close(bl.signalChan)
}
相關文章
相關標籤/搜索