python之SQLAlchemy組件

介紹

SQLAlchemy 是一個 ORM 框架,能夠幫助咱們使用面向對象的方式快速實現數據庫操做。html

組成部分:mysql

  • Engine,框架的引擎
  • Connection Pooling ,數據庫鏈接池
  • Dialect,選擇鏈接數據庫的DB API種類
  • Schema/Types,架構和類型
  • SQL Exprression Language,SQL表達式語言

SQLAlchemy 自己沒法操做數據庫,其必須依賴 pymysql 等第三方插件,Dialect(方言)用於和數據 API 進行交流,根據配置文件的不一樣調用不一樣的數據庫 API ,從而實現對數據庫的操做,如:sql

MySQLDB
    mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
    
pymysql
    mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
    
MySQL-Connector
    mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
    
cx_Oracle
    oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
    
更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html

使用

安裝

pip3 install sqlalchemy

建立和刪除表

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

Base = declarative_base()


class Users(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    age = Column(Integer, nullable=False)
    name = Column(String(32), index=True, nullable=False)


"""
根據類建立數據庫表
"""
engine = create_engine(
    "mysql+pymysql://root:root@127.0.0.1:3306/1221?charset=utf8",
    max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
    pool_size=5,  # 鏈接池大小
    pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
    pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
)


def init_db():
    # 建立表
    Base.metadata.create_all(engine)


def drop_db():
    Base.metadata.drop_all(engine)


if __name__ == '__main__':
    drop_db()
    init_db()

單表操做

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

Base = declarative_base()


class Users(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
age = Column(Integer, nullable=False) name
= Column(String(32), index=True, nullable=False) engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/1221", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) # 每次執行數據庫操做時,都須要建立一個session session = Session()
# 添加單條
obj = Users(name="zhangsan",age=20)
session.add(obj)
# 添加多條
obj1 = Users(name='lisi',age=12)
obj2 = Users(name='wangwu',age=14)
session.add_all([obj1, obj2])
# 提交事務
session.commit()
新增操做
# 查詢全部
user_query = session.query(Users)
print(user_query)  # SELECT users.id AS users_id, users.name AS users_name FROM users
print(user_query.all())  # [<__main__.Users object at 0x0000000003A3E2B0>, <__main__.Users object at 0x0000000003A3E320>, <__main__.Users object at 0x0000000003A3E390>]

# 過濾條件
user_query = user_query.filter(Users.id > 1)
print(user_query)  # SELECT users.id AS users_id, users.name AS users_name FROM users WHERE users.id > %(id_1)s
print(user_query.all())  # [<__main__.Users object at 0x000000000395E4A8>, <__main__.Users object at 0x000000000395E518>]

# 取第一個
print(user_query.first())  # <__main__.Users object at 0x000000000392DC88>
基本查詢
# 刪除 返回刪除條數
print(session.query(Users).filter(Users.id > 1).delete())  # 2
session.commit()
刪除操做
# 修改方式一
user_obj = session.query(Users).filter(Users.id == 1).first()
user_obj.name = '張大三'
session.commit()

# 修改方式二
session.query(Users).filter(Users.id == 1).update({Users.name: '張小三'})
session.commit()

# 在原來值基礎修改
session.query(Users).filter(Users.id == 1).update({Users.name: Users.name + '1'}, synchronize_session=False)
session.query(Users).filter(Users.id == 1).update({'age': Users.age + 2}, synchronize_session='evaluate')
session.commit()
修改操做
# 經過字段過濾
session.query(Users).filter_by(name='zhangsan').all()
# filter 中過濾多條件時它們的關係是 and
session.query(Users).filter(Users.id > 1, Users.name == 'lisi').all()
# between
session.query(Users).filter(Users.id.between(1,2)).all()
# in
session.query(Users).filter(Users.id.in_([1, 3])).all()
# not in
session.query(Users).filter(~Users.id.in_([1, 3])).all()
# 子查詢
session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='zhangsan'))).all()
# and 和 or 操做
from sqlalchemy import and_, or_
session.query(Users).filter(and_(Users.id > 3, Users.name == 'lisi')).all()
session.query(Users).filter(or_(Users.id < 2, Users.name == 'zhangsan')).all()
session.query(Users).filter(
    or_(
        Users.id < 2,
        and_(Users.name == 'zhangsan', Users.id > 3),
    )).all()
