(轉)SQLAlchemy入門和進階

URL:https://zhuanlan.zhihu.com/p/27400862html

http://www.javashuo.com/article/p-dcqfqbpa-k.html---SQLAlchemy 教程 —— 基礎入門篇python

目錄:mysql

  1. SQLAlchemy 簡介
  2. 橫向對比
  3. 核心概念與入門
    1. 模型定義
    2. 複雜查詢
    3. 基礎性能
  4. 擴展與進階
    1. 事件
    2. 反射
    3. Python3.x asyncio擴展
    4. 分片Session
    5. 自定義的列類型
    6. 混合(hybrid)屬性
    7. 序列化Query
    8. Baked Query
    9. 多態與關係


(知乎沒有自動目錄和側邊欄懸浮呢。。惆悵)sql

在新團隊裏作的技術分享,過了一段時間,整理下來而且有了新的想法。彷佛入門級的教程在知乎更受歡迎?數據庫

SQLAlchemy 簡介

SQLAlchemy 是一個功能強大的Python ORM 工具包,口碑不錯,社區活躍也較爲開放
提供 全功能的SQL和ORM操做 本次附贈的文件(這裏放不上來,也懶得放gayhub了,總之很簡單的,單元測試多一些,一下午搞定):
connect.py :底層的數據庫鏈接
orm.py :模型定義的樣例
example_test.py :單元測試,實質上能夠對應業務的具體使用
python3_test.py :展現Python3 asyncio下的SQLAlchemyexpress

分別創建python2/3的虛擬環境,而後安裝對應的requirements.txt便可後端

不管什麼語言,不管什麼庫,作一個ORM實現,至少應當實現徹底語義化的數據庫操做,使得操做數據庫表就像在操做對象。
完整的ORM應當能夠徹底避免SQL拼接api

爲何須要ORM

當時分享完畢以後,也確實不少同事表示仍是喜歡裸SQL,我後來也又在工做中看到了不少遺留代碼的問題。我也正好趁浴室迷思 想了一下,爲何我須要ORM呢?緩存

第一條來自一個定理:安全

一切由人直接來保證安全性的系統,就必定會出錯

拼接SQL、把SQL作成模板、開始使用ORM、封裝出DAO層,幾乎是每一個項目的共識吧?
過往的項目中,由我第一手寫的,都會第一時間加入ORM,畢竟也只是兩三個小文件,一百行之內的事情(後續因爲封裝的增多,可能會到達數百行)

這段時間在寫舊系統的小規模重構(定理2:一個好程序猿應當友好地幫前人擦好屁股,而不是靠從新制造一個新屁股實現),拼接字符串並無帶來任何優勢,反而引入了很是簡單的注入漏洞,簡單的設想這樣一個列表API的場景:

  1. 根據請求參數控制對應的:過濾條件、排序方法、翻頁
  2. 根據須要預取關聯的表,JOIN並把對一對多的關係化爲一個list

第一條,剛一上手,就發現滿地的string format,翻頁用了:

order_sql = "ORDER BY {} {}".format(order_by,direction)

毫無疑問的order_by=id%3Bselect+1%3B-- 就直接注入了

要解決這些在SQL拼接的問題,除了表單驗證,毫無疑問須要作一個SQL字符轉義,另外在能用SQL參數的地方,須要用參數(而後也得注意拼接時候參數的個數,是的,這裏咱們的接口有另外一個BUG,參數數量沒數對)

第二個功能點,想象一下在須要的地方額外加一句LEFT JOIN,而後對結果再作額外的解析

還有一些附屬功能:單元測試如何建表?代碼裏遍地的硬編碼表名如何解決?


本身不是不能實現,但本身來實現這些,就走上了發明ORM的老路,用一個成熟的、文檔豐富的ORM,豈不美哉?

橫向對比

簡單的挑了三個

(知乎的表格彷佛智障,不插入表格了)

SQLAlchemy、Peewee、Django ORM

Django ORM一直就不是一個全功能的ORM,會發現你想寫的SQL幾乎沒法經過ORM寫出來,固然raw屬於tan90,使用裸SQL不在咱們的考慮範圍。Django 1.12後提供了一些subquery等各種豐富SQL操做,但這麼新,估計還極少項目在這麼新的版本

