Essential_SQLAlchemy2th學習筆記之Core模塊

SQL Expression Language對原生SQL語言進行了簡單的封裝
兩大模塊SQLAlchemy Core and ORM:html

  • Core:提供執行SQL Expression Language的接口python

  • ORMmysql

安裝:SQLAlchemy及相關數據庫驅動
pip install sqlalchemy pymysql sql

鏈接到數據庫

數據庫鏈接字符串格式:請參考這裏數據庫

mysql://username:password@hostname/database
postgresql://username:password@hostname/database
sqlite:////absolute/path/to/database
oracle://scott:tiger@127.0.0.1:1521/orcl

好比SQLite以下:json

from sqlalchemy import create_engine
engine = create_engine('sqlite:///cookies.db')
engine2 = create_engine('sqlite:///:memory:')
engine3 = create_engine('sqlite:////home/cookiemonster/cookies.db')
engine4 = create_engine('sqlite:///c:\\Users\\cookiemonster\\cookies.db')

注意:create_engine函數返回以一個engine實例,可是不會當即獲取數據庫鏈接,直到在engine上進行操做如查詢時纔會去獲取connection安全

關於MySQL空閒鏈接8小時自動關閉的解決方案:傳入 pool_recycle=3600參數cookie

from sqlalchemy import create_engine
engine = create_engine('mysql+pymysql://cookiemonster:chocolatechip@mysql01.monster.internal/cookies', pool_recycle=3600)

create_engine其他的一些參數:oracle

  • echo:是否log打印執行的sql語句及其參數。默認爲False函數

  • encoding:默認utf-8

  • isolation_level:隔離級別

  • pool_recycle:指定鏈接回收間隔,這對於MySQL鏈接的8小時機制特別重要。默認-1

獲取鏈接

from sqlalchemy import create_engine
engine = create_engine('mysql+pymysql://cookiemonster:chocolatechip' \
'@mysql01.monster.internal/cookies', pool_recycle=3600)
connection = engine.connect()

Schema and Types

四種類型集合:
• Generic
• SQL standard
• Vendor specific
• User defined

SQLAlchemy定義了不少generic types以兼容不一樣數據庫。這些類型都定義在sqlalchemy.types模塊中,爲了方便也能夠從sqlalchemy直接導入這些類型。
類型對應表以下:

SQLAlchemy Python SQL
BigInteger int BIGINT
Boolean bool BOOLEAN or SMALLINT
Date datetime.date DATE (SQLite: STRING)
DateTime datetime.datetime DATETIME (SQLite: STRING)
Enum str ENUM or VARCHAR
Float float or Decimal FLOAT or REAL
Integer int INTEGER
Interval datetime.timedelta INTERVAL or DATE from epoch
LargeBinary byte BLOB or BYTEA
Numeric decimal.Decimal NUMERIC or DECIMAL
Unicode unicode UNICODE or VARCHAR
Text str CLOB or TEXT
Time datetime.time DATETIME

若是這些類型不能知足你,好比有些數據庫支持json類型,那麼你須要用到sqlalchemy.dialects模塊中對應數據庫的類型。好比from sqlalchemy.dialects.postgresql import JSON

Metadata & Table & Column

Metadata爲了快速訪問數據庫。能夠看做是不少Table對象的集合,還有一些關於engin,connection的信息。能夠經過MetaData.tables訪問這些表對象字典
定義表對象以前須要先實例化Metadata:

from sqlalchemy import MetaData
metadata = MetaData()

Table對象構建以下:第一個參數爲名稱,第二個參數爲Metadata對象,後續參數爲Column對象. Column對象參數爲,名稱,類型,及其他等

