包括python鏈接數據庫,以及django下配置鏈接數據庫python
# -*- coding:utf-8 -*- import psycopg2 import pymysql import pymssql import cx_Oracle import time from functools import wraps from contextlib import contextmanager # 測試一個函數的運行時間,使用方式:在待測函數直接添加此修飾器 def timethis(func): @wraps(func) def wrapper(*args, **kwargs): start = time.perf_counter() r = func(*args, **kwargs) end = time.perf_counter() print('\n============================================================') print('{}.{} : {}'.format(func.__module__, func.__name__, end - start)) print('============================================================\n') return r return wrapper # 測試一段代碼運行的時間,使用方式:上下文管理器with # with timeblock('block_name'): # your_code_block... @contextmanager def timeblock(label='Code'): start = time.perf_counter() try: yield finally: end = time.perf_counter() print('==============================================================') print('{} run time: {}'.format(label, end - start)) print('==============================================================') class SqlConn(): ''' 鏈接數據庫,以及進行一些操做的封裝 ''' sql_name = '' database = '' user = '' password = '' port = 0 host = '' # 建立鏈接、遊標 def __init__(self, *args, **kwargs): if kwargs.get("sql_name"): self.sql_name = kwargs.get("sql_name") if kwargs.get("database"): self.database = kwargs.get("database") if kwargs.get("user"): self.user = kwargs.get("user") if kwargs.get("password"): self.password = kwargs.get("password") if kwargs.get("port"): self.port = kwargs.get("port") if kwargs.get("host"): self.host = kwargs.get("host") if not (self.host and self.port and self.user and self.password and self.database): raise Warning("conn_error, missing some params!") sql_conn = {'mysql': pymysql, 'postgresql': psycopg2, 'sqlserver': pymssql, 'orcle': cx_Oracle } self.conn = sql_conn[self.sql_name].connect(host=self.host, port=self.port, user=self.user, password=self.password, database=self.database, ) self.cursor = self.conn.cursor() if not self.cursor: raise Warning("conn_error!") # 測試鏈接 def test_conn(self): if self.cursor: print("conn success!") else: print('conn error!') # 單條語句的並提交 def execute(self, sql_code): self.cursor.execute(sql_code) self.conn.commit() # 單條語句的不提交 def execute_no_conmmit(self, sql_code): self.cursor.execute(sql_code) # 構造多條語句,使用%s參數化,對於每一個list都進行替代構造 def excute_many(self, sql_base, param_list): self.cursor.executemany(sql_base, param_list) # 批量執行(待完善) def batch_execute(self, sql_code): pass # 獲取數據 def get_data(self, sql_code, count=0): self.cursor.execute(sql_code) if int(count): return self.cursor.fetchmany(count) else: return self.cursor.fetchall() # 更新數據 def updata_data(self, sql_code): self.cursor(sql_code) # 插入數據 def insert_data(self, sql_code): self.cursor(sql_code) # 滾動遊標 def cursor_scroll(self, count, mode='relative'): self.cursor.scroll(count, mode=mode) # 提交 def commit(self): self.conn.commit() # 回滾 def rollback(self): self.conn.rollback() # 關閉鏈接 def close_conn(self): self.cursor.close() self.conn.close()
import psycopg2 import pymysql import pymssql import cx_Oracle from mysite.settings import DATABASES class SqlConn(): ''' 鏈接數據庫,以及進行一些操做的封裝 ''' # 建立鏈接、遊標 def __init__(self): print(DATABASES['default']['ENGINE']) if DATABASES['default']['ENGINE'] == 'django.db.backends.mysql': self.sql_name = 'mysql' elif DATABASES['default']['ENGINE'] == 'sql_server.pyodbc': self.sql_name = 'sqlserver' print(self.sql_name) else: self.sql_name = '' raise Warning("conn_error!") self.host = DATABASES['default']['HOST'] self.port = DATABASES['default']['PORT'] self.user = DATABASES['default']['USER'] self.password = DATABASES['default']['PASSWORD'] self.database = DATABASES['default']['NAME'] sql_conn = {'mysql': pymysql, 'postgresql': psycopg2, 'sqlserver': pymssql, 'orcle': cx_Oracle } self.conn = sql_conn[self.sql_name].connect(host=self.host, port=self.port, user=self.user, password=self.password, database=self.database, # charset='utf8', ) self.cursor = self.conn.cursor() if not self.cursor: raise Warning("conn_error!") # 測試鏈接 def test_conn(self): if self.cursor: print("conn success!") else: print('conn error!') # 單條語句的並提交 def execute(self, sql_code): self.cursor.execute(sql_code) self.conn.commit() # 單條語句的不提交 def execute_no_conmmit(self, sql_code): self.cursor.execute(sql_code) # 構造多條語句,使用%s參數化,對於每一個list都進行替代構造 def excute_many(self, sql_base, param_list): self.cursor.executemany(sql_base, param_list) # 批量執行(待完善) def batch_execute(self, sql_code): pass def get_headers(self, table_name): sql_code = "select COLUMN_NAME from information_schema.COLUMNS \ where table_name = '%s' and table_schema = '%s';" % ( table_name, self.database) self.execute(sql_code) return self.cursor.fetchall() # 獲取數據 def get_data(self, sql_code, count=0): print(sql_code) # sql_code = 'select employee.pin,employee.emp_name,iclock.sn,area.area_name from transaction, employee, iclock, area where transaction.employee_id=employee.id and transaction.iclock_id=iclock.id and iclock.area_id=area.id;' self.cursor.execute(sql_code) if int(count): return self.cursor.fetchmany(count) else: return self.cursor.fetchall() def get_headers_datas(self, sql_code, count=0): self.cursor.execute(sql_code) headers = [] for each in self.cursor.description: headers.append(each[0]) if int(count): return headers, self.cursor.fetchmany(count) else: return headers, self.cursor.fetchall() # 更新數據 def updata_data(self, sql_code): self.cursor(sql_code) # 插入數據 def insert_data(self, sql_code): self.cursor(sql_code) # 滾動遊標 def cursor_scroll(self, count, mode='relative'): self.cursor.scroll(count, mode=mode) # 提交 def commit(self): self.conn.commit() # 回滾 def rollback(self): self.conn.rollback() # 關閉鏈接 def close_conn(self): self.cursor.close() self.conn.close()