歡迎你們前往騰訊雲+社區,獲取更多騰訊海量技術實踐乾貨哦~編程
這裏有一些技巧來處理日誌文件提取。假設咱們正在查看一些Enterprise Splunk提取。咱們能夠用Splunk來探索數據。或者咱們能夠獲得一個簡單的提取並在Python中擺弄這些數據。微信
在Python中運行不一樣的實驗彷佛比試圖在Splunk中進行這種探索性的操做更有效。主要是由於咱們能夠無所限制地對數據作任何事。咱們能夠在一個地方建立很是複雜的統計模型。app
理論上,咱們能夠在Splunk中作不少的探索。它有各類報告和分析功能。框架
可是,使用Splunk須要假設咱們知道咱們正在尋找什麼。在不少狀況下,咱們不知道咱們在尋找什麼:咱們正在探索。可能會有一些跡象代表,一些RESTful API處理速度很慢,但還不止於此。咱們如何繼續?函數式編程
第一步是獲取CSV格式的原始數據。怎麼辦?函數
咱們將首先用一些附加函數來包裝一個CSV.DictReader對象。單元測試
面向對象的純粹主義者會反對這個策略。 「爲何不擴展DictReader?」他們問。我沒有一個很好的答案。我傾向於函數式編程和組件的正交性。對於一個純粹的面向對象的方法,咱們不得不使用更復雜的混合來實現這一點。學習
咱們處理日誌的通常框架是這樣的。測試
with open("somefile.csv") as source: rdr = csv.DictReader(source)
這使咱們能夠讀取CSV格式的Splunk提取物。咱們能夠迭代閱讀器中的行。這是訣竅#1。這不是很是棘手,但我喜歡它。ui
with open("somefile.csv") as source: rdr = csv.DictReader(source) for row in rdr: print( "{host} {ResponseTime} {source}{Service}".format_map(row) )
咱們能夠 - 在必定程度上 - 以有用的格式報告原始數據。若是咱們想粉飾一下輸出,咱們能夠改變格式字符串。那就多是「{主機:30s} {回覆時間:8s} {來源:s}」或相似的東西。
常見的狀況是咱們提取了太多,但其實只須要看一個子集。咱們能夠更改Splunk過濾器,可是,在完成咱們的探索以前,過量使用過濾器使人討厭。在Python中過濾要容易得多。一旦咱們瞭解到須要什麼,就能夠在Splunk中完成。
with open("somefile.csv") as source: rdr = csv.DictReader(source) rdr_perf_log = (row for row in rdr if row['source'] == 'perf_log') for row in rdr_perf_log: print( "{host} {ResponseTime} {Service}".format_map(row) )
咱們已經加入了一個生成器表達式來過濾源行,可以處理一個有意義的子集。
在某些狀況下,咱們會添加額外的源數據列,這些列咱們並不想使用。因此將經過對每一行進行投影來消除這些數據。
原則上,Splunk從不產生空列。可是,RESTful API日誌可能會致使數據集中包含大量列標題,這些列標題是基於請求URI一部分的代理鍵。這些列將包含來自使用該代理鍵的一個請求的一行數據。對於其餘行,在這一列中沒有任何用處。因此要刪除這些空列。
咱們也能夠用一個生成器表達式來作到這一點,可是它會變得有點長。生成器函數更容易閱讀。
def project(reader): for row in reader: yield {k:v for k,v in row.items() if v}
咱們已經從原始閱讀器中的一部分項目構建了一個新的行字典。咱們可使用它來包裝咱們的過濾器的輸出。
with open("somefile.csv") as source: rdr = csv.DictReader(source) rdr_perf_log = (row for row in rdr if row['source'] == 'perf_log') for row in project(rdr_perf_log): print( "{host} {ResponseTime} {Service}".format_map(row) )
這將減小在for語句內部可見的未使用的列。
row['source']符號會變得比較笨重。使用types.SimpleNamespace比用字典更好。這使得咱們可使用row.source。
這是一個很酷的技巧來創造更有用的東西。
rdr_ns= (types.SimpleNamespace(**row) forrowinreader)
咱們能夠將其摺疊成這樣的步驟序列。
with open("somefile.csv") as source: rdr = csv.DictReader(source) rdr_perf_log = (row for row in rdr if row['source'] == 'perf_log') rdr_proj = project(rdr_perf_log) rdr_ns = (types.SimpleNamespace(**row) for row in rdr_proj) for row in rdr_ns: print( "{host} {ResponseTime} {Service}".format_map(vars(row)) )
請注意咱們對format_map()方法的小改動。從SimpleNamespace的屬性中,咱們添加了vars()函數來提取字典 。
咱們能夠用其餘函數把它寫成一個函數來保留句法對稱性。
def ns_reader(reader): return (types.SimpleNamespace(**row) for row in reader)
的確,咱們能夠把它寫成一個像函數同樣使用的lambda結構
ns_reader = lambda reader: (types.SimpleNamespace(**row) for row in reader)
雖然ns_reader()函數和ns_reader()lambda的使用方式相同,但爲lambda編寫文檔字符串和doctest單元測試稍微困難一些。出於這個緣由,應該避免使用lambda結構。
咱們可使用map(lambda row:types.SimpleNamespace(** row),reader)。有些人喜歡這個發生器表達式。
咱們能夠用一個適當的for語句和一個內部的yield語句,可是從一個小的東西里寫大的語句彷佛沒有什麼好處。
咱們有不少選擇,由於Python提供瞭如此多的函數式編程功能。雖然咱們不會常常把Python視做一種功能性語言。但咱們有多種方法來處理簡單的映射。
咱們常常會有一個很是明顯的數據轉換列表。此外,咱們將有一個衍生的數據項目愈來愈多的列表。衍生項目將是動態的,並基於咱們正在測試的不一樣假設。每當咱們有一個實驗或問題,咱們可能會改變派生的數據。
這些步驟中的每個:過濾,投影,轉換和派生都是map-reduce管道的「map」部分的階段。咱們能夠建立一些較小的函數,並將其應用於map()。由於咱們正在更新一個有狀態的對象,因此咱們不能使用通常的map()函數。若是咱們想實現一個更純粹的函數式編程風格,咱們將使用一個不可變的namedtuple而不是一個可變的SimpleNamespace。
def convert(reader): for row in reader: row._time = datetime.datetime.strptime(row.Time, "%Y-%m-%dT%H:%M:%S.%F%Z") row.response_time = float(row.ResponseTime) yield row
在咱們探索的過程當中,咱們將調整這個轉換函數的主體。也許咱們將從一些最小的轉換和派生開始。咱們將用一些「這些是正確的?」的問題來繼續探索。當咱們發現不工做時,咱們會從中取出一些。
咱們的總體處理過程以下所示:
with open("somefile.csv") as source: rdr = csv.DictReader(source) rdr_perf_log = (row for row in rdr if row['source'] == 'perf_log') rdr_proj = project(rdr_perf_log) rdr_ns = (types.SimpleNamespace(**row) for row in rdr_proj) rdr_converted = convert(rdr_ns) for row in rdr_converted: row.start_time = row._time - datetime.timedelta(seconds=row.response_time) row.service = some_mapping(row.Service) print( "{host:30s} {start_time:%H:%M:%S} {response_time:6.3f} {service}".format_map(vars(row)) )
請注意語句主體的變化。convert()函數產生咱們肯定的值。咱們已經在for循環中添加了一些額外的變量,咱們不能100%肯定。在更新convert()函數以前,咱們會看看它們是否有用(甚至是正確的)。
在減量方面,咱們能夠採起稍微不一樣的加工方式。咱們須要重構咱們以前的例子,並把它變成一個生成器函數。
def converted_log(some_file): with open(some_file) as source: rdr = csv.DictReader(source) rdr_perf_log = (row for row in rdr if row['source'] == 'perf_log') rdr_proj = project(rdr_perf_log) rdr_ns = (types.SimpleNamespace(**row) for row in rdr_proj) rdr_converted = convert(rdr_ns) for row in rdr_converted: row.start_time = row._time - datetime.timedelta(seconds=row.response_time) row.service = some_mapping(row.Service) yield row
接着用一個yield代替了print()。
這是重構的另外一部分。
for row in converted_log("somefile.csv"): print( "{host:30s} {start_time:%H:%M:%S} {response_time:6.3f} {service}".format_map(vars(row)) )
理想狀況下,咱們全部的編程都是這樣的。咱們使用生成器函數來生成數據。數據的最終顯示保持徹底分離。這使咱們能夠更自由地重構和改變處理。
如今咱們能夠作一些事情,例如將行收集到Counter()對象中,或者可能計算一些統計信息。咱們可使用defaultdict(list)按服務對行進行分組。
by_service= defaultdict(list) for row in converted_log("somefile.csv"): by_service[row.service] = row.response_time for svc in sorted(by_service): m = statistics.mean( by_service[svc] ) print( "{svc:15s} {m:.2f}".format_map(vars()) )
咱們決定在這裏建立具體的列表對象。咱們可使用itertools按服務分組響應時間。它看起來像是正確的函數式編程,可是這種實施在Pythonic函數式編程形式中指出了一些限制。要麼咱們必須對數據進行排序(建立列表對象),要麼在分組數據時建立列表。爲了作好幾個不一樣的統計,經過建立具體的列表來分組數據一般更容易。
咱們如今正在作兩件事情,而不是簡單地打印行對象。
建立一些局部變量,如svc和m。咱們能夠很容易地添加變化或其餘措施。
使用沒有參數的vars()函數,它會從局部變量中建立一個字典。
這個使用vars()而沒有參數的行爲就像locals()同樣是一個方便的技巧。它容許咱們簡單地建立咱們想要的任何局部變量,並將它們包含在格式化輸出中。咱們能夠侵入咱們認爲可能相關的各類統計方法中。
既然咱們的基本處理循環是針對converted_log(「somefile.csv」)中的行,咱們能夠經過一個小小的,易於修改的腳本探索不少處理選擇。咱們能夠探索一些假設來肯定爲何某些RESTful API處理速度慢,而其餘處理速度則很快。
問答
如何在Python中分析內存使用狀況?
相關閱讀
基於Python實現的微信好友數據分析
Python數據分析和數據挖掘學習路線圖
一文入門Python數據分析庫Pandas
此文已由做者受權騰訊雲+社區發佈,原文連接:https://cloud.tencent.com/dev...