Python--DBUtil

Python--DBUtil包

1 簡介

    DBUtils是一套Python數據庫鏈接池包,並容許對非線程安全的數據庫接口進行線程安全包裝。DBUtils來自Webware for Python。python

    DBUtils提供兩種外部接口:web

  • PersistentDB :提供線程專用的數據庫鏈接,並自動管理鏈接。
  • PooledDB :提供線程間可共享的數據庫鏈接,並自動管理鏈接。

實測證實 PersistentDB 的速度是最高的,可是在某些特殊狀況下,數據庫的鏈接過程可能異常緩慢,而此時的PooledDB則能夠提供相對來講平均鏈接時間比較短的管理方式。sql

另外,實際使用的數據庫驅動也有所依賴,好比SQLite數據庫只能使用PersistentDB做鏈接池。 下載地址:http://www.webwareforpython.org/downloads/DBUtils/數據庫

安裝編程

pip install DBUtils

 

2 使用方法

鏈接池對象只初始化一次,通常能夠做爲模塊級代碼來確保。 PersistentDB的鏈接例子:api

import DBUtils.PersistentDB 
persist=DBUtils.PersistentDB.PersistentDB(dbpai=MySQLdb,maxusage=1000,**kwargs)

這裏的參數dbpai指使用的底層數據庫模塊,兼容DB-API的。maxusage則爲一個鏈接最大使用次數,參考了官方例子。後面的**kwargs則爲實際傳遞給MySQLdb的參數。緩存

獲取鏈接: conn=persist.connection() 實際編程中用過的鏈接直接關閉 conn.close() 便可將鏈接交還給鏈接池。安全

PooledDB使用方法同PersistentDB,只是參數有所不一樣。session

  • * dbapi :數據庫接口,使用連接數據庫的模塊
  • * mincached :初始化時,連接池中至少建立的空閒的連接,0表示不建立
  • * maxcached :鏈接池最大可用鏈接數量
  • * maxshared :鏈接池最大可共享鏈接數量
  • * maxconnections : 鏈接池容許的最大鏈接數,0和None表示不限制鏈接數
  • * blocking :達到最大數量時是否阻塞
  • * maxusage :單個鏈接最大複用次數
  • * setsession :用於傳遞到數據庫的準備會話,如 [」set name UTF-8″] 。

一個使用過程:數據結構

import os
import cx_Oracle
# 用於以清晰、可讀的形式輸出 Python 數據結構
from pprint import pprint
from sys import modules
from DBUtils.PooledDB import PooledDB

pool= PooledDB(cx_Oracle,user='test',
                              password='test',dsn='testDB',mincached=5, maxcached=20)
print(pool.connection())
print(connection.version)

# 得到遊標對象
cursor = pool.connection().cursor ()

try:
    # 解析sql語句
    cursor.parse("select *  dual")
    # 捕獲SQL異常
except cx_Oracle.DatabaseError as e:
    print(e)   # ORA-00923: 未找到要求的 FROM 關鍵字

# 執行sql 語句
cursor.execute ("select * from dual")
# 提取一條數據,返回一個元祖
row = cursor.fetchone()
pprint(row)  # ('X',)

3.DBUtil功能

功能

SimplePooledDB
DBUtils.SimplePooledDB 是一個很是簡單的數據庫鏈接池實現。他比完善的 PooledDB 模塊缺乏不少功能。 DBUtils.SimplePooledDB 本質上相似於 MiscUtils.DBPool 這個Webware的組成部分。你能夠把它看做一種演示程序。

SteadyDB

DBUtils.SteadyDB 是一個模塊實現了」強硬」的數據庫鏈接,基於DB-API 2創建的原始鏈接。一個」強硬」的鏈接意味着在鏈接關閉以後,或者使用次數操做限制時會從新鏈接。

一個典型的例子是數據庫重啓時,而你的程序仍然在運行並須要訪問數據庫,或者當你的程序鏈接了一個防火牆後面的遠程數據庫,而防火牆重啓時丟失了狀態時。

通常來講你不須要直接使用 SteadyDB 它只是給接下來的兩個模塊提供基本服務, PersistentDB 和 PooledDB 。

PersistentDB

