42 - 數據庫-orm-SQLAlchemy

1 ORM

Object-Relational Mapping,把關係數據庫的表結構映射到對象上。使用面向對象的方式來操做數據庫。python

orm

下面是一個關係模型與Python對象之間的映射關係:mysql

  • table --> class : 表映射爲類
  • row --> object : 行映射爲實例
  • column --> property : 字段映射爲屬性

2 sqlalchemy

SQLAlchemy是一個ORM框架。內部是使用了鏈接池來管理數據庫鏈接。其自己只是作了關係映射,不能鏈接數據庫,也不能執行sql語句,它在底層須要使用pymysql等模塊來鏈接並執行sql語句,要使用sqlalchemy,那麼須要先進行安裝:nginx

pip3 install sqlalchemy

查看版本sql

In [1]: import sqlalchemy
In [2]: print(sqlalchemy.__version__)
1.3.1

3 基本使用

先來總結一下使用sqlalchemy框架操做數據庫的通常流程:數據庫

  1. 建立引擎(不一樣類型數據庫使用不一樣的鏈接方式)
  2. 建立基類(類對象要繼承,由於基類會利用元編程爲咱們的子類綁定關於表的其餘屬性信息)
  3. 建立實體類(用來對應數據庫中的表)
  4. 編寫實體類屬性(用來對應表中的字段/屬性)
  5. 建立表(若是表不存在,則須要執行語句在數據庫中建立出對應的表)
  6. 實例化(具體的一條record記錄)
  7. 建立會話session(用於執行sql語句的鏈接)
  8. 使用會話執行SQL語句
  9. 關閉會話

3.1 建立鏈接

sqlalchemy 使用引擎管理數據庫鏈接(DATABASE URLS),鏈接的通常格式爲:django

dialect+driver://username:password@host:port/database
  • dialect:表示什麼數據庫(好比,mysql,sqlite,oracle等)
  • driver:用於鏈接數據庫的模塊(好比pymysql,mysqldb等)
  • username:鏈接數據庫的用戶名
  • password:鏈接數據庫的密碼
  • host: 數據庫的主機地址
  • port: 數據庫的端口
  • database: 要鏈接的數據庫名稱

pymysql模塊是較長用於鏈接mysql的模塊,使用pymysql的鏈接的語句爲:編程

  • mysql+pymysql://dahl:123456@10.0.0.13:3306/test

建立引擎用於進行數據庫的鏈接:create_engine(urls)安全

import sqlalchemy

db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test'
engine = sqlalchemy.create_engine(db_url,echo=True)
  • echo:引擎是否打印執行的sql語句,等於True時,表示打印,便於調試
  • max_overflow=5: 超過鏈接池大小外最多建立的鏈接
  • pool_size=1: 鏈接池大小
  • pool_timeout=30: 池中沒有線程最多等待的時間(不然會報錯)
  • pool_recycle=-1: 多久以後對線程池中的線程進行一次鏈接的回收(重置)

特別注意:建立引擎並不會立刻鏈接數據庫,直到讓數據庫執行任務是才鏈接。session

3.1.1 利用鏈接池執行sql

sqlalchemy內部是原生支持鏈接池的,咱們能夠僅僅利用它的鏈接池功能。經過engie的raw_connection方法就能夠獲取到一個鏈接,而後就能夠執行sql語句了(基本上就是Pymysql+DBUtils的實現)

import threading
import time
import sqlalchemy
import pymysql

engine = sqlalchemy.create_engine(
    'mysql+pymysql://dahl:123456@10.0.0.13:3306/test',
    max_overflow=5,
    pool_size=1,
    pool_timeout=30,
    pool_recycle=-1
)

def func():
    conn = engine.raw_connection()
    cursor = conn.cursor(pymysql.cursors.DictCursor)
    time.sleep(2)
    cursor.execute('select * from employees;')
    res = cursor.fetchall()
    print(res)
    cursor.close()
    conn.close()

if __name__ == '__main__':
    for i in range(10):
        threading.Thread(target=func).start()

3.1.2 利用session來執行sql

上面是經過連接池來執行sql的,其實也能夠經過session來執行。

import threading
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

engine = sqlalchemy.create_engine(
    'mysql+pymysql://dahl:123456@10.0.0.13:3306/test',
    max_overflow=5,
    pool_size=1,
    pool_timeout=30,
    pool_recycle=-1
)

DBsession = sessionmaker(bind=engine)

def func():
    session = DBsession()
    cursor = session.execute('select * from employees;')
    res = cursor.fetchall()
    print(res)
    cursor.close()
    session.close()

if __name__ == '__main__':
    for i in range(10):
        threading.Thread(target=func).start()

3.2 建立基類

        使用: sqlalchemy.ext.declarative.declarative_base 來構造聲明性類定義的基類。由於sqlalchemy內部大量使用了元編程,爲實例化的子類注入映射所需的屬性,因此咱們定義的映射要繼承自它(必須繼承)

通常只須要一個這樣的基類

Base = sqlalchemy.ext.declarative.declarative_base()
# 或者
from sqlalchemy.ext import declarative
Base = declarative.declarative_base()

3.3 建立實體類

現數據庫存在以下表

