sqlalchemy是python下一個著名的數據庫orm庫,能夠方便地進行數據表建立、數據增刪改查等操做html
最詳細的教程,見官方:https://docs.sqlalchemy.orgpython
這裏列舉一些經常使用操做:mysql
代碼以及相關的註釋:git
import datetime import sqlalchemy as db from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base # 爲了使用MySQL支持的類型,如unsigned bigint import sqlalchemy.dialects.mysql as mysql # 建立對象的基類: Base = declarative_base() # 定義User對象, 儘可能展現一些建立選項: class User(Base): # 表的名字: __tablename__ = 't_user' # 建立表的擴展設置,這裏設置表的字符集爲utf8 __table_args__ = ( # 聯合惟一索引 db.UniqueConstraint('id', 'name', name='uix_id_name'), # 聯合索引 db.Index('sex_createtime', 'sex', 'createtime'), # 設置引擎和字符集,注意:這個map只能放到元組的最後,不然會報錯 { 'mysql_engine': 'InnoDB', 'mysql_charset': 'utf8', } ) # 表的結構字段定義 # id字段,類型爲unsigned bigint,定義爲主鍵,自增,db.BigInteger不支持unsigned,因此使用mysql特別支持的類型 # id = db.Column(db.BigInteger(), primary_key=True, autoincrement=True) id = db.Column(mysql.BIGINT(unsigned=True), primary_key=True, autoincrement=True) # name字段,定義爲varchar(20),默認容許空,帶註釋 name = db.Column(db.String(20), comment='姓名') # phone字段,定義爲varchar(11),不容許空,有惟一限制,帶註釋 phone = db.Column(db.String(11), nullable=False, unique=True, comment='電話') # score, float類型,帶索引, 容許空 score = db.Column(db.Float, index=True, comment='成績') # sex,性別, int類型, 默認值爲1 sex = db.Column(db.Integer, default=1, comment='性別,1-男;2-女') # createtime, datetime類型, 不容許空 createtime = db.Column(db.DateTime, nullable=False, comment='建立時間') # modifytime, datetime類型, 帶索引, 默認使用datetime.now()生成當前時間,注意,不帶() modifytime = db.Column(db.DateTime, default=datetime.datetime.now, comment='修改時間') # 只是用於打印對象 def __str__(self): return "(" + ', '.join(['%s:%s' % item for item in self.__dict__.items()]) + ")" # 初始化數據庫鏈接: # 鏈接字符串模式:數據庫類型+鏈接庫+用戶名+密碼+主機,字符編碼,是否打印建表細節 # 其中,鏈接庫是當前用於操做數據庫的庫,對於python2.7,通常使用MysqlDb,對於Python3,通常使用pymysql # 鏈接的例子如:create_engine("mysql+pymysql://cai:123@localhost/test?charset=utf8", echo=True) engine = db.create_engine('mysql+pymysql://user:pass@localhost:3306/test') # 刪除現有的表,謹慎決定是否須要這樣操做 Base.metadata.drop_all(engine) # 建立表 Base.metadata.create_all(engine)
在mysql中生成的表結構以下:github
CREATE TABLE `t_user` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `name` varchar(20) DEFAULT NULL COMMENT '姓名', `phone` varchar(11) NOT NULL COMMENT '電話', `score` float DEFAULT NULL COMMENT '成績', `sex` int(11) DEFAULT NULL COMMENT '性別,1-男;2-女', `createtime` datetime NOT NULL COMMENT '建立時間', `modifytime` datetime DEFAULT NULL COMMENT '修改時間', PRIMARY KEY (`id`), UNIQUE KEY `phone` (`phone`), UNIQUE KEY `uix_id_name` (`id`,`name`), KEY `sex_createtime` (`sex`,`createtime`), KEY `ix_t_user_score` (`score`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
最經常使用的SQLAlchemy Column類型sql
類型名 | Python類型 | 說 明 |
---|---|---|
Integer | int | 普通整數,通常是 32 位 |
SmallInteger | int | 取值範圍小的整數,通常是 16 位 |
BigInteger | int 或 long | 不限制精度的整數 |
Float | float | 浮點數 |
Numeric | decimal.Decimal | 定點數 |
String | str | 變長字符串 |
Text | str | 變長字符串,對較長或不限長度的字符串作了優化 |
Unicode | unicode | 變長 Unicode 字符串 |
UnicodeText | unicode | 變長 Unicode 字符串,對較長或不限長度的字符串作了優化 |
Boolean | bool | 布爾值 |
Date | datetime.date | 日期 |
Time | datetime.time | 時間 |
DateTime | datetime.datetime | 日期和時間 |
Interval | datetime.timedelta | 時間間隔 |
Enum | str | 一組字符串 |
PickleType | 任何 Python 對象 | 自動使用 Pickle 序列化 |
LargeBinary | str | 二進制文件 |
最常使用的SQLAlchemy列選項數據庫
選項名 | 說 明 |
---|---|
primary_key | 若是設爲 True ,這列就是表的主鍵 |
unique | 若是設爲 True ,這列不容許出現重複的值 |
index | 若是設爲 True ,爲這列建立索引,提高查詢效率 |
nullable | 若是設爲 True ,這列容許使用空值;若是設爲 False ,這列不容許使用空值 |
default | 爲這列定義默認值 |
mysql的特別定義列類型選項express
BIGINT, BINARY, BIT, BLOB, BOOLEAN, CHAR, DATE, DATETIME, DECIMAL, DECIMAL, DOUBLE, ENUM, FLOAT, INTEGER, LONGBLOB, LONGTEXT, MEDIUMBLOB, MEDIUMINT, MEDIUMTEXT, NCHAR, NUMERIC, NVARCHAR, REAL, SET, SMALLINT, TEXT, TIME, TIMESTAMP, TINYBLOB, TINYINT, TINYTEXT, VARBINARY, VARCHAR, YEAR
對數據操做以前,須要定義一個會話類型,並在操做數據時,生成一個會話實例進行操做session
# 定義一個會話類型,用來進行數據操做(engine是前面步驟生成的引擎對象) DBSession = sessionmaker(bind=engine) #實例一個會話 session = DBSession() # 數據的操做,如session.add(..) # 這一句不能少,不然不會提交 session.commit() session.close()
# 插入一條數據 user1 = User(id=1,name="小明", phone="13111223344", score=98.2, sex=1, createtime=datetime.datetime.now()) session.add(user1) # 插入多條數據 users = [ User(id=2,name="小芳", phone="13111223345", score=83.1, sex=2, createtime=datetime.datetime.now()), User(id=3,name="小李", phone="13111223346", score=100, sex=2, createtime=datetime.datetime.now()), User(id=4,name="小牛", phone="13111223347", score=62.5, sex=1, createtime=datetime.datetime.now()), ] session.add_all(users) # 非orm方式,特色是快,可定製,如支持ON DUPLICATE KEY UPDATE session.execute(User.__table__.insert(), [ {"id":5,"name":"小五", "phone":"13111223348", "score":72.5, "sex":1, "createtime":datetime.datetime.now()}, ] )
如何實現ON DUPLICATE KEY UPDATE?app
sqlalchemy不支持ON DUPLICATE KEY UPDATE, 能夠本身實現一個。
基本的實現方式:
from sqlalchemy.ext.compiler import compiles from sqlalchemy.sql.expression import Insert @compiles(Insert) def append_string(insert, compiler, **kw): s = compiler.visit_insert(insert, **kw) if 'append_string' in insert.kwargs: return s + " " + insert.kwargs['append_string'] return s my_connection.execute(my_table.insert(append_string = 'ON DUPLICATE KEY UPDATE foo=foo'), my_values)
在實際使用中,這種使用方式顯得比較粗糙,通常來講,能夠枚舉對象的字段,並對每一個字段設置xxx=VALUES(xxx)進行更新。
三者的操做直接合並來講了
query = session.query(User) print query # 顯示SQL 語句 print query.statement # 同上 for user in query: # 遍歷時查詢 print user.name print query.all() # 返回的是一個相似列表的對象 print query.first().name # 記錄不存在時,first() 會返回 None # print query.one().name # 不存在,或有多行記錄時會拋出異常 print query.filter(User.id == 2).first().name print query.get(2).name # 以主鍵獲取,等效於上句 print query.filter('id = 2').first().name # 支持字符串 query2 = session.query(User.name) print query2.all() # 每行是個元組 print query2.limit(1).all() # 最多返回 1 條記錄 print query2.offset(1).all() # 從第 2 條記錄開始返回 print query2.order_by(User.name).all() print query2.order_by('name').all() print query2.order_by(User.name.desc()).all() print query2.order_by('name desc').all() print session.query(User.id).order_by(User.name.desc(), User.id).all() print query2.filter(User.id == 1).scalar() # 若是有記錄,返回第一條記錄的第一個元素 print session.query('id').select_from(User).filter('id = 1').scalar() print query2.filter(User.id > 1, User.name != 'a').scalar() # and query3 = query2.filter(User.id > 1) # 屢次拼接的 filter 也是 and query3 = query3.filter(User.name != 'a') print query3.scalar() print query2.filter(or_(User.id == 1, User.id == 2)).all() # or print query2.filter(User.id.in_((1, 2))).all() # in query4 = session.query(User.id) print query4.filter(User.name == None).scalar() print query4.filter('name is null').scalar() print query4.filter(not_(User.name == None)).all() # not print query4.filter(User.name != None).all() print query4.count() print session.query(func.count('*')).select_from(User).scalar() print session.query(func.count('1')).select_from(User).scalar() print session.query(func.count(User.id)).scalar() print session.query(func.count('*')).filter(User.id > 0).scalar() # filter() 中包含 User,所以不須要指定表 print session.query(func.count('*')).filter(User.name == 'a').limit(1).scalar() == 1 # 能夠用 limit() 限制 count() 的返回數 print session.query(func.sum(User.id)).scalar() print session.query(func.now()).scalar() # func 後能夠跟任意函數名,只要該數據庫支持 print session.query(func.current_timestamp()).scalar() print session.query(func.md5(User.name)).filter(User.id == 1).scalar() query.filter(User.id == 1).update({User.name: 'c'}) user = query.get(1) print user.name user.name = 'd' session.flush() # 寫數據庫,但並不提交 print query.get(1).name session.delete(user) session.flush() print query.get(1) session.rollback() print query.get(1).name query.filter(User.id == 1).delete() session.commit() print query.get(1)
如何批量插入大批數據?
可使用非 ORM 的方式:
session.execute( User.__table__.insert(), [{'name': `randint(1, 100)`,'age': randint(1, 100)} for i in xrange(10000)] )
session.commit()
上面我批量插入了 10000 條記錄,半秒內就執行完了;而 ORM 方式會花掉很長時間。
如何讓執行的 SQL 語句增長前綴?
使用 query 對象的 prefix_with() 方法:
session.query(User.name).prefix_with('HIGH_PRIORITY').all() session.execute(User.__table__.insert().prefix_with('IGNORE'), {'id': 1, 'name': '1'})
如何替換一個已有主鍵的記錄?
使用 session.merge() 方法替代 session.add(),其實就是 SELECT + UPDATE:
user = User(id=1, name='ooxx') session.merge(user) session.commit()
或者使用 MySQL 的 INSERT … ON DUPLICATE KEY UPDATE,須要用到 @compiles 裝飾器,有點難懂,本身看吧:《SQLAlchemy ON DUPLICATE KEY UPDATE》 和 sqlalchemy_mysql_ext。
如何使用無符號整數?
可使用 MySQL 的方言:
如何使用無符號整數?
可使用 MySQL 的方言:
模型的屬性名須要和表的字段名不同怎麼辦?
開發時遇到過一個奇怪的需求,有個其餘系統的表裏包含了一個「from」字段,這在 Python 裏是關鍵字,因而只能這樣處理了:
from_ = Column('from', CHAR(10))
如何獲取字段的長度?
Column 會生成一個很複雜的對象,想獲取長度比較麻煩,這裏以 User.name 爲例:
User.name.property.columns[0].type.length
如何指定使用 InnoDB,以及使用 UTF-8 編碼?
最簡單的方式就是修改數據庫的默認配置。若是非要在代碼裏指定的話,能夠這樣:
class User(BaseModel): __table_args__ = { 'mysql_engine': 'InnoDB', 'mysql_charset': 'utf8' }
MySQL 5.5 開始支持存儲 4 字節的 UTF-8 編碼的字符了,iOS 裏自帶的 emoji(如 � 字符)就屬於這種。
若是是對錶來設置的話,能夠把上面代碼中的 utf8 改爲 utf8mb4,DB_CONNECT_STRING 裏的 charset 也這樣更改。
若是對庫或字段來設置,則仍是本身寫 SQL 語句比較方便,具體細節可參考《How to support full Unicode in MySQL databases》。
不建議全用 utf8mb4 代替 utf8,由於前者更慢,索引會佔用更多空間。
如何設置外鍵約束?
from random import randint from sqlalchemy import ForeignKey class User(BaseModel): __tablename__ = 'user' id = Column(Integer, primary_key=True) age = Column(Integer) class Friendship(BaseModel): __tablename__ = 'friendship' id = Column(Integer, primary_key=True) user_id1 = Column(Integer, ForeignKey('user.id')) user_id2 = Column(Integer, ForeignKey('user.id')) for i in xrange(100): session.add(User(age=randint(1, 100))) session.flush() # 或 session.commit(),執行完後,user 對象的 id 屬性才能夠訪問(由於 id 是自增的) for i in xrange(100): session.add(Friendship(user_id1=randint(1, 100), user_id2=randint(1, 100))) session.commit() session.query(User).filter(User.age < 50).delete()
執行這段代碼時,你應該會遇到一個錯誤:
sqlalchemy.exc.IntegrityError: (IntegrityError) (1451, 'Cannot delete or update a parent row: a foreign key constraint fails (`ooxx`.`friendship`, CONSTRAINT `friendship_ibfk_1` FOREIGN KEY (`user_id1`) REFERENCES `user` (`id`))') 'DELETE FROM user WHERE user.age < %s' (50,)
緣由是刪除 user 表的數據,可能會致使 friendship 的外鍵不指向一個真實存在的記錄。在默認狀況下,MySQL 會拒絕這種操做,也就是 RESTRICT。InnoDB 還容許指定 ON DELETE 爲 CASCADE 和 SET NULL,前者會刪除 friendship 中無效的記錄,後者會將這些記錄的外鍵設爲 NULL。
除了刪除,還有可能更改主鍵,這也會致使 friendship 的外鍵失效。因而相應的就有 ON UPDATE 了。其中 CASCADE 變成了更新相應的外鍵,而不是刪除。
而在 SQLAlchemy 中是這樣處理的:
class Friendship(BaseModel): __tablename__ = 'friendship' id = Column(Integer, primary_key=True) user_id1 = Column(Integer, ForeignKey('user.id', ondelete='CASCADE', onupdate='CASCADE')) user_id2 = Column(Integer, ForeignKey('user.id', ondelete='CASCADE', onupdate='CASCADE'))
如何鏈接表?
from sqlalchemy import distinct from sqlalchemy.orm import aliased Friend = aliased(User, name='Friend') print session.query(User.id).join(Friendship, User.id == Friendship.user_id1).all() # 全部有朋友的用戶 print session.query(distinct(User.id)).join(Friendship, User.id == Friendship.user_id1).all() # 全部有朋友的用戶(去掉重複的) print session.query(User.id).join(Friendship, User.id == Friendship.user_id1).distinct().all() # 同上 print session.query(Friendship.user_id2).join(User, User.id == Friendship.user_id1).order_by(Friendship.user_id2).distinct().all() # 全部被別人當成朋友的用戶 print session.query(Friendship.user_id2).select_from(User).join(Friendship, User.id == Friendship.user_id1).order_by(Friendship.user_id2).distinct().all() # 同上,join 的方向相反,但由於不是 STRAIGHT_JOIN,因此 MySQL 能夠本身選擇順序 print session.query(User.id, Friendship.user_id2).join(Friendship, User.id == Friendship.user_id1).all() # 用戶及其朋友 print session.query(User.id, Friendship.user_id2).join(Friendship, User.id == Friendship.user_id1).filter(User.id < 10).all() # id 小於 10 的用戶及其朋友 print session.query(User.id, Friend.id).join(Friendship, User.id == Friendship.user_id1).join(Friend, Friend.id == Friendship.user_id2).all() # 兩次 join,因爲使用到相同的表,所以須要別名 print session.query(User.id, Friendship.user_id2).outerjoin(Friendship, User.id == Friendship.user_id1).all() # 用戶及其朋友(無朋友則爲 None,使用左鏈接)
這裏我沒提到 relationship,雖然它看上去很方便,但須要學習的內容實在太多,還要考慮不少性能上的問題,因此乾脆本身 join 吧。
爲何沒法刪除 in 操做查詢出來的記錄?
session.query(User).filter(User.id.in_((1, 2, 3))).delete()
拋出這樣的異常:
sqlalchemy.exc.InvalidRequestError: Could not evaluate current criteria in Python. Specify 'fetch' or False for the synchronize_session parameter.
但這樣是沒問題的:
session.query(User).filter(or_(User.id == 1, User.id == 2, User.id == 3)).delete()
搜了下找到《Sqlalchemy delete subquery》這個問題,提到了 delete 的一個注意點:刪除記錄時,默認會嘗試刪除 session 中符合條件的對象,而 in 操做估計還不支持,因而就出錯了。解決辦法就是刪除時不進行同步,而後再讓 session 裏的全部實體都過時:
session.query(User).filter(User.id.in_((1, 2, 3))).delete(synchronize_session=False) session.commit() # or session.expire_all()
此外,update 操做也有一樣的參數,若是後面馬上提交了,那麼加上 synchronize_session=False 參數會更快。
如何對一個字段進行自增操做
最簡單的辦法就是獲取時加上寫鎖:
user = session.query(User).with_lockmode('update').get(1) user.age += 1 session.commit()
若是不想多一次讀的話,這樣寫也是能夠的:
session.query(User).filter(User.id == 1).update({ User.age: User.age + 1 }) session.commit() # 其實字段之間也能夠作運算: session.query(User).filter(User.id == 1).update({ User.age: User.age + User.id })
參考資料:
https://blog.csdn.net/tastelife/article/details/25218895
https://www.cnblogs.com/Oliver.net/p/7345647.html