個人第一個python web開發框架(25)——定製ORM(一)

  在開始編寫ORM模塊以前,咱們須要先對db_helper進行重構,由於ORM最終生成的sql是須要轉給db_helper來執行的,因此擁有一個功能完善、健壯的數據庫操做類是很是必要的。前端

  這是項目原db_helper.py代碼python

#!/usr/bin/env python
# coding=utf-8

import psycopg2
from common import log_helper
from config import const

# 初始化數據庫參數
db_name = const.DB_NAME
db_host = const.DB_HOST
db_port = const.DB_PORT
db_user = const.DB_USER
db_pass = const.DB_PASS


def read(sql):
    """
    鏈接pg數據庫並進行數據查詢
    若是鏈接失敗,會把錯誤寫入日誌中,並返回false,若是sql執行失敗,也會把錯誤寫入日誌中,並返回false
    若是全部執行正常,則返回查詢到的數據,這個數據是通過轉換的,轉成字典格式,方便模板調用,其中字典的key是數據表裏的字段名
    """
    try:
        # 鏈接數據庫
        conn = psycopg2.connect(database=db_name, user=db_user, password=db_pass, host=db_host, port=db_port)
        # 獲取遊標
        cursor = conn.cursor()
    except Exception as e:
        print(e.args)
        log_helper.error('鏈接數據庫失敗:' + str(e.args))
        return False
    try:
        # 執行查詢操做
        cursor.execute(sql)
        # 將返回的結果轉換成字典格式
        data = [dict((cursor.description[i][0], value) for i, value in enumerate(row)) for row in cursor.fetchall()]
    except Exception as e:
        print(e.args)
        log_helper.error('sql執行失敗:' + str(e.args) + ' sql:' + str(sql))
        return False
    finally:
        # 關閉遊標和數據庫連接
        cursor.close()
        conn.close()
    # 返回結果(字典格式)
    return data


def write(sql, vars):
    """
    鏈接pg數據庫並進行寫的操做
    若是鏈接失敗,會把錯誤寫入日誌中,並返回false,若是sql執行失敗,也會把錯誤寫入日誌中,並返回false,若是全部執行正常,則返回true
    """
    try:
        # 鏈接數據庫
        conn = psycopg2.connect(database=db_name, user=db_user, password=db_pass, host=db_host, port=db_port)
        # 獲取遊標
        cursor = conn.cursor()
    except Exception as e:
        print(e.args)
        log_helper.error('鏈接數據庫失敗:' + str(e.args))
        return False
    try:
        # 執行sql語句
        cursor.execute(sql, vars)
        # 提交事務
        conn.commit()
    except Exception as e:
        print(e.args)
        # 若是出錯,則事務回滾
        conn.rollback()
        log_helper.error('sql執行失敗:' + str(e.args) + ' sql:' + str(sql))
        return False
    else:
        # 獲取數據
        try:
            data = [dict((cursor.description[i][0], value) for i, value in enumerate(row))
                         for row in cursor.fetchall()]
        except Exception as e:
            # 沒有設置returning或執行修改或刪除語句時,記錄不存在
            data = None
    finally:
        # 關閉遊標和數據庫連接
        cursor.close()
        conn.close()

    # 若是寫入數據後,將數據庫返回的數據返回給調用者
    return data
View Code

  經過對代碼的簡單分析,能夠看到整個模塊在初化時,載入數據庫連接配置,對數據庫的操做也只有簡單讀與寫操做。這樣的功能對於通常的數據庫增刪改查操做已經足夠了,但若是業務複雜,有多個庫、須要用到事務或者須要訪問不一樣類型數據庫時,它就不夠用了。因此首先要作的就是對它進行重構,功能進行完善。sql

  首先咱們須要將配置獨立出來,當有須要連接多個數據庫時,能夠讀取不一樣的配置文件,讓程序更加方便靈活。數據庫

  在config目錄下建立db_config.py文件(有多個庫時,能夠配置多個不一樣的參數來引用)ide

#!/usr/bin/env python
# coding=utf-8


