SQLAlchemy 簡單筆記

ORM 江湖

曾幾什麼時候,程序員由於害怕SQL而在開發的時候當心翼翼的寫着sql,心中老是少不了恐慌,萬一不當心sql語句出錯,搞壞了數據庫怎麼辦?又或者爲了獲取一些數據,什麼內外左右鏈接,函數存儲過程等等。毫無疑問,不搞懂這些,怎麼都以爲變扭,說不定某天就跳進了坑裏,叫每天不該,喊地地不答。python

ORM 的出現,讓畏懼SQL的開發者,在坑裏看見了爬出去的繩索,彷彿天空並非那麼黑暗,至少再暗,咱們也有了眼睛。顧名思義,ORM 對象關係映射,簡而言之,就是把數據庫的一個個table(表),映射爲編程語言的class(類)。mysql

python中比較著名的ORM框架有不少,大名頂頂的 SQLAlchemy 是python世界裏當仁不讓的ORM框架。江湖中peeweestrompyormSQLObject 各領風騷,但是最終仍是SQLAlchemy 傲視羣雄。程序員

SQLAlchemy 簡介

SQLAlchemy 分爲兩個部分,一共用於 ORM 的對象映射,另一個是核心的 SQL expression 。第一個很好理解,純粹的ORM,後面這個不是 ORM,而是DBAPI的封裝,固然也提供了不少方法,避免了直接寫sql,而是經過一些sql表達式。使用 SQLAlchemy 則能夠分爲三種方式。web

  • 使用 sql expression ,經過 SQLAlchemy 的方法寫sql表達式,簡介的寫sqlsql

  • 使用 raw sql, 直接書寫 sql數據庫

  • 使用 ORM 避開直接書寫 sqlexpress

本文先探討 SQLAlchemy的 sql expresstion 部分的用法。主要仍是跟着官方的 SQL Expression Language Tutorial.介紹編程

爲何要學習 sql expresstion ,而不直接上 ORM?由於後面這個兩個是 orm 的基礎。而且,便是不使用orm,後面這兩個也能很好的完成工做,而且代碼的可讀性更好。純粹把SQLAlchemy當成dbapi使用。首先SQLAlchemy 內建數據庫鏈接池,解決了鏈接操做相關繁瑣的處理。其次,提供方便的強大的log功能,最後,複雜的查詢語句,依靠單純的ORM比較難實現。api

實戰
鏈接數據庫

首先須要導入 sqlalchemy 庫,而後創建數據庫鏈接,這裏使用 mysql。經過create_engine方法進行ruby

from sqlalchemy import create_engine
engine = create_engine("mysql://root:@localhost:3306/webpy?charset=utf8",encoding="utf-8", echo=True)

create_engine 方法進行數據庫鏈接,返回一個 db 對象。裏面的參數表示

數據庫類型://用戶名:密碼(沒有密碼則爲空,不填)@數據庫主機地址/數據庫名?編碼
echo = True 是爲了方便 控制檯 logging 輸出一些sql信息,默認是False

經過這個engine對象能夠直接execute 進行查詢,例如 engine.execute("SELECT * FROM user") 也能夠經過 engine 獲取鏈接在查詢,例如 conn = engine.connect() 經過 conn.execute()方法進行查詢。二者有什麼差異呢?

  • 直接使用engine的execute執行sql的方式, 叫作connnectionless執行,

  • 藉助 engine.connect()獲取conn, 而後經過conn執行sql, 叫作connection執行
    主要差異在因而否使用transaction模式, 若是不涉及transaction, 兩種方法效果是同樣的. 官網推薦使用後者。

定義表

定義數據表,才能進行sql表達式的操做,畢竟sql表達式的表的肯定,是sqlalchemy制定的,若是數據庫已經存在了數據表還須要定義麼?固然,這裏實際上是一個映射關係,若是不指定,查詢表達式就不知道是附加在那個表的操做,固然定義的時候,注意表名和字段名,代碼和數據的必須保持一致。定義好以後,就能建立數據表,一旦建立了,再次運行建立的代碼,數據庫是不會建立的。

# -*- coding: utf-8 -*-
__author__ = 'ghost'from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey
# 鏈接數據庫 
engine = create_engine("mysql://root:@localhost:3306/webpy?charset=utf8",encoding="utf-8", echo=True)
# 獲取元數據
metadata = MetaData()
# 定義表
user = Table('user', metadata,        Column('id', Integer, primary_key=True),        Column('name', String(20)),        Column('fullname', String(40)),
    )

address = Table('address', metadata,        Column('id', Integer, primary_key=True),        Column('user_id', None, ForeignKey('user.id')),        Column('email', String(60), nullable=False)
    )
# 建立數據表,若是數據表存在,則忽視
metadata.create_all(engine)
# 獲取數據庫鏈接
conn = engine.connect()
插入 insert

有了數據表和鏈接對象,對應數據庫操做就簡單了。

