SQLALchemy的其餘經常使用操做

使用鏈接池的兩種方式

第一種方式:

直接從SessionFactory裏獲取,此時若是須要開啓多個進程,那麼建立鏈接池的代碼必定要放在循環裏面html

否則的話每一個進程都是用一個session了python

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Student,Course,Student2Course

engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
        max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
        pool_size=5,  # 鏈接池大小
        pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
        pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)

def task():
    # 去鏈接池中獲取一個鏈接
    session = SessionFactory()

    ret = session.query(Student).all()

    # 將鏈接交還給鏈接池
    session.close()


from threading import Thread

for i in range(20):
    t = Thread(target=task)
    t.start()

 第二種方式:

使用scoped_session建立sessionmysql

這樣建立的session在多個線程裏仍是不一樣的sql

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import Student,Course,Student2Course

engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
        max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
        pool_size=5,  # 鏈接池大小
        pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
        pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)
session = scoped_session(SessionFactory)


def task():
    ret = session.query(Student).all()
    # 將鏈接交還給鏈接池
    session.remove()


from threading import Thread

for i in range(20):
    t = Thread(target=task)
    t.start()

執行原生SQL的兩種方式

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import Student,Course,Student2Course

engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
        max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
        pool_size=5,  # 鏈接池大小
        pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
        pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)
session = scoped_session(SessionFactory)


def task():
    """"""
    # 方式一:
    # 查詢
    # cursor = session.execute('select * from users')
    # result = cursor.fetchall()

    # 添加
    cursor = session.execute('INSERT INTO users(name) VALUES(:value)', params={"value": 'wupeiqi'})
    session.commit()
    print(cursor.lastrowid)
    
    
    # 方式二:
    conn = engine.raw_connection()
    cursor = conn.cursor()
    cursor.execute(
        "select * from t1"
    )
    result = cursor.fetchall()
    cursor.close()
    conn.close()
    
    # 將鏈接交還給鏈接池
    session.remove() 

一對多操做

models.py

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column
from sqlalchemy import Integer,String,Text,Date,DateTime,ForeignKey,UniqueConstraint, Index
from sqlalchemy import create_engine
from sqlalchemy.orm import relationship


Base = declarative_base()

class Depart(Base):
    __tablename__ = 'depart'
    id = Column(Integer, primary_key=True)
    title = Column(String(32), index=True, nullable=False)

class Users(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)
    depart_id = Column(Integer,ForeignKey("depart.id"))

    dp = relationship("Depart", backref='pers')   # 不在數據庫中生成字段   只作查詢使用,反向查詢用user.pers

views.py

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users,Depart

engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
        max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
        pool_size=5,  # 鏈接池大小
        pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
        pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)

# 根據Users類對users表進行增刪改查
session = SessionFactory()

# 1. 查詢全部用戶
ret = session.query(Users).all()
for row in ret:
    print(row.id,row.name,row.depart_id)
    
# 2. 查詢全部用戶+所屬部門名稱
ret = session.query(Users.id,Users.name,Depart.title).join(Depart,Users.depart_id == Depart.id).all()
for row in ret:
    print(row.id,row.name,row.title)

query = session.query(Users.id,Users.name,Depart.title).join(Depart,Users.depart_id == Depart.id,isouter=True)
print(query)

# 3. relation字段:查詢全部用戶+所屬部門名稱
ret = session.query(Users).all()
for row in ret:
    print(row.id,row.name,row.depart_id,row.dp.title)

# 4. relation字段:查詢銷售部全部的人員
obj = session.query(Depart).filter(Depart.title == '銷售').first()
for row in obj.pers:
    print(row.id,row.name,obj.title)

# 5. 建立一個名稱叫:IT部門,再在該部門中添加一個員工:田碩
# 方式一:
d1 = Depart(title='IT')
session.add(d1)
session.commit()

u1 = Users(name='付勇',depart_id=d1.id)
session.add(u1)
session.commit()

# 方式二:
u1 = Users(name='付勇',dp=Depart(title='IT'))
session.add(u1)
session.commit()

# 6. 建立一個名稱叫:王者榮耀,再在該部門中添加一個員工:龔林峯/長美夢/王爺們
d1 = Depart(title='王者榮耀')
d1.pers = [Users(name='小A'),Users(name='小B'),Users(name='王爺們'),]
session.add(d1)
session.commit()

session.close()

多對多操做

models.py

class Student(Base):
    __tablename__ = 'student'
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)

    course_list = relationship('Course', secondary='student2course', backref='student_list')

class Course(Base):
    __tablename__ = 'course'
    id = Column(Integer, primary_key=True)
    title = Column(String(32), index=True, nullable=False)

class Student2Course(Base):
    __tablename__ = 'student2course'
    id = Column(Integer, primary_key=True, autoincrement=True)
    student_id = Column(Integer, ForeignKey('student.id'))
    course_id = Column(Integer, ForeignKey('course.id'))

    __table_args__ = (
        UniqueConstraint('student_id', 'course_id', name='uix_stu_cou'), # 聯合惟一索引
        # Index('ix_id_name', 'name', 'extra'),                          # 聯合索引
    )

views.py

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Student,Course,Student2Course

engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
        max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
        pool_size=5,  # 鏈接池大小
        pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
        pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)

# 根據Users類對users表進行增刪改查
session = SessionFactory()

# 1. 錄入數據
session.add_all([
    Student(name='先用'),
    Student(name='佳俊'),
    Course(title='生物'),
    Course(title='體育'),
])
session.commit()

session.add_all([
    Student2Course(student_id=2,course_id=1)
])
session.commit()

# 2. 三張表關聯
ret = session.query(Student2Course.id,Student.name,Course.title).join(Student,Student2Course.student_id==Student.id,isouter=True).join(Course,Student2Course.course_id==Course.id,isouter=True).order_by(Student2Course.id.asc())
for row in ret:
    print(row)
# 3. 「先用」選的全部課
ret = session.query(Student2Course.id,Student.name,Course.title).join(Student,Student2Course.student_id==Student.id,isouter=True).join(Course,Student2Course.course_id==Course.id,isouter=True).filter(Student.name=='先用').order_by(Student2Course.id.asc()).all()
print(ret)

obj = session.query(Student).filter(Student.name=='先用').first()
for item in obj.course_list:
    print(item.title)

# 4. 選了「生物」的全部人
obj = session.query(Course).filter(Course.title=='生物').first()
for item in obj.student_list:
    print(item.name)

# 5. 建立一個課程,建立2學生,兩個學生選新建立的課程。
obj = Course(title='英語')
obj.student_list = [Student(name='爲名'),Student(name='廣宗')]

session.add(obj)
session.commit()


session.close()
相關文章
相關標籤/搜索