### 數據庫連接參數 ###
DB = {
    'db_host': '127.0.0.1',
    'db_port': 5432,
    'db_name': 'simple_db',
    'db_user': 'postgres',
    'db_pass': '123456'
}
# 是否將全部要執行的Sql語句輸出到日誌裏
IS_OUTPUT_SQL = False

  在配置中,咱們一樣定義了數據庫鏈接地址、端口、數據庫名稱、用戶名與密碼。函數

  另外,爲了方便咱們進行排錯,檢查sql的生成狀況,添加了IS_OUTPUT_SQL是否輸出執行的sql語句到日誌中這一個開關項,設置爲True時,全部被執行的sql語句都會被寫到日誌中,方便下載日誌下來進行分析。post

 

  對於數據庫操做模塊,咱們須要封裝成一個類,在有須要調用時,就能夠經過with語句進行初始化操做,設置對應的數據庫連接配置,靈活的鏈接不一樣的數據庫。測試

  在設計操做類時,咱們須要思考幾個問題:fetch

  1.它能夠支持多數據庫操做,即讀取不一樣的配置能鏈接操做不一樣的數據庫(能夠經過類初始化時進行注入配置信息)優化

  2.它須要支持with語句,當咱們忘記關閉數據庫遊標和鏈接時,自動幫咱們關閉(須要實現__enter__()與__exit__()方法)

  3.它須要支持數據庫事務,當執行失敗時,能夠回滾數據,當全部sql執行都成功時,能夠統一提交事務(須要建立rollback()與commit()方法)

  4.它須要支持查詢、添加、修改、刪除等操做,方便咱們操做關係型數據庫記錄(須要建立sql執行方法)

  5.它須要支持sql執行優化,將超出指定執行時間的sql語句記錄到日誌中,方便開發人員進行分析(須要記錄sql執行起始時間與結束時間,並進行計算,當這個時間大於指定值時執行日誌寫入程序)

  根據這些要求,咱們初步設計出數據庫操做類的基本模型

class PgHelper(object):
    """postgresql數據庫操做類"""

    def __init__(self, db, is_output_sql):
        """初始化數據庫操做類配置信息"""

    def open_conn(self):
        """鏈接數據庫,並創建遊標"""

    def close_conn(self):
        """關閉postgresql數據庫連接"""

    def __enter__(self):
        """初始化數據庫連接"""

    def __exit__(self, type, value, trace):
        """關閉postgresql數據庫連接"""

    def rollback(self):
        """回滾操做"""

    def commit(self):
        """提交事務"""

    def execute(self, query, vars=None):
        """執行sql語句查詢,返回結果集或影響行數"""

    def write_log(self, start_time, end_time, sql):
        """記錄Sql執行超時日誌"""

 

 

  接下來,咱們來一一實現上面的各個方法

  首先是完成初始化方法,咱們能夠經過注入的方法,將db_config配置信息裏的參數注入進來初始化。鏈接不一樣的數據庫時,能夠注入不一樣的配置信息。

class PgHelper(object):
    """postgresql數據庫操做類"""

    def __init__(self, db, is_output_sql):
        self.connect = None
        self.cursor = None
        # 初始化數據庫參數
        self.db_name = db.get('db_name')
        self.db_user = db.get('db_user')
        self.db_pass = db.get('db_pass')
        self.db_host = db.get('db_host')
        self.db_port = db.get('db_port')
        # 是否將全部要執行的Sql語句輸出到日誌裏
        self.is_output_sql = is_output_sql

 

  而後咱們來建立數據庫打開鏈接方法與關閉鏈接的方法,當數據庫鏈接失敗時會拋出異常,程序會自動調用log_helper.error()方法,將異常寫入日誌當中,並第一時間發送郵件通知開發人員,方便開發人員即時排錯。

    def open_conn(self):
        """鏈接數據庫,並創建遊標"""
        try:
            if not self.connect:
                self.connect = psycopg2.connect(database=self.db_name, user=self.db_user, password=self.db_pass, host=self.db_host, port=self.db_port)
            return self.connect
        except Exception as e:
            log_helper.error('鏈接數據庫失敗:' + str(e.args))
            return False

    def close_conn(self):
        """關閉postgresql數據庫連接"""
        # 關閉遊標
        try:
            if self.cursor:
                self.cursor.close()
        except Exception:
            pass
        # 關閉數據庫連接
        try:
            if self.connect:
                self.connect.close()
        except Exception:
            pass

 

  經過重寫內置__enter__()與__exit__()方法,來實現with語句調用本類時,會自動對類進行初始化操做,自動建立數據庫鏈接。當代碼執行完畢後(程序退出with語句時),程序會自動調用對應的方法,將遊標與數據庫鏈接的關閉,避免手動操做時,忘記關閉鏈接出現異常。

    def __enter__(self):
        """初始化數據庫連接"""
        self.open_conn()
        return self

    def __exit__(self, type, value, trace):
        """關閉postgresql數據庫連接"""
        self.close_conn()

 

  爲了方便事務處理,增長回滾方法。用於事務中執行操做失敗時,調用回滾方法執行回滾操做。

    def rollback(self):
        """回滾操做"""
        try:
            # 異常時,進行回滾操做
            if self.connect:
                self.connect.rollback()
                self.close_conn()
        except Exception as e:
            log_helper.error('回滾操做失敗:' + str(e.args))

 

  還須要增長事務提交方法,方便sql執行增刪改爲功之後,提交事務更新數據。在開發中不少朋友常常會忘記執行提交事務操做,一直覺得代碼有問題沒有執行成功。

    def commit(self):
        """提交事務"""
        try:
            if self.connect:
                self.connect.commit()
                self.close_conn()
        except Exception as e:
            log_helper.error('提交事務失敗:' + str(e.args))

 

  爲了方便查看sql語句轉換效果,咱們還能夠增長獲取sql語句生成方法,固然這個方法並無太大的用途。

    def get_sql(self, query, vars=None):
        """獲取編譯後的sql語句"""
        # 記錄程序執行開始時間
        start_time = time.clock()
        try:
            # 判斷是否記錄sql執行語句
            if self.is_output_sql:
                log_helper.info('sql:' + str(query))
            # 創建遊標
            self.cursor = self.connect.cursor()
            # 執行SQL
            self.data = self.cursor.mogrify(query, vars)
        except Exception as e:
            # 將異常寫入到日誌中
            log_helper.error('sql生成失敗:' + str(e.args) + ' query:' + str(query))
            self.data = '獲取編譯sql失敗'
        finally:
            # 關閉遊標
            self.cursor.close()
        # 記錄程序執行結束時間
        end_time = time.clock()
        # 寫入日誌
        self.write_log(start_time, end_time, query)

        return self.data

  由於,當你直接使用完整的sql語句執行時,並不須要這個方法。可是,你使用的是下面方式,執行後就會生成組合好的sql語句,幫助咱們分析sql語句生成狀況