CREATE TABLE `student` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(30) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4;

建立對應的實體類:

import sqlalchemy
from sqlalchemy.ext import declarative
from sqlalchemy import Column, String, Integer

db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test'
engine = sqlalchemy.create_engine(db_url, echo=True)

Base = declarative.declarative_base()

class Student(Base):

    __tablename__ = 'student'

    id = Column(Integer, primary_key=True, nullable=False, autoincrement=True)
    name = Column(String, default='Null')
    age = Column(Integer, default='Null')

print(Student.__dict__)

# Table('student', MetaData(bind=None),   這裏沒有綁定engine,因此是None
# Column('id', Integer(), table=<student>, primary_key=True, nullable=False), 
# Column('name', String(), table=<student>, default=ColumnDefault('Null')), 
# Column('age', Integer(), table=<student>, default=ColumnDefault('Null')), schema=None),
# Column('data',DateTime,default=datetime.datetime.now)  # 這裏不能加括號
  • __tablename__: 代表,若是表已存在,則必須指定正確的表名
  • Column: 用於構建一個列對象,它的參數通常都和數據庫中的列屬性是對應的,主要有:
    • name: 數據庫中表示的此列的名稱
    • type:字段類型(好比String、Integer,這裏是sqlalchemy包裝的類型,對應的是數據庫的varchar、int等),來自TypeEngine的子類
    • autoincrement:是否自增
    • default:默認值,能夠是值,可調用對象或者類,當寫入數據該字段沒有指定時,調用。當是可調用對象的時候,建議不要加括號
    • doc:字段說明信息
    • key: 一個可選的字符串標識符,用於標識表上的此Column對象。
    • index: 是否啓用索引
    • nullable: 是否能夠爲空
    • onupdate: 若是在更新語句的SET子句中不存在此列,則將在更新時調用該值
    • primary_key: 主鍵
    • server_default:它的值是 FetchedValue實例、字符串、Unicode 或者 text()實例,用做DDL語句中該列的default值

注意:Column和String、Integer等都來自於sqlalchemy下的方法,要麼直接導入,要麼就使用sqlalchemy.String來引用。

3.3.1 經常使用字段

類型名 python中類型 說明
Integer int 普通整數,通常是32位
SmallInteger int 取值範圍小的整數,通常是16位
BigInteger int或long 不限制精度的整數
Float float 浮點數
Numeric decimal.Decimal 普通整數,通常是32位
String str 變長字符串
Text str 變長字符串,對較長或不限長度的字符串作了優化
Unicode unicode 變長Unicode字符串
UnicodeText unicode 變長Unicode字符串,對較長或不限長度的字符串作了優化
Boolean bool 布爾值
Date datetime.date 時間
Time datetime.datetime 日期和時間
LargeBinary str 二進制文件

3.4 實例化

經過咱們構建的類,來實例化的對象,在未來就是數據庫中的一條條記錄。

student = Student()
student.name = 'daxin'
student.id = 1
student.age = 20
print(student)   # <1 daxin 20>

3.5 建立表

        咱們本身寫的類都是繼承自Base,每繼承一次Base類,在Base類的metadata屬性中就會記錄當前子類,metadata提供了方法用於刪除/建立表。若是數據庫中已經存在對應的表,那麼將不會繼續建立

  • drop_all(bind=None, tables=None, checkfirst=True):刪除metadata中記錄的全部表
  • create_all(bind=None, tables=None, checkfirst=True):建立metadata中記錄的全部表
Base = declarative.declarative_base()  
Base.metadata.create_all(bind=engine)  # 須要經過引擎去執行

# 下面是engine的echo爲true時的輸出信息
# 2019-03-16 16:53:45,922 INFO sqlalchemy.engine.base.Engine 
# CREATE TABLE hello (
#   id INTEGER NOT NULL AUTO_INCREMENT, 
#   name VARCHAR(24), 
#   age INTEGER, 
#   PRIMARY KEY (id)
# )
# 
# 
# 2019-03-16 16:53:45,922 INFO sqlalchemy.engine.base.Engine {}
# 2019-03-16 16:53:45,926 INFO sqlalchemy.engine.base.Engine COMMIT
# 2019-03-16 16:53:45,927 INFO sqlalchemy.engine.base.Engine 
# CREATE TABLE world (
#   id INTEGER NOT NULL AUTO_INCREMENT, 
#   name VARCHAR(24), 
#   age INTEGER, 
#   PRIMARY KEY (id)
# )
# 
# 
# 2019-03-16 16:53:45,927 INFO sqlalchemy.engine.base.Engine {}
# 2019-03-16 16:53:45,928 INFO sqlalchemy.engine.base.Engine COMMIT
  • 生產環境不多這樣建立表,都是系統上線的時候由腳本生成。
  • 生產環境不多刪除表,寧肯廢除都不能刪除。

注意:sqlalchemy 只能建立和刪除表,不能修改表結構。只能手動的在數據庫中修改而後在代碼中添加便可。

3.6 建立會話Session

        在一個會話中操做數據庫,繪畫創建在鏈接上,鏈接被引擎管理,當第一次使用數據庫時,從引擎維護的鏈接池中取出一個鏈接使用。

