上一篇文章簡單的實現了ORM(對象關係模型),這一篇文章主要實現簡單的MySQL數據庫操做。python
想要操做數據庫,首先要創建一個數據庫鏈接。下面定義一個建立數據庫鏈接的函數,獲得一個鏈接叫作engine。mysql
def create_engine(user,password,database,host='127.0.0.1',port=3306,**kw): import mysql.connector global engine if engine is not None: raise DBError('Engine is already initialized.') params = dict(user=user,password=password,database=database,host=host,port=port) defaults = dict(use_unicode=True,charset='utf8',collation='utf8_general_ci',autocommit=False) #print ('%s %s %s %s %s') % (user,password,database,host,port) for k,v in defaults.iteritems(): params[k] = kw.pop(k,v) params.update(kw) params['buffered'] = True engine = mysql.connector.connect(**params) cursor = engine.cursor()
有了鏈接就能夠對數據庫進行操做了。下面寫了幾個函數,能夠對數據庫進行查詢和插入操做。sql
def _select(sql,first,*args): cursor = None sql = sql.replace('?','%s') global engine try: cursor = engine.cursor() cursor.execute(sql,args) if cursor.description: names = [x[0] for x in cursor.description] if first: values = cursor.fetchone() if not values: return None return Dict(names,values) return [Dict(names,x) for x in cursor.fetchall()] finally: if cursor: cursor.close() def select_one(sql,*args): return _select(sql,True,*args) def select(sql,*args): return _select(sql,False,*args) def _update(sql,*args): cursor = None global engine sql = sql.replace('?','%s') print sql try: cursor = engine.cursor() cursor.execute(sql,args) r = cursor.rowcount engine.commit() return r finally: if cursor: cursor.close() def insert(table,**kw): cols, args = zip(*kw.iteritems()) sql = 'insert into %s (%s) values(%s)' % (table,','.join(['%s' % col for col in cols]),','.join(['?' for i in range(len(cols))])) print ('sql %s args %s' % (sql, str(args))) return _update(sql,*args)
到這裏,基本的數據庫操做已經完成了。可是,根據廖雪峯的教程,這還遠遠不夠。數據庫
針對第三個問題,應該使每一個鏈接是每一個線程擁有的,其它線程不能訪問,使用threading.local。首先定義一個類,來保存數據庫的上下文:服務器
class _DbCtx(threading.local): def __init__(self): self.connection = None self.transactions = 0 def is_init(self): return not self.connection is None def init(self): self.connection = engine # 建立數據庫鏈接 self.transactions = 0 def cleanup(self): self.connection.cleanup() self.connection = None def cursor(self): return self.connection.cursor()
上面的代碼有一個錯誤。由於Python的賦值語句只是將一個對象的引用傳給一個變量,就如上面代碼中 init函數中 self.connection = engine。代表self.connection和engine都指向一個數據庫鏈接的對象。若是將self.connection給cleanup了,那麼engine指向的對象也被cleanup了。下圖是一個例子:app
a是類foo實例的一個引用,執行b=a後,在執行b.clean(),此時應該只是b的v值被更改成0,可是執行a.v卻發現v的值也變爲0了。函數
下面是最後的代碼,只是封裝了最底層的數據庫操做,代碼也寫的很漲,雖然是模仿廖雪峯的代碼。fetch
# -*- coding: utf-8 -*- import time, uuid, functools, threading, logging class Dict(dict): ''' Simple dict but support access as x.y style. ''' def __init__(self, names=(), values=(), **kw): super(Dict, self).__init__(**kw) for k, v in zip(names, values): self[k] = v def __getattr__(self, key): try: return self[key] except KeyError: raise AttributeError(r"'Dict' object has no attribute '%s'" % key) def __setattr__(self, key, value): self[key] = value class DBError(Exception): pass class MultiColumnsError(Exception): pass engine = None class _DbCtx(threading.local): def __init__(self): self.connection = None self.transactions = 0 def is_init(self): return not self.connection is None def init(self): self.connection = engine self.transactions = 0 def cleanup(self): self.connection = None def cursor(self): return self.connection.cursor() def create_engine(user,password,database,host='127.0.0.1',port=3306,**kw): import mysql.connector global engine if engine is not None: raise DBError('Engine is already initialized.') params = dict(user=user,password=password,database=database,host=host,port=port) defaults = dict(use_unicode=True,charset='utf8',collation='utf8_general_ci',autocommit=False) #print ('%s %s %s %s %s') % (user,password,database,host,port) for k,v in defaults.iteritems(): params[k] = kw.pop(k,v) params.update(kw) params['buffered'] = True engine = mysql.connector.connect(**params) print type(engine) _db_ctx = _DbCtx() class _ConnectionCtx(object): def __enter__(self): self.should_cleanuo = False if not _db_ctx.is_init(): cursor = engine.cursor() _db_ctx.init() self.should_cleanup = True return self def __exit__(self,exctype,excvalue,traceback): if self.should_cleanup: _db_ctx.cleanup() def with_connection(func): @functools.wraps(func) def _wrapper(*args,**kw): with _ConnectionCtx(): return func(*args, **kw) return _wrapper def _select(sql,first,*args): cursor = None sql = sql.replace('?','%s') global _db_ctx try: cursor = _db_ctx.cursor() cursor.execute(sql,args) if cursor.description: names = [x[0] for x in cursor.description] if first: values = cursor.fetchone() if not values: return None return Dict(names,values) return [Dict(names,x) for x in cursor.fetchall()] finally: if cursor: cursor.close() @with_connection def select_one(sql,*args): return _select(sql,True,*args) @with_connection def select_int(sql,*args): d = _select(sql,True,*args) if len(d) != 1: raise MultoColumnsError('Except only one column.') return d.values()[0] @with_connection def select(sql,*args): global engine print type(engine) return _select(sql,False,*args) @with_connection def _update(sql,*args): cursor = None global _db_ctx sql = sql.replace('?','%s') print sql try: cursor = _db_ctx.cursor() cursor.execute(sql,args) r = cursor.rowcount engine.commit() return r finally: if cursor: cursor.close() def insert(table,**kw): cols, args = zip(*kw.iteritems()) sql = 'insert into %s (%s) values(%s)' % (table,','.join(['%s' % col for col in cols]),','.join(['?' for i in range(len(cols))])) print ('sql %s args %s' % (sql, str(args))) return _update(sql,*args) create_engine(user='root',password='z5201314',database='test') u1 = select_one('select * from user where id=?',1) print 'u1' print u1 print 'start selet()...' u2 = select('select * from user') for item in u2: print ('%s %s' % (item.name,item.id)) print 'name:%s id: %s' % (u1.name,u1.id)