SQLAlchemy是一個ORM框架(Object Rational Mapping,對象關係映射),它能夠幫助咱們更加優雅、更加高效的實現數據庫操做,並且還不限於mysql。html
SQLAlchemy自己沒法操做數據庫,其必須以來pymsql等第三方插件,Dialect用於和數據API進行交流,根據配置文件的不一樣調用不一樣的數據庫API,從而實現對數據庫的操做,如:python
MySQL-Python mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] pyodbc sybase+pyodbc://<username>:<password>@<dsn>[/<database>] Python-Sybase sybase+pysybase://<username>:<password>@<dsn>/[database name] mxODBC sybase+mxodbc://<username>:<password>@<dsnname>
pip install sqlalchemy #安裝mysql pip install pymysql #安裝mysql-connector2.2.3版本會報錯:Unable to find Protobuf include directory. #因此咱們指定安裝的版本 pip install mysql-connector==2.1.4
# python3 # author lizm # datetime 2018-01-28 12:00:00 ''' Demo:sqlalchemy對mysql數據庫的操做 ''' from sqlalchemy import Column,Integer, String, create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base import pymysql # 建立對象的基類: Base = declarative_base() # 定義Channel對象: class Channel(Base): # 表名 __tablename__ = 'playback' # 表結構 # Column:行聲明,可指定主鍵 Integer:數據類型 String:數據類型,可指定長度 id = Column(Integer,primary_key=True,autoincrement=True) channel_name = Column(String(45),unique=True, nullable=False) address = Column(String(80),unique=True, nullable=False) service_name = Column(String(45),unique=True, nullable=False) def __init__(self,id,channel_name,address,service_name): self.id = id self.channel_name = channel_name self.address = address self.service_name = service_name # 初始化數據庫鏈接, # 傳入參數:數據庫類型+鏈接庫+用戶名+密碼+主機,字符編碼,是否打印建表細節 engine = create_engine('mysql+mysqlconnector://root:lizm@localhost:3306/pythondb',encoding='utf-8') # 建立表 Base.metadata.create_all(engine) # 刪除表 # Base.metadata.drop_all(engine) # 建立DBSession類型: DBSession = sessionmaker(bind=engine) session = DBSession() try: # 增操做 item1 = Channel(id='1',channel_name='cctv8',address='http://10.10.10.1/cctv8',service_name='news') session.add(item1) item2 = Channel(id='2',channel_name='cctv10',address='http://10.10.10.1/cctv10',service_name='sports') session.add(item2) item3 = Channel(id='3',channel_name='cctv12',address='http://10.10.10.1/cctv12',service_name='economics') session.add(item3) #提交數據 session.commit() except Exception as e: session.rollback() finally: #關閉 session.close() # 查操做 session1 = DBSession() # 輸出sql 語句 print("查詢sql語句:%s"%session1.query(Channel).filter(Channel.id < '3')) # 返回的是一個相似列表的對象 channel = session1.query(Channel).filter(Channel.id < '3').all() for i in range(len(channel)): print(channel[i].id) print(channel[i].channel_name) print(channel[i].address) print(channel[i].service_name) session1.close() # 改操做 session2 = DBSession() session2.query(Channel).filter(Channel.id == '2').update({Channel.service_name: 'movie',Channel.address: '127.0.0.1'}, synchronize_session=False) session2.commit() session2.close() ## 查看修改結果 session3 = DBSession() print(session3.query(Channel).filter(Channel.id == '2').one().service_name) session3.close() # 刪操做 session4 = DBSession() session4.query(Channel).filter(Channel.id == '3').delete() session4.commit() session4.close()
# python3 # author lizm # datetime 2018-02-28 12:00:00 ''' Demo:sqlalchemy對sybase數據庫的操做 ''' from selenium import webdriver from sqlalchemy import Column,Integer,String,DateTime,create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import and_,func import pyodbc import time,datetime # 建立對象的基類: Base = declarative_base() # 定義Channel對象: class Channel(Base): # 表名 __tablename__ = 'shrjj' # 表結構 # Column:行聲明,可指定主鍵 Integer:數據類型 String:數據類型,可指定長度 id = Column(Integer,primary_key=True,autoincrement=True) rpname = Column(String(500), nullable=False) rpdate = Column(String(50)) jjzt = Column(String(255)) fbsjj = Column(String(255)) etf = Column(String(255)) lof = Column(String(255)) fjlof = Column(String(255)) create_date = Column(DateTime,nullable=False) update_date = Column(DateTime,nullable=False) def __init__(self,rpname,rpdate,jjzt,fbsjj,etf,lof,fjlof,create_date,update_date): self.rpname = rpname self.rpdate = rpdate self.jjzt = jjzt self.fbsjj = fbsjj self.etf = etf self.lof = lof self.fjlof = fjlof self.create_date = create_date self.update_date = update_date # 初始化數據庫鏈接, # 傳入參數:數據庫類型+鏈接庫+用戶名+密碼+主機,字符編碼,是否打印建表細節 engine = create_engine('sybase+pyodbc://username:password@dns') # 建立表 #Base.metadata.create_all(engine) # 建立DBSession類型: DBSession = sessionmaker(bind=engine) # 查操做 session1 = DBSession() # 輸出sql 語句 print("查詢sql語句:%s"%session1.query(Channel).filter(Channel.rpdate == '2018-02-12')) # 返回的是一個相似列表的對象 channel = session1.query(Channel).filter(Channel.rpdate == '2018-02-12').all() for i in range(len(channel)): print(channel[i].rpname) print(channel[i].rpdate) print(channel[i].jjzt) session1.close() # 改操做 session2 = DBSession() session2.query(Channel).filter(Channel.rpdate == '2018-02-12').update({Channel.jjzt: '0'}, synchronize_session=False) session2.commit() session2.close() # 查看修改結果 session3 = DBSession() print(session3.query(Channel).filter(Channel.rpdate == '2018-02-12').one().jjzt) session3.close() #新增數據操做 session4 = DBSession() # 增操做 item1 = Channel(rpname='市價總值',rpdate='2018-02-29',jjzt='1',fbsjj='1',etf='1',lof='1',fjlof='1',create_date=time.strftime('%Y-%m-%d %H:%M:%S'),update_date=time.strftime('%Y-%m-%d %H:%M:%S')) session4.add(item1) #提交數據 session4.commit() #關閉 session4.close()
常見的列類型:
類型名稱 python類型 描述
Integer int 常規整形,一般爲32位
SmallInteger int 短整形,一般爲16位
BigInteger int或long 精度不受限整形
Float float 浮點數
Numeric decimal.Decimal 定點數
String str 可變長度字符串
Text str 可變長度字符串,適合大量文本
Unicode unicode 可變長度Unicode字符串
Boolean bool 布爾型
Date datetime.date 日期類型
Time datetime.time 時間類型
Interval datetime.timedelta 時間間隔
Enum str 字符列表
PickleType 任意Python對象 自動Pickle序列化
LargeBinary str 二進制
常見的SQLALCHEMY列選項:
可選參數 描述
primary_key 若是設置爲True,則爲該列表的主鍵
unique 若是設置爲True,該列不容許相同值
index 若是設置爲True,爲該列建立索引,查詢效率會更高
nullable 若是設置爲True,該列容許爲空。若是設置爲False,該列不容許空值
default 定義該列的默認值
#簡單查詢 print(session.query(User).all()) print(session.query(User.name, User.fullname).all()) print(session.query(User, User.name).all()) #帶條件查詢 print(session.query(User).filter_by(name='user1').all()) print(session.query(User).filter(User.name == "user").all()) print(session.query(User).filter(User.name.like("user%")).all()) #多條件查詢 from sqlalchemy import and_ print(session.query(User).filter(and_(User.name.like("user%"), User.fullname.like("first%"))).all()) from sqlalchemy import or_ print(session.query(User).filter(or_(User.name.like("user%"), User.password != None)).all()) #sql過濾 print(session.query(User).filter("id>:id").params(id=1).all()) #關聯查詢 print(session.query(User, Address).filter(User.id == Address.user_id).all()) print(session.query(User).join(User.addresses).all()) print(session.query(User).outerjoin(User.addresses).all()) #聚合查詢 print(session.query(User.name, func.count('*').label("user_count")).group_by(User.name).all()) print(session.query(User.name, func.sum(User.id).label("user_id_sum")).group_by(User.name).all()) #子查詢 stmt = session.query(Address.user_id, func.count('*').label("address_count")).group_by(Address.user_id).subquery() print(session.query(User, stmt.c.address_count).outerjoin((stmt, User.id == stmt.c.user_id)).order_by(User.id).all()) #exists print(session.query(User).filter(exists().where(Address.user_id == User.id))) print(session.query(User).filter(User.addresses.any())) # 限制 ret = session.query(Users)[1:2] # 排序 ret = session.query(Users).order_by(Users.name.desc()).all() ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 分組 from sqlalchemy.sql import func ret = session.query(Users).group_by(Users.extra).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all() # 連表 ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all() ret = session.query(Person).join(Favor).all() ret = session.query(Person).join(Favor, isouter=True).all() # 組合 q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union(q2).all() q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union_all(q2).all()
注意:SQLAlchemy中的model必需要有主鍵,纔可使用;mysql
做者:整合俠
連接:http://www.cnblogs.com/lizm166/p/8370633.html
來源:博客園
著做權歸做者全部。商業轉載請聯繫做者得到受權,非商業轉載請註明出處。web