from sqlalchemy import Table, Column, Integer, Numeric, String, ForeignKey
cookies = Table('cookies', metadata,
Column('cookie_id', Integer(), primary_key=True),
Column('cookie_name', String(50), index=True),
Column('cookie_recipe_url', String(255)),
Column('cookie_sku', String(55)),
Column('quantity', Integer()),
Column('unit_cost', Numeric(12, 2))
)
from datetime import datetime
from sqlalchemy import DateTime
users = Table('users', metadata,
Column('user_id', Integer(), primary_key=True),
Column('username', String(15), nullable=False, unique=True),
Column('email_address', String(255), nullable=False),
Column('phone', String(20), nullable=False),
Column('password', String(25), nullable=False),
Column('created_on', DateTime(), default=datetime.now),
Column('updated_on', DateTime(), default=datetime.now, onupdate=datetime.now)

注意:這裏default,onupdate屬性是一個callable對象而不是直接值,好比datetime.now(),由於這樣的話,就永遠是這個值,而不是每一個實例實例化、更新時的時間了。
比較有用的就是onupdate,每次更新時都會調用該方法或函數。

鍵和約束(Keys and Constraints)
鍵和約束既能夠像上面那樣經過kwargs定義在Column中,也能夠在以後經過對象添加。相關類定義在基礎的 sqlalchemy模塊中,好比最經常使用的三個:
from sqlalchemy import PrimaryKeyConstraint, UniqueConstraint, CheckConstraint

PrimaryKeyConstraint('user_id', name='user_pk'),它也支持同時定義多個造成聯合主鍵。
UniqueConstraint('username', name='uix_username')
CheckConstraint('unit_cost >= 0.00', name='unit_cost_positive')

索引(Index)

from sqlalchemy import Index
Index('ix_cookies_cookie_name', 'cookie_name')

這個定義須要放置在Table構造器中。也能夠在以後定義,好比
Index('ix_test', mytable.c.cookie_sku, mytable.c.cookie_name))

關聯關係和外鍵約束(Relationships and ForeignKeyConstraints)

from sqlalchemy import ForeignKey
orders = Table('orders', metadata,
Column('order_id', Integer(), primary_key=True),
Column('user_id', ForeignKey('users.user_id')),
Column('shipped', Boolean(), default=False)
)
line_items = Table('line_items', metadata,
Column('line_items_id', Integer(), primary_key=True),
Column('order_id', ForeignKey('orders.order_id')),
Column('cookie_id', ForeignKey('cookies.cookie_id')),
Column('quantity', Integer()),
Column('extended_cost', Numeric(12, 2))
)

注意:這裏ForeignKey用的是字符串參數(這些字符串對應的是數據庫中的表名.列名),而非引用。這樣隔離了模塊間相互依賴
咱們也可使用:
ForeignKeyConstraint(['order_id'], ['orders.order_id'])

建立或持久化表模式(Persisting the Tables)
經過示例代碼咱們知道全部的Table定義,以及額外的模式定義都會與一個metadata對象關聯。咱們能夠經過這個metadata對象來建立表:

metadata.create_all(engine)

注意:默認狀況下create_all不會從新建立已有表,因此它能夠安全地屢次調用,並且也很是友好地與數據庫遷移庫如Ablembic集成而不須要你進行額外手動編碼。

本節代碼完整以下:

from datetime import datetime
from sqlalchemy import (MetaData, Table, Column, Integer, Numeric, String,
DateTime, ForeignKey, create_engine)
metadata = MetaData()
cookies = Table('cookies', metadata,
Column('cookie_id', Integer(), primary_key=True),
Column('cookie_name', String(50), index=True),
Column('cookie_recipe_url', String(255)),
Column('cookie_sku', String(55)),
Column('quantity', Integer()),
Column('unit_cost', Numeric(12, 2))
)
users = Table('users', metadata,
Column('user_id', Integer(), primary_key=True),
Column('customer_number', Integer(), autoincrement=True),
Column('username', String(15), nullable=False, unique=True),
Column('email_address', String(255), nullable=False),
Column('phone', String(20), nullable=False),
Column('password', String(25), nullable=False),
Column('created_on', DateTime(), default=datetime.now),
Column('updated_on', DateTime(), default=datetime.now, onupdate=datetime.now)
)
orders = Table('orders', metadata,
Column('order_id', Integer(), primary_key=True),
Column('user_id', ForeignKey('users.user_id'))
)
line_items = Table('line_items', metadata,
Column('line_items_id', Integer(), primary_key=True),
Column('order_id', ForeignKey('orders.order_id')),
Column('cookie_id', ForeignKey('cookies.cookie_id')),
Column('quantity', Integer()),
Column('extended_cost', Numeric(12, 2))
)
engine = create_engine('sqlite:///:memory:')
metadata.create_all(engine)

