https://www.cnblogs.com/wupeiqi/articles/8259356.html
pip3 install sqlalchemy
SQLAlchemy是一個基於Python實現的ORM框架。該框架創建在 DB API之上,使用關係對象映射進行數據庫操做,
簡言之即是:將類和對象轉換成SQL,而後使用數據API執行SQL並獲取執行結果。
Engine,框架的引擎 Connection Pooling ,數據庫鏈接池 Dialect,選擇鏈接數據庫的DB API種類 Schema/Types,架構和類型 SQL Exprression Language,SQL表達式語言
models.py
import datetime from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() # https://www.cnblogs.com/wupeiqi/articles/8259356.html class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) dep_id=Column(Integer, index=True) def create_all(): """ 根據類建立數據庫表 :return: """ engine = create_engine( "mysql+pymysql://root:root@localhost:3306/mydb2?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) Base.metadata.create_all(engine) def drop_all(): """ 根據類刪除數據庫表 :return: """ engine = create_engine( "mysql+pymysql://root:root@localhost:3306/mydb2?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) Base.metadata.drop_all(engine) if __name__ == '__main__': create_all() # drop_all()
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Users engine = create_engine( "mysql+pymysql://root:root@localhost:3306/mydb2?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) # 根據Users類對users表進行增刪改查 session = SessionFactory() # ======================增============================= # # 單條數據 obj=Users(name="張三") session.add(obj) session.commit() # 多條數據 session.add_all([ Users(name="張裏"), Users(name="立刻") ]) session.commit() # ======================查============================= # for集合查詢 aa=session.query(Users).all() for row in aa: print(row.name,row.id) # # 條件查詢 bb=session.query(Users).filter(Users.id>=3) for roe in bb: print(roe.name) # # 條件查詢 res=session.query(Users).filter(Users.id>=2).first() print(res) # ======================刪============================= session.query(Users).filter(Users.id>2).delete() session.commit() # ======================改============================= session.query(Users).filter(Users.id==1).update({Users.name:"哈哈哈哈"}) session.query(Users).filter(Users.id==2).update({"name":"哈哈哈哈哈哈哈哈哈哈企鵝惡趣味去"}) session.query(Users).filter(Users.id == 4).update({'name':Users.name+"DSB"},synchronize_session=False) # 在原來的數據上進行追加 session.commit() # ======================其餘經常使用============================= # 1. 指定列 select id,name as cname from users; result = session.query(Users.id,Users.name.label('cname')).all() for item in result: print(item,item.id,item.cname) aa=session.query(Users.id,Users.name.label('cname')) print(aa) 查看sql語句 # 2. 默認條件and aa=session.query(Users).filter(Users.id > 1, Users.name == '李四').all() for i in aa: print(i.name) # 3. between 在什麼之間 aa=session.query(Users).filter(Users.id.between(4, 7), Users.name == '李四').all() for i in aa: print(i.name) # in 包含 aa=session.query(Users).filter(Users.id.in_([1,5,2])).all() #在這個範圍內 查詢的結構順序會亂 for i in aa: print(i.name)
print("--------------------------") cc=session.query(Users).filter(~Users.id.in_([1,3,4])).all() # ~不在這個範圍內 for i in cc: print(i.name) # 5. 子查詢 aa=session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name=='李四'))).all() for i in aa: print(i.name) # 6. and 和 or from sqlalchemy import and_, or_ aa=session.query(Users).filter(Users.id > 3, Users.name == '李四').all() # 默認就是and for i in aa: print(i.name) bb=session.query(Users).filter(and_(Users.id > 3, Users.name == '立刻')).all() #and #id大於3 而且name=立刻 for i in bb: print(i.name) cc=session.query(Users).filter(or_(Users.id < 2, Users.name == '黃色')).all() # or_(Users.id < 2, Users.name == '黃色')表示id小於2 或者name等於黃色 for i in cc: print(i.name) dd=session.query(Users).filter( or_( Users.id < 2, and_(Users.name == 'eric', Users.id > 3), Users.extra != "" )).all() for i in dd: print(i.name) # 7. filter_by aa=session.query(Users).filter_by(name='李四').all() print(aa) for i in aa: print(i.name) # 8. 通配符 ret1 = session.query(Users).filter(Users.name.like('李%')).all() for i in ret1: print(i.name) ret = session.query(Users).filter(~Users.name.like('黃%')).all() for i in ret: print(i.name) # 9. 切片 result = session.query(Users)[1:3] 切片 (限制/分頁) print(result) for i in result: print(i.name) # 10.排序 ret1 = session.query(Users).order_by(Users.name.desc()).all() #升序 for i in ret1: print(i.name) print("-----------------") ret2= session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 降序 for i in ret2: print(i.name) # 11. group by from sqlalchemy.sql import func # 導入 func,func 中有聚合函數 ret = session.query( Users.dep_id, func.count(Users.id), ).group_by(Users.dep_id).all() for item in ret: print(item) # group_by(字段)以後不要 query() 全部字段 print("-----------------------------------------------------") ret = session.query( Users.dep_id, func.count(Users.id), ).group_by(Users.dep_id).having(func.count(Users.id) >=2).all() 根據 name 分組,func.count(Users.id) > 2 ;根據聚合函數進行二次篩選:having for item in ret: print(item) # 12.union 和 union all 組合(垂直/上下連表):union 和 union all --- union all 去重,union 不去重 # Union All:對兩個結果集進行並集操做,包括重複行,不進行排序 # Union:對兩個結果集進行並集操做,不包括重複行,同時進行默認規則的排序; # 有關union和union all關鍵字須要注意的問題是: # union 和 union all均可以將多個結果集合並,而不單單是兩個,你能夠將多個結果集串起來。 # 使用union和union all必須保證各個select 集合的結果有相同個數的列,而且每一個列的類型是同樣的。但列名則不必定需 # union 和 union_all 的區別 # union 去重 # union_all 不去重 # 相同點:合併的兩張表的列要相同 """ select id,name from users UNION select id,name from users; """ q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union(q2).all() print(ret) q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union_all(q2).all() print(ret) session.close()
models.py
import datetime from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index from sqlalchemy.orm import relationship Base = declarative_base() # ##################### 一對多示例表 ######################### class Dep(Base): __tablename__ = 'dep' id = Column(Integer, primary_key=True) title = Column(String(50),index=True,nullable=False) class Users(Base): __tablename__ = 'users' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) dep_id = Column(Integer, ForeignKey("dep.id")) # 與生成表結構無關,僅用於查詢方便 dp = relationship("Dep", backref='pers') # 這個會在數據庫建立 只會建立一個簡單的關係 (跨表操做) def create_all(): """ 根據類建立數據庫表 :return: """ engine = create_engine( "mysql+pymysql://root:root@localhost:3306/mydb3?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) Base.metadata.create_all(engine) def drop_all(): """ 根據類刪除數據庫表 :return: """ engine = create_engine( "mysql+pymysql://root:root@localhost:3306/mydb3?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) Base.metadata.drop_all(engine) if __name__ == '__main__': create_all() # drop_all()
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Users,Dep engine = create_engine( "mysql+pymysql://root:root@localhost:3306/mydb3?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) # 根據Users類對users表進行增刪改查 session = SessionFactory() # 1. 查詢全部用戶 ret=session.query(Users).all() for roe in ret: print(roe.nid,roe.name,roe.dep_id) # 2. 查詢全部用戶+所屬部門名稱 print("————————————————————————————————————————————") res=session.query(Users,Dep).join(Dep).all() for roe in res: print(roe[0].name,roe[1].title) print("————————————————————————————————————————————") aa=session.query(Users.nid ,Users.name,Dep.title,Users.dep_id).join(Dep,Users.dep_id==Dep.id).all() for roe in aa: print(roe) print("2222222222222222") print(roe.nid,roe.name,roe.title) print("————————————————————————————————————————————") bb=session.query(Users.nid ,Users.name,Dep.title).join(Dep,Users.dep_id==Dep.id,isouter=True) print(bb) print("————————————————————————————————————————————") # 查詢全部用戶+所屬部門名稱 # relationship 字段 # 這個會在數據庫建立 只會建立一個簡單的關係 (能夠正向和反向查找) cc=session.query(Users).all() for roe in cc: print(roe.nid,roe.name,roe.dep_id,roe.dp.title) # 反查 部門對應的人 查詢工程部全部的人員 ob=session.query(Dep).filter(Dep.title=="工程部").first() for roe in ob.pers: print(roe.nid,roe.name,ob.title) # 5. 建立一個名稱叫:IT部門,再在該部門中添加一個員工:田碩 # 方式一: d1 = Dep(title='企劃部') session.add(d1) session.commit() u1 = Users(name='張無忌哈哈哈',dep_id=d1.id) session.add(u1) session.commit() # 方式二: u1 = Users(name='高聲喊',dp=Dep(title='物業部')) session.add(u1) session.commit() # 6. 建立一個名稱叫遊戲,再在該部門中添加一個員工:龔林峯/長美夢/王爺們 d1 = Dep(title='遊戲部') #一個部門建立了三我的 d1.pers = [Users(name='日地'),Users(name='趙日天'),Users(name='王尼瑪'),] session.add(d1) session.commit() session.close()
models.py
import datetime from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index from sqlalchemy.orm import relationship Base = declarative_base() # ##################### 多對多示例表 ######################### class Student(Base): __tablename__ = 'student' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) # 與生成表結構無關,僅用於查詢方便 course_list = relationship('Course', secondary='student2course', backref='student_list') class Course(Base): __tablename__ = 'course' id = Column(Integer, primary_key=True) title = Column(String(32), index=True, nullable=False) class Student2Course(Base): __tablename__ = 'student2course' id = Column(Integer, primary_key=True, autoincrement=True) student_id = Column(Integer, ForeignKey('student.id')) course_id = Column(Integer, ForeignKey('course.id')) __table_args__ = ( UniqueConstraint('student_id', 'course_id', name='uix_stu_cou'), # 聯合惟一索引 # Index('ix_id_name', 'name', 'extra'), # 聯合索引 ) def create_all(): """ 根據類建立數據庫表 :return: """ engine = create_engine( "mysql+pymysql://root:root@localhost:3306/mydb4?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) Base.metadata.create_all(engine) def drop_all(): """ 根據類刪除數據庫表 :return: """ engine = create_engine( "mysql+pymysql://root:root@localhost:3306/mydb4?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) Base.metadata.drop_all(engine) if __name__ == '__main__': create_all() # drop_all()
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Student,Course,Student2Course engine = create_engine( "mysql+pymysql://root:root@localhost:3306/mydb4?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) # 根據Users類對users表進行增刪改查 session = SessionFactory() # 1. 錄入數據 session.add_all([ Student(name='隔壁老王'), Student(name='唐僧'), Course(title='語文'), Course(title='地理'), ]) session.commit() session.add_all([ Student2Course(student_id=3,course_id=1) ]) session.commit() # 2. 三張表關聯 ret = session.query(Student2Course.id,Student.name,Course.title).join(Student,Student2Course.student_id==Student.id,isouter=True).join(Course,Student2Course.course_id==Course.id,isouter=True).order_by(Student2Course.id.asc()) for row in ret: print(row) print("________________________________----") # (1, '王尼瑪', '生物') # (2, '隔壁老王', '體育') # (3, '唐僧', '地理') # (4, '王尼瑪', '語文') # (5, '李四', '體育') # (6, '李四', '地理') # (7, '李四', '語文') # (8, '隔壁老王', '生物') # (9, '唐僧', '體育') # (10, '唐僧', '生物') # 3. 「王尼瑪」選的全部課 ret = session.query(Student2Course.id,Student.name,Course.title).join(Student,Student2Course.student_id==Student.id,isouter=True).join(Course,Student2Course.course_id==Course.id,isouter=True).filter(Student.name=='李四').order_by(Student2Course.id.asc()).all() print(ret) # [(5, '李四', '體育'), (6, '李四', '地理'), (7, '李四', '語文')] # 4. 「唐僧」選的全部課 # relationship # 與生成表結構無關,僅用於查詢方便 print("________________________________----") obj = session.query(Student).filter(Student.name=='唐僧').first() for item in obj.course_list: print(item.title) # 生物 # 體育 # 地理 print("________________________________----") # relationship # 與生成表結構無關,僅用於查詢方便 # 4. 選了「生物」的全部人 反向查找 obj1 = session.query(Course).filter(Course.title=='生物').first() for item in obj1.student_list: print(item.name) # 王尼瑪 # 隔壁老王 # 唐僧 # 5. 建立一個課程,建立2學生,兩個學生選新建立的課程。 obj = Course(title='英語') obj.student_list = [Student(name='老司機'),Student(name='幺二二')] session.add(obj) session.commit() session.close()
方式一(推薦) :每次線程來都去獲取一個鏈接(內部是基於threading.local) 方式二:也差很少
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session from models import Student,Course,Student2Course engine = create_engine( "mysql+pymysql://root:root@localhost:3306/mydb4?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) session = scoped_session(SessionFactory) def task(): ret = session.query(Student).all() for row in ret: print(row.name) # 李四 # 王尼瑪 # 隔壁老王 # 唐僧 # 李四 # 將鏈接交還給鏈接池 session.remove() from threading import Thread for i in range(20): t = Thread(target=task) t.start()
方式二
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session from models import Student,Course,Student2Course engine = create_engine( "mysql+pymysql://root:root@localhost:3306/mydb4?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) def task(): # 去鏈接池中獲取一個鏈接 session = SessionFactory() ret = session.query(Student).all() for row in ret: print(row.name) # 唐僧 # 李四 # 李四 # 將鏈接交還給鏈接池 session.close() from threading import Thread for i in range(20): t = Thread(target=task) t.start()
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session from models import Student,Course,Student2Course engine = create_engine( "mysql+pymysql://root:root@localhost:3306/mydb4?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) session = scoped_session(SessionFactory) def task(): """""" # 方式一 原生sql: """ # 查詢 # cursor = session.execute('select * from users') # result = cursor.fetchall() # 添加 cursor = session.execute('INSERT INTO users(name) VALUES(:value)', params={"value": 'wupeiqi'}) session.commit() print(cursor.lastrowid) """ # 方式二 原生sql: conn = engine.raw_connection() cursor = conn.cursor() cursor.execute( "select * from t1" ) result = cursor.fetchall() cursor.close() conn.close() # 將鏈接交還給鏈接池 session.remove() from threading import Thread for i in range(20): t = Thread(target=task) t.start()