Python鏈接數據庫

pymysql

# -*- coding: utf-8 -*-

"""
@Datetime: 2018/12/26
@Author: Zhang Yafei
"""
import pymysql
from DBUtils.PooledDB import PooledDB

POOL = PooledDB(
    creator=pymysql,  # 使用連接數據庫的模塊
    maxconnections=6,  # 鏈接池容許的最大鏈接數,0和None表示不限制鏈接數
    mincached=2,  # 初始化時,連接池中至少建立的空閒的連接,0表示不建立
    maxcached=5,  # 連接池中最多閒置的連接,0和None不限制
    maxshared=3,
    # 連接池中最多共享的連接數量,0和None表示所有共享。PS: 無用,由於pymysql和MySQLdb等模塊的 threadsafety都爲1,全部值不管設置爲多少,_maxcached永遠爲0,因此永遠是全部連接都共享。
    blocking=True,  # 鏈接池中若是沒有可用鏈接後,是否阻塞等待。True,等待;False,不等待而後報錯
    maxusage=None,  # 一個連接最多被重複使用的次數,None表示無限制
    setsession=[],  # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    host='127.0.0.1',
    port=3306,
    user='root',
    password='0000',
    database='flask_code',
    charset='utf8'
)


def connect(type=None):
    conn = POOL.connection()
    cursor = conn.cursor(cursor=type)
    return conn, cursor


def connect_close(conn, cursor):
    cursor.close()
    conn.close()


def fetchone(sql, arg=list()):
    conn, cursor = connect(type)
    cursor.execute(sql, arg)
    data = cursor.fetchone()
    connect_close(conn, cursor)
    return data


def fetchall(sql, arg=list(), type=pymysql.cursors.DictCursor):
    conn, cursor = connect(type)
    cursor.execute(sql, arg)
    data = cursor.fetchall()
    connect_close(conn, cursor)
    return data


def insert(sql, arg=list()):
    conn, cursor = connect()
    row = cursor.execute(sql, arg)
    conn.commit()
    connect_close(conn, cursor)
    return row
mysql_helper

sqlite

# -*- coding: utf-8 -*-

"""
@Datetime: 2019/1/31
@Author: Zhang Yafei
"""
import sqlite3
import settings
import os

BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

DB_DIR = os.path.join(BASE_DIR, 'data.db')

def connect():
    '''鏈接數據庫'''
    conn = sqlite3.connect(settings.DATABASES.get('sqlite3'))  # db不存在時將自動建立db
    cursor = conn.cursor()
    return conn, cursor


def connect_close(conn, cursor):
    """關閉鏈接"""
    cursor.close()
    conn.close()


def execute(sql, params=tuple()):
    conn, cursor = connect()
    cursor.execute(sql, params)  # 執行這個語句
    connect_close(conn, cursor)


def fetchone(sql, params=tuple()):
    conn, cursor = connect()
    result = cursor.execute(sql, params)
    data = result.fetchone()
    connect_close(conn, cursor)
    return data


def fetchall(sql, params=tuple()):
    conn, cursor = connect()
    results = cursor.execute(sql, params)
    data = results.fetchall()
    connect_close(conn, cursor)
    return data
sqlite3_helper
# -*- coding: utf-8 -*-

"""
@Datetime: 2019/1/31
@Author: Zhang Yafei
"""
import sqlite3
import os

BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

DB_DIR = os.path.join(BASE_DIR, 'data.db')


class SqliteDB(object):
    def __init__(self):
        self.conn = sqlite3.connect(DB_DIR)  # db不存在時將自動建立db
        self.cursor = self.conn.cursor()

    def close(self):
        self.cursor.close()
        self.conn.close()

    def execute(self, sql, params=tuple()):
        self.cursor.execute(sql, params)
        self.close()

    def fetchone(self, sql, params=tuple()):
        result = self.cursor.execute(sql, params)
        data = result.fetchone()
        self.close()
        return data

    def fetchall(self, sql, params=tuple()):
        results = self.cursor.execute(sql, params)
        data = results.fetchall()
        self.close()
        return data


