python10min系列之面試題解析:python實現tail -f功能

同步發佈在github上,跪求starpython

這篇文章最初是由於reboot的羣裏,有人去面試,筆試題有這個題,不知道怎麼作,什麼思路,就發羣裏你們討論git

我想了一下,簡單說一下個人想法吧,固然,也有很好用的pyinotify模塊專門監聽文件變化,不過我更想介紹的,是解決的思路,畢竟做爲面試官,仍是想看到一下解決問題的思路,並且我以爲這一題的難點不在於監控文件增量,而在於怎麼打印最後面10行github

但願你們讀這篇文章前,對python基礎、處理文件和經常使用模塊有一個簡單的瞭解,知道下面幾個名詞是啥面試

open('a.txt')
file.seek
file.tell
time.sleep()

下面思路限於我我的知識,免不了有錯誤和考慮不周的,但願你們有更好的方法提出來,我隨時優化代碼,題目的感受沒啥太多的坑,下面讓天真爛漫的蝸牛教你們用python的思路瀏覽器

怎麼用python實現

其實思路也不難啦函數

  • 打開這個文件,指針移到最後
  • 每隔一秒就嘗試readline一下,有內容就打印出來,沒內容就sleep
  • 大概就是這麼個意思

監聽文件

思路以下:性能

  • 用open打開文件
  • 用seek文件指針,給大爺把跳到文件最後面
  • while True進行循環
    • 持續不停的readline,若是能讀到內容,打印出來便可

代碼呼之欲出優化

with open('test.txt') as f:
    f.seek(0,2)
    while True:
        last_pos = f.tell()
        line = f.readline()
        if line:
            print line

效果圖以下指針

代碼說明日誌

  • seek第二個參數2,意思就是從文件結尾處開始seek,更標準的寫法使用os模塊下面的SEEK_END,可讀性更好
  • 只寫出了簡單的邏輯,代碼簡單粗暴,若是這個題目是10分的話,最多也就拿4分吧,不能再多了

優化點

  • print有缺陷,每次都是新的一行,換成sys.stdout.write(line)更和諧
  • 文件名傳參,不能寫死
  • 直接打印能夠當成默認行爲,具體要作什麼,能夠寫成函數處理,這樣咱們就能夠把新行作其餘處理,好比展現在瀏覽器裏
  • 加上容錯處理,好比文件不存在會報錯
  • while True一直都文件,比較耗性能,每讀一次,間隔一秒比較靠譜
    • 調用time.sleep(1)
  • 用類來組織代碼

實例代碼以下

# coding=utf-8
import sys
import time

class Tail():
    def __init__(self,file_name,callback=sys.stdout.write):
        self.file_name = file_name
        self.callback = callback
    def follow(self):

        try:
            with open(self.file_name) as f:
                f.seek(0,2)
                while True:
                    line = f.readline()
                    if line:
                        self.callback(line)
                    time.sleep(1)
        except Exception,e:
            print '打開文件失敗,囧,看看文件是否是不存在,或者權限有問題'
            print e

使用方法:

# 使用默認的sys.stdout.write打印到屏幕
py_tail = Tail('test.txt')
py_tail.follow()

# 本身定義處理函數

def test_tail(line):
    print 'xx'+line+'xx'

py_tail1 = Tail('test.txt', test_tail)
py_tail1.follow()

咦 等等,tail -f 默認還會打印最後10行,這個好像纔是這個題目的難點所在,衆所周知,python裏讀文件指針,只能移動到固定位置,不能判斷是哪一行,囧,先實現簡單的,逐漸增強吧

默認打印最後10行

如今這個代碼,大概能拿6分啦,咱們還有一個功能沒作,那就是打印最後n行,默認是10行,如今把這個功能加上,加一個函數就行啦

文件小的時候

咱們知道,readlines能夠獲取全部內容,而且分行,代碼呼之欲出,獲取list最後10行很簡單有麼有,切片妥妥的

# 演示代碼,說明核心邏輯,完整代碼在下面
last_lines = f.readlines()[-10:]
for line in last_lines:
    self.callback(line)

此時代碼變成這樣了

import sys
import time

class Tail():
    def __init__(self,file_name,callback=sys.stdout.write):
        self.file_name = file_name
        self.callback = callback
    def follow(self,n=10):
        try:
            with open(self.file_name) as f:
                self._file = f
                self.showLastLine(n)
                self._file.seek(0,2)
                while True:
                    line = self._file.readline()
                    if line:
                        self.callback(line)
        except Exception,e:
            print '打開文件失敗,囧,看看文件是否是不存在,或者權限有問題'
            print e
    def showLastLine(self, n):
        last_lines = self._file.readlines()[-10:]
        for line in last_lines:
            self.callback(line)

更進一步:大的日誌怎麼辦

此時代碼有7分時很隨意啦,可是若是文件特別大呢,特別是日誌文件,很容易幾個G,咱們只須要最後幾行,所有讀出來內存受不了,因此咱們要繼續優化showLastLine函數,我以爲這纔是這題的難點所在