# 使用with方法,初始化數據庫鏈接
with db_helper.PgHelper(db_config.DB, db_config.IS_OUTPUT_SQL) as db:
    # 設置sql執行語句
    sql = """insert into product (name, code) values (%s, %s) returning id"""
    # 設置提交參數
    vars = ('zhangsan', '201807251234568')
    # 生成sql語句,並打印到控制檯
    print(db.get_sql(sql, vars))

  輸出結果:

b"insert into product (name, code) values ('zhangsan', '201807251234568') returning id"

 

  數據庫最多見的操做就是增刪改查操做,因爲postgresql有個很是好用的特殊參數:returning,它能夠在sql執行增改刪結束後,返回咱們想要的字段值,方便咱們進行相應的判斷與操做,因此增改刪操做咱們不須要將它與查詢操做分離成兩個方法,統一使用這個方法來獲取數據庫中返回的記錄值。

  在實現這個方法以前,咱們設計時要思考這幾個問題:

  1.須要記錄程序執行的起始與結束時間,計算sql語句執行時長,用來判斷是否記錄到日誌中,方便開發人員進行分析優化sql語句

  2.須要根據參數判斷,是否須要將全部執行的sql語句記錄到日誌中,方便開發人員有須要時,查看執行了哪些sql語句,進行數據與功能分析

  3.因爲類在加載時就已經自動鏈接數據庫了,因此在方法中不須要進行打開數據庫連接操做

  5.在執行sql語句時,須要建立遊標,而後執行sql語句

  6.爲了讓用戶更好的體驗,減小異常的直接拋出,須要進行異常捕捉,並將捕捉到的異常進行處理,記錄到日誌中方便開發人員分析錯誤,同時同步發送推送給相關人員,即時提醒錯誤

  7.sql執行成功之後,須要對返回的數據進行處理,組合成字典類型,方便前端使用

  8.完成數據處理後,須要及時關閉遊標

  9.對返回的數據須要進行處理後,返回給上一級程序

 1     def execute(self, query, vars=None):
 2         """執行sql語句查詢,返回結果集或影響行數"""
 3         if not query:
 4             return None
 5         # 記錄程序執行開始時間
 6         start_time = time.clock()
 7         try:
 8             # 判斷是否記錄sql執行語句
 9             if self.is_output_sql:
10                 log_helper.info('sql:' + str(query))
11             # 創建遊標
12             self.cursor = self.connect.cursor()
13             # 執行SQL
14             result = self.cursor.execute(query, vars)
15             print(str(result))
16         except Exception as e:
17             # 將異常寫入到日誌中
18             log_helper.error('sql執行失敗:' + str(e.args) + ' query:' + str(query))
19             self.data = None
20         else:
21             # 獲取數據
22             try:
23                 if self.cursor.description:
24                     # 在執行insert/update/delete等更新操做時,若是添加了returning,則讀取返回數據組合成字典返回
25                     self.data = [dict((self.cursor.description[i][0], value) for i, value in enumerate(row)) for row in self.cursor.fetchall()]
26                 else:
27                     # 若是執行insert/update/delete等更新操做時沒有添加returning,則返回影響行數,值爲0時表時沒有數據被更新
28                     self.data = self.cursor.rowcount
29             except Exception as e:
30                 # 將異常寫入到日誌中
31                 log_helper.error('數據獲取失敗:' + str(e.args) + ' query:' + str(query))
32                 self.data = None
33         finally:
34             # 關閉遊標
35             self.cursor.close()
36         # 記錄程序執行結束時間
37         end_time = time.clock()
38         # 寫入日誌
39         self.write_log(start_time, end_time, query)
40 
41         # 若是有返回數據,則把該數據返回給調用者
42         return self.data

 

   最後一個是記錄超時sql語句到日誌方法,這裏我將大於0.1秒的sql語句都記錄下來

    def write_log(self, start_time, end_time, sql):
        """記錄Sql執行超時日誌"""
        t = end_time - start_time
        if (t) > 0.1:
            content = ' '.join(('run time:', str(t), 's sql:', sql))
            log_helper.info(content)

 

 

  完成的db_helper.py代碼

  1 #!/usr/bin/env python
  2 # coding=utf-8
  3 
  4 import psycopg2
  5 import time
  6 from io import StringIO
  7 from common import log_helper, file_helper
  8 
  9 
 10 class PgHelper(object):
 11     """postgresql數據庫操做類"""
 12 
 13     def __init__(self, db, is_output_sql):
 14         self.connect = None
 15         self.cursor = None
 16         # 初始化數據庫參數
 17         self.db_name = db.get('db_name', '')
 18         self.db_user = db.get('db_user', '')
 19         self.db_pass = db.get('db_pass', '')
 20         self.db_host = db.get('db_host', '')
 21         self.db_port = db.get('db_port', '')
 22         # 是否將全部要執行的Sql語句輸出到日誌裏
 23         self.is_output_sql = is_output_sql
 24 
 25     def open_conn(self):
 26         """鏈接數據庫,並創建遊標"""
 27         try:
 28             if not self.connect:
 29                 self.connect = psycopg2.connect(database=self.db_name, user=self.db_user, password=self.db_pass,
 30                                                 host=self.db_host, port=self.db_port)
 31             return self.connect
 32         except Exception as e:
 33             log_helper.error('鏈接數據庫失敗:' + str(e.args))
 34             return False
 35 
 36     def close_conn(self):
 37         """關閉postgresql數據庫連接"""
 38         # 關閉遊標
 39         try:
 40             if self.cursor:
 41                 self.cursor.close()
 42         except Exception:
 43             pass
 44         # 關閉數據庫連接
 45         try:
 46             if self.connect:
 47                 self.connect.close()
 48         except Exception:
 49             pass
 50 
 51     def __enter__(self):
 52         """初始化數據庫連接"""
 53         self.open_conn()
 54         return self
 55 
 56     def __exit__(self, type, value, trace):
 57         """關閉postgresql數據庫連接"""
 58         self.close_conn()
 59 
 60     def rollback(self):
 61         """回滾操做"""
 62         try:
 63             # 異常時,進行回滾操做
 64             if self.connect:
 65                 self.connect.rollback()
 66         except Exception as e:
 67             log_helper.error('回滾操做失敗:' + str(e.args))
 68 
 69     def commit(self):
 70         """提交事務"""
 71         try:
 72             if self.connect:
 73                 self.connect.commit()
 74                 self.close_conn()
 75         except Exception as e:
 76             log_helper.error('提交事務失敗:' + str(e.args))
 77 
 78     def get_sql(self, query, vars=None):
 79         """獲取編譯後的sql語句"""
 80         # 記錄程序執行開始時間
 81         start_time = time.clock()
 82         try:
 83             # 判斷是否記錄sql執行語句
 84             if self.is_output_sql:
 85                 log_helper.info('sql:' + str(query))
 86             # 創建遊標
 87             self.cursor = self.connect.cursor()
 88             # 執行SQL
 89             self.data = self.cursor.mogrify(query, vars)
 90         except Exception as e:
 91             # 將異常寫入到日誌中
 92             log_helper.error('sql生成失敗:' + str(e.args) + ' query:' + str(query))
 93             self.data = '獲取編譯sql失敗'
 94         finally:
 95             # 關閉遊標
 96             self.cursor.close()
 97         # 記錄程序執行結束時間
 98         end_time = time.clock()
 99         # 寫入日誌