DBUtils.PersistentDB 實現了強硬的、線程安全的、頑固的數據庫鏈接,使用DB-API 2模塊。以下圖展現了使用 PersistentDB 時的鏈接層步驟:

persist.gif當一個線程首次打開一個數據庫鏈接時,一個鏈接會打開並僅供這個線程使用。當線程關閉鏈接時,鏈接仍然持續打開供這個線程下次請求時使用這個已經打開的鏈接。鏈接在線程死亡時自動關閉。

簡單的來講 PersistentDB 嘗試重用數據庫鏈接來提升線程化程序的數據庫訪問性能,而且他確保鏈接不會被線程之間共享。

所以, PersistentDB 能夠在底層DB-API模塊並不是線程安全的時候一樣工做的很好,而且他會在其餘線程改變數據庫會話或者使用多語句事務時一樣避免問題的發生。

PooledDB

DBUtils.PooledDB 實現了一個強硬的、線程安全的、有緩存的、可複用的數據庫鏈接,使用任何DB-API 2模塊。以下圖展現了使用 PooledDB 時的工做流程:

pool.gif如圖所示 PooledDB 能夠在不一樣線程之間共享打開的數據庫鏈接。這在你鏈接並指定 maxshared 參數,而且底層的DB-API 2接口是線程安全才能夠,可是你仍然可使用專用數據庫鏈接而不在線程之間共享鏈接。除了共享鏈接之外,還能夠設立一個至少 mincached 的鏈接池,而且最多容許使用 maxcached 個鏈接,這能夠同時用於專用和共享鏈接池。當一個線程關閉了一個非共享鏈接,則會返還到空閒鏈接池中等待下次使用。

若是底層DB-API模塊是非線程安全的,線程鎖會確保使用 PooledDB 是線程安全的。因此你並不須要爲此擔憂,可是你在使用專用鏈接來改變數據庫會話或執行多命令事務時必須當心。
該選擇哪個?

PersistentDB 和 PooledDB 都是爲了重用數據庫鏈接來提升性能,並保持數據庫的穩定性。

因此選擇何種模塊,能夠參考上面的解釋。 PersistentDB 將會保持必定數量的鏈接供頻繁使用。在這種狀況下你老是保持固定數量的鏈接。若是你的程序頻繁的啓動和關閉線程,最好使用 PooledDB 。後面將會提到更好的調整,尤爲在使用線程安全的DB-API 2模塊時。

固然,這兩個模塊的接口是很類似的,你能夠方便的在他們之間轉換,並查看哪一個更好一些。

 

4.使用PooledDB 操做ORALCE數據庫案例

settings.py
import os

'''
日誌文件設置
'''
LOG_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
LOG_LEVEL = 'DEBUG'
LOG_FILE = 'ops.log'

"""
數據庫設置
"""
DB_USER = 'XXX'
DB_PASSWORD = 'XXX'
DB_SID = 'XXX'


print(LOG_DIR)

 

 my_logset.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @Time    : 2018/4/23 8:55
# @Author  : hyang
# @File    : my_logset.py
# @Software: PyCharm

import logging
import os
import sys
from logging import handlers
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)  # 加入環境變量

from utils import settings

# 日誌格式
log_format = '[%(asctime)s - %(levelname)s - %(name)s - %(filename)s - %(funcName)s- %(lineno)d ] %(message)s '

def get_mylogger(name):
    """
    get log
    :param name:
    :return:
    """
    logger = logging.getLogger(name)
    logger.setLevel(settings.LOG_LEVEL)

    console_handler = logging.StreamHandler()
    # 文件絕對路徑
    logfile_path = os.path.join(settings.LOG_DIR, "log", settings.LOG_FILE)
    if not os.path.exists(logfile_path):
        # 建立log目錄
        os.mkdir(os.path.join(settings.LOG_DIR, "log"))
    # 天天建立一個日誌文件,文件數不超過20個
    file_handler = handlers.TimedRotatingFileHandler(logfile_path, when="D", interval=1, backupCount=25)

    logger.addHandler(console_handler)
    logger.addHandler(file_handler)

    file_format = logging.Formatter(fmt=log_format)
    console_format = logging.Formatter(fmt=log_format, datefmt='%Y-%m-%d %H:%M:%S ')

    console_handler.setFormatter(console_format)
    file_handler.setFormatter(file_format)

    return logger