from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine) 
session = Session()  # 實例化一個session對象
  • session對象線程不安全。因此不一樣線程應該使用不一樣的session對象
  • Session類和engine有一個就好了。

scoped_session是sqlalchemy提供的線程安全的session,利用的是ThreadLocal實現的。

3.7 數據操做

3.7.1 增長數據

方法 含義
add() 增長一個對象
add_all() 增長多個對象,類型爲可迭代
import sqlalchemy
from sqlalchemy.ext import declarative
from sqlalchemy import Column, String, Integer
from sqlalchemy.orm import sessionmaker

db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test'
engine = sqlalchemy.create_engine(db_url, echo=True)

Base = declarative.declarative_base()
DBSession = sessionmaker(bind=engine)
session = DBSession()

class Student(Base):
    __tablename__ = 'student'

    id = Column(Integer, primary_key=True, nullable=False, autoincrement=True)
    name = Column(String(24), default='Null')
    age = Column(Integer, default='Null')

    def __repr__(self):
        return '<{} {} {}>'.format(
            self.id, self.name, self.age
        )

daxin = Student(id=12, name='daxin', age=20)
session.add(daxin)
dachenzi = Student(id=13,name='dachenzi', age=21)
xiaobai = Student(id=14,name='xiaobai', age=22)
session.add_all((dachenzi,xiaobai))   # 新增三條數據
session.commit()

須要注意的是下面的狀況:

daxin = Student(name='daxin')
daxin.age = 40
session.add(daxin)  # 1
session.commit()   
daxin.age = 20  
session.add(daxin)  # 2
session.commit()
daxin.age = 10
session.add_all([daxin, daxin, daxin, daxin]) # 3
session.commit()  
# <30 daxin 10>

結果生成個1條數據,<30 daxin 10>,爲何呢?因爲id屬於自增列,咱們在執行#1時,id是沒有固定下來的。

  1. 執行後,commit,則daxin的id就固定下來了。
  2. 執行時,因爲id沒變,因此並不會新增數據,而是使用update語句更新了age字段
  3. 執行時,因爲都是daxin的,id,name,age都沒有改變,因此只會執行1條語句

    當engine的echo等於true時,看到具體的sql語句,一切就很明白了。

3.7.2 簡單查詢

使用session的query方法進行簡單查詢,格式爲:

  • session.query(student): 等同於select * from student;
  • session.query(student).get(2): 等同於select * from student where id = 2,這裏的get方法只能主鍵查詢
std_list = session.query(Student)
print(std_list)        # SELECT student.id AS student_id, student.name AS student_name, student.age AS student_age FROM student
print(type(std_list))  # <class 'sqlalchemy.orm.query.Query'>

這裏直接打印並不會結果,由於它太懶了,你不迭代它,它就不會真的去數據庫查詢。

std_list = session.query(Student)
for std in std_list:
    print(std)

# 經過get來過濾主鍵,是能夠直接執行返回結果的。
std_list = session.query(Student).get(30)
print(std_list)

3.7.3 修改數據

修改的數據的流程分爲兩步:

  1. 查找匹配的數據
  2. 修改後,提交
std = session.query(Student).get(30)
print(std)  # <30 daxin 200>
std.age = 1000
session.add(std)
session.commit()
std = session.query(Student).get(30)
print(std)  # <30 daxin 1000>

# 或者
std = session.query(Student).filter(Student.id > 10).update({'name':'daxin'})
std.commit()

大部分ORM是都是這樣,必須先查才能改。

在原有數據的基礎上批量修改,好比在全部名稱後面添加特定的後綴

session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)  # 必須爲synchronize_session=False
session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")  # 必須synchronize_session="evaluate",表示要進行計算

3.7.4 刪除數據(不建議)

使用session.delete來刪除數據

std = session.query(Student).get(30)
session.delete(std)
session.commit()   # 提交刪除操做
std = session.query(Student).get(30)
print(std)  # None

3.7.5 狀態

當咱們建立一條數據,或是從數據庫中獲取一條數據時,數據自己是存在一個狀態屬性的,用來標識當前數據是否持久化,或者其餘狀態,在sqlalchemy中,inspect(obj)能夠用來窺探數據(obj)的狀態

daxin = Student(name='daxin')
daxin.age = 10000
state = sqlalchemy.inspect(daxin)
print(state)  # <sqlalchemy.orm.state.InstanceState object at 0x00000186EEC38EB8>
std = session.query(Student).get(28)
state = sqlalchemy.inspect(std)
print(state)  # <sqlalchemy.orm.state.InstanceState object at 0x00000186EFDCDA20>

咱們看到,inspect返回的是一個InstanceState對象。這個對象有如下幾個屬性:

  • session_id:獲取數據時的session ID,若是是自建數據未持久化ID爲None
  • _attached: 是不是附加的(數據已經加載(已add),就差提交的數據庫了)
  • transient:是不是臨時數據(臨時建立,尚未提交(尚未add))
  • pending: 是不是待定的數據
  • persistent: 是不是持久的
  • deleted: 是否已刪除
  • detached:是否被分離
  • key:key是多少

    InstanceState對象,能夠經過sqlalchemy.orm.state導入

