十五 .Flask SQLAlchemy 使用(單表 一對多表 多對多表 操做) 數據庫鏈接方方式(多線程)原生sql

一 . SQLAlchemy 基本使用  單表  一對多表   多對多表 數據庫鏈接方方式(多線程)原生sql

https://www.cnblogs.com/wupeiqi/articles/8259356.html

1.SQLAlchemy 介紹

pip3 install sqlalchemy

SQLAlchemy是一個基於Python實現的ORM框架。該框架創建在 DB API之上,使用關係對象映射進行數據庫操做,
簡言之即是:將類和對象轉換成SQL,而後使用數據API執行SQL並獲取執行結果。
Engine,框架的引擎
Connection Pooling ,數據庫鏈接池
Dialect,選擇鏈接數據庫的DB API種類
Schema/Types,架構和類型
SQL Exprression Language,SQL表達式語言

2. SQLAlchemy 使用單表經常使用操做

models.py

import
datetime from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() # https://www.cnblogs.com/wupeiqi/articles/8259356.html class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) dep_id=Column(Integer, index=True) def create_all(): """ 根據類建立數據庫表 :return: """ engine = create_engine( "mysql+pymysql://root:root@localhost:3306/mydb2?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) Base.metadata.create_all(engine) def drop_all(): """ 根據類刪除數據庫表 :return: """ engine = create_engine( "mysql+pymysql://root:root@localhost:3306/mydb2?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) Base.metadata.drop_all(engine) if __name__ == '__main__': create_all() # drop_all()

 

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users


engine = create_engine(
       "mysql+pymysql://root:root@localhost:3306/mydb2?charset=utf8",
        max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
        pool_size=5,  # 鏈接池大小
        pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
        pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)
# 根據Users類對users表進行增刪改查
session = SessionFactory()

# ======================增=============================
# # 單條數據
obj=Users(name="張三")
session.add(obj)
session.commit()


# 多條數據
session.add_all([
          Users(name="張裏"),
          Users(name="立刻")
])
session.commit()


# ======================查=============================
# for集合查詢
aa=session.query(Users).all()
for row in  aa:
    print(row.name,row.id)

# # 條件查詢
bb=session.query(Users).filter(Users.id>=3)
for roe in  bb:
    print(roe.name)

# # 條件查詢
res=session.query(Users).filter(Users.id>=2).first()
print(res)


# ======================刪=============================
session.query(Users).filter(Users.id>2).delete()
session.commit()

# ======================改=============================
session.query(Users).filter(Users.id==1).update({Users.name:"哈哈哈哈"})
session.query(Users).filter(Users.id==2).update({"name":"哈哈哈哈哈哈哈哈哈哈企鵝惡趣味去"})
session.query(Users).filter(Users.id == 4).update({'name':Users.name+"DSB"},synchronize_session=False)  # 在原來的數據上進行追加
session.commit()

# ======================其餘經常使用=============================

# 1. 指定列
select id,name as cname from users;
result = session.query(Users.id,Users.name.label('cname')).all()
for item in result:
        print(item,item.id,item.cname)

aa=session.query(Users.id,Users.name.label('cname'))
print(aa) 查看sql語句



# 2. 默認條件and
aa=session.query(Users).filter(Users.id > 1, Users.name == '李四').all()
for i in aa:
    print(i.name)



# 3. between 在什麼之間
aa=session.query(Users).filter(Users.id.between(4, 7), Users.name == '李四').all()
for i in aa:
    print(i.name)




# in 包含
aa=session.query(Users).filter(Users.id.in_([1,5,2])).all() #在這個範圍內 查詢的結構順序會亂
for i in aa:
    print(i.name)
