Python學習 - 編寫本身的ORM(2)

上一篇文章簡單的實現了ORM(對象關係模型),這一篇文章主要實現簡單的MySQL數據庫操做。python

想要操做數據庫,首先要創建一個數據庫鏈接。下面定義一個建立數據庫鏈接的函數,獲得一個鏈接叫作engine。mysql

def create_engine(user,password,database,host='127.0.0.1',port=3306,**kw):
    import mysql.connector
    global engine
    if engine is not None:
        raise DBError('Engine is already initialized.')
    params = dict(user=user,password=password,database=database,host=host,port=port)
    defaults = dict(use_unicode=True,charset='utf8',collation='utf8_general_ci',autocommit=False)
    #print ('%s %s %s %s %s') % (user,password,database,host,port)
    for k,v in defaults.iteritems():
        params[k] = kw.pop(k,v)
    params.update(kw)
    params['buffered'] = True
    engine = mysql.connector.connect(**params)
    cursor = engine.cursor()

 有了鏈接就能夠對數據庫進行操做了。下面寫了幾個函數,能夠對數據庫進行查詢和插入操做。sql

def _select(sql,first,*args):
    cursor = None
    sql = sql.replace('?','%s')
    global engine
    try:
        cursor = engine.cursor()
        cursor.execute(sql,args)
        if cursor.description:
            names = [x[0] for x in cursor.description]
        if first:
            values = cursor.fetchone()
            if not values:
                return None
            return Dict(names,values)
        return [Dict(names,x) for x in cursor.fetchall()]
    finally:
        if cursor:
            cursor.close()

def select_one(sql,*args):
    return _select(sql,True,*args)

def select(sql,*args):
    return _select(sql,False,*args)

def _update(sql,*args):
    cursor = None
    global engine
    sql = sql.replace('?','%s')
    print sql
    try:
        cursor = engine.cursor()
        cursor.execute(sql,args)
        r = cursor.rowcount
        engine.commit()
        return r
    finally:
        if cursor:
            cursor.close()

def insert(table,**kw):
    cols, args = zip(*kw.iteritems())
    sql = 'insert into %s (%s) values(%s)' % (table,','.join(['%s' % col for col in cols]),','.join(['?' for i in range(len(cols))]))
    print ('sql %s args %s' % (sql, str(args)))
    return _update(sql,*args)

到這裏,基本的數據庫操做已經完成了。可是,根據廖雪峯的教程,這還遠遠不夠。數據庫

  • 若是要在一個數據庫鏈接中實現多個操做,上面的代碼效率很低,沒次執行玩一條語句,就須要從新分配一個鏈接。
  • 在一次事務中執行多條操做也是同樣效率低下。
  • 若是服務器爲不一樣用戶數據庫請求都分配一個線程來創建鏈接,可是在進程中,鏈接是可供享使用的。這樣問題就來了,致使數據庫操做可能異常。

針對第三個問題,應該使每一個鏈接是每一個線程擁有的,其它線程不能訪問,使用threading.local。首先定義一個類,來保存數據庫的上下文:服務器

class _DbCtx(threading.local):

    def __init__(self):
        self.connection = None
        self.transactions = 0

    def is_init(self):
        return not self.connection is None

    def init(self):
        self.connection = engine # 建立數據庫鏈接
        self.transactions = 0

    def cleanup(self):
        self.connection.cleanup()
        self.connection = None

    def cursor(self):
        return self.connection.cursor()

 上面的代碼有一個錯誤。由於Python的賦值語句只是將一個對象的引用傳給一個變量,就如上面代碼中 init函數中 self.connection = engine。代表self.connection和engine都指向一個數據庫鏈接的對象。若是將self.connection給cleanup了,那麼engine指向的對象也被cleanup了。下圖是一個例子:app

a是類foo實例的一個引用,執行b=a後,在執行b.clean(),此時應該只是b的v值被更改成0,可是執行a.v卻發現v的值也變爲0了。函數

下面是最後的代碼,只是封裝了最底層的數據庫操做,代碼也寫的很漲,雖然是模仿廖雪峯的代碼。fetch