具體的狀態信息與含義以下:

狀態 說明
transient 實體類還沒有加入到session中,同時並無保存到數據庫中
pending transient的實體被加入到session中,狀態切換到pending,但它尚未被flsh到數據庫中
persistent session中的實體對象對應着數據庫中的真實記錄。pending狀態在提交成功後能夠變成persistent狀態,或者查詢成功返回的實體也是persistent狀態
deleted 實體被刪除且已經flush但未commit完成。事物提交成功了,實體變成detached,事物失敗返回persistent狀態
detached 刪除成功的實體進入這個狀態

因此數據的狀態變化以下:

  1. 新建一個實體,狀態是transient臨時的
  2. add()之後,狀態變爲pending
  3. commit()之後,狀態變爲persistent
  4. 查詢成功返回的實體狀態也是persistent狀態
  5. delete()並flush()之後,狀態變爲deleted
  6. commit()之後,變爲detached,提交失敗,回退到persistent狀態

    flush()方法,主動把改變應用到數據庫中去

        刪除、修改操做,須要對應一個真實存在的數據,也就是說數據的狀態是persistent才行。當使用add語句新增信息時,若是這個對象已經添加過數據庫了,那麼它的狀態會變爲persistent,若是對persistent的數據進行修改繼續提交的話,那麼使用的將會是update語句而非insert。這也是前面爲啥屢次對一個數據進行add,提交了屢次只會插入1次的緣由。

import sqlalchemy
from sqlalchemy.ext import declarative
from sqlalchemy import Column, String, Integer
from sqlalchemy.orm import sessionmaker

db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test'
engine = sqlalchemy.create_engine(db_url, echo=True)

Base = declarative.declarative_base()
DBSession = sessionmaker(bind=engine)
session = DBSession()

class Student(Base):
    __tablename__ = 'student'

    id = Column(Integer, primary_key=True, nullable=False, autoincrement=True)
    name = Column(String(24), default='Null')
    age = Column(Integer, default='Null')

    def __repr__(self):
        return '<{} {} {}>'.format(
            self.id, self.name, self.age
        )

def getstate(state: InstanceState):
    print("""
    session_id={} transient={} _attached={} pending={} persistent={} deleted={} detached={}
    """.format(state.session_id, state.transient, state._attached, state.pending, state.persistent, state.deleted,
               state.detached))

daxin = Student(name='daxin')
daxin.age = 10000
state = sqlalchemy.inspect(daxin)
getstate(state)  # session_id=None transient=True _attached=False pending=False persistent=False deleted=False detached=False

session.add(daxin)
getstate(state)  # session_id=1 transient=False _attached=True pending=True persistent=False deleted=False detached=False

session.commit()
getstate(state)  # session_id=1 transient=False _attached=True pending=False persistent=True deleted=False detached=False

std = session.query(Student).get(28)
state = sqlalchemy.inspect(std)
getstate(state)  # session_id=1 transient=False _attached=True pending=False persistent=True deleted=False detached=False

std = session.query(Student).get(31)
session.delete(std)
state = sqlalchemy.inspect(std)
getstate(state)  # session_id=1 transient=False _attached=True pending=False persistent=True deleted=False detached=False

session.flush()
getstate(state)  # session_id=1 transient=False _attached=True pending=False persistent=False deleted=True detached=False

session.commit()
getstate(state)  # session_id=None transient=False _attached=False pending=False persistent=False deleted=False detached=True

3.7.6 枚舉字段

        在數據庫的字段類型中,好比性別字段,咱們可能會限制數據來源爲M(male),F(Female),這個時候字段類型能夠是枚舉的,可是在sqlalchemy中,原生的字段類型沒有枚舉類型,那麼就須要藉助enum類了。

import sqlalchemy
from sqlalchemy.ext import declarative
from sqlalchemy import Column, String, Integer
from sqlalchemy.orm import sessionmaker
import enum

db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test'
engine = sqlalchemy.create_engine(db_url, echo=True)

Base = declarative.declarative_base()
DBSession = sessionmaker(bind=engine)
session = DBSession()

class GenderEnum(enum.Enum):
    M = 'M'
    F = 'F'

class Student(Base):
    __tablename__ = 'student'

    id = Column(Integer, primary_key=True, nullable=False, autoincrement=True)
    name = Column(String(24), default='Null')
    age = Column(Integer, default='Null')
    gender = Column(sqlalchemy.Enum(GenderEnum),nullable=False)

    def __repr__(self):
        return '<{} {} {}>'.format(
            self.id, self.name, self.age
        )

用起來很麻煩,因此建議性別使用1或者0來存儲,顯示的時候作對於轉換便可

3.7.7 複雜查詢

3.7.7.1 where條件查詢

使用filter方法進行條件過濾查詢:

  • session.query(student).filter(student.id > 10):至關於select * from student where student.id > 10

    同時還存在一個filter_by,它的不一樣之處在於括號中的不是表達式,而是參數。好比:filter(user.id == 10) -> filter_by(user.id = 10)

where條件中的關係:

  • AND(與) 對應 and_
  • OR(或) 對應 or_
  • not(非) 對應 not_
  • in 對應字段的 in_
  • not in 對應字段的 notin_
  • like 對應 字段的like方法
  • not like 對應 字段的notlike方法

