ORM之SQLAlchemy

SQLAlchemy,ORM框架mysql

  • 類 --> 表
  • 對象 --> 一行數據
  • 類的字段 --> 數據庫表的一列

做用:幫助咱們使用類和對象快速實現數據庫操做sql

   單表操做                                                                  數據庫

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column
from sqlalchemy import Integer,String,Text,Date,DateTime
from sqlalchemy import create_engine


Base = declarative_base()


class Users(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)      # 主鍵,默認自增
    name = Column(String(32), index=True, nullable=False)   # 設置索引, 非空
    depart_id = Column(Integer)


def create_all():
    engine = create_engine(
        "mysql+pymysql://root:0000@127.0.0.1:3306/sqlalchemy?charset=utf8",
        max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
        pool_size=5,  # 鏈接池大小
        pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
        pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
    )

    Base.metadata.create_all(engine)


def drop_all():
    engine = create_engine(
        "mysql+pymysql://root:0000@127.0.0.1:3306/sqlalchemy?charset=utf8",
        max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
        pool_size=5,  # 鏈接池大小
        pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
        pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
    )
    Base.metadata.drop_all(engine)


if __name__ == '__main__':
    # drop_all()
    create_all()
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users

engine = create_engine(
        "mysql+pymysql://root:0000@127.0.0.1:3306/sqlalchemy?charset=utf8",
        max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
        pool_size=5,  # 鏈接池大小
        pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
        pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)

# 根據Users類對users表進行增刪改查
session = SessionFactory()

# 1. 增長
# obj = Users(name='zhangyafeo')
# session.add(obj)
# session.commit()

# session.add_all([
#         Users(name='姚明'),
#         Users(name='科比')
# ])
# session.commit()

# 2. 查
# result = session.query(Users).all()
# for row in result:
#         print(row.id,row.name)

# result = session.query(Users).filter(Users.id >= 2)
# for row in result:
#         print(row.id,row.name)

# result = session.query(Users).filter(Users.id >= 2).first()
# print(result)

# 3.刪
# session.query(Users).filter(Users.id >= 2).delete()
# session.commit()

# 4.改
# session.query(Users).filter(Users.id == 1).update({Users.depart_id: 1})
# session.query(Users).filter(Users.id == 2).update({'name':'姚明'})
# session.query(Users).filter(Users.id == 3).update({'name':Users.name+"NB"},synchronize_session=False)  # 字符串的增長
# session.commit()



session.close()
行操做示例
# 添加
session.add(對象)
session.add_all([
    對象1,
    對象2
])
session.commit()

# 查詢
session.query(Users).all()
session.query(Users).filter(Users.id>4)

# 刪除
session.query(Users).filter(Users.id>4).delete()

# 修改
session.query(Users).filter(Users.id>4).update({Users.age:19})
基本的增刪改查
# 1. 指定列
# select id,name as cname from users;
# result = session.query(Users.id,Users.name.label('cname')).all()
# for item in result:
#         print(item[0],item.id,item.cname)
# 查看sql語句
# query = session.query(Users.id,Users.name.label('cname'))
# print(query)
# 2. 默認條件and
# session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
# 3. between
# session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
# 4. in
# session.query(Users).filter(Users.id.in_([1,3,4])).all()
# session.query(Users).filter(~Users.id.in_([1,3,4])).all()
# 5. 子查詢
# session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name=='eric'))).all()
# 6. and 和 or
# from sqlalchemy import and_, or_
# session.query(Users).filter(Users.id > 3, Users.name == 'eric').all()
# session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
# session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
# session.query(Users).filter(
#     or_(
#         Users.id < 2,
#         and_(Users.name == 'eric', Users.id > 3),
#         Users.extra != ""
#     )).all()

# 7. filter_by
# session.query(Users).filter_by(name='alex').all()

# 8. 通配符
# ret = session.query(Users).filter(Users.name.like('e%')).all()
# ret = session.query(Users).filter(~Users.name.like('e%')).all()

# 9. 切片
# result = session.query(Users)[1:2]

# 10.排序
# ret = session.query(Users).order_by(Users.name.desc()).all()
# ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

# 11. group by
from sqlalchemy.sql import func