# like
session.query(Users).filter(Users.name.like('z%')).all()
# not like
session.query(Users).filter(~Users.name.like('z%')).all()
# 切片 含首不含尾
session.query(Users)[0:1]
# order by
session.query(Users).order_by(Users.name).all()
# order by desc
session.query(Users).order_by(Users.name.desc(),Users.id.asc()).all()
# group by & having & max & sum & min & avg
from sqlalchemy.sql import func
session.query(func.max(Users.age), func.sum(Users.age), func.min(Users.age), func.avg(Users.age)).group_by(Users.age).having(func.min(Users.age) > 14).all()  # [(26, Decimal('26'), 26, Decimal('26.0000'))]
# 查詢指定列 & as
ret = session.query(Users.name,Users.age,Users.name.label('nick_name')).all()
[print(user_item[0],user_item.age,user_item.nick_name) for user_item in ret]
# 連表
# where xx.id = xx.id
session.query(Users, Favor).filter(Users.id == Favor.nid).all()
# inner join
session.query(Person).join(Favor).all()
# outer join
session.query(Person).join(Favor, isouter=True).all()
# 組合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
# 合併去重
ret = q1.union(q2).all()
# 合併不去重
ret = q1.union_all(q2).all()
其它操做

一對多操做

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

Base = declarative_base()


class Depart(Base):
    __tablename__ = 'depart'
    id = Column(Integer, primary_key=True)
    title = Column(String(32), nullable=True)


class Users(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)
    dept_id = Column(Integer, ForeignKey('depart.id'))


"""
根據類建立數據庫表
"""
engine = create_engine(
    "mysql+pymysql://root:root@127.0.0.1:3306/1221?charset=utf8",
    max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
    pool_size=5,  # 鏈接池大小
    pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
    pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
)


def init_db():
    # 建立表
    Base.metadata.create_all(engine)


def drop_db():
    Base.metadata.drop_all(engine)


if __name__ == '__main__':
    drop_db()
    init_db()
建立外鍵關係
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

Base = declarative_base()


class Depart(Base):
    __tablename__ = 'depart'
    id = Column(Integer, primary_key=True)
    title = Column(String(32), nullable=True)


class Users(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)
    dept_id = Column(Integer, ForeignKey('depart.id'))


engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/1221", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)

# 每次執行數據庫操做時,都須要建立一個session
session = Session()
# 查詢全部用戶名稱 & 所屬部門名稱
user_list = session.query(Users, Depart).join(Depart, Users.dept_id == Depart.id).all()
print(user_list)
'''
[(<__main__.Users object at 0x000000000395C8D0>, <__main__.Depart object at 0x000000000395C940>),
(<__main__.Users object at 0x000000000395C9B0>, <__main__.Depart object at 0x000000000395C940>),
(<__main__.Users object at 0x000000000395CA20>, <__main__.Depart object at 0x000000000395CA90>)]
'''
[print(user[0].name, user[1].title) for user in user_list]
'''
張三 後勤部
李四 後勤部
王五 財務部
'''
#  SELECT users.name AS users_name, depart.title AS depart_title FROM users INNER JOIN depart ON users.dept_id = depart.id
user_list = session.query(Users.name, Depart.title).join(Depart, Users.dept_id == Depart.id).all()
print(user_list, type(user_list))  # [('張三', '後勤部'), ('李四', '後勤部'), ('王五', '財務部')]
[print(user.name, user.title) for user in user_list]
'''
張三 後勤部
李四 後勤部
王五 財務部
'''
#  SELECT users.name AS users_name, depart.title AS depart_title FROM users LEFT OUTER JOIN depart ON users.dept_id = depart.id
user_list = session.query(Users.name, Depart.title).join(Depart, Users.dept_id == Depart.id, isouter=True)
print(user_list)
連表查詢
# 正向查詢 :查詢全部用戶關聯的部門名稱
user_list = session.query(Users).all()
[print(user.name, user.depart, user.depart.title) for user in user_list]
'''
張三 <__main__.Depart object at 0x00000000039697F0> 後勤部
李四 <__main__.Depart object at 0x00000000039697F0> 後勤部
王五 <__main__.Depart object at 0x0000000003969898> 財務部
'''
# 反向查詢 :查詢後勤部全部用戶名稱
depart_obj = session.query(Depart).filter_by(id=1).first()
[print(user.name, user) for user in depart_obj.users]
'''
張三 <__main__.Users object at 0x000000000395FA20>
李四 <__main__.Users object at 0x000000000395F978>
'''

# 關聯新增 :建立一個部門'醬油部',再在該部門中添加一個員工'李德剛'
# 方式一:
user_obj = Users(name='李德剛')
depart_obj = Depart(title='醬油部', users=[user_obj])
session.add(depart_obj)
session.commit()
# 方式二:
depart_obj = Depart(title='醬油部')
user_obj = Users(name='李德剛', depart=depart_obj)
session.add(user_obj)
session.commit()
relationship使用