>>> i = user.insert()   # 使用查詢>>> i  
<sqlalchemy.sql.dml.Insert object at 0x0000000002637748>>>> print i  # 內部構件的sql語句INSERT INTO "user" (id, name, fullname) VALUES (:id, :name, :fullname)>>> u = dict(name='jack', fullname='jack Jone')>>> r = conn.execute(i, **u)  # 執行查詢,第一個爲查詢對象,第二個參數爲一個插入數據字典,若是插入的是多個對象,就把對象字典放在列表裏面>>> r
<sqlalchemy.engine.result.ResultProxy object at 0x0000000002EF9390>>>> r.inserted_primary_key  # 返回插入行 主鍵 id[4L]>>> addresses
[{'user_id': 1, 'email': 'jack@yahoo.com'}, {'user_id': 1, 'email': 'jack@msn.com'}, {'user_id': 2, 'email': 'www@www.org'}, {'user_id': 2, 'email': 'wendy@aol.com'}]>>> i = address.insert()>>> r = conn.execute(i, addresses)   # 插入多條記錄>>> r
<sqlalchemy.engine.result.ResultProxy object at 0x0000000002EB5080>>>> r.rowcount   #返回影響的行數4L>>> i = user.insert().values(name='tom', fullname='tom Jim')>>> i.compile()
<sqlalchemy.sql.compiler.SQLCompiler object at 0x0000000002F6F390>>>> print i.compile()INSERT INTO "user" (name, fullname) VALUES (:name, :fullname)>>> print i.compile().params
{'fullname': 'tom Jim', 'name': 'tom'}>>> r = conn.execute(i)>>> r.rowcount1L
查詢 select

查詢方式很靈活,多數時候使用 sqlalchemy.sql 下面的 select方法

>>> s = select([user])  # 查詢 user表>>> s
<sqlalchemy.sql.selectable.Select at 0x25a7748; Select object>>>> print sSELECT "user".id, "user".name, "user".fullname 
FROM "user"

若是須要查詢自定義的字段,但是使用 user 的cloumn 對象,例如

>>> user.c  # 表 user 的字段column對象<sqlalchemy.sql.base.ImmutableColumnCollection object at 0x0000000002E804A8>>>> print user.c
['user.id', 'user.name', 'user.fullname']>>> s = select([user.c.name,user.c.fullname])>>> r = conn.execute(s)>>> r
<sqlalchemy.engine.result.ResultProxy object at 0x00000000025A7748>>>> r.rowcount  # 影響的行數5L>>> ru = r.fetchall()  
>>> ru
[(u'hello', u'hello world'), (u'Jack', u'Jack Jone'), (u'Jack', u'Jack Jone'), (u'jack', u'jack Jone'), (u'tom', u'tom Jim')]>>> r  
<sqlalchemy.engine.result.ResultProxy object at 0x00000000025A7748>>>> r.closed  # 只要 r.fetchall() 以後,就會自動關閉 ResultProxy 對象True

同時查詢兩個表

>>> s = select([user.c.name, address.c.user_id]).where(user.c.id==address.c.user_id)   # 使用了字段和字段比較的條件>>> s
<sqlalchemy.sql.selectable.Select at 0x2f03390; Select object>>>> print sSELECT "user".name, address.user_id 
FROM "user", address 
WHERE "user".id = address.user_id
操做符
>>> print user.c.id == address.c.user_id  # 返回一個編譯的字符串"user".id = address.user_id>>> print user.c.id == 7"user".id = :id_1   # 編譯成爲帶參數的sql 語句片斷字符串>>> print user.c.id != 7"user".id != :id_1>>> print user.c.id > 7"user".id > :id_1>>> print user.c.id == None"user".id IS NULL>>> print user.c.id + address.c.id   # 使用兩個整形的變成 +"user".id + address.id>>> print user.c.name + address.c.email  # 使用兩個字符串 變成 ||"user".name || address.email
操做鏈接

這裏的鏈接指條件查詢的時候,邏輯運算符的鏈接,即 and ornot

>>> print and_(
        user.c.name.like('j%'),
        user.c.id == address.c.user_id,
        or_(
            address.c.email == 'wendy@aol.com',
            address.c.email == 'jack@yahoo.com'
        ),        not_(user.c.id>5))"user".name LIKE :name_1 AND "user".id = address.user_id AND (address.email = :email_1 OR address.email = :email_2) AND "user".id <= :id_1>>>

獲得的結果爲 編譯的sql語句片斷,下面看一個完整的例子

>>> se_sql = [(user.c.fullname +", " + address.c.email).label('title')]>>> wh_sql = and_(
              user.c.id == address.c.user_id,
              user.c.name.between('m', 'z'),
              or_(
                  address.c.email.like('%@aol.com'),
                  address.c.email.like('%@msn.com')
              )
         )>>> print wh_sql"user".id = address.user_id AND "user".name BETWEEN :name_1 AND :name_2 AND (address.email LIKE :email_1 OR address.email LIKE :email_2)>>> s = select(se_sql).where(wh_sql)>>> print sSELECT "user".fullname || :fullname_1 || address.email AS title 
FROM "user", address 
WHERE "user".id = address.user_id AND "user".name BETWEEN :name_1 AND :name_2 AND (address.email LIKE :email_1 OR address.email LIKE :email_2)>>> r = conn.execute(s)>>> r.fetchall()

