Python之數據庫操縱工具

前言

源碼

import pymysql # 導入mysql模塊

## 數據庫操縱工具
class DatabaseUtil:
    """
    database util / 數據庫操縱工具
    ------------------------------
    [特別注意] 
        + pymysql.connect(...)函數
            + unix_socket
                + 本字段是使用 socket 鏈接纔用到的,而咱們使用的是IP鏈接
                + 故:unix_socket='/tmp/mysql.sock' 改爲 port=3306   
    """
    className = "DatabaseUtil";
    def __init__(self,database='default_db_name'):
        self.host='xxxx.com.cn';
        self.port = 3306;
        # unix_socket= "/tmp/mysql.sock",//module 'socket' has no attribute 'AF_UNIX'
        self.user='root';
        self.password='root';
        self.database=database;
        self.charset='utf8mb4';
        self.cursorclass=pymysql.cursors.DictCursor;
        pass;
    
    def getConnection(self):
        connection = None;
        connection = pymysql.connect(host=self.host,
                                 port=self.port,
                                 #  unix_socket= "/tmp/mysql.sock",//module 'socket' has no attribute 'AF_UNIX'
                                 user=self.user,
                                 password=self.password,
                                 database=self.database,
                                 charset=self.charset,
                                 cursorclass=self.cursorclass);
        # 切換數據庫子模式
        # cursor.execute("USE "+str(self.database)); # 形如: USE db_demoDatabase 
           # 使用cursor()方法建立一個遊標對象
        # cursor = connection.cursor();
        if connection == None:
            print("[",self.className,".getConnection] Fail to load and get database Connection!");
        return connection;
        pass;
    
    def query(self,sql):## 查 select
        # sql = "select * from tb_url limit 0,1000";  # just for test
        print("[",self.className,".query] SQL:",sql);
        connection = self.getConnection();
        cursor = connection.cursor();
        try:
            cursor.execute(sql) # 執行SQL語句
            # sql = "select * from userinfo where username=%(u)s and password=%(p)s"
            # cursor.execute(sql,user,pwd)  #直接傳值
            # cursor.execute(sql,[user,pwd]) #列表形式
            # cursor.execute(sql,{'u':user,'p':pwd}) #字典格式
            # print("row number:",(cursor.rownumber))
            # print("rowcount:",cursor.rowcount); # 只讀屬性,並返回執行execute()方法後影響的行數
            # 獲取全部記錄列表
            #results = print("row [1]: ", cursor.fetchone()); # 獲取下一個查詢結果集。結果集是一個對象
            if cursor.rowcount < 1:
                print("[",self.className,".query] Not found any data from database!")
                return;
            results = None; # 初始化
            # + cursor.fetchone() 獲取下一行數據,第一次爲首行
            # + cursor.fetchall() 獲取全部行數據源
            # + cursor.fetchmany(n) 獲取下N行數據
            results = cursor.fetchall(); # 接收所有的返回結果行.
            # print("results:",results);
            # print("results[0]:",results[0]);
            i = 0; #記錄計數
            for row in results: # row :字典類型
                object = [];#存儲各字段信息
                for key in row:
                    object.append(row[key]);
                    # print("[", i , "] " , key , " : " , row[key]);
                    pass;
                i+=1;
                # 打印結果
                # print("Url {\n\tpk_url_id:%s,\n\tstate:%s,\n\turl:%s,\n\tresolve_type:%s,\n\tresolver_class_bean:%s\n}" % (object[0], object[1], object[2], object[3], object[4]))
        except e:
            print("[",self.className,".query] ",e.message);
            print("[",self.className,".query] Error: unable to fetch data!")
            pass;
        self.closeResources(cursor,connection);
        return results; # 以結果行爲字典類型的數組 [{ key1:value1, key2:value2 }]
        pass;

    def save(self,sql): ## 插 insert / 更新 update
        # sql = "UPDATE EMPLOYEE SET AGE = AGE + 1 WHERE SEX = '%c'" % ('M')
        # sql = "insert into tb_url values('2','WAITING_RESOLVE','http://test.for.python.mysql.cn','article/author','resolver_class_bean:Unknown')";
        print("[",self.className,".save] SQL:",sql);
        connection = self.getConnection();
        cursor = connection.cursor();
        result = -100; # 初始化一個異常值
        try:
            # 執行sql語句
            result = cursor.execute(sql);
            # print("[",className,".insert] the last rowId is ",cursor.lastrowid) # 當表中有自增的主鍵的時候,能夠使用lastrowid來獲取最後一次自增的ID
            # 提交到數據庫執行
            connection.commit();
        except:
            # 若是發生錯誤則回滾
            connection.rollback();
            pass;
        self.closeResources(cursor,connection);
        if result == 1:
            print("[",self.className,".save] Done!");
        else:
            print("[",self.className,".save] Failed!")
            pass;
        # print("[",self.className,".insert] result:",result)
        return result;
        pass;
    
    def delete(self,sql): # 刪除 delete
        # sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20)
        print("[",self.className,".delete] SQL:",sql);
        connection = self.getConnection();
        cursor = connection.cursor();
        result = -100; # 初始化一個異常值
        try:
            # 執行sql語句
            result = cursor.execute(sql);
            # print("[",className,".insert] the last rowId is ",cursor.lastrowid) # 當表中有自增的主鍵的時候,能夠使用lastrowid來獲取最後一次自增的ID
            # 提交到數據庫執行
            connection.commit();
        except:
            # 若是發生錯誤則回滾
            connection.rollback();
            pass;
        self.closeResources(cursor,connection);
        if result == 1:
            print("[",self.className,".delete] Done!");
        else:
            print("[",self.className,".delete] Failed!")
            pass;
        # print("[",self.className,".insert] result:",result)
        return result;
        pass;
    
    def closeResources(self,cursor,connection):
        try:
            cursor.close() # 關閉數據庫遊標對象
            connection.close() # 關閉數據庫鏈接
            pass;
        except e:
            print (e.message);
            pass;
        pass;
    pass; # class end

Demo

# 插 / 更新
    # sql = "insert into tb_url values('5','WAITING_RESOLVE','http://test.for.python.mysql.cn','article/author','resolver_class_bean:Unknown')";
    # print(dbUtil.save(sql));

    # 刪
    # for i in range(2,5):[2,5)
    #     sql = "delete from tb_url where pk_url_id='%d'" % (i);
    #     dbUtil.delete(sql);
    #     pass;

    # 查
sql = "select * from tb_url where pk_url_id='%s'" % ('5'); 
print(dbUtil.query(sql));

//output
[ DatabaseUtil .query] SQL: select * from tb_url where pk_url_id='5'
[{'pk_url_id': 5, 'state': 'WAITING_RESOLVE', 'url': 'http://test.for.python.mysql.cn', 'resolve_type': 'article/author', 'resolver_class_bean': 'resolver_class_bean:Unknown'}]
相關文章
相關標籤/搜索