# 環境 python3.7python
# 第三方庫安裝:mysql
pip install pymysqlsql
pip install DBUtils工具
# 功能實現:創建mysql鏈接池,實現增刪查改fetch
import pymysql import time from DBUtils.PooledDB import PooledDB class MysqldbHelper(object): # 繼承object類全部方法 def __init__(self, config): self.host = config['host'] self.username = config['user'] self.password = config['passwd'] self.db = config['db'] self.port = config['port'] while True: try: self.pool = PooledDB(pymysql, 5, host=self.host, user=self.username, passwd=self.password, db=self.db, port=self.port, cursorclass=pymysql.cursors.DictCursor) except BaseException as e: print(e) self.pool = None if self.pool: print("mysql鏈接成功") break print("鏈接失敗,5秒後重試") time.sleep(5) # # 從鏈接池獲取一條鏈接 def get_conn(self): conn = self.pool.connection() cur = conn.cursor() # 光標 return conn, cur # 關閉鏈接 def close_conn(self, conn, cur): cur.close() conn.close() def deal_with_sql(self, cur, conn, SQL, types='SELECT'): if types == "SELECT": cur.execute(SQL) result = cur.fetchall() return result elif types == "INSERT": try: cur.execute(SQL) conn.commit() # 提交事務 return True except BaseException as e: print("插入失敗:", e) return False elif types == "UPDATE": try: cur.execute(SQL) conn.commit() return True except BaseException as e: print("更新失敗:", e) return False elif types == "DELETE": try: cur.execute(SQL) conn.commit() return True except BaseException as e: print("刪除失敗:", e) return False else: return None if __name__ == '__main__': config = { 'host': '47.103.1.124', 'port': 3306, 'user': 'root', 'passwd': '123456', 'db': '37', } # 初始化工具類 mydb = MysqldbHelper(config) # 從鏈接池獲取鏈接 conn, cur = mydb.get_conn() # 執行查詢sql SQL = """select * from ceshi2""" result = mydb.deal_with_sql(cur, conn, SQL) print(result) # 執行插入sql SQL2 = """INSERT INTO ceshi2 (a,b,c) VALUES (12,11,34)""" result2 = mydb.deal_with_sql(cur, conn, SQL2, "INSERT") print(result2) # 關閉鏈接 mydb.close_conn(conn, cur) print("鏈接已關閉")