想要使用與或非,須要先行導入

import sqlalchemy
from sqlalchemy.ext import declarative
from sqlalchemy import Column, String, Integer
from sqlalchemy.orm import sessionmaker
from sqlalchemy import and_, or_, not_

db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test'
engine = sqlalchemy.create_engine(db_url, echo=True)

Base = declarative.declarative_base()
DBSession = sessionmaker(bind=engine)
session = DBSession()

class Student(Base):
    __tablename__ = 'student'

    id = Column(Integer, primary_key=True, nullable=False, autoincrement=True)
    name = Column(String(24), default='Null')
    age = Column(Integer, default='Null')

    def __repr__(self):
        return '<{} {} {}>'.format(
            self.id, self.name, self.age
        )

def getresulte(stds):
    for std in stds:
        print(std)

條件判斷之AND(&,and_)

# and
std_list = session.query(Student).filter(and_(Student.id < 30, Student.id > 27))
getresulte(std_list)

std_list = session.query(Student).filter(Student.id < 30).filter(Student.id > 27)   # filter返回的仍是一個結果集,因此還能夠繼續使用filter進行過濾
getresulte(std_list)

std_list = session.query(Student).filter(Student.id < 30, Student.age > 100)  # 多個條件一塊兒寫,也是and的關係
getresulte(std_list)

std_list = session.query(Student).filter((Student.name == 'daxin') & (Student.age > 28))
getresulte(std_list)

條件判斷之OR(or_,|)

# or
std_list = session.query(Student).filter(or_(Student.id > 27, Student.age < 50))
getresulte(std_list)

std_list = session.query(Student).filter((Student.id > 27) | (Student.age < 50 ))
getresulte(std_list)

條件判斷之NOT(not_,~)

# not
std_list = session.query(Student).filter(not_(Student.id == 32))
getresulte(std_list)

std_list = session.query(Student).filter(~(Student.id == 32))
getresulte(std_list)

likeinnot in

# like
std_list = session.query(Student).filter(Student.name.like('da%'))
getresulte(std_list)

# not like
std_list = session.query(Student).filter(Student.name.notlike('da%'))
getresulte(std_list)

# in
std_list = session.query(Student).filter(Student.age.in_([10,30,50]))
getresulte(std_list)

# not in
std_list = session.query(Student).filter(Student.age.notin_([10,30,50]))
getresulte(std_list)

補充:

r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all()  # 傳參的方式查詢
r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()  # 用的很少

更多

3.7.7.2 排序

  • order_by:排序
  • asc: 升序(默認)
  • desc: 降序
# 升序
std_list = session.query(Student).filter(Student.name.like('da%')).order_by(Student.age)
getresulte(std_list)

# 降序
std_list = session.query(Student).filter(Student.name.like('da%')).order_by(Student.age.desc())
getresulte(std_list)

3.7.7.3 分頁(偏移量)

  • limit: 顯示結果的條目數
  • offset:偏移量
# limit
std_list = session.query(Student).filter(Student.name.like('da%')).order_by(Student.age.desc()).limit(3).offset(2)
getresulte(std_list)
# select * from Student where name like 'da%' order_by age desc limit 3 offset 2

3.7.7.4 消費方法

  • count(): 獲取總條數(僅僅針對結果集,不能all之後再count)
  • all(): 取全部行(默認)(列表)
  • first():取首行
  • one():有且只有一行,多行則拋出異常
  • delete():對查詢出來的數據直接進行刪除
  • scalar(): 第一條記錄的第一個元素
# count
std_list = session.query(Student)
print(std_list.count())

# first
std_list = session.query(Student).first()
print(std_list)

first本質上就是limit。

3.7.7.5 分組及聚合方法

sqlalchemy一樣提供了聚合方法,使用sqlalchemy.func來調用

  • group_by:分組顯示

func提供的方法有

  • max:求最大值
  • min:求最小值
  • avg:求平均值
  • count:聚合(通常和分組連用)
# max
std_list = session.query(Student.name,sqlalchemy.func.max(Student.age))  # select name,max(age) from Student;
getresulte(std_list)

# count
std_list = session.query(Student.name,sqlalchemy.func.count(Student.id)).group_by(Student.name)  # SELECT name, count(id)  FROM student GROUP BY name 
getresulte(std_list)

3.7.7.6 關聯查詢

sqlalchemy提供ForeignKey用來進行外鍵關聯,它的格式爲:

sqlalchemy.ForeignKey(表名.字段名,ondelete='更新規則')

# 若是填寫映射後的class,那麼能夠直接寫:類.字段
# 若是填寫數據庫中的表,那麼須要使用引號:'數據庫表名.字段名'

更新規則和刪除規則,可選項以下:

  • CASCADE:級聯刪除,刪除被關聯數據時,從表關聯的數據所有刪除。
  • SET NULL:從父表刪除或更新行,會設置子表中的外鍵列爲NULL,但必須保證子表沒有指定 NOT NULL,也就是說子表的字段能夠爲NULL才行。
  • RESTRICT:若是從父表刪除主鍵,若是子表引用了,則拒絕對父表的刪除或更新操做。(保護數據)
  • NO ACTION:表中SQL的關鍵字,在MySQL中與RESTRICT相同。拒絕對父表的刪除或更新操做。