使用 raw sql 方式

遇到負責的sql語句的時候,可使用 sqlalchemy.sql 下面的 text 函數。將字符串的sql語句包裝編譯成爲 execute執行須要的sql對象。例如:、

>>> text_sql = "SELECT id, name, fullname FROM user WHERE id=:id"  # 原始sql語句,參數用( :value)表示>>> s = text(text_sql)>>> print s
SELECT id, name, fullname FROM user WHERE id=:id>>> s
<sqlalchemy.sql.elements.TextClause object at 0x0000000002587668>>>> conn.execute(s, id=3).fetchall()   # id=3 傳遞:id參數[(3L, u'Jack', u'Jack Jone')]
鏈接 join

鏈接有joinoutejoin 兩個方法,join 有兩個參數,第一個是join 的表,第二個是on 的條件,joing以後必需要配合select_from 方法

>>> print user.join(address)"user" JOIN address ON "user".id = address.user_id   # 由於開啓了外鍵 ,因此join 能只能識別 on 條件>>> print user.join(address, address.c.user_id==user.c.id)  # 手動指定 on 條件"user" JOIN address ON address.user_id = "user".id>>> s = select([user.c.name, address.c.email]).select_from(user.join(address, user.c.id==address.c.user_id))  # 被jion的sql語句須要用 select_from方法配合>>> s
<sqlalchemy.sql.selectable.Select at 0x2eb63c8; Select object>>>> print s
SELECT "user".name, address.email 
FROM "user" JOIN address ON "user".id = address.user_id>>> conn.execute(s).fetchall()
[(u'hello', u'jack@yahoo.com'), (u'hello', u'jack@msn.com'), (u'hello', u'jack@yahoo.com'), (u'hello', u'jack@msn.com'), (u'Jack', u'www@www.org'), (u'Jack', u'wendy@aol.com'), (u'Jack', u'www@www.org'), (u'Jack', u'wendy@aol.com')]

更復雜的鏈接參考 官方的文檔了。

排序 分組 分頁

排序使用 order_by 方法,分組是 group_by ,分頁天然就是limit 和 offset兩個方法配合

>>> s = select([user.c.name]).order_by(user.c.name)  # order_by>>> print s
SELECT "user".name 
FROM "user" ORDER BY "user".name>>> s = select([user]).order_by(user.c.name.desc())>>> print s
SELECT "user".id, "user".name, "user".fullname 
FROM "user" ORDER BY "user".name DESC>>> s = select([user]).group_by(user.c.name)       # group_by>>> print s
SELECT "user".id, "user".name, "user".fullname 
FROM "user" GROUP BY "user".name>>> s = select([user]).order_by(user.c.name.desc()).limit(1).offset(3)  # limit(1).offset(3)>>> print s
SELECT "user".id, "user".name, "user".fullname 
FROM "user" ORDER BY "user".name DESC
 LIMIT :param_1 OFFSET :param_2
[(4L, u'jack', u'jack Jone')]
更新 update

前面都是一些查詢,更新和插入的方法很像,都是 表下面的方法,不一樣的是,update 多了一個 where 方法 用來選擇過濾

>>> s = user.update()>>> print sUPDATE "user" SET id=:id, name=:name, fullname=:fullname>>> s = user.update().values(fullname=user.c.name)           # values 指定了更新的字段>>> print sUPDATE "user" SET fullname="user".name>>> s = user.update().where(user.c.name == 'jack').values(name='ed')  # where 進行選擇過濾>>> print s 
UPDATE "user" SET name=:name WHERE "user".name = :name_1>>> r = conn.execute(s)>>> print r.rowcount         # 影響行數3

還有一個高級用法,就是一次命令執行多個記錄的更新,須要用到 bindparam 方法

>>> s = user.update().where(user.c.name==bindparam('oldname')).values(name=bindparam('newname'))   # oldname 與下面的傳入的從拿書進行綁定,newname也同樣>>> print sUPDATE "user" SET name=:newname WHERE "user".name = :oldname>>> u = [{'oldname':'hello', 'newname':'edd'},
{'oldname':'ed', 'newname':'mary'},
{'oldname':'tom', 'newname':'jake'}]>>> r = conn.execute(s, u)>>> r.rowcount5L
刪除 delete

刪除比較容易,調用 delete方法便可,不加 where 過濾,則刪除全部數據,可是不會drop掉表,等於清空了數據表

>>> r = conn.execute(address.delete()) # 清空表>>> print r
<sqlalchemy.engine.result.ResultProxy object at 0x0000000002EAF550>>>> r.rowcount8L>>> r = conn.execute(users.delete().where(users.c.name > 'm')) # 刪除記錄>>> r.rowcount3L

至此,sqlalchemy sql表達式的基本用法介紹完畢,更深刻的閱讀能夠查看官方的api SQL Statements and Expressions API



文/人世間(簡書做者)原文連接:http://www.jianshu.com/p/e6bba189fcbd著做權歸做者全部,轉載請聯繫做者得到受權,並標註「簡書做者」。

相關文章
相關標籤/搜索