print("--------------------------") cc=session.query(Users).filter(~Users.id.in_([1,3,4])).all() # ~不在這個範圍內 for i in cc: print(i.name) # 5. 子查詢 aa=session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name=='李四'))).all() for i in aa: print(i.name) # 6. and 和 or from sqlalchemy import and_, or_ aa=session.query(Users).filter(Users.id > 3, Users.name == '李四').all() # 默認就是and for i in aa: print(i.name) bb=session.query(Users).filter(and_(Users.id > 3, Users.name == '立刻')).all() #and #id大於3 而且name=立刻 for i in bb: print(i.name) cc=session.query(Users).filter(or_(Users.id < 2, Users.name == '黃色')).all() # or_(Users.id < 2, Users.name == '黃色')表示id小於2 或者name等於黃色 for i in cc: print(i.name) dd=session.query(Users).filter( or_( Users.id < 2, and_(Users.name == 'eric', Users.id > 3), Users.extra != "" )).all() for i in dd: print(i.name) # 7. filter_by aa=session.query(Users).filter_by(name='李四').all() print(aa) for i in aa: print(i.name) # 8. 通配符 ret1 = session.query(Users).filter(Users.name.like('李%')).all() for i in ret1: print(i.name) ret = session.query(Users).filter(~Users.name.like('黃%')).all() for i in ret: print(i.name) # 9. 切片 result = session.query(Users)[1:3] 切片 (限制/分頁) print(result) for i in result: print(i.name) # 10.排序 ret1 = session.query(Users).order_by(Users.name.desc()).all() #升序 for i in ret1: print(i.name) print("-----------------") ret2= session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 降序 for i in ret2: print(i.name) # 11. group by from sqlalchemy.sql import func # 導入 func,func 中有聚合函數 ret = session.query( Users.dep_id, func.count(Users.id), ).group_by(Users.dep_id).all() for item in ret: print(item) # group_by(字段)以後不要 query() 全部字段 print("-----------------------------------------------------") ret = session.query( Users.dep_id, func.count(Users.id), ).group_by(Users.dep_id).having(func.count(Users.id) >=2).all() 根據 name 分組,func.count(Users.id) > 2 ;根據聚合函數進行二次篩選:having for item in ret: print(item) # 12.union 和 union all 組合(垂直/上下連表):union 和 union all --- union all 去重,union 不去重 # Union All:對兩個結果集進行並集操做,包括重複行,不進行排序 # Union:對兩個結果集進行並集操做,不包括重複行,同時進行默認規則的排序; # 有關union和union all關鍵字須要注意的問題是: # union 和 union all均可以將多個結果集合並,而不單單是兩個,你能夠將多個結果集串起來。 # 使用union和union all必須保證各個select 集合的結果有相同個數的列,而且每一個列的類型是同樣的。但列名則不必定需 # union 和 union_all 的區別 # union 去重 # union_all 不去重 # 相同點:合併的兩張表的列要相同 """ select id,name from users UNION select id,name from users; """ q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union(q2).all() print(ret) q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union_all(q2).all() print(ret) session.close()

3. SQLAlchemy 一對多(ForeignKey)

 
models.py

import
datetime from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index from sqlalchemy.orm import relationship Base = declarative_base() # ##################### 一對多示例表 ######################### class Dep(Base): __tablename__ = 'dep' id = Column(Integer, primary_key=True) title = Column(String(50),index=True,nullable=False) class Users(Base): __tablename__ = 'users' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) dep_id = Column(Integer, ForeignKey("dep.id")) # 與生成表結構無關,僅用於查詢方便 dp = relationship("Dep", backref='pers') # 這個會在數據庫建立 只會建立一個簡單的關係 (跨表操做) def create_all(): """ 根據類建立數據庫表 :return: """ engine = create_engine( "mysql+pymysql://root:root@localhost:3306/mydb3?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) Base.metadata.create_all(engine) def drop_all(): """ 根據類刪除數據庫表 :return: """ engine = create_engine( "mysql+pymysql://root:root@localhost:3306/mydb3?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) Base.metadata.drop_all(engine) if __name__ == '__main__': create_all() # drop_all()

 

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users,Dep

engine = create_engine(
    "mysql+pymysql://root:root@localhost:3306/mydb3?charset=utf8",
        max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
        pool_size=5,  # 鏈接池大小
        pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
        pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)

# 根據Users類對users表進行增刪改查
session = SessionFactory()



# 1. 查詢全部用戶
ret=session.query(Users).all()
for roe in  ret:
    print(roe.nid,roe.name,roe.dep_id)

 # 2. 查詢全部用戶+所屬部門名稱
