使用鏈接池的兩種方式
第一種方式:
直接從SessionFactory裏獲取,此時若是須要開啓多個進程,那麼建立鏈接池的代碼必定要放在循環裏面html
否則的話每一個進程都是用一個session了python
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Student,Course,Student2Course engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) def task(): # 去鏈接池中獲取一個鏈接 session = SessionFactory() ret = session.query(Student).all() # 將鏈接交還給鏈接池 session.close() from threading import Thread for i in range(20): t = Thread(target=task) t.start()
第二種方式:
使用scoped_session建立sessionmysql
這樣建立的session在多個線程裏仍是不一樣的sql
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session from models import Student,Course,Student2Course engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) session = scoped_session(SessionFactory) def task(): ret = session.query(Student).all() # 將鏈接交還給鏈接池 session.remove() from threading import Thread for i in range(20): t = Thread(target=task) t.start()
執行原生SQL的兩種方式
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import Student,Course,Student2Course
engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
max_overflow=0, # 超過鏈接池大小外最多建立的鏈接
pool_size=5, # 鏈接池大小
pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯
pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
)
SessionFactory = sessionmaker(bind=engine)
session = scoped_session(SessionFactory)
def task():
""""""
# 方式一:
# 查詢
# cursor = session.execute('select * from users')
# result = cursor.fetchall()
# 添加
cursor = session.execute('INSERT INTO users(name) VALUES(:value)', params={"value": 'wupeiqi'})
session.commit()
print(cursor.lastrowid)
# 方式二:
conn = engine.raw_connection()
cursor = conn.cursor()
cursor.execute(
"select * from t1"
)
result = cursor.fetchall()
cursor.close()
conn.close()
# 將鏈接交還給鏈接池
session.remove()
一對多操做
models.py
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column from sqlalchemy import Integer,String,Text,Date,DateTime,ForeignKey,UniqueConstraint, Index from sqlalchemy import create_engine from sqlalchemy.orm import relationship Base = declarative_base() class Depart(Base): __tablename__ = 'depart' id = Column(Integer, primary_key=True) title = Column(String(32), index=True, nullable=False) class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) depart_id = Column(Integer,ForeignKey("depart.id")) dp = relationship("Depart", backref='pers') # 不在數據庫中生成字段 只作查詢使用,反向查詢用user.pers
views.py
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Users,Depart engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) # 根據Users類對users表進行增刪改查 session = SessionFactory() # 1. 查詢全部用戶 ret = session.query(Users).all() for row in ret: print(row.id,row.name,row.depart_id) # 2. 查詢全部用戶+所屬部門名稱 ret = session.query(Users.id,Users.name,Depart.title).join(Depart,Users.depart_id == Depart.id).all() for row in ret: print(row.id,row.name,row.title) query = session.query(Users.id,Users.name,Depart.title).join(Depart,Users.depart_id == Depart.id,isouter=True) print(query) # 3. relation字段:查詢全部用戶+所屬部門名稱 ret = session.query(Users).all() for row in ret: print(row.id,row.name,row.depart_id,row.dp.title) # 4. relation字段:查詢銷售部全部的人員 obj = session.query(Depart).filter(Depart.title == '銷售').first() for row in obj.pers: print(row.id,row.name,obj.title) # 5. 建立一個名稱叫:IT部門,再在該部門中添加一個員工:田碩 # 方式一: d1 = Depart(title='IT') session.add(d1) session.commit() u1 = Users(name='付勇',depart_id=d1.id) session.add(u1) session.commit() # 方式二: u1 = Users(name='付勇',dp=Depart(title='IT')) session.add(u1) session.commit() # 6. 建立一個名稱叫:王者榮耀,再在該部門中添加一個員工:龔林峯/長美夢/王爺們 d1 = Depart(title='王者榮耀') d1.pers = [Users(name='小A'),Users(name='小B'),Users(name='王爺們'),] session.add(d1) session.commit() session.close()
多對多操做
models.py
class Student(Base): __tablename__ = 'student' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) course_list = relationship('Course', secondary='student2course', backref='student_list') class Course(Base): __tablename__ = 'course' id = Column(Integer, primary_key=True) title = Column(String(32), index=True, nullable=False) class Student2Course(Base): __tablename__ = 'student2course' id = Column(Integer, primary_key=True, autoincrement=True) student_id = Column(Integer, ForeignKey('student.id')) course_id = Column(Integer, ForeignKey('course.id')) __table_args__ = ( UniqueConstraint('student_id', 'course_id', name='uix_stu_cou'), # 聯合惟一索引 # Index('ix_id_name', 'name', 'extra'), # 聯合索引 )
views.py
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Student,Course,Student2Course engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) # 根據Users類對users表進行增刪改查 session = SessionFactory() # 1. 錄入數據 session.add_all([ Student(name='先用'), Student(name='佳俊'), Course(title='生物'), Course(title='體育'), ]) session.commit() session.add_all([ Student2Course(student_id=2,course_id=1) ]) session.commit() # 2. 三張表關聯 ret = session.query(Student2Course.id,Student.name,Course.title).join(Student,Student2Course.student_id==Student.id,isouter=True).join(Course,Student2Course.course_id==Course.id,isouter=True).order_by(Student2Course.id.asc()) for row in ret: print(row) # 3. 「先用」選的全部課 ret = session.query(Student2Course.id,Student.name,Course.title).join(Student,Student2Course.student_id==Student.id,isouter=True).join(Course,Student2Course.course_id==Course.id,isouter=True).filter(Student.name=='先用').order_by(Student2Course.id.asc()).all() print(ret) obj = session.query(Student).filter(Student.name=='先用').first() for item in obj.course_list: print(item.title) # 4. 選了「生物」的全部人 obj = session.query(Course).filter(Course.title=='生物').first() for item in obj.student_list: print(item.name) # 5. 建立一個課程,建立2學生,兩個學生選新建立的課程。 obj = Course(title='英語') obj.student_list = [Student(name='爲名'),Student(name='廣宗')] session.add(obj) session.commit() session.close()