Flask-SQLAlchemy是一個Flask擴展,可以支持多種數據庫後臺,咱們能夠不須要關心SQL的處理細節,操做數據庫,一個基本關係對應一個類,而一個實體對應類的實例對象。Flask是一個輕量級的web框架,而SQLAlchemy 是轉爲Flask定製的ORM框架。mysql
安裝flask-sqlalchemyweb
pip install flask-sqlalchemy
初始化sqlalchemysql
from flask import Flask from flask.ext.sqlalchemy import SQLAlchemy app = Flask(__name__) # 配置 sqlalchemy 數據庫驅動://數據庫用戶名:密碼@主機地址:端口/數據庫?編碼 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://password:@127.0.0.1:3306/db?charset=utf8' # 初始化 db = SQLAlchemy(app)
注意:數據庫
SQLALCHEMY_DATABASE_URI 是關鍵字,默認數據源配置變量。而咱們在實際操做中,不免要同時鏈接多個數據庫,這個時候就要配置多個數據源,而SQLAlchemy提供了很方便的多數據源配置。
只需在配置文件中這樣寫就能夠:
SQLALCHEMY_DATABASE_URI = 'postgres://localhost/main' SQLALCHEMY_BINDS = { 'users': 'mysqldb://localhost/users', 'appmeta': 'sqlite:////path/to/appmeta.db' }
SQLALCHEMY_DATABASE_URI 是默認數據源,users和appmeta是其餘數據源,用的時候也很是方便,在model中加入 __bind_key__ = 'users' 便可。
數據庫表映射到模型,繼承db.model類便可flask
class RoadReportErrorCheck(db.Model): __bind_key__ = 'mdb_plat' __tablename__ = 'road_report_error_check' order_id = db.Column(db.Integer, nullable=False, default=0, primary_key=True) #訂單ID p_pid = db.Column(db.String(32), nullable=False, default='') #包ID check_state = db.Column(db.Integer, nullable=False, default=0) #0未審覈,1初審,2複審 first_checker = db.Column(db.String(32), nullable=False, default='') #軌跡點初審人 first_check_time = db.Column(db.TIMESTAMP, nullable=False, default=0) #軌跡點初審時間 checker = db.Column(db.String(32), nullable=False, default='') #軌跡點複審人 check_time = db.Column(db.TIMESTAMP, nullable=False, default=0) #軌跡點複審時間 description = db.Column(db.String(200), nullable=False, default='') #軌跡點審覈描述 receive_flag = db.Column(db.Integer, nullable=False, default=0) #外包領取,0未領,1已領 create_time = db.Column(db.TIMESTAMP, nullable=False, default=0) update_time = db.Column(db.TIMESTAMP, nullable=False, default=datetime.now)
只需初始化一個變量,添加到session中便可session
group_info = Group_Check(id=id,package_id=package_id,user=user,upload_time=upload_time,orderid=orderid,group_track_url=group_track_url,xf_links=xf_links,create_time=datetime.datetime.now())
db.session.add(group_info)
db.session。commit()
SQLAlchemy 提供了不少種方便的過濾方法。寫法以下:app
#查詢單個 Group_Check.query.fliter(Group_Check.id == id).first() #查詢全部 Group_Check.query.fliter(Group_Check.id == id).all() #條件查詢
Group_Check.query.fliter(db.and_(Group_Check.id == id, Group_Check.state==1))#求和Group_Check.query.fliter(db.and_(Group_Check.id == id, Group_Check.state==1))