python 簡單mysql工具類,使用鏈接池PooledDB案例

 # 環境 python3.7python

# 第三方庫安裝:mysql

pip install pymysqlsql

pip install DBUtils工具

# 功能實現:創建mysql鏈接池,實現增刪查改fetch

import pymysql
import time
from DBUtils.PooledDB import PooledDB


class MysqldbHelper(object):  # 繼承object類全部方法
    def __init__(self, config):
        self.host = config['host']
        self.username = config['user']
        self.password = config['passwd']
        self.db = config['db']
        self.port = config['port']
        while True:
            try:
                self.pool = PooledDB(pymysql, 5, host=self.host, user=self.username, passwd=self.password, db=self.db,
                                     port=self.port,
                                     cursorclass=pymysql.cursors.DictCursor)
            except BaseException as e:
                print(e)
                self.pool = None
            if self.pool:
                print("mysql鏈接成功")
                break
            print("鏈接失敗,5秒後重試")
            time.sleep(5)

    # # 從鏈接池獲取一條鏈接
    def get_conn(self):
        conn = self.pool.connection()
        cur = conn.cursor()  # 光標
        return conn, cur

    # 關閉鏈接
    def close_conn(self, conn, cur):
        cur.close()
        conn.close()

    def deal_with_sql(self, cur, conn, SQL, types='SELECT'):
        if types == "SELECT":
            cur.execute(SQL)
            result = cur.fetchall()
            return result
        elif types == "INSERT":
            try:
                cur.execute(SQL)
                conn.commit()  # 提交事務
                return True
            except BaseException as e:
                print("插入失敗:", e)
                return False
        elif types == "UPDATE":
            try:
                cur.execute(SQL)
                conn.commit()
                return True
            except BaseException as e:
                print("更新失敗:", e)
                return False
        elif types == "DELETE":
            try:
                cur.execute(SQL)
                conn.commit()
                return True
            except BaseException as e:
                print("刪除失敗:", e)
                return False
        else:
            return None


if __name__ == '__main__':
    config = {
        'host': '47.103.1.124',
        'port': 3306,
        'user': 'root',
        'passwd': '123456',
        'db': '37',
    }
    # 初始化工具類
    mydb = MysqldbHelper(config)
    # 從鏈接池獲取鏈接
    conn, cur = mydb.get_conn()

    # 執行查詢sql
    SQL = """select * from ceshi2"""
    result = mydb.deal_with_sql(cur, conn, SQL)
    print(result)

    # 執行插入sql
    SQL2 = """INSERT INTO ceshi2  (a,b,c) VALUES (12,11,34)"""
    result2 = mydb.deal_with_sql(cur, conn, SQL2, "INSERT")
    print(result2)
    # 關閉鏈接
    mydb.close_conn(conn, cur)
    print("鏈接已關閉")
相關文章
相關標籤/搜索