# -*- coding: utf-8 -*- """ @Datetime: 2018/12/26 @Author: Zhang Yafei """ import pymysql from DBUtils.PooledDB import PooledDB POOL = PooledDB( creator=pymysql, # 使用連接數據庫的模塊 maxconnections=6, # 鏈接池容許的最大鏈接數,0和None表示不限制鏈接數 mincached=2, # 初始化時,連接池中至少建立的空閒的連接,0表示不建立 maxcached=5, # 連接池中最多閒置的連接,0和None不限制 maxshared=3, # 連接池中最多共享的連接數量,0和None表示所有共享。PS: 無用,由於pymysql和MySQLdb等模塊的 threadsafety都爲1,全部值不管設置爲多少,_maxcached永遠爲0,因此永遠是全部連接都共享。 blocking=True, # 鏈接池中若是沒有可用鏈接後,是否阻塞等待。True,等待;False,不等待而後報錯 maxusage=None, # 一個連接最多被重複使用的次數,None表示無限制 setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='0000', database='flask_code', charset='utf8' ) def connect(type=None): conn = POOL.connection() cursor = conn.cursor(cursor=type) return conn, cursor def connect_close(conn, cursor): cursor.close() conn.close() def fetchone(sql, arg=list()): conn, cursor = connect(type) cursor.execute(sql, arg) data = cursor.fetchone() connect_close(conn, cursor) return data def fetchall(sql, arg=list(), type=pymysql.cursors.DictCursor): conn, cursor = connect(type) cursor.execute(sql, arg) data = cursor.fetchall() connect_close(conn, cursor) return data def insert(sql, arg=list()): conn, cursor = connect() row = cursor.execute(sql, arg) conn.commit() connect_close(conn, cursor) return row
# -*- coding: utf-8 -*- """ @Datetime: 2019/1/31 @Author: Zhang Yafei """ import sqlite3 import settings import os BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) DB_DIR = os.path.join(BASE_DIR, 'data.db') def connect(): '''鏈接數據庫''' conn = sqlite3.connect(settings.DATABASES.get('sqlite3')) # db不存在時將自動建立db cursor = conn.cursor() return conn, cursor def connect_close(conn, cursor): """關閉鏈接""" cursor.close() conn.close() def execute(sql, params=tuple()): conn, cursor = connect() cursor.execute(sql, params) # 執行這個語句 connect_close(conn, cursor) def fetchone(sql, params=tuple()): conn, cursor = connect() result = cursor.execute(sql, params) data = result.fetchone() connect_close(conn, cursor) return data def fetchall(sql, params=tuple()): conn, cursor = connect() results = cursor.execute(sql, params) data = results.fetchall() connect_close(conn, cursor) return data
# -*- coding: utf-8 -*- """ @Datetime: 2019/1/31 @Author: Zhang Yafei """ import sqlite3 import os BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) DB_DIR = os.path.join(BASE_DIR, 'data.db') class SqliteDB(object): def __init__(self): self.conn = sqlite3.connect(DB_DIR) # db不存在時將自動建立db self.cursor = self.conn.cursor() def close(self): self.cursor.close() self.conn.close() def execute(self, sql, params=tuple()): self.cursor.execute(sql, params) self.close() def fetchone(self, sql, params=tuple()): result = self.cursor.execute(sql, params) data = result.fetchone() self.close() return data def fetchall(self, sql, params=tuple()): results = self.cursor.execute(sql, params) data = results.fetchall() self.close() return data if __name__ == '__main__': sqlite = SqliteDB() # 1. 建表 sql = '''create table happy( username text, password text, id int)''' sqlite.execute(sql) # 2. 插入數據 sqlite.execute("INSERT INTO COMPANY (ID,NAME,AGE,ADDRESS,SALARY) \ VALUES (4, 'Mark', 25, 'Rich-Mond ', 65000.00 )") # 3. 更改數據 sqlite.execute("UPDATE COMPANY SET ID=99 WHERE ID=2") # 4. 刪除表裏面的數據 c.execute("DELETE FROM COMPANY WHERE ID=4") c.execute("DELETE FROM COMPANY WHERE ID=3") # 5. 查詢 data = sqlite.fetchall('select * from label limit 1') print(data) # 輸出 ''' [('盤龍雲海(排毒養顏膠囊)', 509881, '廣東深圳龍崗區/女', '昨天吃的,今天就拉肚子了。感受肚子有點漲痛!不知道效果怎麼樣~~~~~', '昨天/吃/的/,/今天/就/拉肚子/SB了/。/感受/肚子/PB有點/漲痛/SB!/不/知道/效果/怎麼樣/~/~/~/~/~', '2011-09-30 15:26:00', 'http://ypk.39.net/509881/comment/k0_p...', '昨天/吃/的/,/今天/就/拉肚子/SB了/。/感受/肚子/PB有點/漲痛/SB!/不/知道/效果/怎麼樣/~/~/~/~/~', '昨天/吃/的/,/今天/就/拉肚子/SB了/。/感受/肚子/PB有點/漲痛/SB!/不/知道/效果/怎麼樣/~/~/~/~/~')] '''
# -*- coding: utf-8 -*- """ @Datetime: 2019/1/31 @Author: Zhang Yafei """ import json import pymongo import pandas as pd class MongoPipeline(object): """ mongodb: save(self, data, collection): 將數據保存到數據庫 read(self, data): 讀取數據庫中指定表格 insert(self, table, dict_data): 插入數據 delete(self, table, condition): 刪除指定數據 update(self, table, condition, new_dict_data): 更新指定數據 dbFind(self, table, condition=None): 按條件查找 findAll(self, table): 查找所有 close(self): 關閉鏈接 """ def __init__(self, mongo_db, mongo_uri='localhost'): self.mongo_uri = mongo_uri self.mongo_db = mongo_db self.client = pymongo.MongoClient(self.mongo_uri) self.db = self.client[self.mongo_db] def close(self): """ 關閉鏈接 :return: """ self.client.close() def save(self, data, collection): """ 將數據保存到數據庫表 :param data: :param collection: :return: None """ self.collection = self.db[collection] try: if self.collection.insert(json.loads(data.T.to_json()).values()): print('mongodb insert {} sucess.'.format(collection)) return except Exception as e: print('insert error:', e) import traceback traceback.print_exc(e) def read(self, table): """ 讀取數據庫中的數據 :param table: :return: dataframe """ try: # 鏈接數據庫 table = self.db[table] # 讀取數據 data = pd.DataFrame(list(table.find())) return data except Exception as e: import traceback traceback.print_exc(e) def insert(self, table, dict_data): """ 插入 :param table: :param dict_data: :return: None """ try: self.db[table].insert(dict_data) print("插入成功") except Exception as e: print(e) def update(self,table, condition, new_dict_data): """ 更新 :param table: :param dict_data: :param new_dict_data: :return: None """ try: self.db[table].update(condition, new_dict_data) print("更新成功") except Exception as e: print(e) def delete(self,table, condition): """ 刪除 :param table: :param dict_data: :return: None """ try: self.db[table].remove(condition) print("刪除成功") except Exception as e: print(e) def dbFind(self, table, condition=None): """ 按條件查找 :param table: :param dict_data: :return: generator dict """ data = self.db[table].find(condition) for item in data: yield item def findAll(self, table): """ 查找所有 :param table: :return: generator dict """ for item in self.db[table].find(): yield item if __name__ == '__main__': mongo = MongoPipeline('flask') # data = mongo.read('label') # print(data.head()) condition = {"藥品ID": 509881} data = mongo.dbFind('label', condition) print(data) for i in data: print(i) # mongo.findAll()