if __name__ == '__main__':
    log = get_mylogger('access')
    log.info('access')
    log.error('Error')

    #
    # log1 = get_mylogger('trans')
    # log1.info('trans')

 

Oracle_util.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @Time    : 2018/5/22 13:17
# @Author  : hyang
# @File    : Oracle_utils.py
# @Software: PyCharm

# 用於以清晰、可讀的形式輸出 Python 數據結構
from sys import modules
import sys
import os
import cx_Oracle
from DBUtils.PooledDB import PooledDB

BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)  # 加入環境變量

from utils import settings
from utils import my_logset

"""
經過PooledDB鏈接Oracle,並完成經常使用一些操做
"""
class Oracle_util(object):

    __pool = None  # 鏈接池對象
    _db_info = {
        'user': settings.DB_USER,
        'pwd': settings.DB_PASSWORD,
        'dsn': settings.DB_SID
    }

    def __init__(self, db_info=_db_info, arraysize = 500):
        # 日誌
        self.db_log = my_logset.get_mylogger("oradb_access")
        self.db_info = db_info
        self.conn = Oracle_util.__getConn(db_info)
        self.cursor = self.conn.cursor()
        # 每次從數據庫向Python的緩存返回arraysize=100條記錄
        self.cursor.arraysize = arraysize


    @staticmethod
    def __getConn(db_info):
        # 靜態方法,從鏈接池中取出鏈接
        if Oracle_util.__pool is None:
            __pool = PooledDB(cx_Oracle,
                              user=db_info['user'],
                              password=db_info['pwd'],
                              dsn=db_info['dsn'],
                              mincached=20,
                              maxcached=50)
        return __pool.connection()


    def get_columns(self, table):
        # 查詢表的全部列
        sql = ["select lower(column_name) column_name \
        from user_tab_columns where table_name=upper('%(table)s')"]
        rows = self.queryBySql(''.join(sql) % locals())
        col_list = [k["column_name"] for k in rows]
        # ['sjhm', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'status']

        return col_list

    # 根據表自動建立參數字典
    def create_params(self, table, args={}):
        col_list = self.get_columns(table)
        params = {}
        for k in col_list:
            if args.__contains__(k):
                params[k] = args[k]
        return params

    # 執行sql
    def execute(self, sql, args={}):
        try:
            self.db_log.debug('execute sql:{}'.format(sql))
            return self.cursor.execute(sql, args)
        except Exception as e:
            self.close()
            raise e

    # 調用函數 函數名,返回類型, 參數('1',2)元祖類型
    def callfunc(self, func, ret_type=cx_Oracle.NUMBER, args=()):
        try:
            self.db_log.debug('call func:{} {}'.format(func, args))
            return self.cursor.callfunc(func,ret_type,args)
        except Exception as e:
            self.close()
            raise e

    # 調用過程 過程名,輸入參數('1',2)元祖類型
    def callproc(self, proc, in_val=()):
        try:
            self.db_log.debug('call proc:{} {}'.format(proc,in_val))
            return self.cursor.callproc(proc, in_val)
        except Exception as e:
            self.close()
            raise e

    # 解析sql
    def parse(self,sql,args={}):
        try:
            # 解析sql語句
            return self.cursor.parse(sql,args)
            # 捕獲SQL異常
        except Exception as e:
            self.close()
            raise e

    # 批量執行
    def executemany(self, sql, args):
        try:
            self.db_log.debug('executemany sql:{}'.format(sql))
            return self.cursor.executemany(sql, args)
        except Exception as e:
            self.close()
            raise e


    # 執行sql,參數一:table,參數二:查詢列'col1,col2' 參數三:參數字典{'字段1':'值1','字段2':'值2'}
    def queryByTable(self, table, column='*',cond_dict={}):
        # self.execute(sql, args)
        cond_dict = self.create_params(table, cond_dict)
        cond_stmt = ' and '.join(['%s=:%s' % (k, k) for k in cond_dict.keys()])
        # del_sql = 'DELETE FROM %(table)s where %(cond_stmt)s'
        if not cond_dict:
            query_sql = 'select %(column)s FROM %(table)s'
        else:
            query_sql = 'select %(column)s FROM %(table)s where %(cond_stmt)s'

        self.execute(query_sql % locals(), cond_dict)
        return self.get_rows()

        # 執行sql,參數一:sql語句,如select * from python_modules where module_name=:module_name
        # 參數二:參數字典{'字段1':'值1','字段2':'值2'} 如{module_name:Oracle}
    def queryBySql(self, sql, args={}):
        self.execute(sql, args)
        return self.get_rows()

    # 導出結果爲文件
    def exportTxt(self,file_name, sql, args={}, col_split='|', col_flg=True):
        """
        :param file_name: 文件位置
        :param sql:  sql語句 如select module_name,china_name from python_modules where module_name=:module_name
        :param args:  參數 如{'module_name':'oracle'}
        :param col_split: 列分隔符
        :param col_flg: 是否輸出列名字段col1|col2
        :return:
        """
        rt = self.queryBySql(sql, args)
        if rt:
            with open(file_name, 'w',encoding="utf-8") as fd:
                for row in rt:
                    col_info = col_split.join(row.keys())
                    val_info = ''
                    if col_flg:
                        fd.write(col_info+"\n")
                        col_flg = False
                    val_info += col_split.join(row.values())
                    val_info += '\n'
                    fd.write(val_info)

    # 分頁查詢,參數一:sql語句,參數二:參數字典{'字段1':'值1','字段2':'值2'},參數三:頁碼,參數四:分頁大小
    def query_pages(self, sql, args={}, page=1, page_size=30):
        _args, count_args = args, args
        page = int(page)
        # print "page:%s" %(page,)
        # 下一頁
        next_page = page_size * page
        # 當前頁
        cur_page = page_size * (page - 1)
        if page == 1 or cur_page < 0:
            cur_page = 0
            next_page = page_size
        sql = """SELECT * FROM(
            SELECT ROWNUM RN,T.* FROM(""" + sql + """)T 
            WHERE ROWNUM<=:next_page
            )WHERE RN >=:cur_page """
        count_sql = """
            SELECT COUNT(1)CNT FROM (""" + sql + """)"""
        _args["cur_page"] = cur_page
        _args["next_page"] = next_page
        rows = self.queryBySql(sql, _args)
        countrows = self.queryBySql(count_sql, count_args)
        return rows, countrows[0]['cnt']

    # oracle的參數名必須使用:代替,如 userid = :userid
    def insertOne(self, table, column_dict):
        column_dict = self.create_params(table, column_dict)
        keys = ','.join(column_dict.keys())
        values = column_dict.values()
        placeholder = ','.join([':%s' % (v) for v in column_dict.keys()])
        ins_sql = 'INSERT INTO %(table)s (%(keys)s) VALUES (%(placeholder)s)'
        # print(ins_sql % locals())
        self.execute(ins_sql % locals(), column_dict)

    # 獲取序列的下一個值,傳入sequence的名稱
    def nextval(self, seq):
        self.cursor.execute("SELECT %(seq)s.nextval from dual " % locals())
        result = self.cursor.fetchall()
        return result[0][0]

    # 批量插入數據庫,參數一:表名,參數二:['字段1','字段2',...],參數二:[('值1','值2',...),('值1','值2',...)]
    def insertMany(self, table, columns=[], values=[]):
        keys = ','.join(columns)
        placeholder = ','.join([':%s' % (v) for v in columns])
        ins_sql = 'INSERT INTO %(table)s (%(keys)s) VALUES(%(placeholder)s)'
        self.executemany(ins_sql % locals(), values)
        return self._get_rows_num()

    # 更新,參數一:表名,參數二用於set 字段1=值1,字段2=值2...格式:{'字段1':'值1','字段2':'值2'},
    # 參數三:用於where條件,如 where 字段3=值3 and 字段4=值4,格式{'字段3':'值3','字段4':'值4'}
    def updateByTable(self, table, column_dict={}, cond_dict={}):
        column_dict = self.create_params(table, column_dict)
        cond_dict = self.create_params(table, cond_dict)
        set_stmt = ','.join(['%s=:%s' % (k, k) for k in column_dict.keys()])
        cond_stmt = ' and '.join(['%s=:%s' % (k, k) for k in cond_dict.keys()])
        if not cond_dict:
            upd_sql = 'UPDATE %(table)s set %(set_stmt)s'
        else:
            upd_sql = 'UPDATE %(table)s set %(set_stmt)s where %(cond_stmt)s'
        args = dict(column_dict, **cond_dict)  # 合併成1個
        self.execute(upd_sql % locals(), args)
        return self._get_rows_num()

    # 刪除,參數一:表名,#參數二:用於where條件,如 where 字段3=值3 and 字段4=值4,格式{'字段3':'值3','字段4':'值4'}
    def deleteByTable(self, table, cond_dict={}):
        cond_dict = self.create_params(table, cond_dict)
        cond_stmt = ' and '.join(['%s=:%s' % (k, k) for k in cond_dict.keys()])
        # del_sql = 'DELETE FROM %(table)s where %(cond_stmt)s'
        if not cond_dict:
            del_sql = 'DELETE FROM %(table)s'
        else:
            del_sql = 'DELETE FROM %(table)s where %(cond_stmt)s'
        self.execute(del_sql % locals(), cond_dict)
        return self._get_rows_num()

    # 提取數據,參數一提取的記錄數,參數二,是否以字典方式提取。爲true時返回:{'字段1':'值1','字段2':'值2'}
    def get_rows(self, size=None, is_dict=True):
        if size is None:
            rows = self.cursor.fetchall()
        else:
            rows = self.cursor.fetchmany(size)
        if rows is None:
            rows = []
        if is_dict:
            dict_rows = []
            dict_keys = [r[0].lower() for r in self.cursor.description]
            for row in rows:
                dict_rows.append(dict(zip(dict_keys, row)))
            rows = dict_rows
        return rows

    # 獲取更改記錄數
    def _get_rows_num(self):
        return self.cursor.rowcount

    # 提交
    def commit(self):
        self.conn.commit()

    # 回滾
    def rollback(self):
        self.conn.rollback();

    # 銷燬
    def __del__(self):
        self.close()

    # 關閉鏈接
    def close(self):
        # self.commit()
        self.cursor.close()
        self.conn.close()

