from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column from sqlalchemy import Integer,String,Text,Date,DateTime from sqlalchemy import create_engine Base = declarative_base() class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) def create_all(): engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/day120?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) Base.metadata.create_all(engine) def drop_all(): engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/day120?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置)from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column from sqlalchemy import Integer,String,Text,Date,DateTime from sqlalchemy import create_engine Base = declarative_base() class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) def create_all(): engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/day120?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) Base.metadata.create_all(engine) def drop_all(): engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/day120?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) Base.metadata.drop_all(engine) if __name__ == '__main__': create_all() ) Base.metadata.drop_all(engine) if __name__ == '__main__': create_all()
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from s1 import Users engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/day120?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) # 根據Users類對users表進行增刪改查 session = SessionFactory() # 增 # obj=Users(name="alex") # session.add(obj) # session.commit() # 查 # ret=session.query(Users).all() # for item in ret: # print(item.name) # ret=session.query(Users).filter(Users.id==1).first() # ret = session.query(Users).filter_by(id = 1).first() # print(ret.name) # 3.刪 # session.query(Users).filter(Users.id >= 2).delete() # session.commit() # # session.query(Users).filter(Users.id == 1).update({Users.name: '111'}) # session.commit() session.query(Users).filter(Users.id == 4).update({Users.name:"666"}) session.close()