Peewee若是有興趣能夠後續繼續使用來感覺一下,Peewee也是一個功能全面的ORM,star不少但開發沒有SQLAlchemy活躍

 

核心概念與入門

官方文檔
我老是在想爲何團隊裏不少人會以爲SQLAlchemy入門門檻高,我曾經也被困擾過,但回頭一看會發現的概念實質比較簡單。
官方文檔的脈絡不太清晰,要掃過一遍而且學以至用才能感覺獲得。example很友好的!


回過頭來看它的從教程到API的文檔,會發現它的文檔很是詳細,學會它,除了學會了Python操做SQL的一個庫,一樣也能夠學到從代碼組織、各種Pythonic技巧到思想的不少東西


總的感覺是:上手還算容易,精通要花不少功夫,但確實還挺有趣的

先放一個表,待會咱們會繼續講

(再次損失一個表)

 

概念不多,而且很清晰,理解這些概念以後的後續使用時,基本能夠感覺到:你能直覺想到的操做,還確實都有(好比subquery、複雜查詢的構造)

 

模型定義

咱們來看看他如何完成模型定義:

# coding=utf-8 from __future__ import unicode_literals, absolute_import from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, DateTime ModelBase = declarative_base() #<-元類 class User(ModelBase): __tablename__ = "auth_user" id = Column(Integer, primary_key=True) date_joined = Column(DateTime) username = Column(String(length=30)) password = Column(String(length=128)) 

從這裏能夠看到,模型定義甚至與數據庫是無關的,因此容許不一樣的數據庫後端,不一樣類型擁有不一樣的表現形式和建表語句

這裏咱們能夠看到它實現了 ORM與數據庫鏈接的解耦,一些數據庫後端不支持的數據類型,例如Numeric類型,在sqlite中不支持,不過SQLAlchemy也能作一些兼容使用普通浮點

Model 等同於數據庫的一張表
Column 顯然就是這張表的一列

PS: SQLAlchemy 1.2以後才支持comment註釋,以在ddl產生建表SQL時寫上comment屬性,1.2還在beta版裏,因此還不能用。。。我倒很好奇爲毛這個feature這麼不重要

with get_session() as session:
    session.add(User(username="asd", password="asd"))
    session.add(User(username="qwe", password="qwe"))
    session.commit()

session(會話)的概念,能夠當作一個管理數據庫持久鏈接的對象,在此下面是徹底透明的鏈接池和事務等東西

get_session底下configure能夠控制auto_commit參數,= False時寫操做默認都不放在事務裏,SQLAlchemy默認爲True

session.add函數將會把Model加入當前的持久空間(能夠從session.dirty看到),直到commit時更新

with get_session() as session:
    # <class 'sqlalchemy.orm.query.Query'>
    session.query(User)

最簡單的這個查詢返回了一個Query對象
須要注意的是,這裏只構造Query,事實上並無發送至數據庫進行查詢,只會在Query.get()、Query.all()、Query.one()以及Query.__iter__等具備「執行」語義的函數,纔會真的去獲取

Query :本質上是數據表的若干行

  1. 在查詢狀況的下,等同於SQL 中的 SELECT Syntax
  2. 在update函數的操做時,能夠根據參數選擇等同於直接UPDATE users SET xxx WHERE name=xxx或者先用SELECT 選出ID,再循環用UPDATE xxx WHERE id=xxx
  3. delete同上

以SQLAlchemy爲表明的ORM基本都支持鏈式操做。
形如:

with get_session() as session:
    # <class 'sqlalchemy.orm.query.Query'>
    query = (session
             .query(User)
             .filter(User.username == "asd")
             .filter_by(username="asd")
             #上面兩個都是添加where
             .join(Addreess)#使用ForeignKey
             .join(Addreess,Addreess.user_id==User.id)#使用顯式聲明
             .limit(10)
             .offset(0)
             )

全部Query支持的詳情見Query API文檔