print("————————————————————————————————————————————")
res=session.query(Users,Dep).join(Dep).all()
for roe in  res:
    print(roe[0].name,roe[1].title)

print("————————————————————————————————————————————")
aa=session.query(Users.nid ,Users.name,Dep.title,Users.dep_id).join(Dep,Users.dep_id==Dep.id).all()
for roe in  aa:
    print(roe)
    print("2222222222222222")
    print(roe.nid,roe.name,roe.title)


print("————————————————————————————————————————————")
bb=session.query(Users.nid ,Users.name,Dep.title).join(Dep,Users.dep_id==Dep.id,isouter=True)
print(bb)


print("————————————————————————————————————————————")
# 查詢全部用戶+所屬部門名稱 # relationship 字段       # 這個會在數據庫建立  只會建立一個簡單的關係  (能夠正向和反向查找)
cc=session.query(Users).all()
for roe in  cc:
    print(roe.nid,roe.name,roe.dep_id,roe.dp.title)


# 反查 部門對應的人               查詢工程部全部的人員
ob=session.query(Dep).filter(Dep.title=="工程部").first()
for roe in  ob.pers:
    print(roe.nid,roe.name,ob.title)


# 5. 建立一個名稱叫:IT部門,再在該部門中添加一個員工:田碩 # 方式一:
d1 = Dep(title='企劃部')
session.add(d1)
session.commit()

u1 = Users(name='張無忌哈哈哈',dep_id=d1.id)
session.add(u1)
session.commit()


# 方式二:
u1 = Users(name='高聲喊',dp=Dep(title='物業部'))
session.add(u1)
session.commit()
# 6. 建立一個名稱叫遊戲,再在該部門中添加一個員工:龔林峯/長美夢/王爺們
d1 = Dep(title='遊戲部')     #一個部門建立了三我的
d1.pers = [Users(name='日地'),Users(name='趙日天'),Users(name='王尼瑪'),]
session.add(d1)
session.commit()

session.close()

4. SQLAlchemy 多對多(ForeignKey+ForeignKey)

     
 
 
models.py

import
datetime from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index from sqlalchemy.orm import relationship Base = declarative_base() # ##################### 多對多示例表 ######################### class Student(Base): __tablename__ = 'student' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) # 與生成表結構無關,僅用於查詢方便 course_list = relationship('Course', secondary='student2course', backref='student_list') class Course(Base): __tablename__ = 'course' id = Column(Integer, primary_key=True) title = Column(String(32), index=True, nullable=False) class Student2Course(Base): __tablename__ = 'student2course' id = Column(Integer, primary_key=True, autoincrement=True) student_id = Column(Integer, ForeignKey('student.id')) course_id = Column(Integer, ForeignKey('course.id')) __table_args__ = ( UniqueConstraint('student_id', 'course_id', name='uix_stu_cou'), # 聯合惟一索引 # Index('ix_id_name', 'name', 'extra'), # 聯合索引 ) def create_all(): """ 根據類建立數據庫表 :return: """ engine = create_engine( "mysql+pymysql://root:root@localhost:3306/mydb4?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) Base.metadata.create_all(engine) def drop_all(): """ 根據類刪除數據庫表 :return: """ engine = create_engine( "mysql+pymysql://root:root@localhost:3306/mydb4?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) Base.metadata.drop_all(engine) if __name__ == '__main__': create_all() # drop_all()

 

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Student,Course,Student2Course


engine = create_engine(
    "mysql+pymysql://root:root@localhost:3306/mydb4?charset=utf8",
        max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
        pool_size=5,  # 鏈接池大小
        pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
        pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)
# 根據Users類對users表進行增刪改查
session = SessionFactory()

# 1. 錄入數據
session.add_all([
    Student(name='隔壁老王'),
    Student(name='唐僧'),
    Course(title='語文'),
    Course(title='地理'),
])
session.commit()
session.add_all([
    Student2Course(student_id=3,course_id=1)
])
session.commit()