現有以下關係表

CREATE TABLE `departments` (
  `dept_no` char(4) NOT NULL,
  `dept_name` varchar(40) NOT NULL,
  PRIMARY KEY (`dept_no`),
  UNIQUE KEY `dept_name` (`dept_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `employees` (
  `emp_no` int(11) NOT NULL,
  `birth_date` date NOT NULL,
  `first_name` varchar(14) NOT NULL,
  `last_name` varchar(16) NOT NULL,
  `gender` enum('M','F') NOT NULL,
  `hire_date` date NOT NULL,
  PRIMARY KEY (`emp_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `dept_emp` (
  `emp_no` int(11) NOT NULL,
  `dept_no` char(4) NOT NULL,
  `from_date` date NOT NULL,
  `to_date` date NOT NULL,
  PRIMARY KEY (`emp_no`,`dept_no`),
  KEY `dept_no` (`dept_no`),
  CONSTRAINT `dept_emp_ibfk_1` FOREIGN KEY (`emp_no`) REFERENCES `employees` (`emp_no`) ON DELETE CASCADE,
  CONSTRAINT `dept_emp_ibfk_2` FOREIGN KEY (`dept_no`) REFERENCES `departments` (`dept_no`) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

建立對應的映射實體類

import sqlalchemy
from sqlalchemy.ext import declarative
from sqlalchemy import Column, String, Integer, Date
from sqlalchemy.orm import sessionmaker
import enum

db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test'
engine = sqlalchemy.create_engine(db_url, echo=True)

Base = declarative.declarative_base()
DBSession = sessionmaker(bind=engine)
session = DBSession()

class Departments(Base):
    __tablename__ = 'departments'
    dept_no = Column(String(4), nullable=False, primary_key=True)
    dept_name = Column(String(40), nullable=False, unique=True)

    def __repr__(self):
        return '<{} {} {}>'.format(self.__class__.__name__, self.dept_no, self.dept_name)

class GenderEnum(enum.Enum):
    M = 'M'
    F = 'F'

class Employees(Base):
    __tablename__ = 'employees'
    emp_no = Column(Integer, nullable=False, primary_key=True)
    birth_date = Column(Date, nullable=False)
    first_name = Column(String(14), nullable=False)
    last_name = Column(String(16), nullable=False)
    gender = Column(sqlalchemy.Enum(GenderEnum), nullable=False)
    hire_date = Column(Date, nullable=False)

    def __repr__(self):
        return '<{} {} {} {} {} {}>'.format(
            self.__class__.__name__, self.emp_no, self.birth_date, self.first_name, self.last_name, self.gender,
            self.hire_date
        )

class Dept_emp(Base):
    __tablename__ = 'dept_emp'
    emp_no = Column(Integer, sqlalchemy.ForeignKey(Employees.emp_no, ondelete='CASCADE'), primary_key=True, )
    dept_no = Column(String(4), sqlalchemy.ForeignKey(Departments.dept_no, ondelete='CASCADE'), nullable=False)
    from_date = Column(Date, nullable=False)
    to_date = Column(Date, nullable=False)

    def __repr__(self):
        return '<{} emp_no={} dept_no={}>'.format(
            self.__class__.__name__, self.emp_no, self.dept_no
        )

def getres(emps):
    for emp in emps:
        print(emp)

查詢10010員工所在的部門編號及員工信息

隱式鏈接
emps = session.query(Employees, Dept_emp).filter(and_(Employees.emp_no == Dept_emp.emp_no, Employees.emp_no == '10010')).all()
getres(emps)

emps = session.query(Employees, Dept_emp).filter(Employees.emp_no == Dept_emp.emp_no).filter(Employees.emp_no == '10010').all()
getres(emps)


# 至關於:select * from employees,dept_emp where employees.emp_no = dept_emp.emp_no and employees.emp_no = '10010';
join鏈接

使用join()關鍵字來進行連表查詢,其isouter參數用於指定join的類型,默認狀況下使用的是inner join,當isouter=True時,就表示是left join(right join沒有實現,須要本身交換前面表的順序)

# join鏈接
std_list = session.query(Employees).join(Dept_emp,Employees.emp_no == Dept_emp.emp_no,isouter=True).filter(Employees.emp_no == '10010')
getres(std_list)

std_list = session.query(Employees).join(Dept_emp).filter((Employees.emp_no == Dept_emp.emp_no) & (Employees.emp_no == '10010'))
getres(std_list)

# 若是query中,僅列出一個代表,至關於
# select employees.* from employees inner join dept_emp on employees.emp_no = dept_emp.emp_no where employees.emp_no = '10010';

4 一對多關係

一對可能是一種表與表的關係,在orm中建立和使用方式有一寫特色,這裏單獨描述

4.1 建立關係表

經過ForeignKey來建立一對多關係,須要注意的是它內部須要填寫對應的真實的表名和字段(非映射的類對象)

class Deptment(base):

    __tablename__ = 'deptment'

    id = Column(Integer, autoincrement=True, primary_key=True)
    dep_name = Column(String(32), nullable=False)

class User(base):

    __tablename__ = 'user'

    id = Column(Integer, autoincrement=True, primary_key=True)
    name = Column(String(32), nullable=False)
    dept_id = Column(Integer, ForeignKey('deptment.id'))

# CREATE TABLE `user` (
#   `id` int(11) NOT NULL AUTO_INCREMENT,
#   `name` varchar(32) NOT NULL,
#   `dept_id` int(11) DEFAULT NULL,
#   PRIMARY KEY (`id`),
#   KEY `dept_id` (`dept_id`),
#   CONSTRAINT `user_ibfk_1` FOREIGN KEY (`dept_id`) REFERENCES `deptment` (`id`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

# CREATE TABLE `deptment` (
#   `id` int(11) NOT NULL AUTO_INCREMENT,
#   `dep_name` varchar(32) NOT NULL,
#   PRIMARY KEY (`id`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

4.2 添加數據

添加部門數據時,只能使用id來插入,當插入的dept_id在deptment表中不存在時,會直接爆出異常

# 批量添加部門數據
session.add_all([
    Deptment(dep_name='運維部'),
    Deptment(dep_name='開發部'),
    Deptment(dep_name='產品部'),
    Deptment(dep_name='測試部')]
)

# 添加用戶數據
session.add_all([
    User(name='daxin', dept_id=1),
    User(name='dachenzi', dept_id=2),
    User(name='dahl', dept_id=3)]
)
session.commit()

4.3 relationship

        在表中使用relationship來建立一個關聯的對象,便於查詢(和django的一對多隱含的對象相同,但在sqlalchemy中必須經過relationship才能夠生成),並不會生成新的字段,僅僅是產生一個關聯關係。

注意relationship對象不能做爲條件直接進行表查詢:session.query(User.name,User.deptment.dept_name) 這樣是不行的。

relationship(obj,backref='')
  • obj:表示要關聯的ORM對象
  • backref:表示在對方插入一個關鍵字,用於反向關聯relationship所在的表對象的自己。

下面是一個例子:

class Deptment(base):
    __tablename__ = 'deptment'

    id = Column(Integer, autoincrement=True, primary_key=True)
    dep_name = Column(String(32), nullable=False)

class User(base):
    __tablename__ = 'user'

    id = Column(Integer, autoincrement=True, primary_key=True)
    name = Column(String(32), nullable=False)
    dept_id = Column(Integer, ForeignKey('deptment.id'))
 
    deptment = relationship(Deptment, backref='user')   # 建立relationship關係

# 正向查(經過User查deptment)
std = session.query(User).filter(User.id == 1).first()
print(std.name)
print(std.deptment.dep_name)  # 直接經過deptment對象,讀取Deptment表中對於的信息

# 反向查(經過depement查User)
dep = session.query(Deptment).filter(Deptment.id == 1).first()
print(dep.user[0].name)   # 反向查,經過user獲取到的數據是一個列表

relationship,就是經過在一個映射類中增長一個屬性,該屬性用於表示鏈接關係,能夠在結果中,訪問該屬性來訪問關聯的表信息

4.4 經過relationship添加數據

存在relationship的映射關係時,咱們添加數據時,就能夠經過relationship使用對象來添加關聯數據了

dep = session.query(Deptment).filter(Deptment.id == 3).first()
session.add_all([
    User(name='hello', deptment=dep),
    User(name='world', deptment=dep)]
)
session.commit()

直接建立新的部門對象也是能夠的

user = User(name='dahlhin',deptment=Deptment(dep_name='管理員'))
session.add(user)
session.commit()

5 多對多關係

和django不一樣sqlalchemy的第三張表須要手動建立。

5.1 建立多對多關係

模擬主機與業務線的歸屬問題。

  • 主機能夠屬於多個業務線
  • 一個業務線能夠包含多個主機

這是一個典型的多對多關係,須要額外一張關係表來記錄。

class Service2host(base):
    __tablename__ = 'service_to_host'
    id = Column(Integer, primary_key=True, nullable=False, unique=True, autoincrement=True)
    s_id = Column(Integer, ForeignKey('service.id'))
    h_id = Column(Integer, ForeignKey('host.id'))

    __table_args__ = (
        # 聯合惟一索引
        UniqueConstraint('s_id', 'h_id', name='serivce_to_host'),
    )  # 業務id和主機id不能重複,這裏建立惟一索引來約束


class Service(base):
    __tablename__ = 'service'
    id = Column(Integer, primary_key=True, nullable=False, unique=True, autoincrement=True)
    ser_name = Column(String(16), nullable=False)


class Host(base):
    __tablename__ = 'host'

    id = Column(Integer, primary_key=True, nullable=False, unique=True, autoincrement=True)
    hostname = Column(String(16), nullable=False)
    datetime = Column(DateTime, default=datetime.datetime.now, nullable=False)

# CREATE TABLE `host` (
#   `id` int(11) NOT NULL AUTO_INCREMENT,
#   `hostname` varchar(16) NOT NULL,
#   `datetime` date NOT NULL,
#   PRIMARY KEY (`id`),
#   UNIQUE KEY `id` (`id`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;


# CREATE TABLE `service` (
#   `id` int(11) NOT NULL AUTO_INCREMENT,
#   `ser_name` varchar(16) NOT NULL,
#   PRIMARY KEY (`id`),
#   UNIQUE KEY `id` (`id`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;


# CREATE TABLE `service_to_host` (
#   `id` int(11) NOT NULL AUTO_INCREMENT,
#   `s_id` int(11) DEFAULT NULL,
#   `h_id` int(11) DEFAULT NULL,
#   PRIMARY KEY (`id`),
#   UNIQUE KEY `id` (`id`),
#   UNIQUE KEY `serivce_to_host` (`s_id`,`h_id`),
#   KEY `h_id` (`h_id`),
#   CONSTRAINT `service_to_host_ibfk_1` FOREIGN KEY (`s_id`) REFERENCES `service` (`id`),
#   CONSTRAINT `service_to_host_ibfk_2` FOREIGN KEY (`h_id`) REFERENCES `host` (`id`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

5.2 經過relationship操做數據

若是沒有relationship關聯對象,那麼咱們將須要分別操做三張表,使用relationship將會大大簡化這個過程,那麼在Service中建立關係:

hosts = relationship('host', secondary='service_to_host', backref='services')

建立後:

  • 在Service中存在一個hosts屬性,對應host表
  • 在Host中被動注入一個services屬性,對應service表
  • secondary='service_to_host' 表示經過第三張表來關聯關係
# 添加一個業務,並建立幾臺主機
session.add(
    Service(ser_name='運營', hosts=[
        Host(hostname='openstack'),
        Host(hostname='nginx')
    ])
)

# 添加一個主機,並關聯幾個業務線
service = session.query(Service).filter(or_(Service.ser_name == '運營', Service.ser_name == '運維')).all()
session.add(
    Host(hostname='Tomcat', services=service)
)

# 查詢一個業務線下都有哪些主機
service = session.query(Service).filter(Service.ser_name == '運營').first()
for host in service.hosts:
    print(host.id, host.hostname)

# 查詢一臺主機都歸屬哪些業務線
host = session.query(Host).filter(Host.hostname == 'openstack').first()
for servcie in host.services:
    print(service.id, service.ser_name)

6 scoped_session(推薦)

當咱們啓動多線程來執行數據庫操做時,每一個線程都會用到session來執行sql語句,通常狀況下,咱們會在線程內不建立新的session來執行sql語句,下面是一個利用session完成原生sql的執行。

import threading
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

engine = create_engine(
    'mysql+pymysql://dahl:123456@10.0.0.10:3306/test',
    max_overflow=5,
    pool_size=1,
    pool_timeout=30,
    pool_recycle=-1
)

DBsession = sessionmaker(bind=engine)

def func():
    session = DBsession()
    cursor = session.execute('show tables')
    res = cursor.fetchall()
    print(res)
    cursor.close()
    session.close()

if __name__ == '__main__':
    for i in range(10):
        threading.Thread(target=func).start()

這裏會產生一個問題,每一個線程啓動時都會建立一個session,用完又會關閉,這樣太麻煩了,這裏可使用scoped_session對象來完成,它相似與threading.Local的實現方式。做用是,當線程調用scoped_session對象的功能時,好比各類sql查詢,在其內部會爲每一個線程建立一個新的session對象,讓它來使用。

# 1 引入scoped_session
from sqlalchemy.orm import scoped_session

# 2 全局建立session對象
session = scoped_session(DBsession)

# 3 多進程內直接使用
def func():
    cursor = session.execute('show tables')
    res = cursor.fetchall()
    print(res)
    cursor.close()
    session.remove()   # 使用完畢須要remove

不須要關閉,直接使用scoped_session對象的remove方法便可。

實現過程:

  1. scoped_session是一個類
  2. 它內部並無實現全部的Session類的方法
  3. 它只是在內部,把傳入的session對象的屬性,進行反射獲取,並綁定在本身身上。
  4. self.registry() 就是爲每一個線程建立的Session對象,其內部使用的就是threading.local實現的。

源碼以下:

# scoping
def instrument(name):
    def do(self, *args, **kwargs):
        return getattr(self.registry(), name)(*args, **kwargs)
    return do

for meth in Session.public_methods:
    setattr(scoped_session, meth, instrument(meth))

# registry是ThreadLocalRegistry對象
self.registry = ThreadLocalRegistry(session_factory)


# ThreadLocalRegistry對象內部經過threading.local實現的
    def __init__(self, createfunc):
        self.createfunc = createfunc  # Session對象
        self.registry = threading.local()

    
    # registry()其實調用的就是__call__方法
    def __call__(self):
        try:
            return self.registry.value
        except AttributeError:
            val = self.registry.value = self.createfunc()   # 第一次執行,就會實例化一個Session對象
            return val

7 別名

當使用join語句連表查詢時,不免會碰到兩個表重名的字段,這裏就可使用label來對字段進行別名顯示。

stds = session.query(User.name.label('username'), User.id, Deptment.dep_name).join(Deptment).all()
for std in stds:
    print(std.username, std.id, std.dep_name)
相關文章
相關標籤/搜索