# -*- coding: utf-8 -*-
import time, uuid, functools, threading, logging

class Dict(dict):
    '''
    Simple dict but support access as x.y style.

    '''
    def __init__(self, names=(), values=(), **kw):
        super(Dict, self).__init__(**kw)
        for k, v in zip(names, values):
            self[k] = v

    def __getattr__(self, key):
        try:
            return self[key]
        except KeyError:
            raise AttributeError(r"'Dict' object has no attribute '%s'" % key)

    def __setattr__(self, key, value):
        self[key] = value
class DBError(Exception):
    pass
class MultiColumnsError(Exception):
    pass
engine = None
class _DbCtx(threading.local):

    def __init__(self):
        self.connection = None
        self.transactions = 0

    def is_init(self):
        return not self.connection is None

    def init(self):
        self.connection = engine
        self.transactions = 0

    def cleanup(self):
        self.connection = None
    
    def cursor(self):
        return self.connection.cursor()

def create_engine(user,password,database,host='127.0.0.1',port=3306,**kw):
    import mysql.connector
    global engine
    if engine is not None:
        raise DBError('Engine is already initialized.')
    params = dict(user=user,password=password,database=database,host=host,port=port)
    defaults = dict(use_unicode=True,charset='utf8',collation='utf8_general_ci',autocommit=False)
    #print ('%s %s %s %s %s') % (user,password,database,host,port)
    for k,v in defaults.iteritems():
        params[k] = kw.pop(k,v)
    params.update(kw)
    params['buffered'] = True
    engine = mysql.connector.connect(**params)
    print type(engine)

_db_ctx = _DbCtx()
class _ConnectionCtx(object):

    def __enter__(self):
        self.should_cleanuo = False
        if not _db_ctx.is_init():
            cursor = engine.cursor()
            _db_ctx.init()
            self.should_cleanup = True
        return self

    def __exit__(self,exctype,excvalue,traceback):
        if self.should_cleanup:
            _db_ctx.cleanup()

def with_connection(func):
    @functools.wraps(func)
    def _wrapper(*args,**kw):
        with _ConnectionCtx():
            return func(*args, **kw)
    return _wrapper

def _select(sql,first,*args):
    cursor = None
    sql = sql.replace('?','%s')
    global _db_ctx
    try:
        cursor = _db_ctx.cursor()
        cursor.execute(sql,args)
        if cursor.description:
            names = [x[0] for x in cursor.description]
        if first:
            values = cursor.fetchone()
            if not values:
                return None
            return Dict(names,values)
        return [Dict(names,x) for x in cursor.fetchall()]
    finally:
        if cursor:
            cursor.close()
@with_connection
def select_one(sql,*args):
    return _select(sql,True,*args)
@with_connection
def select_int(sql,*args):
    d = _select(sql,True,*args)
    if len(d) != 1:
        raise MultoColumnsError('Except only one column.')
    return d.values()[0]
@with_connection
def select(sql,*args):
    global engine
    print type(engine)
    return _select(sql,False,*args)
@with_connection
def _update(sql,*args):
    cursor = None
    global _db_ctx 
    sql = sql.replace('?','%s')
    print sql
    try:
        cursor = _db_ctx.cursor()
        cursor.execute(sql,args)
        r = cursor.rowcount
        engine.commit()
        return r
    finally:
        if cursor:
            cursor.close()

def insert(table,**kw):
    cols, args = zip(*kw.iteritems())
    sql = 'insert into %s (%s) values(%s)' % (table,','.join(['%s' % col for col in cols]),','.join(['?' for i in range(len(cols))]))
    print ('sql %s args %s' % (sql, str(args)))
    return _update(sql,*args)

create_engine(user='root',password='z5201314',database='test')
u1 = select_one('select * from user where id=?',1)
print 'u1'
print u1
print 'start selet()...'
u2 = select('select * from user')
for item in u2:
    print ('%s %s' % (item.name,item.id))
print 'name:%s id: %s' % (u1.name,u1.id)
相關文章
相關標籤/搜索