多對多操做

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

Base = declarative_base()


class Permission(Base):
    '''權限'''
    __tablename__ = 'permission'
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)


class User(Base):
    '''用戶'''
    __tablename__ = 'user'
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)


class Permission2User(Base):
    __tablename__ = 'permission2user'
    id = Column(Integer, primary_key=True, autoincrement=True)
    user_id = Column(Integer, ForeignKey('user.id'))
    permission_id = Column(Integer, ForeignKey('permission.id'))

    __table_args__ = (
        # 聯合惟一索引
        UniqueConstraint('user_id', 'permission_id', name='uc_user_permission'),
    )


"""
根據類建立數據庫表
"""
engine = create_engine(
    "mysql+pymysql://root:root@127.0.0.1:3306/test?charset=utf8",
    max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
    pool_size=5,  # 鏈接池大小
    pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
    pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
)


def init_db():
    # 建立表
    Base.metadata.create_all(engine)


def drop_db():
    Base.metadata.drop_all(engine)


if __name__ == '__main__':
    drop_db()
    init_db()
建立多對多關係
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship

Base = declarative_base()


class Permission(Base):
    '''權限'''
    __tablename__ = 'permission'
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)
    # 第一個參數:指定關聯的模型類,secondary :指定經過該表建立多對多關係,backref :反向查詢時字段
    user_list = relationship('User', secondary='permission2user', backref='permission_list')


class User(Base):
    '''用戶'''
    __tablename__ = 'user'
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)


class Permission2User(Base):
    __tablename__ = 'permission2user'
    id = Column(Integer, primary_key=True, autoincrement=True)
    user_id = Column(Integer, ForeignKey('user.id'))
    permission_id = Column(Integer, ForeignKey('permission.id'))
    __table_args__ = (
        # 聯合惟一索引
        UniqueConstraint('user_id', 'permission_id', name='uc_user_permission'),
    )


engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)

session = Session()
# 初始化數據
session.add_all(
    [
        Permission(name='新增'),
        Permission(name='查詢'),
        User(name='張三'),
        User(name='吳剛')
    ]
)
session.commit()

session.add_all([
    Permission2User(user_id=1,permission_id=1),
    Permission2User(user_id=1,permission_id=2),
    Permission2User(user_id=2,permission_id=2),
])
session.commit()
初始化數據
list = session.query(Permission2User.id, User.name, Permission.name) \
    .join(Permission, Permission2User.permission_id == Permission.id) \
    .join(User, Permission2User.user_id == User.id).all()
print(list)  # [(3, '吳剛', '查詢'), (1, '張三', '新增'), (2, '張三', '查詢')]
# 查詢擁有新增權限的用戶
permission_obj = session.query(Permission).filter_by(name='新增').first()
[print(user_obj.name) for user_obj in permission_obj.user_list]  # 張三
# 查詢吳剛擁有的權限
user_obj = session.query(User).filter_by(name='吳剛').first()
[print(permission_obj.name) for permission_obj in user_obj.permission_list]  # 查詢

#  建立一個權限,再建立兩個用戶,讓這兩個用戶擁有這個權限
permission_obj = Permission(name='修改')
permission_obj.user_list = [User(name='王五'), User(name='趙柳')]
session.add(permission_obj)
session.commit()
經常使用操做示例

兩種鏈接方式

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship, scoped_session

Base = declarative_base()


class User(Base):
    '''用戶'''
    __tablename__ = 'user'
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)


engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test", max_overflow=0, pool_size=5)
SessionFactory = sessionmaker(bind=engine)


def task1():
    # 去鏈接池獲取一個鏈接。
    session = SessionFactory()
    ret = session.query(User).all()
    # 將鏈接交還給鏈接池。
    session.close()


session = scoped_session(session_factory=SessionFactory)


def task2():
    # 當真正使用 session 時,才從鏈接池中取一個鏈接放在 threading.Local 對象的當前線程域中,實現多線程訪問時的 session 隔離。
    ret = session.query(User).all()
    # 將鏈接交還給鏈接池
    session.remove()


from threading import Thread

for i in range(20):
    t1 = Thread(target=task1)
    t1.start()
    t2 = Thread(target=task2)
    t2.start()

執行原生Sql

# 添加
cursor = session.execute('insert into user(name) values(:name)', params={"name": '劉能'})
session.commit()
print(cursor.lastrowid)  # 5

# 查詢
cursor = session.execute('select * from user')
result = cursor.fetchall()
print(result)  # [(5, '劉能'), (2, '吳剛'), (1, '張三'), (3, '王五'), (4, '趙柳')]
相關文章
相關標籤/搜索