if __name__ == '__main__':
   # example
    ora = Oracle_util()
    create_table = """
    CREATE TABLE python_modules (
    module_name VARCHAR2(50) NOT NULL,
    file_path VARCHAR2(300) NOT NULL,
    china_name VARCHAR2(300) NOT NULL
    )
    """
    # 執行建立表
    create_flag = ora.execute(create_table)

    # 獲得表全部列
    print(ora.get_columns('python_modules'))

    # 添加模塊信息
    M = []
    for m_name, m_info in modules.items():
        try:
            M.append((m_name, m_info.__file__, '中國'))
        except AttributeError:
            pass

    print(len(M))

    print(ora.insertMany('python_modules',['module_name', 'file_path','china_name'],M))
    ora.commit()

    print(ora.queryBySql(sql="select * from python_modules where module_name=:module_name", args={'module_name':'DBUtils.PooledDB'}))

    print(ora.updateByTable(table='python_modules',column_dict={'china_name':'北京'},cond_dict={'module_name':'DBUtils.PooledDB'}))
    ora.commit()

    print(ora.queryBySql(sql="select * from python_modules where module_name=:module_name", args={'module_name':'DBUtils.PooledDB'}))

    print(ora.deleteByTable(table='python_modules', cond_dict={'module_name': 'DBUtils.PooledDB'}))
    ora.commit()

    print(ora.queryBySql(sql="select module_name,china_name from python_modules where module_name=:module_name", args={'module_name':'DBUtils.PooledDB'}))

    ora.updateByTable(table='python_modules', column_dict={'china_name': '河北'})
    ora.commit()
    ora.exportTxt("a.txt", sql="select * from python_modules", )

    print(ora.deleteByTable(table='python_modules'))
    ora.commit()
    print(ora.queryByTable(table="python_modules"))

    ora.execute("DROP TABLE python_modules PURGE")

    print(ora.callfunc('myfunc', cx_Oracle.NUMBER, ('abc', 2)))

    print(ora.callproc('myproc',  (3,)))

    print(ora.queryByTable(table="ptab",column='mydata, myid',cond_dict={'myid':2}))
相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息