摘要: 1. 本文背景 不少行業的信息系統中,例如金融行業的信息系統,至關多的數據交互工做是經過傳統的文本文件進行交互的。此外,不少系統的業務日誌和系統日誌因爲各類緣由並無進入ELK之類的日誌分析系統,也是以文本文件的形式存在的。java
不少行業的信息系統中,例如金融行業的信息系統,至關多的數據交互工做是經過傳統的文本文件進行交互的。此外,不少系統的業務日誌和系統日誌因爲各類緣由並無進入ELK之類的日誌分析系統,也是以文本文件的形式存在的。隨着數據量的指數級增加,對超大文本文件的分析愈來愈成爲挑戰。好在阿里雲的MaxCompute產品從2.0版本開始正式支持了直接讀取並分析存儲在OSS上的文本文件,能夠用結構化查詢的方式去分析非結構化的數據。正則表達式
本文對使用MaxCompute分析OSS文本數據的實踐過程當中遇到的一些問題和優化經驗進行了總結。做爲前提,讀者須要詳細瞭解MaxCompute讀取OSS文本數據的一些基礎知識,對這篇官方文檔 《訪問 OSS 非結構化數據》最好有過實踐經驗。本文所描述的內容主要是針對這個文檔中提到的自定義Extractor作出的一些適配和優化。sql
2.1 場景一:分析zip壓縮後的文本文件
場景說明
不少時候咱們會對歷史的文本數據進行壓縮,而後上傳到OSS上進行歸檔,那麼若是要對這部分數據導入MaxCompute進行離線分析,咱們能夠自定義Extractor讓MaxCompute直接讀取OSS上的歸檔文件,避免了把歸檔文件下載到本地、解壓縮、再上傳回OSS這樣冗長的鏈路。app
實現思路
如 《訪問 OSS 非結構化數據》文檔中所述,MaxCompute讀取OSS上的文本數據本質上是讀取一個InputStream流,那麼咱們只要構造出適當的歸檔字節流,就能夠直接獲取這個InputStream中的數據了。ide
以Zip格式的歸檔文件爲例,咱們能夠參考 DataX 中關於讀取OSS上Zip文件的源碼,構造一個Zip格式的InputStream,代碼見 ZipCycleInputStream.java 。構造出這個Zip格式的InputStream後,在自定義Extractor中獲取文件流的部分就能夠直接使用了,例如:優化
private BufferedReader moveToNextStream() throws IOException { SourceInputStream stream = inputs.next(); // ...... ZipCycleInputStream zipCycleInputStream = new ZipCycleInputStream(stream); return new BufferedReader(new InputStreamReader(zipCycleInputStream, "UTF-8"), 8192); // ...... }
優化經驗
你們可能知道,MaxCompute中進行批量計算的時候,能夠經過設置 odps.stage.mapper.split.size 這個參數來調整數據分片的大小,從而影響到執行計算任務的Mapper的個數,在必定程度上提升Mapper的個數能夠增長計算的並行度,進而提升計算效率 (但也不是說Mapper個數越多越好,由於這樣可能會形成較長時間的資源等待,或者可能會形成長尾的後續Reducer任務,反而下降總體的計算效率) 。this
一樣道理,對OSS上的文本文件進行解析的時候,也能夠經過設置 odps.sql.unstructured.data.split.size 這個參數來達到調整Mapper個數的目的 (注意這個參數可能須要提工單開通使用權限):阿里雲
set odps.sql.unstructured.data.split.size=16;
上述設定的含義是,將OSS上的文件拆分爲若干個16M左右大小的分片,讓MaxCompute盡力作到每一個分片啓動一個Mapper任務進行計算——之因此說是「盡力作到」,是由於MaxCompute默認不會對單個文件進行拆分及分片處理(除非設定了其餘參數,咱們後面會講到),也就是說,若是把單個分片按照上面的設定爲16M,而OSS上某個文件大小假設爲32M,則MaxCompute仍然會把這個文件總體(即32M)的數據量做爲一個分片進行Mapper任務計算。調試
注意點
咱們在這個場景中處理的是壓縮後的文件,而InputStream處理的字節量大小是不會因壓縮而變小的。舉個例子,假設壓縮比爲1:10,則上述這個32M的壓縮文件實際表明了320M的數據量,即MaxCompute會把1個Mapper任務分配給這320M的數據量進行處理;同理假設壓縮比爲1:20,則MaxCompute會把1個Mapper任務分配給640M的數據量進行處理,這樣就會較大的影響計算效率。所以,咱們須要根據實際狀況調整分片參數的大小,並儘可能把OSS上的壓縮文件大小控制在一個比較小的範圍內,從而能夠靈活配置分片參數,不然分片參數的值會由於文件太大而且文件不會被拆分而失效。日誌
2.2 場景二:過濾文本文件中的特定行
場景說明
對於一些業務數據文件,特別是金融行業的數據交換文件,一般會有文件頭或文件尾的設定要求,即文件頭部的若干行數據是一些元數據信息,真正要分析的業務數據須要把這些元信息的行過濾掉,只分析業務數據部分的行,不然執行結構化查詢的SQL語句的時候必然會形成任務失敗。
實現思路
在 《訪問 OSS 非結構化數據》文檔中提到的 代碼示例 中,對 readNextLine() 方法進行一些改造,對讀取的每個文件,即每一個 currentReader 讀取下一行的時候,記錄下來當前處理的行數,用這個行數判斷是否到達了業務數據行,若是未到業務數據行,則繼續讀取下一條記錄,若是已經到達數據行,則將該行內容返回處理;而當跳轉到下一個文件的時候,將 該行數值重置。
代碼示例:
private String readNextLine() throws IOException { if (firstRead) { firstRead = false; currentReader = moveToNextStream(); if (currentReader == null) { return null; } } // 讀取行級數據 while (currentReader != null) { String line = currentReader.readLine(); if (line != null) { if (currentLine < dataLineStart) { // 若當前行小於數據起始行,則繼續讀取下一條記錄 currentLine++; continue; } if (!"EOF".equals(line)) { // 若未到達文件尾則將該行內容返回,若到達文件尾則直接跳到下個文件 return line; } } currentReader = moveToNextStream(); currentLine = 1; } return null; }
此處 dataLineStart 表示業務數據的起始行,能夠經過 DataAttributes 在創建外部表的時候從外部做爲參數傳入。固然也能夠隨便定義其餘邏輯來過濾掉特定行,好比本例中的對文件尾的「EOF」行進行了簡單的丟棄處理。
2.3 場景三:忽略文本中的空行
場景說明
在 《訪問 OSS 非結構化數據》文檔中提到的 代碼示例 中,已能夠應對大多數場景下的文本數據處理,但有時候在業務數據文本中會存在一些空行,這些空行可能會形成程序的誤判,所以咱們須要忽略掉這些空行,讓程序繼續分析處理後面有內容的行。
實現思路
相似於上述 場景二 ,只須要判斷爲空行後,讓程序繼續讀取下一行文本便可。
代碼示例:
public Record extract() throws IOException {
String line = readNextLine(); if (line == null) { return null;// 返回null標誌已經讀取完成 } while ("".equals(line.trim()) || line.length() == 0 || line.charAt(0) == '\r' // 遇到空行則繼續處理 || line.charAt(0) == '\n') { line = readNextLine(); if (line == null) return null; } return textLineToRecord(line);
}
2.4 場景四:選擇OSS上文件夾下的部分文件進行處理
場景說明
閱讀 《訪問 OSS 非結構化數據》文檔可知,一張MaxCompute的外部錶鏈接的是OSS上的一個文件夾(嚴格來講OSS沒有「文件夾」這個概念,全部對象都是以Object來存儲的,所謂的文件夾其實就是在OSS建立的一個字節數爲0且名稱以「/」結尾的對象。MaxCompute創建外部表時鏈接的是OSS上這樣的以「/」結尾的對象,即鏈接一個「文件夾」),在處理外部表時,默認會對該文件夾下 全部的文件 進行解析處理。該文件夾下全部的文件集合即被封裝爲 InputStreamSet ,而後經過其 next() 方法來依次得到每個InputStream流、即每一個文件流。
但有時咱們可能會但願只處理OSS上文件夾下的 部分 文件,而不是所有文件,例如只分析那些文件名中含有「2018_」字樣的文件,表示只分析2018年以來的業務數據文件。
實現思路
在獲取到每個InputStream的時候,經過 SourceInputStream 類的 getFileName() 方法獲取正在處理的文件流所表明的文件名,而後能夠經過正則表達式等方式判斷該文件流是否爲所須要處理的文件,若是不是則繼續調用 next() 方法來獲取下一個文件流。
代碼示例:
private BufferedReader moveToNextStream() throws IOException { SourceInputStream stream = null; while ((stream = inputs.next()) != null) { String fileName = stream.getFileName(); System.out.println("========inputs.next():" + fileName + "========"); if (patternModel.matcher(fileName).matches()) { System.out.println(String .format("- match fileName:[%s], pattern:[%s]", fileName, patternModel .pattern())); ZipCycleInputStream zipCycleInputStream = new ZipCycleInputStream(stream); return new BufferedReader(new InputStreamReader(zipCycleInputStream, "UTF-8"), 8192); } else { System.out.println(String.format( "-- discard fileName:[%s], pattern:[%s]", fileName, patternModel.pattern())); continue; } } return null; }
本例中的 patternModel 爲經過 DataAttributes 在創建外部表的時候從外部做爲參數傳入的正則規則。
寫到這裏可能有讀者會問,若是一個文件夾下有不少文件,好比上萬個文件,整個遍歷一遍後只選擇一小部分文件進行處理這樣的方式會不會效率過低了?其實大可沒必要擔憂,由於相對於MaxCompute對外部表執行批量計算的過程,循環遍歷文件流的時間消耗是很是小的,一般狀況下是不會影響批量計算任務的。
2.5 場景五:針對單個大文件進行拆分
場景說明
在 場景一 中提到,要想提升計算效率,咱們須要調整 odps.sql.unstructured.data.split.size 參數值來增長Mapper的並行度,可是對於單個大文件來說,MaxCompute默認是不進行拆分的,也就是說OSS上的單個大文件只會被分配給一個Mapper任務進行處理,若是這個文件很是大的話,處理效率將會及其低下,咱們須要一種方式來實現對單個文件進行拆分,使其能夠被多個Mapper任務進行並行處理。
實現思路
仍然是要依靠調整 odps.sql.unstructured.data.split.size 參數來增長Mapper的並行度,而且設定 odps.sql.unstructured.data.single.file.split.enabled 參數來容許拆分單個文件 (同odps.sql.unstructured.data.split.size,該參數也可能須要提工單申請使用權限) ,例如:
set odps.sql.unstructured.data.split.size=128; set odps.sql.unstructured.data.single.file.split.enabled=true;
設置好這些參數後,就須要編寫特定的Reader類來進行單個大文件的拆分了。
核心的思路是,根據 odps.sql.unstructured.data.split.size 所設定的值,大概將文件按照這個大小拆分開,可是拆分點極大可能會切在一條記錄的中間,這時就須要調整字節數,向前或向後尋找換行符,來保證最終的切分點落在一整條記錄的尾部。具體的實現細節相對來說比較複雜,能夠參考在 《訪問 OSS 非結構化數據》文檔中提到的 代碼示例 來進行分析。
注意點
在計算字節數的過程當中,可能會遇到非英文字符形成計算切分點的位置計算不許確,進而出現讀取的字節流仍然沒有把一整行覆蓋到的狀況。這須要針對含有非英文字符的文本數據作一些特殊處理。
代碼示例:
@Override public int read(char[] cbuf, int off, int len) throws IOException { if (this.splitReadLen >= this.splitSize) { return -1; } if (this.splitReadLen + len >= this.splitSize) { len = (int) (this.splitSize - this.splitReadLen); } int readSize = this.internalReader.read(cbuf, off, len); int totalBytes = 0; for (char ch : cbuf) { String str = String.valueOf(ch); byte[] bytes = str.getBytes(charset); totalBytes += bytes.length; } this.splitReadLen += totalBytes; return readSize; }
在編寫自定義Extractor的程序中,適當加入System.out做爲日誌信息輸出,這些日誌信息會在MaxCompute執行時輸出在LogView的視圖中,對於調試過程和線上問題排查過程很是有幫助。
上文中提到經過調整 odps.sql.unstructured.data.split.size 參數值來適當提升Mapper任務的並行度,可是並行度並非越高越好,具體什麼值最合適是與OSS上的文件大小、總數據量、MaxCompute產品自身的集羣狀態緊密聯繫在一塊兒的,須要屢次調試,而且可能須要與 odps.stage.reducer.num、odps.sql.reshuffle.dynamicpt、odps.merge.smallfile.filesize.threshold 等參數配合使用才能找到最優值。而且因爲MaxCompute產品自身的集羣狀態也是很重要的因素,可能今天申請500個Mapper資源是很容易的事情,過幾個月就變成常常須要等待很長時間才能申請到,這就須要持續關注任務的執行時間並及時調整參數設定。
外部表的讀取和解析是依靠Extractor對文本的解析來實現的,所以在執行效率上是遠不能和MaxCompute的普通表相比的,因此在須要頻繁讀取和分析OSS上的文本文件的狀況下,建議將OSS文件先 INSERT OVERWRITE 到MaxCompute中字段徹底對等的一張普通表中,而後針對普通表進行分析計算,這樣一般會得到更好的計算效率。