大概的思路以下

  • 我先估計日誌一行大概是100個字符,注意,我只是估計一個大概,多了也不要緊,咱們只是須要一個初始值,後面會修正

  • 我要讀十行,因此一開始就seek到離結尾1000的位置seek(-1000,2),把最後1000個字符讀出來,而後有一個判斷

  • 若是這1000個字符長度大於文件長度,不用管了 屬於級別1的狀況,直接read而後split

  • 若是1000個字符裏面的換行符大於10,說明1000個字符裏,大於10行,那也好辦,處理思路相似級別1,直接split取後面十個

  • 問題就在於 若是小於10怎麼處理
    • 好比1000個裏,只有四個換行符,說明一行大概是1000/4=250個字符,咱們還缺6行,因此咱們再讀250*5=1500,一共有2500的大概比較靠譜,而後在對2500個字符進行相同的邏輯判斷,直到讀出的字符裏,換行符大於10,讀取結束

邏輯清晰之後,代碼就呼之欲出啦

# coding=utf-8
import sys
import time

class Tail():
    def __init__(self,file_name,callback=sys.stdout.write):
        self.file_name = file_name
        self.callback = callback
    def follow(self,n=10):
        try:
            with open(self.file_name) as f:
                self._file = f
                self._file.seek(0,2)
                self.file_length = self._file.tell()
                self.showLastLine(n)
                while True:
                    line = self._file.readline()
                    if line:
                        self.callback(line)
                    time.sleep(1)
        except Exception,e:
            print '打開文件失敗,囧,看看文件是否是不存在,或者權限有問題'
            print e
    def showLastLine(self, n):
        len_line = 100
        read_len = len_line*n
        while True:
            if read_len>self.file_length:
                self._file.seek(0)
                last_lines = self._file.read().split('\n')[-n:]
                break
            self._file.seek(-read_len, 2)
            last_words = self._file.read(read_len)
            count = last_words.count('\n')          
            if count>=n:
                last_lines = last_words.split('\n')[-n:]
                break
            else:
                if count==0:
                    len_perline = read_len
                else:
                    len_perline = read_len/count
                read_len = len_perline * n
        for line in last_lines:
            self.callback(line+'\n')
if __name__ == '__main__':
    py_tail = Tail('test.txt')
    py_tail.follow()

加上註釋的版本

# coding=utf-8
import sys
import time

class Tail():
    def __init__(self,file_name,callback=sys.stdout.write):
        self.file_name = file_name
        self.callback = callback
    def follow(self,n=10):
        try:
            # 打開文件
            with open(self.file_name) as f:
                self._file = f
                self._file.seek(0,2)
                # 存儲文件的字符長度
                self.file_length = self._file.tell()
                # 打印最後10行
                self.showLastLine(n)
                # 持續讀文件 打印增量
                while True:
                    line = self._file.readline()
                    if line:
                        self.callback(line)
                    time.sleep(1)
        except Exception,e:
            print '打開文件失敗,囧,看看文件是否是不存在,或者權限有問題'
            print e
    def showLastLine(self, n):
        # 一行大概100個吧 這個數改爲1或者1000都行
        len_line = 100
        # n默認是10,也能夠follow的參數傳進來
        read_len = len_line*n
        # 用last_lines存儲最後要處理的內容
        while True:
            # 若是要讀取的1000個字符,大於以前存儲的文件長度
            # 讀完文件,直接break
            if read_len>self.file_length:
                self._file.seek(0)
                last_lines = self._file.read().split('\n')[-n:]
                break
            # 先讀1000個 而後判斷1000個字符裏換行符的數量
            self._file.seek(-read_len, 2)
            last_words = self._file.read(read_len)
            # count是換行符的數量
            count = last_words.count('\n')
            
            if count>=n:
                # 換行符數量大於10 很好處理,直接讀取
                last_lines = last_words.split('\n')[-n:]
                break
            # 換行符不夠10個
            else:
                # break
                #不夠十行
                # 若是一個換行符也沒有,那麼咱們就認爲一行大概是100個
                if count==0:

                    len_perline = read_len
                # 若是有4個換行符,咱們認爲每行大概有250個字符
                else:
                    len_perline = read_len/count
                # 要讀取的長度變爲2500,繼續從新判斷
                read_len = len_perline * n
        for line in last_lines:
            self.callback(line+'\n')
if __name__ == '__main__':
    py_tail = Tail('test.txt')
    py_tail.follow(20)

效果以下,終於能夠打印最後幾行了,你們能夠試一下,不管日誌每行只有1個,仍是每行有300個字符,都是能夠打印最後10行的

能作到這個地步,通常的面試官就直接被你搞定了,這個代碼大概8分吧,若是還想再進一步,還有一些能夠優化的地方,代碼放github上了,有興趣的能夠拿去研究

待優化:留給你們做爲擴展

  • 若是你tail -f的過程當中,日誌被備份清空或者切割了,怎麼處理
    • 其實很簡單,你維護一個指針位置,若是下次循環發現文件指針位置變了,從最新的指針位置開始讀就行
  • 具體每種錯誤的類型
    • 我這裏只是簡單的try 能夠寫個函數,把文件不存在,文件沒權限這些報錯信息分開
  • 性能小優化,好比我第一次讀了1000,只有4行,我大概估計每行250個字符,整體須要2500行,下次不必直接讀2500個,而是讀1500行和以前1000行拼起來便可

最後大殺器 若是寫出來這個,基本面試官會直接

import os
def tail(file_name):
    os.system('tail -f '+file_name)

tail('log.log')

打死你

以上就是我對這個題的想法,實際開發中想監控文件變化,其實仍是pyinotify好用,跪求你們star

相關文章
相關標籤/搜索