# ret = session.query(
#         Users.depart_id,
#         func.count(Users.id),
# ).group_by(Users.depart_id).all()
# for item in ret:
#         print(item)
#
# from sqlalchemy.sql import func
#
# ret = session.query(
#         Users.depart_id,
#         func.count(Users.id),
# ).group_by(Users.depart_id).having(func.count(Users.id) >= 2).all()
# for item in ret:
#         print(item)

# 12.union 和 union all
"""
select id,name from users
UNION
select id,name from users;
"""
# q1 = session.query(Users.name).filter(Users.id > 2)
# q2 = session.query(Favor.caption).filter(Favor.nid < 2)
# ret = q1.union(q2).all()
#
# q1 = session.query(Users.name).filter(Users.id > 2)
# q2 = session.query(Favor.caption).filter(Favor.nid < 2)
# ret = q1.union_all(q2).all()
經常使用操做

limit分頁session

1.用offset()設置索引偏移量,limit()限制取出量
db.session.query(User.name).filter(User.email.like('%'+email+'%')).limit(page_size).offset((page_index-1)*page_size)
#filter語句後面能夠跟order_by語句

2.用slice(偏移量,取出量)函數
db.session.query(User.name).filter(User.email.like('%'+email+'%')).slice((page_index - 1) * page_size, page_index * page_size)
#filter語句後面能夠跟order_by語句
註釋:此方法和第一種相同的效果。

由於:由一下內部方法可知,slice()函數第一個屬性就是offset()函數值,第二個屬性就是limit()函數值

3.用paginate(偏移量,取出量)函數,用於BaseQuery

user_obj=User.query.filter(User.email.like('%'+email+'%')).paginate(int(page_index), int(page_size),False)
#遍歷時要加上items 
object_list =user_obj.items
 

4.filter中使用limit

db.session.query(User.name).filter(User.email.like('%'+email+'%') and limit (page_index - 1) * page_size, page_size)
#此處不能再跟order_by語句,不然報錯
4種方式

  一對多操做                                                               多線程

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users, Depart

engine = create_engine(
        "mysql+pymysql://root:0000@127.0.0.1:3306/sqlalchemy?charset=utf8",
        max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
        pool_size=5,  # 鏈接池大小
        pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
        pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)

session = SessionFactory()

############################一對多操做####################################

# 1.查詢全部用戶
ret = session.query(Users).all()
for row in ret:
    print(row.id, row.name, row.depart_id)

# 2.查詢全部用戶及所屬部門名稱
# ret1 = session.query(Users, Depart).join(Depart).all()
ret1 = session.query(Users.id, Users.name, Depart.title).join(Depart,Users.depart_id == Depart.id).all()
for row in ret1:
    # print(row)
    # print(row[0].id, row[0].name, row[0].depart_id, row[1].title)
    print(row.id, row.name, row.title)

# 3.relation字段:查詢全部用戶+所屬部門名稱
ret2 = session.query(Users).all()
for row in ret2:
    print(row.id, row.name, row.depart_id, row.dp.title)

# 4.relation:查詢全部技術部的成員
ret3 = session.query(Depart).filter(Depart.title == '技術部').first()
for row in ret3.pers:
    print(row.id, row.name, ret3.title)

# 5.建立一個名稱叫 IT部門,並在該部門中添加一個員工:浩南
# 方式一
# d1 = Depart(title='IT')
# session.add(d1)
# session.commit()
# print(d1.title, d1.id)
# u = Users(name='浩南',depart_id=d1.id)
# session.add(u)
# session.commit()

# 方式二
# u = Users(name='山雞',dp=Depart(title='IT'))
# session.add(u)
# session.commit()
#


# 6.建立一個名稱叫王者榮耀的部門,並在該部門中加入詹姆斯,科比,梅西
# d2 = Depart(title='王者榮耀')
# d2.pers = [Users(name='詹姆斯'), Users(name='科比'), Users(name='梅西')]
# session.add(d2)
# session.commit()

for row in session.query(Users).all():
    print(row.id, row.name, row.dp.title)

for row in session.query(Depart).all():
    print(row.title)
    for ret2 in row.pers:
        print('\t',ret2.name)