SQLAlchemy-Core模塊

插入數據:

ins = cookies.insert().values(
cookie_name="chocolate chip",
cookie_recipe_url="http://some.aweso.me/cookie/recipe.html",
cookie_sku="CC01",
quantity="12",
unit_cost="0.50"
)
print(str(ins))

固然你也能夠這麼作:

from sqlalchemy import insert
ins = insert(cookies).values(
cookie_name="chocolate chip",
cookie_recipe_url="http://some.aweso.me/cookie/recipe.html",
cookie_sku="CC01",
quantity="12",
unit_cost="0.50"
)

上述編譯成預編譯語句以下:

INSERT INTO cookies
(cookie_name, cookie_recipe_url, cookie_sku, quantity, unit_cost)
VALUES
(:cookie_name, :cookie_recipe_url, :cookie_sku, :quantity, :unit_cost)

實際過程會是以下ins對象內部會調用compile()方法編譯成上述語句,而後將參數存儲到ins.compile().params字典中。
接下來咱們經過前面獲取的connection對象執行statement:

result = connection.execute(ins)

固然你也能夠這麼查詢:

ins = cookies.insert()
result = connection.execute(
ins,
cookie_name='dark chocolate chip',
cookie_recipe_url='http://some.aweso.me/cookie/recipe_dark.html',
cookie_sku='CC02',
quantity='1',
unit_cost='0.75'
)
result.inserted_primary_key

批量插入:

inventory_list = [
{
'cookie_name': 'peanut butter',
'cookie_recipe_url': 'http://some.aweso.me/cookie/peanut.html',
'cookie_sku': 'PB01',
'quantity': '24',
'unit_cost': '0.25'
},
{
'cookie_name': 'oatmeal raisin',
'cookie_recipe_url': 'http://some.okay.me/cookie/raisin.html',
'cookie_sku': 'EWW01',
'quantity': '100',
'unit_cost': '1.00'
}
]
result = connection.execute(ins, inventory_list)

注意:必定要確保全部字典參數擁有相同的keys

查詢

from sqlalchemy.sql import select
s = select([cookies])
rp = connection.execute(s)
results = rp.fetchall()

固然咱們也可使用字符串來代替:

s = select("""SELECT cookies.cookie_id, cookies.cookie_name,
cookies.cookie_recipe_url, cookies.cookie_sku, cookies.quantity,
cookies.unit_cost FROM cookies""")

connection.execute返回的rp變量是一個ResultProxy對象(它是DBAPI中cursor對象的封裝)。

咱們也能夠這樣寫:

from sqlalchemy.sql import select
s = cookies.select()
rp = connection.execute(s)
results = rp.fetchall()

ResultProxy使得查詢結果能夠經過index,name,or Column object訪問列數據。例如:

first_row = results[0]
first_row[1] #遊標列索引從1開始,by index
first_row.cookie_name # by name
first_row[cookies.c.cookie_name] #by Column object.

你也能夠迭代ResultProxy,以下:

rp = connection.execute(s)
for record in rp:
print(record.cookie_name)

ResultProxy其他可用來獲取結果集的方法

  • first()

  • fetchone()

  • fetchall()

  • scalar():Returns a single value if a query results in a single record with one column.

  • keys() 獲取列名

