SQLAlchemy 是一個 ORM 框架,能夠幫助咱們使用面向對象的方式快速實現數據庫操做。html
組成部分:mysql
SQLAlchemy 自己沒法操做數據庫,其必須依賴 pymysql 等第三方插件,Dialect(方言)用於和數據 API 進行交流,根據配置文件的不一樣調用不一樣的數據庫 API ,從而實現對數據庫的操做,如:sql
MySQLDB mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] 更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html
pip3 install sqlalchemy
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) age = Column(Integer, nullable=False) name = Column(String(32), index=True, nullable=False) """ 根據類建立數據庫表 """ engine = create_engine( "mysql+pymysql://root:root@127.0.0.1:3306/1221?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) def init_db(): # 建立表 Base.metadata.create_all(engine) def drop_db(): Base.metadata.drop_all(engine) if __name__ == '__main__': drop_db() init_db()
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True)
age = Column(Integer, nullable=False) name = Column(String(32), index=True, nullable=False) engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/1221", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) # 每次執行數據庫操做時,都須要建立一個session session = Session()
# 添加單條 obj = Users(name="zhangsan",age=20) session.add(obj) # 添加多條 obj1 = Users(name='lisi',age=12) obj2 = Users(name='wangwu',age=14) session.add_all([obj1, obj2]) # 提交事務 session.commit()
# 查詢全部 user_query = session.query(Users) print(user_query) # SELECT users.id AS users_id, users.name AS users_name FROM users print(user_query.all()) # [<__main__.Users object at 0x0000000003A3E2B0>, <__main__.Users object at 0x0000000003A3E320>, <__main__.Users object at 0x0000000003A3E390>] # 過濾條件 user_query = user_query.filter(Users.id > 1) print(user_query) # SELECT users.id AS users_id, users.name AS users_name FROM users WHERE users.id > %(id_1)s print(user_query.all()) # [<__main__.Users object at 0x000000000395E4A8>, <__main__.Users object at 0x000000000395E518>] # 取第一個 print(user_query.first()) # <__main__.Users object at 0x000000000392DC88>
# 刪除 返回刪除條數 print(session.query(Users).filter(Users.id > 1).delete()) # 2 session.commit()
# 修改方式一 user_obj = session.query(Users).filter(Users.id == 1).first() user_obj.name = '張大三' session.commit() # 修改方式二 session.query(Users).filter(Users.id == 1).update({Users.name: '張小三'}) session.commit() # 在原來值基礎修改 session.query(Users).filter(Users.id == 1).update({Users.name: Users.name + '1'}, synchronize_session=False) session.query(Users).filter(Users.id == 1).update({'age': Users.age + 2}, synchronize_session='evaluate') session.commit()
# 經過字段過濾 session.query(Users).filter_by(name='zhangsan').all() # filter 中過濾多條件時它們的關係是 and session.query(Users).filter(Users.id > 1, Users.name == 'lisi').all() # between session.query(Users).filter(Users.id.between(1,2)).all() # in session.query(Users).filter(Users.id.in_([1, 3])).all() # not in session.query(Users).filter(~Users.id.in_([1, 3])).all() # 子查詢 session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='zhangsan'))).all() # and 和 or 操做 from sqlalchemy import and_, or_ session.query(Users).filter(and_(Users.id > 3, Users.name == 'lisi')).all() session.query(Users).filter(or_(Users.id < 2, Users.name == 'zhangsan')).all() session.query(Users).filter( or_( Users.id < 2, and_(Users.name == 'zhangsan', Users.id > 3), )).all() # like session.query(Users).filter(Users.name.like('z%')).all() # not like session.query(Users).filter(~Users.name.like('z%')).all() # 切片 含首不含尾 session.query(Users)[0:1] # order by session.query(Users).order_by(Users.name).all() # order by desc session.query(Users).order_by(Users.name.desc(),Users.id.asc()).all() # group by & having & max & sum & min & avg from sqlalchemy.sql import func session.query(func.max(Users.age), func.sum(Users.age), func.min(Users.age), func.avg(Users.age)).group_by(Users.age).having(func.min(Users.age) > 14).all() # [(26, Decimal('26'), 26, Decimal('26.0000'))] # 查詢指定列 & as ret = session.query(Users.name,Users.age,Users.name.label('nick_name')).all() [print(user_item[0],user_item.age,user_item.nick_name) for user_item in ret] # 連表 # where xx.id = xx.id session.query(Users, Favor).filter(Users.id == Favor.nid).all() # inner join session.query(Person).join(Favor).all() # outer join session.query(Person).join(Favor, isouter=True).all() # 組合 q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) # 合併去重 ret = q1.union(q2).all() # 合併不去重 ret = q1.union_all(q2).all()
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() class Depart(Base): __tablename__ = 'depart' id = Column(Integer, primary_key=True) title = Column(String(32), nullable=True) class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) dept_id = Column(Integer, ForeignKey('depart.id')) """ 根據類建立數據庫表 """ engine = create_engine( "mysql+pymysql://root:root@127.0.0.1:3306/1221?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) def init_db(): # 建立表 Base.metadata.create_all(engine) def drop_db(): Base.metadata.drop_all(engine) if __name__ == '__main__': drop_db() init_db()
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() class Depart(Base): __tablename__ = 'depart' id = Column(Integer, primary_key=True) title = Column(String(32), nullable=True) class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) dept_id = Column(Integer, ForeignKey('depart.id')) engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/1221", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) # 每次執行數據庫操做時,都須要建立一個session session = Session()
# 查詢全部用戶名稱 & 所屬部門名稱 user_list = session.query(Users, Depart).join(Depart, Users.dept_id == Depart.id).all() print(user_list) ''' [(<__main__.Users object at 0x000000000395C8D0>, <__main__.Depart object at 0x000000000395C940>), (<__main__.Users object at 0x000000000395C9B0>, <__main__.Depart object at 0x000000000395C940>), (<__main__.Users object at 0x000000000395CA20>, <__main__.Depart object at 0x000000000395CA90>)] ''' [print(user[0].name, user[1].title) for user in user_list] ''' 張三 後勤部 李四 後勤部 王五 財務部 ''' # SELECT users.name AS users_name, depart.title AS depart_title FROM users INNER JOIN depart ON users.dept_id = depart.id user_list = session.query(Users.name, Depart.title).join(Depart, Users.dept_id == Depart.id).all() print(user_list, type(user_list)) # [('張三', '後勤部'), ('李四', '後勤部'), ('王五', '財務部')] [print(user.name, user.title) for user in user_list] ''' 張三 後勤部 李四 後勤部 王五 財務部 ''' # SELECT users.name AS users_name, depart.title AS depart_title FROM users LEFT OUTER JOIN depart ON users.dept_id = depart.id user_list = session.query(Users.name, Depart.title).join(Depart, Users.dept_id == Depart.id, isouter=True) print(user_list)
# 正向查詢 :查詢全部用戶關聯的部門名稱 user_list = session.query(Users).all() [print(user.name, user.depart, user.depart.title) for user in user_list] ''' 張三 <__main__.Depart object at 0x00000000039697F0> 後勤部 李四 <__main__.Depart object at 0x00000000039697F0> 後勤部 王五 <__main__.Depart object at 0x0000000003969898> 財務部 ''' # 反向查詢 :查詢後勤部全部用戶名稱 depart_obj = session.query(Depart).filter_by(id=1).first() [print(user.name, user) for user in depart_obj.users] ''' 張三 <__main__.Users object at 0x000000000395FA20> 李四 <__main__.Users object at 0x000000000395F978> ''' # 關聯新增 :建立一個部門'醬油部',再在該部門中添加一個員工'李德剛' # 方式一: user_obj = Users(name='李德剛') depart_obj = Depart(title='醬油部', users=[user_obj]) session.add(depart_obj) session.commit() # 方式二: depart_obj = Depart(title='醬油部') user_obj = Users(name='李德剛', depart=depart_obj) session.add(user_obj) session.commit()
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() class Permission(Base): '''權限''' __tablename__ = 'permission' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) class User(Base): '''用戶''' __tablename__ = 'user' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) class Permission2User(Base): __tablename__ = 'permission2user' id = Column(Integer, primary_key=True, autoincrement=True) user_id = Column(Integer, ForeignKey('user.id')) permission_id = Column(Integer, ForeignKey('permission.id')) __table_args__ = ( # 聯合惟一索引 UniqueConstraint('user_id', 'permission_id', name='uc_user_permission'), ) """ 根據類建立數據庫表 """ engine = create_engine( "mysql+pymysql://root:root@127.0.0.1:3306/test?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) def init_db(): # 建立表 Base.metadata.create_all(engine) def drop_db(): Base.metadata.drop_all(engine) if __name__ == '__main__': drop_db() init_db()
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship Base = declarative_base() class Permission(Base): '''權限''' __tablename__ = 'permission' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) # 第一個參數:指定關聯的模型類,secondary :指定經過該表建立多對多關係,backref :反向查詢時字段 user_list = relationship('User', secondary='permission2user', backref='permission_list') class User(Base): '''用戶''' __tablename__ = 'user' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) class Permission2User(Base): __tablename__ = 'permission2user' id = Column(Integer, primary_key=True, autoincrement=True) user_id = Column(Integer, ForeignKey('user.id')) permission_id = Column(Integer, ForeignKey('permission.id')) __table_args__ = ( # 聯合惟一索引 UniqueConstraint('user_id', 'permission_id', name='uc_user_permission'), ) engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = Session()
# 初始化數據 session.add_all( [ Permission(name='新增'), Permission(name='查詢'), User(name='張三'), User(name='吳剛') ] ) session.commit() session.add_all([ Permission2User(user_id=1,permission_id=1), Permission2User(user_id=1,permission_id=2), Permission2User(user_id=2,permission_id=2), ]) session.commit()
list = session.query(Permission2User.id, User.name, Permission.name) \ .join(Permission, Permission2User.permission_id == Permission.id) \ .join(User, Permission2User.user_id == User.id).all() print(list) # [(3, '吳剛', '查詢'), (1, '張三', '新增'), (2, '張三', '查詢')] # 查詢擁有新增權限的用戶 permission_obj = session.query(Permission).filter_by(name='新增').first() [print(user_obj.name) for user_obj in permission_obj.user_list] # 張三 # 查詢吳剛擁有的權限 user_obj = session.query(User).filter_by(name='吳剛').first() [print(permission_obj.name) for permission_obj in user_obj.permission_list] # 查詢 # 建立一個權限,再建立兩個用戶,讓這兩個用戶擁有這個權限 permission_obj = Permission(name='修改') permission_obj.user_list = [User(name='王五'), User(name='趙柳')] session.add(permission_obj) session.commit()
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship, scoped_session Base = declarative_base() class User(Base): '''用戶''' __tablename__ = 'user' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test", max_overflow=0, pool_size=5) SessionFactory = sessionmaker(bind=engine) def task1(): # 去鏈接池獲取一個鏈接。 session = SessionFactory() ret = session.query(User).all() # 將鏈接交還給鏈接池。 session.close() session = scoped_session(session_factory=SessionFactory) def task2(): # 當真正使用 session 時,才從鏈接池中取一個鏈接放在 threading.Local 對象的當前線程域中,實現多線程訪問時的 session 隔離。 ret = session.query(User).all() # 將鏈接交還給鏈接池 session.remove() from threading import Thread for i in range(20): t1 = Thread(target=task1) t1.start() t2 = Thread(target=task2) t2.start()
# 添加 cursor = session.execute('insert into user(name) values(:name)', params={"name": '劉能'}) session.commit() print(cursor.lastrowid) # 5 # 查詢 cursor = session.execute('select * from user') result = cursor.fetchall() print(result) # [(5, '劉能'), (2, '吳剛'), (1, '張三'), (3, '王五'), (4, '趙柳')]