session.close()
FK操做

  多對多操做                                                              框架

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users, Depart, Student, Course, Student2Course

engine = create_engine(
        "mysql+pymysql://root:0000@127.0.0.1:3306/sqlalchemy?charset=utf8",
        max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
        pool_size=5,  # 鏈接池大小
        pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
        pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)

session = SessionFactory()

############################多對多操做####################################

# 1.錄入數據, 添加兩個學生和班級
# session.add_all([
#     Student(name='大寶貝'),
#     Student(name='小哥哥'),
#     Course(title='生物'),
#     Course(title='體育'),
# ])
#
# session.commit()

# 2.爲學生選課
# session.add_all([
#     Student2Course(student_id=1, course_id=1),
#     Student2Course(student_id=1, course_id=2),
#     Student2Course(student_id=2, course_id=1),
# ])
# session.commit()

# 3.三張表關聯
ret = session.query(Student2Course.id, Student.name, Course.title).join(Student, Student2Course.student_id==Student.id).join(Course, Student2Course.course_id==Course.id, isouter=True).order_by(Student2Course.id.asc())
for row in ret:
    print(row)

# 4.大寶貝選的課
# ret = session.query(Student2Course.id, Student.name, Course.title).join(Student, Student2Course.student_id==Student.id).join(Course, Student2Course.course_id==Course.id).filter(Student.name=='大寶貝').all()
# print(ret)
#
# obj = session.query(Student).filter(Student.name=='大寶貝').first()
# for row in obj.course_list:
#     print(row.title)

# 5.選了生物的全部人
# objs = session.query(Course).filter(Course.title=='生物')
# print(objs)
# obj = session.query(Course).filter(Course.title=='生物').first()
# for row in obj.student_list:
#     print(row.name)

# 6.建立一個課程,建立2學生,兩個學生選新建立的課程。
# obj = Course(title='英語')
# obj.student_list = [Student(name='浩哥'),Student(name='夥計')]
# session.add(obj)
# session.commit()


# students = session.query(Student).all()
# for row in students:
#     print(row.id, row.name)
#
# courses = session.query(Course).all()
# for row in courses:
#     print(row.id, row.title)
#
# stu_cous = session.query(Student2Course).all()
# for row in stu_cous:
#     print(row.id, row.student_id, row.course_id)

session.close()
多對多操做

  多線程鏈接                                                             ide

  from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Student,Course,Student2Course

engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
        max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
        pool_size=5,  # 鏈接池大小
        pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
        pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)

def task():
    # 去鏈接池中獲取一個鏈接
    session = SessionFactory()

    ret = session.query(Student).all()

    # 將鏈接交還給鏈接池
    session.close()


from threading import Thread

for i in range(20):
    t = Thread(target=task)
    t.start()
方式一
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import Student,Course,Student2Course

engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
        max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
        pool_size=5,  # 鏈接池大小
        pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
        pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)
session = scoped_session(SessionFactory)


def task():
    ret = session.query(Student).all()
    # 將鏈接交還給鏈接池
    session.remove()


from threading import Thread

for i in range(20):
    t = Thread(target=task)
    t.start()
方式二(推薦,基於Threading.local實現)

  執行原生sql                                                            函數

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import Student,Course,Student2Course

engine = create_engine(
    "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
    max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
    pool_size=5,  # 鏈接池大小
    pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
    pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
)
SessionFactory = sessionmaker(bind=engine)
session = scoped_session(SessionFactory)


def task():
""""""
# 方式一:
"""
# 查詢
# cursor = session.execute('select * from users')
# result = cursor.fetchall()

# 添加
cursor = session.execute('INSERT INTO users(name) VALUES(:value)', params={"value": 'wupeiqi'})
session.commit()
print(cursor.lastrowid)
"""
# 方式二:
"""
# conn = engine.raw_connection()
# cursor = conn.cursor()
# cursor.execute(
#     "select * from t1"
# )
# result = cursor.fetchall()
# cursor.close()
# conn.close()
"""

# 將鏈接交還給鏈接池
session.remove()


from threading import Thread

for i in range(20):
    t = Thread(target=task)
    t.start()
執行原生sql
相關文章
相關標籤/搜索