此文已由做者楊望暑受權網易雲社區發佈。
html
歡迎訪問網易雲社區,瞭解更多網易技術產品運營經驗。java
在服務端查看log會常常使用到tail -f命令實時跟蹤文件變化. 那麼問題來了, 若是本身寫一個一樣功能的, 該何處寫起呢? 若是你用過ELK裏的beats/filebeat的話, 應該知道filebeat作的事情就是監控日誌變化, 並把最新數據,按照自定義配置處理後, 發送給ElasticSearch/kafka/... 對, 本文就是想介紹如何本身實現一個簡易版filebeat, 只要日誌內容發生變化(append new line), 能觸發一個消息, 實現對這一行數據的預處理, 打印, 接入kafka等動做, 還有一個功能是, 當這個工具重啓後, 依然能從上次讀取的位置開始讀.linux
Golang IDEAgit
從流程圖中能夠看出, 咱們須要解決下面幾個問題
github
記錄上一次程序關閉前,文件讀取位置,下次程序啓動時候加載這個位置信息.redis
文件定位並按行讀取, 併發布讀取的行數據庫
監測文件內容變化,併發出通知編程
這個問題關鍵應該是何時記錄上次讀取的offset.
json
讀取併發布後記錄 若是發佈後,作記錄前,程序掛了,那麼重啓程序後,這行數據會從新被讀一次.bash
讀取後立刻記錄,記錄成功後,纔對外發布. 這樣會產生另外一個問題, 發佈前程序掛了, 重啓後, 那條未必發送的消息,外部是拿不到了.
若是沒理解錯, elastic的filebeat選的就是第一種,且沒作相應的異常處理, 他是設置一個channel池, 接收並異步寫入位置信息, 若是寫入失敗, 則打印一條error日誌就繼續走了
logp.Err("Writing of registry returned error: %v. Continuing...", err)複製代碼
要讀取一個文件, 首先要有一個reader
func (tail *Tailf) openReader() {
tail.file, _ = os.Open(tail.FileName)
tail.reader = bufio.NewReader(tail.file)
}複製代碼
對於從文件位置(offset)=0處開始讀一行, 這沒什麼問題, 直接用下面這個方法就能夠了.
func (tail *Tailf) readLine() (string, error) {
line, err := tail.reader.ReadString('\n') if err != nil { return line, err
}
line = strings.TrimRight(line, "\n") return line, err
}複製代碼
可是, 對於文件內容增長了, 可是還沒到一行,也就是沒出現\n 卻出現了EOF(end of file), 那這個狀況下, 咱們是要等待的,offset必須保持在這一行的行頭.
func (tail *Tailf) getOffset() (offset int64, err error) {
offset, err = tail.file.Seek(0, os.SEEK_CUR)
offset -= int64(tail.reader.Buffered()) return}func (tail *Tailf) beginWatch() {
tail.openReader() var offset int64
for { //取上一次讀取位置(行頭)
offset, _ = tail.getOffset()
line, err := tail.readLine() if err == nil {
tail.publishLine(line)
} else if err == io.EOF { //讀到了EOF, offset設置回到行頭
tail.seekTo(Seek{offset: offset, whence: 0}) //block and wait for changes
tail.waitChangeEvent()
} else {
fmt.Println(err) return
}
}
}func (tail *Tailf) seekTo(pos Seek) error {
tail.file.Seek(pos.offset, pos.whence) //一旦改變了offset, 這個reader必須reset一下才能生效
tail.reader.Reset(tail.file) return nil}// 這裏是發佈一個消息, 由於是demo,因此只是簡單的往channel裏一扔func (tail *Tailf) publishLine(line string) {
tail.Lines <- line
}複製代碼
下面說說waitChangeEvent
監測文件內容增長的方式大致有2種
監測文件最後修改時間以及文件大小的變化,俗稱poll--輪詢
利用linux的inotify命令實現監測,他會在文件發生狀態改變後觸發事件
這裏採用第一種方式, filebeat也用的第一種. 咱們本身怎麼實現呢?
//currReadPos: 文件末尾的offset,也就是當前文件大小func (w *PollWatcher) ChangeEvent(currReadPos int64) (*ChangeEvent, error) {
watchingFile, err := os.Stat(w.FileName) if err != nil { return nil, err
}
changes := NewChangeEvent() //當前的大小
w.FileSize = currReadPos //以前的修改時間
previousModTime := watchingFile.ModTime() //輪詢
go func() {
previousSize := w.FileSize for {
time.Sleep(POLL_DURATION) //這裏省略不少代碼, 假設文件是存在的,且沒被重命名,刪除之類的狀況, 文件是像日誌文件同樣不斷append的
file, _ := os.Stat(w.FileName) // ... 省略一大段代碼
if previousSize > 0 && previousSize < w.FileSize { //文件肥了
changes.NotifyModified()
previousSize = w.FileSize continue
}
previousSize = w.FileSize // 處理 本來沒內容, 可是加入了內容, 因此要用修改時間
modTime := file.ModTime() if modTime != previousModTime {
previousModTime = modTime
changes.NotifyModified()
}
}
}() return changes, nil}複製代碼
這裏的changes.NotifyModified方法只是往下面實例裏Modified Channel 放入 ce.Modified <- true
type ChangeEvent struct {
Modified chan bool
Truncated chan bool
Deleted chan bool}複製代碼
也正是這個動做, 在主線程中, 就能收到文件被修改的通知, 從而繼續出發readLine動做
// 上面有個beginWatch方法代碼,結合這個代碼來看func (tail *Tailf) waitChangeEvent() error { // ... 省略初始化動做
select { //只測試文件內容增長
case <-tail.changes.Modified:
fmt.Println(">> find Modified") return nil
// ... 省略其餘
}
}複製代碼
有了這個一連串的代碼後, 咱們就能在main裏監視文件變化了
func main() {
t, _ := tailf.NewTailf("/Users/yws/Desktop/test.log") for line := range t.Lines { //這裏會block住,有新行到來,就會輸出新行
fmt.Println(line)
}
}複製代碼
這個擴展點, 和filebeat同樣.
在讀取時候, 不必定是按行讀取,能夠讀多行,json解析等
發佈時候, 本文例子是直接寫console, 其實能夠接kafka, redis, 數據庫等
.... 想不出來了
雖然是一個很簡單的功能, 現代主流服務端編程語言基本都能實現, 但爲何用go來實現呢? 一大堆優勢和缺點就不列了..這不是軟文. 談談go初學者的見解
代碼很簡潔, 雖然不支持不少高級語言特性, 但看起來依然那麼爽, 除了那些過渡包裝的struct以及怪異的取名.
寫併發(goroutine)是那麼的簡單,那麼的優雅,但也很容易被我這樣的菜鳥濫用, 這語言debug目前有點肉痛
goroutine通訊也是那麼的簡單, channel設計的很棒, 用着很爽
不爽的地方, 多返回值的問題, 寫慣了java的xinstance.method(yInstance.method()), 當yInstance.method()是多返回值的時候,必須拆分紅2行或更多, 每次編譯器報錯時候就想砸鍵盤.
github.com/elastic/bea… filebeat只是其中一個feature
github.com/hpcloud/tai… 寫到一半發現原來別人也幹過同樣的事了, 代碼基本大同小異, 有興趣的能夠看他的代碼, 寫的更完善.
網易雲免費體驗館,0成本體驗20+款雲產品!
更多網易技術、產品、運營經驗分享請點擊。
相關文章:
【推薦】 Spark——爲數據分析處理提供更爲靈活的賦能