web.py中實現相似Django中的ORM的查詢效果

Django中的對象查詢

Django框架自帶了ORM,實現了一些比較強大並且方便的查詢功能,這些功能和表無關。好比下面這個例子:web

class Question(models.Model):
    question_text = models.CharField(max_length=200)
    pub_date = models.DateTimeField('date published')

Question.objects.all()
Question.objects.get(pk=1)sql

從例子能夠看出,objects.allobjects.get這些功能都不是在class Question中定義的,可能在其父類models.Model中定義,也可能不是。那麼咱們在web.py中如何實現這樣的功能呢?(若是你選擇使用SQLAlchemy就不須要本身實現了)。數據庫

實現

思路

咱們注意到Question.objects.all()這樣的調用是直接訪問了類屬性objects,並調用了objects屬性的方法all()。這裏objects多是一個實例,也多是一個類。我我的認爲(我沒看過Django的實現)這應該是一個實例,由於實例化的過程能夠傳遞一些表的信息,使得相似all()這樣的函數能夠工做。通過分析以後,咱們能夠列出咱們須要解決的問題:框架

  1. 須要實現一個模型的父類Model,實際的表能夠從這個父類繼承以得到本身沒有定義的功能。函數

  2. 實際的模型類(好比Question類)定義後,不實例話的狀況下就要具有objects.all()這樣的查詢效果。code

從上面的需求能夠看出,咱們須要在類定義的時候就實現這些功能,而不是等到類實例化的時候再實現這些功能。類定義的時候實現功能?這不就是metaclass(元類)作的事情嘛。所以實現過程大概是下面這樣的:sqlite

  1. 實現一個Model類,其綁定方法和表的增、刪、改有關。對象

  2. 修改Model類的元類爲ModelMetaClass,該元類定義的過程當中爲類增長一個objects對象,該對象是一個ModelDefaultManager類的實例,實現了表的查詢功能。繼承

代碼

都說不給代碼就是耍流氓,我仍是給吧。說明下:使用的數據庫操做都是web.py的db庫中的接口。接口

# -*- coding: utf-8 -*-
    
    import web
    
    import config  # 自定義的配置類,能夠忽略
    
    
    def _connect_to_db():
        return web.database(dbn="sqlite", db=config.dbname)
    
    
    def init_db():
        db = _connect_to_db()
        for statement in config.sql_statements:
            db.query(statement)
    
    
    class ModelError(Exception):
        """Exception raised by all models.
    
        Attributes:
            msg: Error message.
        """
    
        def __init__(self, msg=""):
            self.msg = msg
    
        def __str__(self):
            return "ModelError: %s" % self.msg
    
    
    class ModelDefaultManager(object):
        """ModelManager implements query functions against a model.
    
        Attributes:
            cls: The class to be managed.
        """
    
        def __init__(self, cls):
            self.cls = cls
            self._table_name = cls.__name__.lower()
    
        def all(self):
            db = _connect_to_db()
            results = db.select(self._table_name)
            return [self.cls(x) for x in results]
    
        def get(self, query_vars, where):
            results = self.filter(query_vars, where, limit=1)
            if len(results) > 0:
                return results[0]
            else:
                return None
    
        def filter(self, query_vars, where, limit=None):
            db = _connect_to_db()
            try:
                results = db.select(self._table_name, vars=query_vars, where=where,
                                    limit=limit)
            except (Exception) as e:
                raise ModelError(str(e))
    
            return [self.cls(x) for x in results]
    
    
    class ModelMetaClass(type):
    
        def __new__(cls, classname, bases, attrs):
            new_class = super(ModelMetaClass, cls).__new__(cls, classname,
                                                           bases, attrs)
            objects = ModelDefaultManager(new_class)
            setattr(new_class, "objects", objects)
    
            return new_class
    
    
    class Model(object):
        """Parent class of all models.
        """
    
        __metaclass__ = ModelMetaClass
    
        def __init__(self):
            pass
    
        def _table_name(self):
            return self.__class__.__name__.lower()
    
        def insert(self, **kargs):
            db = _connect_to_db()
            try:
                with db.transaction():
                    db.insert(self._table_name(), **kargs)
            except (Exception) as e:
                raise ModelError(str(e))
    
        def delete(self, where, using=None, vars=None):
            db = _connect_to_db()
            try:
                with db.transaction():
                    db.delete(self._table_name(), where, vars=vars)
            except (Exception) as e:
                raise ModelError(str(e))
    
        def save(self, where, vars=None, **kargs):
            db = _connect_to_db()
            try:
                with db.transaction():
                    db.update(self._table_name(), where, vars, **kargs)
            except (Exception) as e:
                raise ModelError(str(e))

使用

首先定義表對應的類:

class Users(Model):
    ...

使用就和Django的方式同樣:

user_list = Users.objects.all()

相關文章
相關標籤/搜索