最近在作離線數據導入HBase項目,涉及將存儲在Mysql中的歷史數據經過bulkload的方式導入HBase。因爲源數據已經不在DB中,而是以文件形式存儲在機器磁盤,此文件是mysqldump導出的格式。如何將mysqldump格式的文件轉換成實際的數據文件提供給bulkload做轉換,是須要考慮的一個問題。python
咱們知道mysqldump導出的文件主要是Insert,數據庫表結構定義語句。而要解析的對象也主要是包含INSERT關鍵字記錄,這樣咱們就把問題轉換成如何從dmp文件解析Insert語句。接觸過dmp文件的同窗應該瞭解,其INSERT語句的結構,主要包含表名、字段名、字段值, 這裏面主要包含幾個關鍵字:INSERT INTO, VALUES。咱們要作的就是把Values括號後的字段值給解析出來,這個過程須要考慮VALUES後面包含的是多少行的記錄,有可能導出的記錄Values後面包含多行對應mysql中存儲的記錄。mysql
在解析文件過程當中,我天然想到用Python來寫,由於Python在處理文件方面有不少優點,也比較簡單。在處理DMP文件這塊,考慮到字段值間是用逗號分割的,在python中正好一個模塊能夠很好的來處理此類格式 ,即你們很熟悉的CSV模塊,在處理CSV類型的文件有不少優點。在這裏咱們把CSV模塊有在解析dmp文件,同時加一些解析邏輯,能夠很好解決此類問題。sql
同時,咱們要處理的dmp文件是通過壓縮的,而且單個文件都比較大,都是Gigbytes的,在讀取時須要注意機器內存大小,不能一次讀出全部的數據,python也考慮到此類問題,採用的方法是惰性取值,即在真正使用時才從磁盤中加載相應的文件數據。若是想加塊解析,還能夠採集多進程或多線程的方法。數據庫
處理流程圖以下所示:多線程
代碼以下圖所示:app
1 #!/usr/bin/env python 2 import fileinput 3 import csv 4 import sys 5 import gzip 6 7 8 # 設定CSV讀取的最大容量 9 csv.field_size_limit(sys.maxsize) 10 11 def check_insert(line): 12 """ 13 返回語句是否以insert into開頭,若是是返回true,不然返回false 14 """ 15 return line.startswith('INSERT INTO') or False 16 17 18 def get_line_values(line): 19 """ 20 返回Insert語句中包含Values的部分 21 """ 22 return line.partition('VALUES ')[2] 23 24 25 def check_values_style(values): 26 """ 27 保證INSERT語句知足基本的條件,即包含(右括號 28 """ 29 30 if values and values[0] == '(': 31 return True 32 return False 33 34 def parse_line(values): 35 """ 36 建立csv對象,讀取INSERT VALUES 字段值 37 """ 38 latest_row = [] 39 40 reader = csv.reader([values], delimiter=',', 41 doublequote=False, 42 escapechar='\\', 43 quotechar="'", 44 strict=True 45 ) 46 47 48 for reader_row in reader: 49 for column in reader_row: 50 # 判斷字段值是否爲空或爲NULL 51 if len(column) == 0 or column == 'NULL': 52 latest_row.append("") 53 continue 54 55 # 判斷字段開頭是否以(開頭,若是是則說明此VALUES後面不僅包含一行數據,可能有多行,要分別解析 56 if column[0] == "(": 57 new_row = False 58 if len(latest_row) > 0: 59 #判斷行是否包含),若是包含則說明一行數據完畢 60 if latest_row[-1][-1] == ")": 61 # 移除) 62 latest_row[-1] = latest_row[-1][:-1] 63 if latest_row[-1] == "NULL": 64 latest_row[-1] = "" 65 new_row = True 66 # 若是是新行,則打印該行 67 if new_row: 68 line="}}}{{{".join(latest_row) 69 print "%s<{||}>" % line 70 latest_row = [] 71 72 if len(latest_row) == 0: 73 column = column[1:] 74 75 latest_row.append(column) 76 # 判斷行結束符 77 if latest_row[-1][-2:] == ");": 78 latest_row[-1] = latest_row[-1][:-2] 79 if latest_row[-1] == "NULL": 80 latest_row[-1] = "" 81 82 line="}}}{{{".join(latest_row) 83 print "%s<{||}>" % line 84 85 def main(): 86 87 filename=sys.argv[1] 88 try: 89 #惰性取行 90 with gzip.open(filename,"rb") as f: 91 for line in f: 92 if check_insert(line): 93 values = get_line_values(line) 94 if check_values_style(values): 95 parse_line(values) 96 except KeyboardInterrupt: 97 sys.exit(0) 98 99 if __name__ == "__main__": 100 main()
總的說來,主要是利用Python的CSV模塊來解析DMP文件的INSERT語句,若是DMP文件不規整,可能仍是有些問題。對於dmp文件很大狀況,也是須要考慮解析時間效率問題,能夠考慮增長多進程或多線程機制。ide