採用鏈接池的方式來操做DBpython
#-*- coding:utf-8 -*- #!/usr/bin/python3 import pymysql import configUtil from DBUtils.PooledDB import PooledDB class MysqlUtil(object): # 鏈接池對象 __pool = None def __init__(self, config): # 數據庫構造函數,從鏈接池中取出鏈接,並生成操做遊標 self.pool = MysqlUtil.__get_conn(config) @staticmethod def __get_conn(config): """ @summary: 靜態方法,從鏈接池中取出鏈接 @return MySQLdb.connection """ host = configUtil.read_config(config, "datasource_url", "mysqlConfig") username = configUtil.read_config(config, "datasource_username", "mysqlConfig") db_pwd = configUtil.read_config(config, "datasource_password", "mysqlConfig") db_database = configUtil.read_config(config, "datasource_database", "mysqlConfig") if MysqlUtil.__pool is None: __pool = PooledDB(pymysql, mincached=1, maxcached=10, maxconnections=10, host=host, port=3306, user=username, passwd=db_pwd, db=db_database, use_unicode=False, blocking=False, charset="utf8") return __pool def get_all(self, sql): """ @summary: 執行查詢,並取出全部結果集 @param sql:查詢SQL,若是有查詢條件,請只指定條件列表,並將條件值使用參數[param]傳遞進來 @param param: 可選參數,條件列表值(元組/列表) @return: result list(字典對象)/boolean 查詢到的結果集 """ try: con = self.pool.connection() cur = con.cursor() count = cur.execute(sql) if count > 0: result = cur.fetchall() else: result = False return result except Exception as e: print('SQL執行有誤,緣由:', e) finally: cur.close() con.close() def update(self, sql): try: con = self.pool.connection() cur = con.cursor() cur.execute(sql) con.commit() except Exception as e: con.rollback() # 事務回滾 print('SQL執行有誤,緣由:', e) finally: cur.close() con.close()