關於選擇ResultProxy上述的方法的建議:
一、使用first()而不是fetchone()來獲取單條記錄,由於fetchone()調用以後仍然保留着打開的connections共後續使用,若是不當心的話很容易引發問題。
二、使用迭代方式獲取全部結果,而不是fetchall(),更加省內存。
三、使用scalar()獲取單行單列結果時須要注意,若是返回多於一行,它會拋出異常。

控制返回列的數目

s = select([cookies.c.cookie_name, cookies.c.quantity])
rp = connection.execute(s)
print(rp.keys())
result = rp.first()

排序

s = select([cookies.c.cookie_name, cookies.c.quantity])
s = s.order_by(cookies.c.quantity)
rp = connection.execute(s)
for cookie in rp:
print('{} - {}'.format(cookie.quantity, cookie.cookie_name))

#倒序desc
from sqlalchemy import desc
s = select([cookies.c.cookie_name, cookies.c.quantity])
s = s.order_by(desc(cookies.c.quantity))

限制返回結果集的條數

s = select([cookies.c.cookie_name, cookies.c.quantity])
s = s.order_by(cookies.c.quantity)
s = s.limit(2)
rp = connection.execute(s)
print([result.cookie_name for result in rp])

內置SQL函數

在sqlalchemy.sql.func模塊中

#sum
from sqlalchemy.sql import func
s = select([func.sum(cookies.c.quantity)])
rp = connection.execute(s)
print(rp.scalar())

#count
s = select([func.count(cookies.c.cookie_name)])
rp = connection.execute(s)
record = rp.first()
print(record.keys())
print(record.count_1) #字段名是自動生成的,<func_name>_<position>,能夠設置別名的,看下面

#設置別名
s = select([func.count(cookies.c.cookie_name).label('inventory_count')])
rp = connection.execute(s)
record = rp.first()
print(record.keys())
print(record.inventory_count)

過濾

#where
s = select([cookies]).where(cookies.c.cookie_name == 'chocolate chip')
rp = connection.execute(s)
record = rp.first()
print(record.items()) #調用row對象的items()方法。

#like
s = select([cookies]).where(cookies.c.cookie_name.like('%chocolate%'))
rp = connection.execute(s)
for record in rp.fetchall():
    print(record.cookie_name)

能夠在where中使用的子句元素

  • between(cleft, cright)

  • concat(column_two) Concatenate column with column_two

  • distinct()

  • in_([list])

  • is_(None) Find where the column is None (commonly used for Null checks with None)

  • contains(string) Find where the column has string in it (case-sensitive)

  • endswith(string) Find where the column ends with string (case-sensitive)

  • like(string) Find where the column is like string (case-sensitive)

  • startswith(string) Find where the column begins with string (case-sensitive)

  • ilike(string) Find where the column is like string (this is not case-sensitive)

固然還包括一系列的notxxx方法,好比notin_(),惟一的例外是isnot()

操做符

  • +,-,*,/,%

  • ==,!=,<,>,<=,>=

  • AND,OR,NOT,因爲python關鍵字的緣由,使用and_(),or_(),not_()來代替

+號還能夠用於字符串拼接:

s = select([cookies.c.cookie_name, 'SKU-' + cookies.c.cookie_sku])
for row in connection.execute(s):
print(row)
from sqlalchemy import cast
s = select([cookies.c.cookie_name,
    cast((cookies.c.quantity * cookies.c.unit_cost),
        Numeric(12,2)).label('inv_cost')])
for row in connection.execute(s):
    print('{} - {}'.format(row.cookie_name, row.inv_cost))

注意:cast是另一個函數,容許咱們進行類型轉換,上述轉換是將數字轉換爲貨幣形式,和
print('{} - {:.2f}'.format(row.cookie_name, row.inv_cost)).這個行爲一致。

from sqlalchemy import and_, or_, not_
s = select([cookies]).where(
    and_(
        cookies.c.quantity > 23,
        cookies.c.unit_cost < 0.40
    )
)
for row in connection.execute(s):
    print(row.cookie_name)


