前言
- 基於原生模塊:pymysql
- 推薦教程
- 小工具結構
- DatabaseUtil([databseName])
- getConnection()
- query(sql)
- save(sql)
- delete(sql)
- closeResources(cursor,connection)
源碼
import pymysql # 導入mysql模塊
## 數據庫操縱工具
class DatabaseUtil:
"""
database util / 數據庫操縱工具
------------------------------
[特別注意]
+ pymysql.connect(...)函數
+ unix_socket
+ 本字段是使用 socket 鏈接纔用到的,而咱們使用的是IP鏈接
+ 故:unix_socket='/tmp/mysql.sock' 改爲 port=3306
"""
className = "DatabaseUtil";
def __init__(self,database='default_db_name'):
self.host='xxxx.com.cn';
self.port = 3306;
# unix_socket= "/tmp/mysql.sock",//module 'socket' has no attribute 'AF_UNIX'
self.user='root';
self.password='root';
self.database=database;
self.charset='utf8mb4';
self.cursorclass=pymysql.cursors.DictCursor;
pass;
def getConnection(self):
connection = None;
connection = pymysql.connect(host=self.host,
port=self.port,
# unix_socket= "/tmp/mysql.sock",//module 'socket' has no attribute 'AF_UNIX'
user=self.user,
password=self.password,
database=self.database,
charset=self.charset,
cursorclass=self.cursorclass);
# 切換數據庫子模式
# cursor.execute("USE "+str(self.database)); # 形如: USE db_demoDatabase
# 使用cursor()方法建立一個遊標對象
# cursor = connection.cursor();
if connection == None:
print("[",self.className,".getConnection] Fail to load and get database Connection!");
return connection;
pass;
def query(self,sql):## 查 select
# sql = "select * from tb_url limit 0,1000"; # just for test
print("[",self.className,".query] SQL:",sql);
connection = self.getConnection();
cursor = connection.cursor();
try:
cursor.execute(sql) # 執行SQL語句
# sql = "select * from userinfo where username=%(u)s and password=%(p)s"
# cursor.execute(sql,user,pwd) #直接傳值
# cursor.execute(sql,[user,pwd]) #列表形式
# cursor.execute(sql,{'u':user,'p':pwd}) #字典格式
# print("row number:",(cursor.rownumber))
# print("rowcount:",cursor.rowcount); # 只讀屬性,並返回執行execute()方法後影響的行數
# 獲取全部記錄列表
#results = print("row [1]: ", cursor.fetchone()); # 獲取下一個查詢結果集。結果集是一個對象
if cursor.rowcount < 1:
print("[",self.className,".query] Not found any data from database!")
return;
results = None; # 初始化
# + cursor.fetchone() 獲取下一行數據,第一次爲首行
# + cursor.fetchall() 獲取全部行數據源
# + cursor.fetchmany(n) 獲取下N行數據
results = cursor.fetchall(); # 接收所有的返回結果行.
# print("results:",results);
# print("results[0]:",results[0]);
i = 0; #記錄計數
for row in results: # row :字典類型
object = [];#存儲各字段信息
for key in row:
object.append(row[key]);
# print("[", i , "] " , key , " : " , row[key]);
pass;
i+=1;
# 打印結果
# print("Url {\n\tpk_url_id:%s,\n\tstate:%s,\n\turl:%s,\n\tresolve_type:%s,\n\tresolver_class_bean:%s\n}" % (object[0], object[1], object[2], object[3], object[4]))
except e:
print("[",self.className,".query] ",e.message);
print("[",self.className,".query] Error: unable to fetch data!")
pass;
self.closeResources(cursor,connection);
return results; # 以結果行爲字典類型的數組 [{ key1:value1, key2:value2 }]
pass;
def save(self,sql): ## 插 insert / 更新 update
# sql = "UPDATE EMPLOYEE SET AGE = AGE + 1 WHERE SEX = '%c'" % ('M')
# sql = "insert into tb_url values('2','WAITING_RESOLVE','http://test.for.python.mysql.cn','article/author','resolver_class_bean:Unknown')";
print("[",self.className,".save] SQL:",sql);
connection = self.getConnection();
cursor = connection.cursor();
result = -100; # 初始化一個異常值
try:
# 執行sql語句
result = cursor.execute(sql);
# print("[",className,".insert] the last rowId is ",cursor.lastrowid) # 當表中有自增的主鍵的時候,能夠使用lastrowid來獲取最後一次自增的ID
# 提交到數據庫執行
connection.commit();
except:
# 若是發生錯誤則回滾
connection.rollback();
pass;
self.closeResources(cursor,connection);
if result == 1:
print("[",self.className,".save] Done!");
else:
print("[",self.className,".save] Failed!")
pass;
# print("[",self.className,".insert] result:",result)
return result;
pass;
def delete(self,sql): # 刪除 delete
# sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20)
print("[",self.className,".delete] SQL:",sql);
connection = self.getConnection();
cursor = connection.cursor();
result = -100; # 初始化一個異常值
try:
# 執行sql語句
result = cursor.execute(sql);
# print("[",className,".insert] the last rowId is ",cursor.lastrowid) # 當表中有自增的主鍵的時候,能夠使用lastrowid來獲取最後一次自增的ID
# 提交到數據庫執行
connection.commit();
except:
# 若是發生錯誤則回滾
connection.rollback();
pass;
self.closeResources(cursor,connection);
if result == 1:
print("[",self.className,".delete] Done!");
else:
print("[",self.className,".delete] Failed!")
pass;
# print("[",self.className,".insert] result:",result)
return result;
pass;
def closeResources(self,cursor,connection):
try:
cursor.close() # 關閉數據庫遊標對象
connection.close() # 關閉數據庫鏈接
pass;
except e:
print (e.message);
pass;
pass;
pass; # class end
Demo
# 插 / 更新
# sql = "insert into tb_url values('5','WAITING_RESOLVE','http://test.for.python.mysql.cn','article/author','resolver_class_bean:Unknown')";
# print(dbUtil.save(sql));
# 刪
# for i in range(2,5):[2,5)
# sql = "delete from tb_url where pk_url_id='%d'" % (i);
# dbUtil.delete(sql);
# pass;
# 查
sql = "select * from tb_url where pk_url_id='%s'" % ('5');
print(dbUtil.query(sql));
//output
[ DatabaseUtil .query] SQL: select * from tb_url where pk_url_id='5'
[{'pk_url_id': 5, 'state': 'WAITING_RESOLVE', 'url': 'http://test.for.python.mysql.cn', 'resolve_type': 'article/author', 'resolver_class_bean': 'resolver_class_bean:Unknown'}]