Python數據庫鏈接池DBUtils詳解

what's the DBUtils

  DBUtils 是一套用於管理數據庫鏈接池的Python包,爲高頻度高併發的數據庫訪問提供更好的性能,能夠自動管理鏈接對象的建立和釋放。並容許對非線程安全的數據庫接口進行線程安全包裝。python

DBUtils提供兩種外部接口:mysql

  • PersistentDB :提供線程專用的數據庫鏈接,並自動管理鏈接。
  • PooledDB :提供線程間可共享的數據庫鏈接,並自動管理鏈接。

實測證實 PersistentDB 的速度是最高的,可是在某些特殊狀況下,數據庫的鏈接過程可能異常緩慢,而此時的PooledDB則能夠提供相對來講平均鏈接時間比較短的管理方式。web

另外,實際使用的數據庫驅動也有所依賴,好比SQLite數據庫只能使用PersistentDB做鏈接池。 下載地址:http://www.webwareforpython.org/downloads/DBUtils/sql

 

DBUtils使用方法

  鏈接池對象只初始化一次,通常能夠做爲模塊級代碼來確保。 PersistentDB的鏈接例子:數據庫

import DBUtils.PersistentDB 
persist=DBUtils.PersistentDB.PersistentDB(dbpai=MySQLdb,maxusage=1000,**kwargs)

 

  這裏的參數dbpai指使用的底層數據庫模塊,兼容DB-API的。maxusage則爲一個鏈接最大使用次數,參考了官方例子。後面的**kwargs則爲實際傳遞給MySQLdb的參數。編程

  獲取鏈接: conn=persist.connection() 實際編程中用過的鏈接直接關閉 conn.close() 便可將鏈接交還給鏈接池。json

PooledDB使用方法同PersistentDB,只是參數有所不一樣。api

  • dbapi :數據庫接口
  • mincached :啓動時開啓的空鏈接數量
  • maxcached :鏈接池最大可用鏈接數量
  • maxshared :鏈接池最大可共享鏈接數量
  • maxconnections :最大容許鏈接數量
  • blocking :達到最大數量時是否阻塞
  • maxusage :單個鏈接最大複用次數
  • setsession :用於傳遞到數據庫的準備會話,如 [」set name UTF-8″] 。
db=pooled.connection()
cur=db.cursor()
cur.execute(sql)
res=cur.fetchone()
cur.close() # or del cur
db.close() # or del db
使用栗子

 

python不用鏈接池的MySQL鏈接方法安全

import MySQLdb
conn= MySQLdb.connect(host='localhost',user='root',passwd='pwd',db='myDB',port=3306)  
#import pymysql
#conn = pymysql.connect(host='localhost', port='3306', db='game', user='root', password='123456', charset='utf8')
cur=conn.cursor()
SQL="select * from table1"
r=cur.execute(SQL)
r=cur.fetchall()
cur.close()
conn.close()

 

用鏈接池後的鏈接方法session

import MySQLdb
from DBUtils.PooledDB import PooledDB
pool = PooledDB(MySQLdb,5,host='localhost',user='root',passwd='pwd',db='myDB',port=3306) #5爲鏈接池裏的最少鏈接數

conn = pool.connection()  #之後每次須要數據庫鏈接就是用connection()函數獲取鏈接就行了
cur=conn.cursor()
SQL="select * from table1"
r=cur.execute(SQL)
r=cur.fetchall()
cur.close()
conn.close()

 

DBUtils下載地址:https://pypi.python.org/pypi/DBUtils/

 

import sys
import threading
import MySQLdb
import DBUtils.PooledDB

connargs = { "host":"localhost", "user":"user1", "passwd":"123456", "db":"test" }
def test(conn):
    try:
        cursor = conn.cursor()
        count = cursor.execute("select * from users")
        rows = cursor.fetchall()
        for r in rows: pass
    finally:
        conn.close()
        
def testloop():
    print ("testloop")
    for i in range(1000):
        conn = MySQLdb.connect(**connargs)
        test(conn)
        
def testpool():
    print ("testpool")
    pooled = DBUtils.PooledDB.PooledDB(MySQLdb, **connargs)
    for i in range(1000):
        conn = pooled.connection()
        test(conn)
        
def main():
    t = testloop if len(sys.argv) == 1 else testpool
    for i in range(10):
        threading.Thread(target = t).start()
        
if __name__ == "__main__":
    main()
測試代碼

看看 10 線程的測試結果。

$ time ./main.py  
testloop  
testloop  
testloop  
testloop  
testloop  
testloop  
testloop  
testloop  
testloop  
testloop  
real    0m4.471s  
user    0m0.570s  
sys     0m4.670s  
$ time ./main.py -l  
testpool  
testpool  
testpool  
testpool  
testpool  
testpool  
testpool  
testpool  
testpool  
testpool  
real    0m2.637s  
user    0m0.320s  
sys     0m2.750s  
View Code

 

  雖然測試方式不是很嚴謹,但從測試結果仍是能感覺到 DBUtils 帶來的性能提高。固然,咱們咱們也能夠在 testloop() 中一直重複使用一個不關閉的 Connection,但這卻不適合實際開發時的情形。

 

DBUtils 提供了幾個參數,便於咱們更好地調整資源利用。

DBUtils.PooledDB.PooledDB(self, creator,   
    mincached=0, maxcached=0, maxshared=0, maxconnections=0, blocking=False, maxusage=None,   
    setsession=None, failures=None, *args, **kwargs)  