if __name__ == '__main__':
    sqlite = SqliteDB()
    # 1. 建表
    sql = '''create table happy(
             username text,
             password text,
             id int)'''
    sqlite.execute(sql)

    # 2. 插入數據
    sqlite.execute("INSERT INTO COMPANY (ID,NAME,AGE,ADDRESS,SALARY) \
          VALUES (4, 'Mark', 25, 'Rich-Mond ', 65000.00 )")

    # 3. 更改數據
    sqlite.execute("UPDATE COMPANY SET  ID=99  WHERE ID=2")

    # 4. 刪除表裏面的數據
    c.execute("DELETE FROM COMPANY WHERE ID=4")
    c.execute("DELETE FROM COMPANY WHERE ID=3")

    # 5. 查詢
    data = sqlite.fetchall('select * from label limit 1')
    print(data)
    # 輸出
    '''
    [('盤龍雲海(排毒養顏膠囊)', 509881, '廣東深圳龍崗區/女', '昨天吃的,今天就拉肚子了。感受肚子有點漲痛!不知道效果怎麼樣~~~~~',
      '昨天/吃/的/,/今天/就/拉肚子/SB了/。/感受/肚子/PB有點/漲痛/SB!/不/知道/效果/怎麼樣/~/~/~/~/~', '2011-09-30 15:26:00',
      'http://ypk.39.net/509881/comment/k0_p...', '昨天/吃/的/,/今天/就/拉肚子/SB了/。/感受/肚子/PB有點/漲痛/SB!/不/知道/效果/怎麼樣/~/~/~/~/~',
      '昨天/吃/的/,/今天/就/拉肚子/SB了/。/感受/肚子/PB有點/漲痛/SB!/不/知道/效果/怎麼樣/~/~/~/~/~')]
    '''
sqlite3_helper2

mongodb

# -*- coding: utf-8 -*-

"""
@Datetime: 2019/1/31
@Author: Zhang Yafei
"""
import json
import pymongo
import pandas as pd


class MongoPipeline(object):
    """
    mongodb:
        save(self, data, collection):                    將數據保存到數據庫
        read(self, data):                                讀取數據庫中指定表格
        insert(self, table, dict_data):                 插入數據
        delete(self, table, condition):                 刪除指定數據
        update(self, table, condition, new_dict_data):  更新指定數據
        dbFind(self, table, condition=None):             按條件查找
        findAll(self, table):                           查找所有
        close(self):                                    關閉鏈接
    """

    def __init__(self, mongo_db, mongo_uri='localhost'):
        self.mongo_uri = mongo_uri
        self.mongo_db = mongo_db
        self.client = pymongo.MongoClient(self.mongo_uri)
        self.db = self.client[self.mongo_db]

    def close(self):
        """
        關閉鏈接
        :return:
        """
        self.client.close()

    def save(self, data, collection):
        """
        將數據保存到數據庫表
        :param data:
        :param collection:
        :return: None
        """
        self.collection = self.db[collection]
        try:
            if self.collection.insert(json.loads(data.T.to_json()).values()):
                print('mongodb insert {} sucess.'.format(collection))
                return
        except Exception as e:
            print('insert error:', e)
            import traceback
            traceback.print_exc(e)

    def read(self, table):
        """
        讀取數據庫中的數據
        :param table:
        :return: dataframe
        """
        try:
            # 鏈接數據庫
            table = self.db[table]
            # 讀取數據
            data = pd.DataFrame(list(table.find()))
            return data
        except Exception as e:
            import traceback
            traceback.print_exc(e)

    def insert(self, table, dict_data):
        """
        插入
        :param table:
        :param dict_data:
        :return: None
        """
        try:
            self.db[table].insert(dict_data)
            print("插入成功")
        except Exception as e:
            print(e)

    def update(self,table, condition, new_dict_data):
        """
        更新
        :param table:
        :param dict_data:
        :param new_dict_data:
        :return: None
        """
        try:
            self.db[table].update(condition, new_dict_data)
            print("更新成功")
        except Exception as e:
            print(e)

    def delete(self,table, condition):
        """
        刪除
        :param table:
        :param dict_data:
        :return: None
        """
        try:
            self.db[table].remove(condition)
            print("刪除成功")
        except Exception as e:
            print(e)

    def dbFind(self, table, condition=None):
        """
        按條件查找
        :param table:
        :param dict_data:
        :return: generator dict
        """
        data = self.db[table].find(condition)
        for item in data:
            yield item

    def findAll(self, table):
        """
        查找所有
        :param table:
        :return: generator dict
        """
        for item in self.db[table].find():
            yield item


if __name__ == '__main__':
    mongo = MongoPipeline('flask')
    # data = mongo.read('label')
    # print(data.head())
    condition = {"藥品ID": 509881}
    data = mongo.dbFind('label', condition)
    print(data)
    for i in data:
        print(i)
    # mongo.findAll()
mongo_helper
相關文章
相關標籤/搜索