上面也涉及到一個特別有意思的filter函數:User.username == "asd" ,其實是SQLAlchemy重載了Column上的各類運算符 __eq__、__ge__,返回了一個BinaryExpression對象,看起來就更加符合直覺上的語義

複雜查詢

基於Query的subquery

with get_session() as session:
    # <class 'sqlalchemy.orm.query.Query'>
    query = (session
            .query(User.id)
            .filter(User.username == "asd")
            .filter_by(username="asd")
            .limit(10)
            )
    subquery = query.subquery()
    query2 = session.query(User).filter(
        User.id.in_(subquery)
    )
    print query2#<-打印展開成的SQL,此處沒有SQL查詢

理解了Query、Column的概念,也很容易自行構造出這樣的SQL

全部在Column級別上的使用 詳見Column API文檔

上面咱們提到了直接對Query進行的刪除:

with get_session() as session:
    query = (session
             .query(User)
             .filter(User.username == "asd")
             .filter_by(username="asd")
             .join(Addreess)
             .join(Addreess,Addreess.user_id=User.id)
             .limit(10)
             .delete()#<-這裏
             )

另外,由於Model也能夠被放進session裏,而後刪除的,和插入是個反向操做:

with get_session() as session:
    instance = session.query(User).get(1)
    session.delete(instance)
    #下一句執行:DELETE FROM auth_user WHERE auth_user.id = ?
    session.commit()

改首先是上述Query中所說的update方法:

with get_session() as session:
            # get by id
    query = (session
             .query(User)
             .filter_by(id=1)
             .update({"username": 
                     User.username + "a"},
                     synchronize_session=False)
                     )

而後是在Model級別的方法:

with get_session() as session:
            # get by id
    user = (session
            .query(User)
            .get(1)
            )
    user.password = "zxcv"
    # UPDATE auth_user SET password=?
    # WHERE auth_user.id = ?
    session.commit()

在對Model的屬性進行修改的時候,session會獲得修改對應的內容,下次commit即會提交SQL
這裏留個思考題:若是對一、同一對象的同一屬性進行修改,二、同一對象的不一樣屬性進行修改 ,最終會有幾個SQL被髮出? 若是你來實現這樣的功能,你會從哪裏下手?

基礎性能

SQLAlchemy性能

比較了十萬條記錄插入的性能

 

另外不要以爲比sqlite 裸SQL慢三倍很慢,注意這個量級,實際項目中會發現慢查詢、不規範操做(例如for循環裏放查詢)的危害比引入ORM的這點開銷打多了

 

總結

到這再貼上面那個概念表,應該就能比較好的理解了

在用裸SQL能夠解決的場景下,上述的SQLAlchemy入門部分就足以掌控場景,完成全部的增刪查改API需求(甚至自動生成代碼的需求),自動生成真是偷懶無止境。。不過發明新的DSL嘛,能不作就不作。。

 

擴展與進階

從過往的經驗來看,SQLAlchemy以優雅的直覺實現了諸多接口,並保留了良好的可擴展性,這裏拋磚引玉一些有趣的特性

事件

應用層的觸發器(trigger),支持:

  1. ConnectionEvents 包括Connection和Engine(鏈接後進行一些自檢操做)
  2. DDLEvents 模型增刪查改事件
  3. DialectEvents 不一樣種類的數據庫的事件
  4. PoolEvents 鏈接池事件,鏈接的檢出和回收等

上面的性能測試裏就使用了兩種事件

from sqlalchemy import event
from sqlalchemy.engine import Engine
import time
import logging

logging.basicConfig()
logger = logging.getLogger("myapp.sqltime")
logger.setLevel(logging.DEBUG)

@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement,
                        parameters, context, executemany):
    conn.info.setdefault('query_start_time', []).append(time.time())
    logger.debug("Start Query: %s", statement)

@event.listens_for(Engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement,
                        parameters, context, executemany):
    total = time.time() - conn.info['query_start_time'].pop(-1)
    logger.debug("Query Complete!")
    logger.debug("Total Time: %f", total)

反射

現有項目或者別人的代碼裏若是已經用其餘的方式寫好了表定義,不想再定義Model了,想用SQLAlchemy直接使用對應的數據庫表
查文檔關鍵字:Automap