Docstring:  
    Set up the DB-API 2 connection pool.  
    creator: either an arbitrary function returning new DB-API 2  
        connection objects or a DB-API 2 compliant database module  
    mincached: initial number of idle connections in the pool  
        (0 means no connections are made at startup)  
    maxcached: maximum number of idle connections in the pool  
        (0 or None means unlimited pool size)  
    maxshared: maximum number of shared connections  
        (0 or None means all connections are dedicated)  
        When this maximum number is reached, connections are  
        shared if they have been requested as shareable.  
    maxconnections: maximum number of connections generally allowed  
        (0 or None means an arbitrary number of connections)  
    blocking: determines behavior when exceeding the maximum  
        (if this is set to true, block and wait until the number of  
        connections decreases, otherwise an error will be reported)  
    maxusage: maximum number of reuses of a single connection  
        (0 or None means unlimited reuse)  
        When this maximum usage number of the connection is reached,  
        the connection is automatically reset (closed and reopened).  
    setsession: optional list of SQL commands that may serve to prepare  
        the session, e.g. ["set datestyle to ...", "set time zone ..."]  
    failures: an optional exception class or a tuple of exception classes  
        for which the connection failover mechanism shall be applied,  
        if the default (OperationalError, InternalError) is not adequate  
    args, kwargs: the parameters that shall be passed to the creator  
        function or the connection constructor of the DB-API 2 module  
View Code

 

DBUtils 僅提供給了鏈接池管理,實際的數據庫操做依然是由符合 DB-API 2 標準的目標數據庫模塊完成的。

 

一個面向對象使用DBUtils的栗子

# coding=utf-8
"""
使用DBUtils數據庫鏈接池中的鏈接,操做數據庫
OperationalError: (2006, ‘MySQL server has gone away’)
"""
import json
import pymysql
import datetime
from DBUtils.PooledDB import PooledDB
import pymysql


class MysqlClient(object):
    __pool = None;

    def __init__(self, mincached=10, maxcached=20, maxshared=10, maxconnections=200, blocking=True,
                 maxusage=100, setsession=None, reset=True,
                 host='127.0.0.1', port=3306, db='test',
                 user='root', passwd='123456', charset='utf8mb4'):
        """

        :param mincached:鏈接池中空閒鏈接的初始數量
        :param maxcached:鏈接池中空閒鏈接的最大數量
        :param maxshared:共享鏈接的最大數量
        :param maxconnections:建立鏈接池的最大數量
        :param blocking:超過最大鏈接數量時候的表現,爲True等待鏈接數量降低,爲false直接報錯處理
        :param maxusage:單個鏈接的最大重複使用次數
        :param setsession:optional list of SQL commands that may serve to prepare
            the session, e.g. ["set datestyle to ...", "set time zone ..."]
        :param reset:how connections should be reset when returned to the pool
            (False or None to rollback transcations started with begin(),
            True to always issue a rollback for safety's sake)
        :param host:數據庫ip地址
        :param port:數據庫端口
        :param db:庫名
        :param user:用戶名
        :param passwd:密碼
        :param charset:字符編碼
        """

        if not self.__pool:
            self.__class__.__pool = PooledDB(pymysql,
                                             mincached, maxcached,
                                             maxshared, maxconnections, blocking,
                                             maxusage, setsession, reset,
                                             host=host, port=port, db=db,
                                             user=user, passwd=passwd,
                                             charset=charset,
                                             cursorclass=pymysql.cursors.DictCursor
                                             )
        self._conn = None
        self._cursor = None
        self.__get_conn()

    def __get_conn(self):
        self._conn = self.__pool.connection();
        self._cursor = self._conn.cursor();

    def close(self):
        try:
            self._cursor.close()
            self._conn.close()
        except Exception as e:
            print e

    def __execute(self, sql, param=()):
        count = self._cursor.execute(sql, param)
        print count
        return count

    @staticmethod
    def __dict_datetime_obj_to_str(result_dict):
        """把字典裏面的datatime對象轉成字符串,使json轉換不出錯"""
        if result_dict:
            result_replace = {k: v.__str__() for k, v in result_dict.items() if isinstance(v, datetime.datetime)}
            result_dict.update(result_replace)
        return result_dict

    def select_one(self, sql, param=()):
        """查詢單個結果"""
        count = self.__execute(sql, param)
        result = self._cursor.fetchone()
        """:type result:dict"""
        result = self.__dict_datetime_obj_to_str(result)
        return count, result

    def select_many(self, sql, param=()):
        """
        查詢多個結果
        :param sql: qsl語句
        :param param: sql參數
        :return: 結果數量和查詢結果集
        """
        count = self.__execute(sql, param)
        result = self._cursor.fetchall()
        """:type result:list"""
        [self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]
        return count, result

    def execute(self, sql, param=()):
        count = self.__execute(sql, param)
        return count

    def begin(self):
        """開啓事務"""
        self._conn.autocommit(0)

    def end(self, option='commit'):
        """結束事務"""
        if option == 'commit':
            self._conn.autocommit()
        else:
            self._conn.rollback()


if __name__ == "__main__":
    mc = MysqlClient()
    sql1 = 'SELECT * FROM shiji  WHERE  id = 1'
    result1 = mc.select_one(sql1)
    print json.dumps(result1[1], ensure_ascii=False)

    sql2 = 'SELECT * FROM shiji  WHERE  id IN (%s,%s,%s)'
    param = (2, 3, 4)
    print json.dumps(mc.select_many(sql2, param)[1], ensure_ascii=False)
View Code
相關文章
相關標籤/搜索