from sqlalchemy import and_, or_, not_
s = select([cookies]).where(
    or_(
        cookies.c.quantity.between(10, 50),
        cookies.c.cookie_name.contains('chip')
    )
)
for row in connection.execute(s):
    print(row.cookie_name)

update

from sqlalchemy import update
u = update(cookies).where(cookies.c.cookie_name == "chocolate chip")
u = u.values(quantity=(cookies.c.quantity + 120))
result = connection.execute(u)
print(result.rowcount)
s = select([cookies]).where(cookies.c.cookie_name == "chocolate chip")
result = connection.execute(s).first()
for key in result.keys():
    print('{:>20}: {}'.format(key, result[key]))

delete

from sqlalchemy import delete
u = delete(cookies).where(cookies.c.cookie_name == "dark chocolate chip")
result = connection.execute(u)
print(result.rowcount)

s = select([cookies]).where(cookies.c.cookie_name == "dark chocolate chip")
result = connection.execute(s).fetchall()
print(len(result))

joins

join(),outerjoin()函數,select_from()函數

columns = [orders.c.order_id, users.c.username, users.c.phone,
           cookies.c.cookie_name, line_items.c.quantity,
           line_items.c.extended_cost]
cookiemon_orders = select(columns)
cookiemon_orders = cookiemon_orders.select_from(orders.join(users).join(
    line_items).join(cookies)).where(users.c.username ==
                                     'cookiemon')
result = connection.execute(cookiemon_orders).fetchall()
for row in result:
    print(row)

最終產生的SQL語句以下:

SELECT orders.order_id, users.username, users.phone, cookies.cookie_name,
line_items.quantity, line_items.extended_cost FROM users JOIN orders ON
users.user_id = orders.user_id JOIN line_items ON orders.order_id =
line_items.order_id JOIN cookies ON cookies.cookie_id = line_items.cookie_id
WHERE users.username = :username_1

outerjoin

columns = [users.c.username, func.count(orders.c.order_id)]
all_orders = select(columns)
all_orders = all_orders.select_from(users.outerjoin(orders))
all_orders = all_orders.group_by(users.c.username)
result = connection.execute(all_orders).fetchall()
for row in result:
    print(row)

表別名函數alias()

>>> manager = employee_table.alias('mgr')
>>> stmt = select([employee_table.c.name],
            ... and_(employee_table.c.manager_id==manager.c.id,
            ... manager.c.name=='Fred'))
>>> print(stmt)
SELECT employee.name
FROM employee, employee AS mgr
WHERE employee.manager_id = mgr.id AND mgr.name = ?

分組

columns = [users.c.username, func.count(orders.c.order_id)]
all_orders = select(columns)
all_orders = all_orders.select_from(users.outerjoin(orders))
all_orders = all_orders.group_by(users.c.username)
result = connection.execute(all_orders).fetchall()
for row in result:
    print(row)

chaining

def get_orders_by_customer(cust_name, shipped=None, details=False):
    columns = [orders.c.order_id, users.c.username, users.c.phone]
    joins = users.join(orders)
    if details:
        columns.extend([cookies.c.cookie_name, line_items.c.quantity,
            line_items.c.extended_cost])
        joins = joins.join(line_items).join(cookies)
    cust_orders = select(columns)
    cust_orders = cust_orders.select_from(joins)

    cust_orders = cust_orders.where(users.c.username == cust_name)
    if shipped is not None:
        cust_orders = cust_orders.where(orders.c.shipped == shipped)
    result = connection.execute(cust_orders).fetchall()
    return result

執行原生SQL

返回的仍是ResultProxy對象
一、徹底採用原始SQL

result = connection.execute("select * from orders").fetchall()
print(result)

二、部分採用原始SQL,text()函數

from sqlalchemy import text
stmt = select([users]).where(text("username='cookiemon'"))
print(connection.execute(stmt).fetchall())

異常

