已經有兩個月沒有寫博客了,也有好幾個月沒有看go相關的內容了,因爲工做緣由最近在作java以及大數據相關的內容,致使最近工做較忙,博客中止了更新,正好想撿起以前go的東西,因此找了一個源碼學習java
這個也是以前用go寫日誌收集的時候用到的一個包 :github.com/hpcloud/tail, 此次就學習一下人家的源碼,爲了方便看這個代碼,我將這個包進行了簡化,也是用於方便理解,代碼放到了:https://github.com/pythonsite/tail, 這個代碼包可能沒法正經常使用,只是爲了方面理解tail這個包,以及學習人家的代碼python
│ tail.go │ └─watch filechanges.go inotify.go inotify_tracker.go watch.go
tail.go: 這裏包含着tail包的核心代碼,主要的邏輯處理時在這個裏面git
watch: 這個包主要用於對文件的監控,用於將文件的變化通知到tail.如:文件修改了,文件刪除了,文件內容追加了github
在tail.go中主要有幾下幾個結構體:學習
// Line 結構體用於存讀每行的時候的對象 type Line struct { Text string //當前行的內容 Time time.Time // 時間 Err error // Error from tail } type SeekInfo struct { Offset int64 Whence int } // 關於配置的結構體 type Config struct { Location *SeekInfo ReOpen bool MustExist bool // 要打開的文件是否必須存在 Poll bool Pipe bool Follow bool // 是否繼續讀取新的一行,能夠理解爲tail -f 命令 } // 核心的結構體Tail type Tail struct { Filename string // 要打開的文件名 Lines chan *Line // 用於存每行內容的Line結構體 Config watcher watch.FileWatcher changes *watch.FileChanges tomb.Tomb file *os.File reader *bufio.Reader lk sync.Mutex }
tail,err := tail.TailFile(conf.LogPath,tail.Config{ ReOpen:true, Follow:true, Location:&tail.SeekInfo{Offset:0,Whence:2}, MustExist:false, Poll:true, })
既然咱們使用的時候就會在最開始的時候調用tail.TailFile方法,就直接看這個方法:大數據
// 主要用於Tail結構體的初始化 func TailFile(filename string, config Config) (*Tail, error) { t := &Tail { Filename: filename, Lines: make(chan *Line), Config: config, } t.watcher = watch.NewInotifyFileWatcher(filename) if t.MustExist { var err error t.file, err = OpenFile(t.Filename) if err != nil { return nil, err } } go t.tailFileSync() return t, nil }
從這個代碼裏咱們就能夠看到它首先初始化了Tail結構體而且對Tail中的watcher進行的複製,先暫時不看watch相關的內容spa
而後就是關於文件是否必須存在的判斷處理,最後開啓了一個一個線程執行tailFileSync()方法,咱們接着看tailFileSync方法線程
func (tail *Tail) tailFileSync(){ defer tail.Done() defer tail.close() if !tail.MustExist { err := tail.reopen() if err != nil { if err != tomb.ErrDying { tail.Kill(err) } return } } tail.openReader() var offset int64 var err error // 一行行讀文件內容 for { if !tail.Pipe { offset,err = tail.Tell() if err != nil { tail.Kill(err) return } } line, err := tail.readLine() if err == nil { // 將讀取的一行內容放到chan中 tail.sendLine(line) } else if err == io.EOF { // 表示讀到文件的最後了 // 若是Follow 設置爲false的話就不會繼續讀文件 if !tail.Follow { if line != "" { tail.sendLine(line) } return } // 若是Follow設置爲True則會繼續讀 if tail.Follow && line != "" { err := tail.seekTo(SeekInfo{Offset: offset, Whence: 0}) if err != nil { tail.Kill(err) return } } // 若是讀到文件最後,文件並無新的內容增長 err := tail.waitForChanges() if err != nil { if err != ErrStop { tail.Kill(err) } return } } else { // 既不是文件結尾,也沒有error tail.Killf("error reading %s :%s", tail.Filename, err) return } select { case <- tail.Dying(): if tail.Err() == errStopAtEOF { continue } return default: } } }
這個方法裏主要是先調用了openReader方法,這個方法其實並無作什麼,只是對tail.reqader進行了賦值:tail.reader = bufio.NewReader(tail.file)指針
接着就是循環一行行的讀文件日誌
在循環裏最開始判斷了tail.Pipe的值,這個值通常開始我也並不會設置,因此默認就是false,因此就會執行tail.Tell()方法,這個方法主要是用於獲取文件當前行的位置信息,下面是Tell的代碼內容:
// 獲取文件當前行的位置信息 func (tail *Tail) Tell()(offset int64, err error) { if tail.file == nil { return } offset, err = tail.file.Seek(0, os.SEEK_CUR) if err != nil { return } tail.lk.Lock() defer tail.lk.Unlock() if tail.reader == nil { return } offset -= int64(tail.reader.Buffered()) return }
接着會調用tail.readLine()方法,這個方法就是用於獲取文件的一行內容,同時將一行內容實例化爲Line對象,而後扔到管道tail.Lines中
//將讀取的文件的每行內容存入到Line結構體中,並最終存入到tail.Lines的chan中 func (tail *Tail) sendLine(line string) bool { now := time.Now() lines := []string{line} for _, line := range lines { tail.Lines <- &Line { line, now, nil, } } return true }
最後的大量if 判斷其實主要是針對讀到文件末尾後的一些操做,
Tail結構體在最後定義的時候有一個參數:Follow, 這個參數的目的就是當讀到文件最後的時候是否繼續讀文件, 若是最開始設置了false,那麼讀到最後以後就不會在讀文件了
若是設置爲True,那麼讀到文件最後以後會保存文件的位置信息,並執行waitForChanges() 去等待文件的變化,waitForChanges()代碼內容以下:
// 等待文件的變化事件 func (tail *Tail) waitForChanges() error { if tail.changes == nil { // 這裏是獲取文件指針的當前位置 pos, err := tail.file.Seek(0,os.SEEK_CUR) if err != nil { return err } tail.changes, err = tail.watcher.ChangeEvents(&tail.Tomb, pos) if err != nil { return err } } // 和inotify中進行很巧妙的配合,這裏經過select 來進行查看那個chan變化了,來知道文件的變化 select { case <- tail.changes.Modified: // 文件被修改 return nil case <- tail.changes.Deleted: // 文件被刪除或者移動到其餘目錄 tail.changes = nil // 若是文件被刪除或者被移動到其餘目錄,則會嘗試從新打開文件 if tail.ReOpen { fmt.Printf("Re-opening moved/deleted file %s...",tail.Filename) if err := tail.reopen();err != nil { return err } fmt.Printf("Successfully reopened %s", tail.Filename) tail.openReader() return nil } else { fmt.Printf("Stoping tail as file not longer exists: %s", tail.Filename) return ErrStop } case <- tail.changes.Truncated: // 文件被追加新的內容 fmt.Printf("Re-opening truncated file %s....", tail.Filename) if err := tail.reopen();err != nil { return err } fmt.Printf("SuccessFuly reopend truncated %s", tail.Filename) tail.openReader() return nil case <- tail.Dying(): return nil } panic("unreachable") }
看到這裏的時候其實就能感受到,別人寫的代碼其實也並非很是複雜,也是很普通的代碼,可是你會以爲人家不少地方用的很是巧妙,
這段代碼中主要的是的內容就是select部分,這個部分經過select監控
從而知道文件的變化,是修改了,仍是刪除了,仍是追加內容了,這幾個其實都是一個channel,這幾個channel中的內容是怎麼放進去的呢,接下來看watch包中的內容
首先先看一下watch包中的watch.go,這個裏面其實就是定一個了一個FileWatcher的接口
type FileWatcher interface { BlockUntilExists(*tomb.Tomb) error ChangeEvents(*tomb.Tomb, int64) (*FileChanges, error) }
接着咱們看一下inotify.go文件,這個裏面咱們就能夠看到定一個InotifyFileWatcher結構體,而且實現了FileWatcher 這個接口
type InotifyFileWatcher struct { Filename string Size int64 } func NewInotifyFileWatcher(filename string) *InotifyFileWatcher { fw := &InotifyFileWatcher { filepath.Clean(filename), 0, } return fw } // 關於文件改變事件的處理,當文件被修改了或者文件內容被追加了,進行通知 func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) (*FileChanges, error) { err := Watch(fw.Filename) if err != nil { return nil, err } changes := NewFileChanges() fw.Size = pos go func() { events := Events(fw.Filename) for { prevSize := fw.Size var evt fsnotify.Event var ok bool select { case evt, ok = <- events: if !ok { RemoveWatch(fw.Filename) return } case <- t.Dying(): RemoveWatch(fw.Filename) return } switch { case evt.Op & fsnotify.Remove == fsnotify.Remove: fallthrough case evt.Op & fsnotify.Rename == fsnotify.Rename: RemoveWatch(fw.Filename) changes.NotifyDeleted() return case evt.Op & fsnotify.Chmod == fsnotify.Chmod: fallthrough case evt.Op & fsnotify.Write == fsnotify.Write: fi, err := os.Stat(fw.Filename) if err != nil { // 文件若是被刪除了通知文件刪除到chan if os.IsNotExist(err) { RemoveWatch(fw.Filename) changes.NotifyDeleted() return } } fw.Size = fi.Size() if prevSize > 0 && prevSize > fw.Size { // 表示文件內容增長了 changes.NotifyTruncated() } else { // 表示文件被修改了 changes.NotifyModified() } prevSize = fw.Size } } }() return changes, nil } func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error { err := WatchCreate(fw.Filename) if err != nil { return err } defer RemoveWatchCreate(fw.Filename) if _, err := os.Stat(fw.Filename);!os.IsNotExist(err) { return err } events := Events(fw.Filename) for { select { case evt, ok := <- events: if !ok { return fmt.Errorf("inotify watcher has been closed") } evtName, err := filepath.Abs(evt.Name) if err != nil { return err } fwFilename, err := filepath.Abs(fw.Filename) if err != nil { return err } if evtName == fwFilename { return nil } case <- t.Dying(): return tomb.ErrDying } } panic("unreachable") }
實現的接口就兩個方法:
type FileChanges struct { Modified chan bool // 修改 Truncated chan bool // 增長 Deleted chan bool // 刪除 } func NewFileChanges() *FileChanges { return &FileChanges{ make(chan bool, 1), make(chan bool, 1), make(chan bool, 1), } } func (fc *FileChanges) NotifyModified() { sendOnlyIfEmpty(fc.Modified) } func (fc *FileChanges) NotifyTruncated() { sendOnlyIfEmpty(fc.Truncated) } func (fc *FileChanges) NotifyDeleted() { sendOnlyIfEmpty(fc.Deleted) } func sendOnlyIfEmpty(ch chan bool) { select { case ch <- true: default: } }
在這個裏面也是能夠學習到人家寫的這個地方很是巧妙,雖然談不上代碼高達上,可是看着會讓你很舒服,經過這個結構體,當文件被刪除,修改和增長的時候就會讓對應的channel中插入一個true,而且這裏
的channel都是不帶緩衝區的,只有當tail中觸發一次以後,channel中的內容就會被獲取出來,從而觸發tail繼續讀文件的內容