# 2. 三張表關聯
ret = session.query(Student2Course.id,Student.name,Course.title).join(Student,Student2Course.student_id==Student.id,isouter=True).join(Course,Student2Course.course_id==Course.id,isouter=True).order_by(Student2Course.id.asc())
for row in ret:
    print(row)
print("________________________________----")
# (1, '王尼瑪', '生物')
# (2, '隔壁老王', '體育')
# (3, '唐僧', '地理')
# (4, '王尼瑪', '語文')
# (5, '李四', '體育')
# (6, '李四', '地理')
# (7, '李四', '語文')
# (8, '隔壁老王', '生物')
# (9, '唐僧', '體育')
# (10, '唐僧', '生物')



# 3. 「王尼瑪」選的全部課
ret = session.query(Student2Course.id,Student.name,Course.title).join(Student,Student2Course.student_id==Student.id,isouter=True).join(Course,Student2Course.course_id==Course.id,isouter=True).filter(Student.name=='李四').order_by(Student2Course.id.asc()).all()
print(ret)
# [(5, '李四', '體育'), (6, '李四', '地理'), (7, '李四', '語文')]


# 4. 「唐僧」選的全部課
# relationship  # 與生成表結構無關,僅用於查詢方便
print("________________________________----")
obj = session.query(Student).filter(Student.name=='唐僧').first()
for item in obj.course_list:
    print(item.title)
# 生物
# 體育
# 地理


print("________________________________----")
# relationship # 與生成表結構無關,僅用於查詢方便
# 4. 選了「生物」的全部人     反向查找
obj1 = session.query(Course).filter(Course.title=='生物').first()
for item in obj1.student_list:
    print(item.name)
# 王尼瑪
# 隔壁老王
# 唐僧


# 5. 建立一個課程,建立2學生,兩個學生選新建立的課程。
obj = Course(title='英語')
obj.student_list = [Student(name='老司機'),Student(name='幺二二')]
session.add(obj)
session.commit()


session.close()

 5. 數據庫鏈接方式(多線程 每次都要去數據庫獲取鏈接 )   注意:鏈接放在全局只會獲取一次

方式一(推薦) :每次線程來都去獲取一個鏈接(內部是基於threading.local)  方式二:也差很少

from
sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session from models import Student,Course,Student2Course engine = create_engine( "mysql+pymysql://root:root@localhost:3306/mydb4?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) session = scoped_session(SessionFactory) def task(): ret = session.query(Student).all() for row in ret: print(row.name) # 李四 # 王尼瑪 # 隔壁老王 # 唐僧 # 李四 # 將鏈接交還給鏈接池 session.remove() from threading import Thread for i in range(20): t = Thread(target=task) t.start()

 

方式二 
from
sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session from models import Student,Course,Student2Course engine = create_engine( "mysql+pymysql://root:root@localhost:3306/mydb4?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) def task(): # 去鏈接池中獲取一個鏈接 session = SessionFactory() ret = session.query(Student).all() for row in ret: print(row.name) # 唐僧 # 李四 # 李四 # 將鏈接交還給鏈接池 session.close() from threading import Thread for i in range(20): t = Thread(target=task) t.start()

 6. 原生sql

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import Student,Course,Student2Course

engine = create_engine(
        "mysql+pymysql://root:root@localhost:3306/mydb4?charset=utf8",
        max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
        pool_size=5,  # 鏈接池大小
        pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
        pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)
session = scoped_session(SessionFactory)


def task():
    """"""
    # 方式一 原生sql:
    """
    # 查詢
    # cursor = session.execute('select * from users')
    # result = cursor.fetchall()

    # 添加
    cursor = session.execute('INSERT INTO users(name) VALUES(:value)', params={"value": 'wupeiqi'})
    session.commit()
    print(cursor.lastrowid)
    """
    # 方式二 原生sql:
    
    conn = engine.raw_connection()
    cursor = conn.cursor()
    cursor.execute(
        "select * from t1"
    )
    result = cursor.fetchall()
    cursor.close()
    conn.close()
    

    # 將鏈接交還給鏈接池
    session.remove()

from threading import Thread

for i in range(20):
    t = Thread(target=task)
    t.start()
相關文章
相關標籤/搜索