100         self.write_log(start_time, end_time, query)
101 
102         return self.data
103 
104     def copy(self, values, table_name, columns):
105         """
106         百萬級數據更新函數
107         :param values: 更新內容,字段之間用\t分隔,記錄之間用\n分隔 "1\taaa\tabc\n2\bbb\abc\n"
108         :param table_name: 要更新的表名稱
109         :param columns: 須要更新的字段名稱:例:('id','userame','passwd')
110         :return:
111         """
112         try:
113             # 創建遊標
114             self.cursor = self.connect.cursor()
115             self.cursor.copy_from(StringIO(values), table_name, columns=columns)
116             self.connect.commit()
117             return True
118         except Exception as e:
119             # 將異常寫入到日誌中
120             log_helper.error('批量更新失敗:' + str(e.args) + ' table:' + table_name)
121         finally:
122             # 關閉遊標
123             self.cursor.close()
124 
125     def execute(self, query, vars=None):
126         """執行sql語句查詢,返回結果集或影響行數"""
127         if not query:
128             return None
129         # 記錄程序執行開始時間
130         start_time = time.clock()
131         try:
132             # 判斷是否記錄sql執行語句
133             if self.is_output_sql:
134                 log_helper.info('sql:' + str(query))
135             # 創建遊標
136             self.cursor = self.connect.cursor()
137             # 執行SQL
138             result = self.cursor.execute(query, vars)
139             print(str(result))
140         except Exception as e:
141             # 將異常寫入到日誌中
142             log_helper.error('sql執行失敗:' + str(e.args) + ' query:' + str(query))
143             self.data = None
144         else:
145             # 獲取數據
146             try:
147                 if self.cursor.description:
148                     # 在執行insert/update/delete等更新操做時,若是添加了returning,則讀取返回數據組合成字典返回
149                     self.data = [dict((self.cursor.description[i][0], value) for i, value in enumerate(row)) for row in self.cursor.fetchall()]
150                 else:
151                     # 若是執行insert/update/delete等更新操做時沒有添加returning,則返回影響行數,值爲0時表時沒有數據被更新
152                     self.data = self.cursor.rowcount
153             except Exception as e:
154                 # 將異常寫入到日誌中
155                 log_helper.error('數據獲取失敗:' + str(e.args) + ' query:' + str(query))
156                 self.data = None
157         finally:
158             # 關閉遊標
159             self.cursor.close()
160         # 記錄程序執行結束時間
161         end_time = time.clock()
162         # 寫入日誌
163         self.write_log(start_time, end_time, query)
164 
165         # 若是有返回數據,則把該數據返回給調用者
166         return self.data
167 
168 
169     def write_log(self, start_time, end_time, sql):
170         """記錄Sql執行超時日誌"""
171         t = end_time - start_time
172         if (t) > 0.1:
173             content = ' '.join(('run time:', str(t), 's sql:', sql))
174             log_helper.info(content)
View Code

  測試代碼

 1 #!/usr/bin/evn python
 2 # coding=utf-8
 3 
 4 import unittest
 5 from common import db_helper
 6 from config import db_config
 7 
 8 class DbHelperTest(unittest.TestCase):
 9     """數據庫操做包測試類"""
10 
11     def setUp(self):
12         """初始化測試環境"""
13         print('------ini------')
14 
15     def tearDown(self):
16         """清理測試環境"""
17         print('------clear------')
18 
19     def test(self):
20         # 使用with方法,初始化數據庫鏈接
21         with db_helper.PgHelper(db_config.DB, db_config.IS_OUTPUT_SQL) as db:
22             # 設置sql執行語句
23             sql = """insert into product (name, code) values (%s, %s) returning id"""
24             # 設置提交參數
25             vars = ('張三', '201807251234568')
26             # 生成sql語句,並打印到控制檯
27             print(db.get_sql(sql, vars))
28 
29             db.execute('select * from product where id=1000')
30             db.execute('insert into product (name, code) values (%s, %s) returning id', ('張三', '201807251234568'))
31             db.commit()
32 
33 if __name__ == '__main__':
34     unittest.main()
View Code

 

 

 

版權聲明:本文原創發表於 博客園,做者爲 AllEmpty 本文歡迎轉載,但未經做者贊成必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接,不然視爲侵權。

python開發QQ羣:669058475(本羣已滿)、733466321(能夠加2羣)    做者博客:http://www.cnblogs.com/EmptyFS/

相關文章
相關標籤/搜索