在開始編寫ORM模塊以前,咱們須要先對db_helper進行重構,由於ORM最終生成的sql是須要轉給db_helper來執行的,因此擁有一個功能完善、健壯的數據庫操做類是很是必要的。前端
這是項目原db_helper.py代碼python
#!/usr/bin/env python # coding=utf-8 import psycopg2 from common import log_helper from config import const # 初始化數據庫參數 db_name = const.DB_NAME db_host = const.DB_HOST db_port = const.DB_PORT db_user = const.DB_USER db_pass = const.DB_PASS def read(sql): """ 鏈接pg數據庫並進行數據查詢 若是鏈接失敗,會把錯誤寫入日誌中,並返回false,若是sql執行失敗,也會把錯誤寫入日誌中,並返回false 若是全部執行正常,則返回查詢到的數據,這個數據是通過轉換的,轉成字典格式,方便模板調用,其中字典的key是數據表裏的字段名 """ try: # 鏈接數據庫 conn = psycopg2.connect(database=db_name, user=db_user, password=db_pass, host=db_host, port=db_port) # 獲取遊標 cursor = conn.cursor() except Exception as e: print(e.args) log_helper.error('鏈接數據庫失敗:' + str(e.args)) return False try: # 執行查詢操做 cursor.execute(sql) # 將返回的結果轉換成字典格式 data = [dict((cursor.description[i][0], value) for i, value in enumerate(row)) for row in cursor.fetchall()] except Exception as e: print(e.args) log_helper.error('sql執行失敗:' + str(e.args) + ' sql:' + str(sql)) return False finally: # 關閉遊標和數據庫連接 cursor.close() conn.close() # 返回結果(字典格式) return data def write(sql, vars): """ 鏈接pg數據庫並進行寫的操做 若是鏈接失敗,會把錯誤寫入日誌中,並返回false,若是sql執行失敗,也會把錯誤寫入日誌中,並返回false,若是全部執行正常,則返回true """ try: # 鏈接數據庫 conn = psycopg2.connect(database=db_name, user=db_user, password=db_pass, host=db_host, port=db_port) # 獲取遊標 cursor = conn.cursor() except Exception as e: print(e.args) log_helper.error('鏈接數據庫失敗:' + str(e.args)) return False try: # 執行sql語句 cursor.execute(sql, vars) # 提交事務 conn.commit() except Exception as e: print(e.args) # 若是出錯,則事務回滾 conn.rollback() log_helper.error('sql執行失敗:' + str(e.args) + ' sql:' + str(sql)) return False else: # 獲取數據 try: data = [dict((cursor.description[i][0], value) for i, value in enumerate(row)) for row in cursor.fetchall()] except Exception as e: # 沒有設置returning或執行修改或刪除語句時,記錄不存在 data = None finally: # 關閉遊標和數據庫連接 cursor.close() conn.close() # 若是寫入數據後,將數據庫返回的數據返回給調用者 return data
經過對代碼的簡單分析,能夠看到整個模塊在初化時,載入數據庫連接配置,對數據庫的操做也只有簡單讀與寫操做。這樣的功能對於通常的數據庫增刪改查操做已經足夠了,但若是業務複雜,有多個庫、須要用到事務或者須要訪問不一樣類型數據庫時,它就不夠用了。因此首先要作的就是對它進行重構,功能進行完善。sql
首先咱們須要將配置獨立出來,當有須要連接多個數據庫時,能夠讀取不一樣的配置文件,讓程序更加方便靈活。數據庫
在config目錄下建立db_config.py文件(有多個庫時,能夠配置多個不一樣的參數來引用)ide
#!/usr/bin/env python # coding=utf-8 ### 數據庫連接參數 ### DB = { 'db_host': '127.0.0.1', 'db_port': 5432, 'db_name': 'simple_db', 'db_user': 'postgres', 'db_pass': '123456' } # 是否將全部要執行的Sql語句輸出到日誌裏 IS_OUTPUT_SQL = False
在配置中,咱們一樣定義了數據庫鏈接地址、端口、數據庫名稱、用戶名與密碼。函數
另外,爲了方便咱們進行排錯,檢查sql的生成狀況,添加了IS_OUTPUT_SQL是否輸出執行的sql語句到日誌中這一個開關項,設置爲True時,全部被執行的sql語句都會被寫到日誌中,方便下載日誌下來進行分析。post
對於數據庫操做模塊,咱們須要封裝成一個類,在有須要調用時,就能夠經過with語句進行初始化操做,設置對應的數據庫連接配置,靈活的鏈接不一樣的數據庫。測試
在設計操做類時,咱們須要思考幾個問題:fetch
1.它能夠支持多數據庫操做,即讀取不一樣的配置能鏈接操做不一樣的數據庫(能夠經過類初始化時進行注入配置信息)優化
2.它須要支持with語句,當咱們忘記關閉數據庫遊標和鏈接時,自動幫咱們關閉(須要實現__enter__()與__exit__()方法)
3.它須要支持數據庫事務,當執行失敗時,能夠回滾數據,當全部sql執行都成功時,能夠統一提交事務(須要建立rollback()與commit()方法)
4.它須要支持查詢、添加、修改、刪除等操做,方便咱們操做關係型數據庫記錄(須要建立sql執行方法)
5.它須要支持sql執行優化,將超出指定執行時間的sql語句記錄到日誌中,方便開發人員進行分析(須要記錄sql執行起始時間與結束時間,並進行計算,當這個時間大於指定值時執行日誌寫入程序)
根據這些要求,咱們初步設計出數據庫操做類的基本模型
class PgHelper(object): """postgresql數據庫操做類""" def __init__(self, db, is_output_sql): """初始化數據庫操做類配置信息""" def open_conn(self): """鏈接數據庫,並創建遊標""" def close_conn(self): """關閉postgresql數據庫連接""" def __enter__(self): """初始化數據庫連接""" def __exit__(self, type, value, trace): """關閉postgresql數據庫連接""" def rollback(self): """回滾操做""" def commit(self): """提交事務""" def execute(self, query, vars=None): """執行sql語句查詢,返回結果集或影響行數""" def write_log(self, start_time, end_time, sql): """記錄Sql執行超時日誌"""
接下來,咱們來一一實現上面的各個方法
首先是完成初始化方法,咱們能夠經過注入的方法,將db_config配置信息裏的參數注入進來初始化。鏈接不一樣的數據庫時,能夠注入不一樣的配置信息。
class PgHelper(object): """postgresql數據庫操做類""" def __init__(self, db, is_output_sql): self.connect = None self.cursor = None # 初始化數據庫參數 self.db_name = db.get('db_name') self.db_user = db.get('db_user') self.db_pass = db.get('db_pass') self.db_host = db.get('db_host') self.db_port = db.get('db_port') # 是否將全部要執行的Sql語句輸出到日誌裏 self.is_output_sql = is_output_sql
而後咱們來建立數據庫打開鏈接方法與關閉鏈接的方法,當數據庫鏈接失敗時會拋出異常,程序會自動調用log_helper.error()方法,將異常寫入日誌當中,並第一時間發送郵件通知開發人員,方便開發人員即時排錯。
def open_conn(self): """鏈接數據庫,並創建遊標""" try: if not self.connect: self.connect = psycopg2.connect(database=self.db_name, user=self.db_user, password=self.db_pass, host=self.db_host, port=self.db_port) return self.connect except Exception as e: log_helper.error('鏈接數據庫失敗:' + str(e.args)) return False def close_conn(self): """關閉postgresql數據庫連接""" # 關閉遊標 try: if self.cursor: self.cursor.close() except Exception: pass # 關閉數據庫連接 try: if self.connect: self.connect.close() except Exception: pass
經過重寫內置__enter__()與__exit__()方法,來實現with語句調用本類時,會自動對類進行初始化操做,自動建立數據庫鏈接。當代碼執行完畢後(程序退出with語句時),程序會自動調用對應的方法,將遊標與數據庫鏈接的關閉,避免手動操做時,忘記關閉鏈接出現異常。
def __enter__(self): """初始化數據庫連接""" self.open_conn() return self def __exit__(self, type, value, trace): """關閉postgresql數據庫連接""" self.close_conn()
爲了方便事務處理,增長回滾方法。用於事務中執行操做失敗時,調用回滾方法執行回滾操做。
def rollback(self): """回滾操做""" try: # 異常時,進行回滾操做 if self.connect: self.connect.rollback() self.close_conn() except Exception as e: log_helper.error('回滾操做失敗:' + str(e.args))
還須要增長事務提交方法,方便sql執行增刪改爲功之後,提交事務更新數據。在開發中不少朋友常常會忘記執行提交事務操做,一直覺得代碼有問題沒有執行成功。
def commit(self): """提交事務""" try: if self.connect: self.connect.commit() self.close_conn() except Exception as e: log_helper.error('提交事務失敗:' + str(e.args))
爲了方便查看sql語句轉換效果,咱們還能夠增長獲取sql語句生成方法,固然這個方法並無太大的用途。
def get_sql(self, query, vars=None): """獲取編譯後的sql語句""" # 記錄程序執行開始時間 start_time = time.clock() try: # 判斷是否記錄sql執行語句 if self.is_output_sql: log_helper.info('sql:' + str(query)) # 創建遊標 self.cursor = self.connect.cursor() # 執行SQL self.data = self.cursor.mogrify(query, vars) except Exception as e: # 將異常寫入到日誌中 log_helper.error('sql生成失敗:' + str(e.args) + ' query:' + str(query)) self.data = '獲取編譯sql失敗' finally: # 關閉遊標 self.cursor.close() # 記錄程序執行結束時間 end_time = time.clock() # 寫入日誌 self.write_log(start_time, end_time, query) return self.data
由於,當你直接使用完整的sql語句執行時,並不須要這個方法。可是,你使用的是下面方式,執行後就會生成組合好的sql語句,幫助咱們分析sql語句生成狀況
# 使用with方法,初始化數據庫鏈接 with db_helper.PgHelper(db_config.DB, db_config.IS_OUTPUT_SQL) as db: # 設置sql執行語句 sql = """insert into product (name, code) values (%s, %s) returning id""" # 設置提交參數 vars = ('zhangsan', '201807251234568') # 生成sql語句,並打印到控制檯 print(db.get_sql(sql, vars))
輸出結果:
b"insert into product (name, code) values ('zhangsan', '201807251234568') returning id"
數據庫最多見的操做就是增刪改查操做,因爲postgresql有個很是好用的特殊參數:returning,它能夠在sql執行增改刪結束後,返回咱們想要的字段值,方便咱們進行相應的判斷與操做,因此增改刪操做咱們不須要將它與查詢操做分離成兩個方法,統一使用這個方法來獲取數據庫中返回的記錄值。
在實現這個方法以前,咱們設計時要思考這幾個問題:
1.須要記錄程序執行的起始與結束時間,計算sql語句執行時長,用來判斷是否記錄到日誌中,方便開發人員進行分析優化sql語句
2.須要根據參數判斷,是否須要將全部執行的sql語句記錄到日誌中,方便開發人員有須要時,查看執行了哪些sql語句,進行數據與功能分析
3.因爲類在加載時就已經自動鏈接數據庫了,因此在方法中不須要進行打開數據庫連接操做
5.在執行sql語句時,須要建立遊標,而後執行sql語句
6.爲了讓用戶更好的體驗,減小異常的直接拋出,須要進行異常捕捉,並將捕捉到的異常進行處理,記錄到日誌中方便開發人員分析錯誤,同時同步發送推送給相關人員,即時提醒錯誤
7.sql執行成功之後,須要對返回的數據進行處理,組合成字典類型,方便前端使用
8.完成數據處理後,須要及時關閉遊標
9.對返回的數據須要進行處理後,返回給上一級程序
1 def execute(self, query, vars=None): 2 """執行sql語句查詢,返回結果集或影響行數""" 3 if not query: 4 return None 5 # 記錄程序執行開始時間 6 start_time = time.clock() 7 try: 8 # 判斷是否記錄sql執行語句 9 if self.is_output_sql: 10 log_helper.info('sql:' + str(query)) 11 # 創建遊標 12 self.cursor = self.connect.cursor() 13 # 執行SQL 14 result = self.cursor.execute(query, vars) 15 print(str(result)) 16 except Exception as e: 17 # 將異常寫入到日誌中 18 log_helper.error('sql執行失敗:' + str(e.args) + ' query:' + str(query)) 19 self.data = None 20 else: 21 # 獲取數據 22 try: 23 if self.cursor.description: 24 # 在執行insert/update/delete等更新操做時,若是添加了returning,則讀取返回數據組合成字典返回 25 self.data = [dict((self.cursor.description[i][0], value) for i, value in enumerate(row)) for row in self.cursor.fetchall()] 26 else: 27 # 若是執行insert/update/delete等更新操做時沒有添加returning,則返回影響行數,值爲0時表時沒有數據被更新 28 self.data = self.cursor.rowcount 29 except Exception as e: 30 # 將異常寫入到日誌中 31 log_helper.error('數據獲取失敗:' + str(e.args) + ' query:' + str(query)) 32 self.data = None 33 finally: 34 # 關閉遊標 35 self.cursor.close() 36 # 記錄程序執行結束時間 37 end_time = time.clock() 38 # 寫入日誌 39 self.write_log(start_time, end_time, query) 40 41 # 若是有返回數據,則把該數據返回給調用者 42 return self.data
最後一個是記錄超時sql語句到日誌方法,這裏我將大於0.1秒的sql語句都記錄下來
def write_log(self, start_time, end_time, sql): """記錄Sql執行超時日誌""" t = end_time - start_time if (t) > 0.1: content = ' '.join(('run time:', str(t), 's sql:', sql)) log_helper.info(content)
完成的db_helper.py代碼
1 #!/usr/bin/env python 2 # coding=utf-8 3 4 import psycopg2 5 import time 6 from io import StringIO 7 from common import log_helper, file_helper 8 9 10 class PgHelper(object): 11 """postgresql數據庫操做類""" 12 13 def __init__(self, db, is_output_sql): 14 self.connect = None 15 self.cursor = None 16 # 初始化數據庫參數 17 self.db_name = db.get('db_name', '') 18 self.db_user = db.get('db_user', '') 19 self.db_pass = db.get('db_pass', '') 20 self.db_host = db.get('db_host', '') 21 self.db_port = db.get('db_port', '') 22 # 是否將全部要執行的Sql語句輸出到日誌裏 23 self.is_output_sql = is_output_sql 24 25 def open_conn(self): 26 """鏈接數據庫,並創建遊標""" 27 try: 28 if not self.connect: 29 self.connect = psycopg2.connect(database=self.db_name, user=self.db_user, password=self.db_pass, 30 host=self.db_host, port=self.db_port) 31 return self.connect 32 except Exception as e: 33 log_helper.error('鏈接數據庫失敗:' + str(e.args)) 34 return False 35 36 def close_conn(self): 37 """關閉postgresql數據庫連接""" 38 # 關閉遊標 39 try: 40 if self.cursor: 41 self.cursor.close() 42 except Exception: 43 pass 44 # 關閉數據庫連接 45 try: 46 if self.connect: 47 self.connect.close() 48 except Exception: 49 pass 50 51 def __enter__(self): 52 """初始化數據庫連接""" 53 self.open_conn() 54 return self 55 56 def __exit__(self, type, value, trace): 57 """關閉postgresql數據庫連接""" 58 self.close_conn() 59 60 def rollback(self): 61 """回滾操做""" 62 try: 63 # 異常時,進行回滾操做 64 if self.connect: 65 self.connect.rollback() 66 except Exception as e: 67 log_helper.error('回滾操做失敗:' + str(e.args)) 68 69 def commit(self): 70 """提交事務""" 71 try: 72 if self.connect: 73 self.connect.commit() 74 self.close_conn() 75 except Exception as e: 76 log_helper.error('提交事務失敗:' + str(e.args)) 77 78 def get_sql(self, query, vars=None): 79 """獲取編譯後的sql語句""" 80 # 記錄程序執行開始時間 81 start_time = time.clock() 82 try: 83 # 判斷是否記錄sql執行語句 84 if self.is_output_sql: 85 log_helper.info('sql:' + str(query)) 86 # 創建遊標 87 self.cursor = self.connect.cursor() 88 # 執行SQL 89 self.data = self.cursor.mogrify(query, vars) 90 except Exception as e: 91 # 將異常寫入到日誌中 92 log_helper.error('sql生成失敗:' + str(e.args) + ' query:' + str(query)) 93 self.data = '獲取編譯sql失敗' 94 finally: 95 # 關閉遊標 96 self.cursor.close() 97 # 記錄程序執行結束時間 98 end_time = time.clock() 99 # 寫入日誌 100 self.write_log(start_time, end_time, query) 101 102 return self.data 103 104 def copy(self, values, table_name, columns): 105 """ 106 百萬級數據更新函數 107 :param values: 更新內容,字段之間用\t分隔,記錄之間用\n分隔 "1\taaa\tabc\n2\bbb\abc\n" 108 :param table_name: 要更新的表名稱 109 :param columns: 須要更新的字段名稱:例:('id','userame','passwd') 110 :return: 111 """ 112 try: 113 # 創建遊標 114 self.cursor = self.connect.cursor() 115 self.cursor.copy_from(StringIO(values), table_name, columns=columns) 116 self.connect.commit() 117 return True 118 except Exception as e: 119 # 將異常寫入到日誌中 120 log_helper.error('批量更新失敗:' + str(e.args) + ' table:' + table_name) 121 finally: 122 # 關閉遊標 123 self.cursor.close() 124 125 def execute(self, query, vars=None): 126 """執行sql語句查詢,返回結果集或影響行數""" 127 if not query: 128 return None 129 # 記錄程序執行開始時間 130 start_time = time.clock() 131 try: 132 # 判斷是否記錄sql執行語句 133 if self.is_output_sql: 134 log_helper.info('sql:' + str(query)) 135 # 創建遊標 136 self.cursor = self.connect.cursor() 137 # 執行SQL 138 result = self.cursor.execute(query, vars) 139 print(str(result)) 140 except Exception as e: 141 # 將異常寫入到日誌中 142 log_helper.error('sql執行失敗:' + str(e.args) + ' query:' + str(query)) 143 self.data = None 144 else: 145 # 獲取數據 146 try: 147 if self.cursor.description: 148 # 在執行insert/update/delete等更新操做時,若是添加了returning,則讀取返回數據組合成字典返回 149 self.data = [dict((self.cursor.description[i][0], value) for i, value in enumerate(row)) for row in self.cursor.fetchall()] 150 else: 151 # 若是執行insert/update/delete等更新操做時沒有添加returning,則返回影響行數,值爲0時表時沒有數據被更新 152 self.data = self.cursor.rowcount 153 except Exception as e: 154 # 將異常寫入到日誌中 155 log_helper.error('數據獲取失敗:' + str(e.args) + ' query:' + str(query)) 156 self.data = None 157 finally: 158 # 關閉遊標 159 self.cursor.close() 160 # 記錄程序執行結束時間 161 end_time = time.clock() 162 # 寫入日誌 163 self.write_log(start_time, end_time, query) 164 165 # 若是有返回數據,則把該數據返回給調用者 166 return self.data 167 168 169 def write_log(self, start_time, end_time, sql): 170 """記錄Sql執行超時日誌""" 171 t = end_time - start_time 172 if (t) > 0.1: 173 content = ' '.join(('run time:', str(t), 's sql:', sql)) 174 log_helper.info(content)
測試代碼
1 #!/usr/bin/evn python 2 # coding=utf-8 3 4 import unittest 5 from common import db_helper 6 from config import db_config 7 8 class DbHelperTest(unittest.TestCase): 9 """數據庫操做包測試類""" 10 11 def setUp(self): 12 """初始化測試環境""" 13 print('------ini------') 14 15 def tearDown(self): 16 """清理測試環境""" 17 print('------clear------') 18 19 def test(self): 20 # 使用with方法,初始化數據庫鏈接 21 with db_helper.PgHelper(db_config.DB, db_config.IS_OUTPUT_SQL) as db: 22 # 設置sql執行語句 23 sql = """insert into product (name, code) values (%s, %s) returning id""" 24 # 設置提交參數 25 vars = ('張三', '201807251234568') 26 # 生成sql語句,並打印到控制檯 27 print(db.get_sql(sql, vars)) 28 29 db.execute('select * from product where id=1000') 30 db.execute('insert into product (name, code) values (%s, %s) returning id', ('張三', '201807251234568')) 31 db.commit() 32 33 if __name__ == '__main__': 34 unittest.main()
版權聲明:本文原創發表於 博客園,做者爲 AllEmpty 本文歡迎轉載,但未經做者贊成必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接,不然視爲侵權。
python開發QQ羣:669058475(本羣已滿)、733466321(能夠加2羣) 做者博客:http://www.cnblogs.com/EmptyFS/