SQLALchemy定義了不少異常。咱們經過關心:AttributeErrors,IntegrityErrors.等
爲了進行相關試驗與說明,請先執行下面這些語句

from datetime import datetime
from sqlalchemy import (MetaData, Table, Column, Integer, Numeric, String,
                        DateTime, ForeignKey, Boolean, create_engine,
                        CheckConstraint)
metadata = MetaData()
cookies = Table('cookies', metadata,
                Column('cookie_id', Integer(), primary_key=True),
                37
                Column('cookie_name', String(50), index=True),
                Column('cookie_recipe_url', String(255)),
                Column('cookie_sku', String(55)),
                Column('quantity', Integer()),
                Column('unit_cost', Numeric(12, 2)),
                CheckConstraint('quantity > 0', name='quantity_positive')
                )
users = Table('users', metadata,
              Column('user_id', Integer(), primary_key=True),
              Column('username', String(15), nullable=False, unique=True),
              Column('email_address', String(255), nullable=False),
              Column('phone', String(20), nullable=False),
              Column('password', String(25), nullable=False),
              Column('created_on', DateTime(), default=datetime.now),
              Column('updated_on', DateTime(),
                     default=datetime.now, onupdate=datetime.now)
              )
orders = Table('orders', metadata,
               Column('order_id', Integer()),
               Column('user_id', ForeignKey('users.user_id')),
               Column('shipped', Boolean(), default=False)
               )
line_items = Table('line_items', metadata,
                   Column('line_items_id', Integer(), primary_key=True),
                   Column('order_id', ForeignKey('orders.order_id')),
                   Column('cookie_id', ForeignKey('cookies.cookie_id')),
                   Column('quantity', Integer()),
                   Column('extended_cost', Numeric(12, 2))
                   )
engine = create_engine('sqlite:///:memory:')
metadata.create_all(engine)
connection = engine.connect()
from sqlalchemy import select, insert
ins = insert(users).values(
username="cookiemon",
email_address="mon@cookie.com",
phone="111-111-1111",
password="password"
)
result = connection.execute(ins)
s = select([users.c.username])
results = connection.execute(s)
for result in results:
print(result.username)
print(result.password) #此處包AttributeError異常

在違反約束的狀況下會出現IntegrityError異常。好比違反惟一性約束等。

s = select([users.c.username])
connection.execute(s).fetchall()
[(u'cookiemon',)]
ins = insert(users).values(
    username="cookiemon",
    email_address="damon@cookie.com",
    phone="111-111-1111",
    password="password"
)
result = connection.execute(ins) #此處報IntegrityError, UNIQUE constraint failed: users.username
#異常處理
try:
    result = connection.execute(ins)
except IntegrityError as error:
    print(error.orig.message, error.params)

全部的SQLAlchemy異常處理方式都是上面那種思路,經過[SQLAlchemyError](http://docs.sqlal
chemy.org/en/latest/core/exceptions.html)能夠獲取到的信息由以下:

  • orig :The DBAPI exception object.

  • params:The parameter list being used when this exception occurred.

  • statement :The string SQL statement being invoked when this exception occurred.

事務Transactions

from sqlalchemy.exc import IntegrityError


def ship_it(order_id):
    s = select([line_items.c.cookie_id, line_items.c.quantity])
    s = s.where(line_items.c.order_id == order_id)
    transaction = connection.begin() #開啓事務
    cookies_to_ship = connection.execute(s).fetchall()
    try:
        for cookie in cookies_to_ship:
            u = update(cookies).where(cookies.c.cookie_id == cookie.cookie_id)
            u = u.values(quantity=cookies.c.quantity - cookie.quantity)
            result = connection.execute(u)
        u = update(orders).where(orders.c.order_id == order_id)
        u = u.values(shipped=True)
        result = connection.execute(u)
        print("Shipped order ID: {}".format(order_id))
        transaction.commit() #提交事務
    except IntegrityError as error:
        transaction.rollback()   #事務回滾
        print(error)
相關文章
相關標籤/搜索