from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import Session
from sqlalchemy import create_engine

Base = automap_base()

# engine, suppose it has two tables 'user' and 'address' set up
engine = create_engine("sqlite:///mydatabase.db")

# reflect the tables
Base.prepare(engine, reflect=True)
tables = Base.classes#<-load tables

User = Base.classes.user
Address = Base.classes.address
# rudimentary relationships are produced
session.add(Address(email_address="foo@bar.com", user=User(name="foo")))
session.commit()

# collection-based relationships are by default named
# "<classname>_collection"
print (u1.address_collection)

擴展閱讀:DeferredReflection

我以前在一些OLAP應用 用來作數據分析時用到過。。

Python3.x asyncio擴展

16年12月 Python3.6進入穩按期,同時也標誌着Python3.4和3.5中的asyncio模塊進入穩按期

SQLAlchemy對asyncio的支持在於,它實質上能夠在engine層進行擴展,同時擴展Engine、Connection、Transaction、Context 代碼量約400行

Strategies for creating new instances of Engine types. These are semi-private implementation classes which provide the underlying behavior for the "strategy" keyword argument available on :func:~sqlalchemy.engine.create_engine. Current available options are plain, threadlocal, and mock. New strategies can be added via new EngineStrategy classes. """

形如:

from sqlalchemy.engine.strategies import DefaultEngineStrategy
from .engine import AsyncioEngine
ASYNCIO_STRATEGY = '_asyncio'

class AsyncioEngineStrategy(DefaultEngineStrategy):
    name = ASYNCIO_STRATEGY
    engine_cls = AsyncioEngine
AsyncioEngineStrategy()  


async def main():
    engine = create_engine(
        # In-memory sqlite database cannot be accessed from different
        # threads, use file.
        'sqlite:///test.db', strategy=ASYNCIO_STRATEGY
    )

    metadata = MetaData()
    users = Table(
        'users', metadata,
        Column('id', Integer, primary_key=True),
        Column('name', Text),
    )

    # Create the table
    await engine.execute(CreateTable(users))

    conn = await engine.connect()

另外提一嘴的是:asyncio不是銀彈,會致使應用層壓力直接傳給DB,會掩蓋應用的SQL寫的爛的問題

分片Session

讀寫分離是當數據庫壓力到達必定階段時,由應用層進行的拆分數據庫壓力的措施
實現一種主從分離的Session:

  1. 最簡單的方案是直接擴展Session類get_bind方法

get_bind(mapper=None, clause=None)
Return a 「bind」 to which this Session is bound. Note that the 「mapper」 argument is usually present when Session.get_bind() is called via an ORM operation such as a Session.query(), each individual INSERT/UPDATE/DELETE operation within a Session.flush(), call, etc.

  1. 也可使用sqlalchemy.ext.horizontal_shard模塊中已經實現好的ShardedSession

Parameters:

  • shard_chooser – A callable which, passed a Mapper, a mapped instance, and possibly a SQL clause, returns a shard ID. This id may be based off of the attributes present within the object, or on some round-robin scheme.
  • id_chooser – A callable, passed a query and a tuple of identity values, which should return a list of shard ids where the ID might reside.
  • query_chooser – For a given Query, returns the list of shard_ids where the query should be issued.
  • shards – A dictionary of string shard names to Engine objects.

容許根據model或者SQL條件、ID選擇具體的數據庫鏈接。一個未經驗證的腦洞:由於shards是Engine的dict,那麼是否容許在異構數據庫之間使用Shard?這樣會帶來什麼樣的優缺點?

自定義的列類型

好久好久之前作的功能了,想象一個這樣的場景:

  • Postgresql支持IP/CIDR的存儲,本質上是使用4*8bit=32bit的int存儲
  • Mysql此時並無這樣簡單的IP存儲 如何對其進行擴展?

自定義實現的列類型實質上須要:

  1. 指定在某種數據庫方言下的存儲類型,例如Mysql下使用int
  2. 實現兩個方法:從數據庫中取出來一個python對象和把Python對象放入數據庫
  3. 按需須要實現:支持一些操做符(例如==,in_)
from sqlalchemy import types
class MyIPType(types.TypeDecorator):
    impl = types.Integer


    def process_bind_param(self, value, dialect):
        #from python to database
        if dialect=="mysql":
            pass
        return #....

    def process_result_value(self, value, dialect):
        #from database to python object
        return #...

咱們也能夠在awesome-sqlalchemy中找到一些有趣的類型擴展

混合(hybrid)屬性

咱們常見使用Python的property修飾器來構造一個複雜屬性,SQLAlchemy中,這個混合屬性的做用也相似,不只能夠用於得到對應的值,也能夠用於Query時的鏈式操做

定義一個Model後,能夠在各種增刪查改中用到這個混合屬性。混合屬性 混合在:既是一個Python屬性,也是一個能夠放入數據庫查詢的屬性

class Interval(Base): __tablename__ = 'interval' id = Column(Integer, primary_key=True) start = Column(Integer, nullable=False) end = Column(Integer, nullable=False) def __init__(self, start, end): self.start = start self.end = end @hybrid_property def length(self): return self.end - self.start #下面這個寫着玩的。。 @length.setter def length(self, value): self._value = value >>> i1 = Interval(5, 10) >>> i1.length 5 >>> print Session().query(Interval).filter_by(length=5) SELECT interval.id AS interval_id, interval.start AS interval_start, interval."end" AS interval_end FROM interval WHERE interval."end" - interval.start = :param_1 

上述還有一個寫着玩兒的setter,hybrid_property支持:

  1. comparator 擴展Interval.length在各類比較符(><=)的行爲
  2. deleter/setter 顧名思義
  3. expression 能夠擴展最後展開的SQL表達式,例如展開成SUM(xxx):
from sqlalchemy.orm import func
    #下面這個寫着玩的。。
    @length.expression
    def length(self, expr):
        return func.sum(self.end, expr)

序列化Query

提供一個接口,以序列化和反序列化Query,用於跨系統、微服務的場景

from sqlalchemy.ext.serializer import loads, dumps
metadata = MetaData(bind=some_engine)
Session = scoped_session(sessionmaker())

# ... define mappers

query = Session.query(User).
    filter(User.somedata=='foo').order_by(User.sortkey)

# pickle the query
serialized = dumps(query)

# unpickle.  Pass in metadata + scoped_session 
# 上面提到過的 query和Session其實是密不可分的
query2 = loads(serialized, metadata, Session)

print query2.all()

這個作起來其實就很是帶感了,微服務之間的必要條件就是各類dump,結合一下celery,實現一個去中心的HTTP服務也是不在話下

Baked Query

緩存從Query生成的SQL,以減小生成時間,其實是個應用層面的存儲過程、View

from sqlalchemy.ext import baked bakery = baked.bakery()#<-建立了一個LRU from sqlalchemy import bindparam def search_for_user(session, username, email=None): baked_query = bakery(lambda session: session.query(User)) baked_query += lambda q: q.filter(User.name == bindparam('username')) baked_query += lambda q: q.order_by(User.id) if email: baked_query += lambda q: q.filter(User.email == bindparam('email')) result = baked_query(session).params(username=username, email=email).all() return result

上面說到了SQLAlchemy展開成SQL的性能問題,真的特別擔心的話,再來一個緩存綁定參數如何?

多態和關係

使用多個模型,但實際上只是操做一張數據庫表 此處基本略,以前寫過一篇文章了:這兒

class Employee(Base):  
    __tablename__ = 'employee'
    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    type = Column(String(50))

    __mapper_args__ = {
        'polymorphic_identity':'employee',
        'polymorphic_on':type
    }

這裏定義了僱員Employee 模型,指定type字段爲多態所在字段,而且對於這個模型,當type字段爲'employee'時,即爲一個僱員

一對1、一對多、多對多的關係和自動收集成collection,這裏不會細說,relationship函數的各類參數留待你們遊玩。

關係間的收集有多種lazy方式,能夠選擇在父類讀取時直接JOIN或者Subquery,也能夠在須要的時候使用Query.option設置。提及來的篇幅會更長,我投個懶,你們去讀文檔吧~hfgl

相關文章
相關標籤/搜索