Object-Relational Mapping,把關係數據庫的表結構映射到對象上。使用面向對象的方式來操做數據庫。python
下面是一個關係模型與Python對象之間的映射關係:mysql
SQLAlchemy是一個ORM框架。內部是使用了鏈接池
來管理數據庫鏈接。其自己只是作了關係映射,不能鏈接數據庫,也不能執行sql語句,它在底層須要使用pymysql等模塊來鏈接並執行sql語句,要使用sqlalchemy,那麼須要先進行安裝:nginx
pip3 install sqlalchemy
查看版本sql
In [1]: import sqlalchemy In [2]: print(sqlalchemy.__version__) 1.3.1
先來總結一下使用sqlalchemy框架操做數據庫的通常流程:數據庫
建立引擎
(不一樣類型數據庫使用不一樣的鏈接方式)建立基類
(類對象要繼承,由於基類會利用元編程爲咱們的子類綁定關於表的其餘屬性信息)建立實體類
(用來對應數據庫中的表)編寫實體類屬性
(用來對應表中的字段/屬性)建立表
(若是表不存在,則須要執行語句在數據庫中建立出對應的表)實例化
(具體的一條record記錄)建立會話session
(用於執行sql語句的鏈接)使用會話執行SQL語句
關閉會話
sqlalchemy 使用引擎管理數據庫鏈接(DATABASE URLS),鏈接的通常格式爲:django
dialect+driver://username:password@host:port/database
dialect
:表示什麼數據庫(好比,mysql,sqlite,oracle等)driver
:用於鏈接數據庫的模塊(好比pymysql,mysqldb等)username
:鏈接數據庫的用戶名password
:鏈接數據庫的密碼host
: 數據庫的主機地址port
: 數據庫的端口database
: 要鏈接的數據庫名稱pymysql模塊是較長用於鏈接mysql的模塊,使用pymysql的鏈接的語句爲:編程
mysql+pymysql://dahl:123456@10.0.0.13:3306/test
建立引擎用於進行數據庫的鏈接:create_engine(urls)
安全
import sqlalchemy db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test' engine = sqlalchemy.create_engine(db_url,echo=True)
特別注意
:建立引擎並不會立刻鏈接數據庫,直到讓數據庫執行任務是才鏈接。session
sqlalchemy內部是原生支持鏈接池的,咱們能夠僅僅利用它的鏈接池功能。經過engie的raw_connection
方法就能夠獲取到一個鏈接,而後就能夠執行sql語句了(基本上就是Pymysql+DBUtils的實現)
import threading import time import sqlalchemy import pymysql engine = sqlalchemy.create_engine( 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test', max_overflow=5, pool_size=1, pool_timeout=30, pool_recycle=-1 ) def func(): conn = engine.raw_connection() cursor = conn.cursor(pymysql.cursors.DictCursor) time.sleep(2) cursor.execute('select * from employees;') res = cursor.fetchall() print(res) cursor.close() conn.close() if __name__ == '__main__': for i in range(10): threading.Thread(target=func).start()
上面是經過連接池來執行sql的,其實也能夠經過session來執行。
import threading from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker engine = sqlalchemy.create_engine( 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test', max_overflow=5, pool_size=1, pool_timeout=30, pool_recycle=-1 ) DBsession = sessionmaker(bind=engine) def func(): session = DBsession() cursor = session.execute('select * from employees;') res = cursor.fetchall() print(res) cursor.close() session.close() if __name__ == '__main__': for i in range(10): threading.Thread(target=func).start()
使用: sqlalchemy.ext.declarative.declarative_base
來構造聲明性類定義的基類。由於sqlalchemy內部大量使用了元編程,爲實例化的子類注入映射所需的屬性,因此咱們定義的映射要繼承自它(必須繼承)
通常只須要一個這樣的基類
Base = sqlalchemy.ext.declarative.declarative_base() # 或者 from sqlalchemy.ext import declarative Base = declarative.declarative_base()
現數據庫存在以下表
CREATE TABLE `student` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(30) DEFAULT NULL, `age` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4;
建立對應的實體類:
import sqlalchemy from sqlalchemy.ext import declarative from sqlalchemy import Column, String, Integer db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test' engine = sqlalchemy.create_engine(db_url, echo=True) Base = declarative.declarative_base() class Student(Base): __tablename__ = 'student' id = Column(Integer, primary_key=True, nullable=False, autoincrement=True) name = Column(String, default='Null') age = Column(Integer, default='Null') print(Student.__dict__) # Table('student', MetaData(bind=None), 這裏沒有綁定engine,因此是None # Column('id', Integer(), table=<student>, primary_key=True, nullable=False), # Column('name', String(), table=<student>, default=ColumnDefault('Null')), # Column('age', Integer(), table=<student>, default=ColumnDefault('Null')), schema=None), # Column('data',DateTime,default=datetime.datetime.now) # 這裏不能加括號
type
:字段類型(好比String、Integer,這裏是sqlalchemy包裝的類型,對應的是數據庫的varchar、int等),來自TypeEngine的子類autoincrement
:是否自增nullable
: 是否能夠爲空primary_key
: 主鍵注意:Column和String、Integer等都來自於sqlalchemy下的方法,要麼直接導入,要麼就使用sqlalchemy.String來引用。
類型名 | python中類型 | 說明 |
---|---|---|
Integer | int | 普通整數,通常是32位 |
SmallInteger | int | 取值範圍小的整數,通常是16位 |
BigInteger | int或long | 不限制精度的整數 |
Float | float | 浮點數 |
Numeric | decimal.Decimal | 普通整數,通常是32位 |
String | str | 變長字符串 |
Text | str | 變長字符串,對較長或不限長度的字符串作了優化 |
Unicode | unicode | 變長Unicode字符串 |
UnicodeText | unicode | 變長Unicode字符串,對較長或不限長度的字符串作了優化 |
Boolean | bool | 布爾值 |
Date | datetime.date | 時間 |
Time | datetime.datetime | 日期和時間 |
LargeBinary | str | 二進制文件 |
經過咱們構建的類,來實例化的對象,在未來就是數據庫中的一條條記錄。
student = Student() student.name = 'daxin' student.id = 1 student.age = 20 print(student) # <1 daxin 20>
咱們本身寫的類都是繼承自Base,每繼承一次Base類,在Base類的metadata屬性中就會記錄當前子類,metadata提供了方法用於刪除/建立表。若是數據庫中已經存在對應的表,那麼將不會繼續建立
Base = declarative.declarative_base() Base.metadata.create_all(bind=engine) # 須要經過引擎去執行 # 下面是engine的echo爲true時的輸出信息 # 2019-03-16 16:53:45,922 INFO sqlalchemy.engine.base.Engine # CREATE TABLE hello ( # id INTEGER NOT NULL AUTO_INCREMENT, # name VARCHAR(24), # age INTEGER, # PRIMARY KEY (id) # ) # # # 2019-03-16 16:53:45,922 INFO sqlalchemy.engine.base.Engine {} # 2019-03-16 16:53:45,926 INFO sqlalchemy.engine.base.Engine COMMIT # 2019-03-16 16:53:45,927 INFO sqlalchemy.engine.base.Engine # CREATE TABLE world ( # id INTEGER NOT NULL AUTO_INCREMENT, # name VARCHAR(24), # age INTEGER, # PRIMARY KEY (id) # ) # # # 2019-03-16 16:53:45,927 INFO sqlalchemy.engine.base.Engine {} # 2019-03-16 16:53:45,928 INFO sqlalchemy.engine.base.Engine COMMIT
注意:sqlalchemy 只能建立和刪除表,不能修改表結構。只能手動的在數據庫中修改而後在代碼中添加便可。
在一個會話中操做數據庫,繪畫創建在鏈接上,鏈接被引擎管理,當第一次使用數據庫時,從引擎維護的鏈接池中取出一個鏈接使用。
from sqlalchemy.orm import sessionmaker Session = sessionmaker(bind=engine) session = Session() # 實例化一個session對象
scoped_session
是sqlalchemy提供的線程安全的session,利用的是ThreadLocal實現的。
方法 | 含義 |
---|---|
add() | 增長一個對象 |
add_all() | 增長多個對象,類型爲可迭代 |
import sqlalchemy from sqlalchemy.ext import declarative from sqlalchemy import Column, String, Integer from sqlalchemy.orm import sessionmaker db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test' engine = sqlalchemy.create_engine(db_url, echo=True) Base = declarative.declarative_base() DBSession = sessionmaker(bind=engine) session = DBSession() class Student(Base): __tablename__ = 'student' id = Column(Integer, primary_key=True, nullable=False, autoincrement=True) name = Column(String(24), default='Null') age = Column(Integer, default='Null') def __repr__(self): return '<{} {} {}>'.format( self.id, self.name, self.age ) daxin = Student(id=12, name='daxin', age=20) session.add(daxin) dachenzi = Student(id=13,name='dachenzi', age=21) xiaobai = Student(id=14,name='xiaobai', age=22) session.add_all((dachenzi,xiaobai)) # 新增三條數據 session.commit()
須要注意的是下面的狀況:
daxin = Student(name='daxin') daxin.age = 40 session.add(daxin) # 1 session.commit() daxin.age = 20 session.add(daxin) # 2 session.commit() daxin.age = 10 session.add_all([daxin, daxin, daxin, daxin]) # 3 session.commit() # <30 daxin 10>
結果生成個1條數據,<30 daxin 10>,爲何呢?因爲id屬於自增列,咱們在執行#1時,id是沒有固定下來的。
執行時,因爲都是daxin的,id,name,age都沒有改變,因此只會執行1條語句
當engine的echo等於true時,看到具體的sql語句,一切就很明白了。
使用session的query方法進行簡單查詢,格式爲:
session.query(student)
: 等同於select * from student;session.query(student).get(2)
: 等同於select * from student where id = 2,這裏的get方法只能主鍵查詢std_list = session.query(Student) print(std_list) # SELECT student.id AS student_id, student.name AS student_name, student.age AS student_age FROM student print(type(std_list)) # <class 'sqlalchemy.orm.query.Query'>
這裏直接打印並不會結果,由於它太懶了,你不迭代它,它就不會真的去數據庫查詢。
std_list = session.query(Student) for std in std_list: print(std) # 經過get來過濾主鍵,是能夠直接執行返回結果的。 std_list = session.query(Student).get(30) print(std_list)
修改的數據的流程分爲兩步:
std = session.query(Student).get(30) print(std) # <30 daxin 200> std.age = 1000 session.add(std) session.commit() std = session.query(Student).get(30) print(std) # <30 daxin 1000> # 或者 std = session.query(Student).filter(Student.id > 10).update({'name':'daxin'}) std.commit()
大部分ORM是都是這樣,必須先查才能改。
在原有數據的基礎上批量修改,好比在全部名稱後面添加特定的後綴
session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False) # 必須爲synchronize_session=False session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate") # 必須synchronize_session="evaluate",表示要進行計算
使用session.delete來刪除數據
std = session.query(Student).get(30) session.delete(std) session.commit() # 提交刪除操做 std = session.query(Student).get(30) print(std) # None
當咱們建立一條數據,或是從數據庫中獲取一條數據時,數據自己是存在一個狀態屬性的,用來標識當前數據是否持久化,或者其餘狀態,在sqlalchemy中,inspect(obj)
能夠用來窺探數據(obj)的狀態
daxin = Student(name='daxin') daxin.age = 10000 state = sqlalchemy.inspect(daxin) print(state) # <sqlalchemy.orm.state.InstanceState object at 0x00000186EEC38EB8> std = session.query(Student).get(28) state = sqlalchemy.inspect(std) print(state) # <sqlalchemy.orm.state.InstanceState object at 0x00000186EFDCDA20>
咱們看到,inspect返回的是一個InstanceState對象。這個對象有如下幾個屬性:
key:key是多少
InstanceState對象,能夠經過sqlalchemy.orm.state導入
具體的狀態信息與含義以下:
狀態 | 說明 |
---|---|
transient |
實體類還沒有加入到session中,同時並無保存到數據庫中 |
pending |
transient的實體被加入到session中,狀態切換到pending,但它尚未被flsh到數據庫中 |
persistent |
session中的實體對象對應着數據庫中的真實記錄。pending狀態在提交成功後能夠變成persistent狀態,或者查詢成功返回的實體也是persistent狀態 |
deleted |
實體被刪除且已經flush但未commit完成。事物提交成功了,實體變成detached,事物失敗返回persistent狀態 |
detached |
刪除成功的實體進入這個狀態 |
因此數據的狀態變化以下:
commit()之後,變爲detached,提交失敗,回退到persistent狀態
flush()方法,主動把改變應用到數據庫中去
刪除、修改操做,須要對應一個真實存在的數據,也就是說數據的狀態是persistent才行。當使用add語句新增信息時,若是這個對象已經添加過數據庫了,那麼它的狀態會變爲persistent,若是對persistent的數據進行修改繼續提交的話,那麼使用的將會是update語句而非insert。這也是前面爲啥屢次對一個數據進行add,提交了屢次只會插入1次的緣由。
import sqlalchemy from sqlalchemy.ext import declarative from sqlalchemy import Column, String, Integer from sqlalchemy.orm import sessionmaker db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test' engine = sqlalchemy.create_engine(db_url, echo=True) Base = declarative.declarative_base() DBSession = sessionmaker(bind=engine) session = DBSession() class Student(Base): __tablename__ = 'student' id = Column(Integer, primary_key=True, nullable=False, autoincrement=True) name = Column(String(24), default='Null') age = Column(Integer, default='Null') def __repr__(self): return '<{} {} {}>'.format( self.id, self.name, self.age ) def getstate(state: InstanceState): print(""" session_id={} transient={} _attached={} pending={} persistent={} deleted={} detached={} """.format(state.session_id, state.transient, state._attached, state.pending, state.persistent, state.deleted, state.detached)) daxin = Student(name='daxin') daxin.age = 10000 state = sqlalchemy.inspect(daxin) getstate(state) # session_id=None transient=True _attached=False pending=False persistent=False deleted=False detached=False session.add(daxin) getstate(state) # session_id=1 transient=False _attached=True pending=True persistent=False deleted=False detached=False session.commit() getstate(state) # session_id=1 transient=False _attached=True pending=False persistent=True deleted=False detached=False std = session.query(Student).get(28) state = sqlalchemy.inspect(std) getstate(state) # session_id=1 transient=False _attached=True pending=False persistent=True deleted=False detached=False std = session.query(Student).get(31) session.delete(std) state = sqlalchemy.inspect(std) getstate(state) # session_id=1 transient=False _attached=True pending=False persistent=True deleted=False detached=False session.flush() getstate(state) # session_id=1 transient=False _attached=True pending=False persistent=False deleted=True detached=False session.commit() getstate(state) # session_id=None transient=False _attached=False pending=False persistent=False deleted=False detached=True
在數據庫的字段類型中,好比性別字段,咱們可能會限制數據來源爲M(male),F(Female),這個時候字段類型能夠是枚舉的,可是在sqlalchemy中,原生的字段類型沒有枚舉類型,那麼就須要藉助enum類了。
import sqlalchemy from sqlalchemy.ext import declarative from sqlalchemy import Column, String, Integer from sqlalchemy.orm import sessionmaker import enum db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test' engine = sqlalchemy.create_engine(db_url, echo=True) Base = declarative.declarative_base() DBSession = sessionmaker(bind=engine) session = DBSession() class GenderEnum(enum.Enum): M = 'M' F = 'F' class Student(Base): __tablename__ = 'student' id = Column(Integer, primary_key=True, nullable=False, autoincrement=True) name = Column(String(24), default='Null') age = Column(Integer, default='Null') gender = Column(sqlalchemy.Enum(GenderEnum),nullable=False) def __repr__(self): return '<{} {} {}>'.format( self.id, self.name, self.age )
用起來很麻煩,因此建議性別使用1或者0來存儲,顯示的時候作對於轉換便可
使用filter方法進行條件過濾查詢:
session.query(student).filter(student.id > 10)
:至關於select * from student where student.id > 10
同時還存在一個filter_by,它的不一樣之處在於括號中的不是表達式,而是參數。好比:filter(user.id == 10) -> filter_by(user.id = 10)
where條件中的關係:
AND
(與) 對應 and_
OR
(或) 對應 or_
not
(非) 對應 not_
in
對應字段的 in_
not in
對應字段的 notin_
like
對應 字段的like方法not like
對應 字段的notlike方法想要使用與或非,須要先行導入
import sqlalchemy from sqlalchemy.ext import declarative from sqlalchemy import Column, String, Integer from sqlalchemy.orm import sessionmaker from sqlalchemy import and_, or_, not_ db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test' engine = sqlalchemy.create_engine(db_url, echo=True) Base = declarative.declarative_base() DBSession = sessionmaker(bind=engine) session = DBSession() class Student(Base): __tablename__ = 'student' id = Column(Integer, primary_key=True, nullable=False, autoincrement=True) name = Column(String(24), default='Null') age = Column(Integer, default='Null') def __repr__(self): return '<{} {} {}>'.format( self.id, self.name, self.age ) def getresulte(stds): for std in stds: print(std)
條件判斷之AND
(&,and_)
# and std_list = session.query(Student).filter(and_(Student.id < 30, Student.id > 27)) getresulte(std_list) std_list = session.query(Student).filter(Student.id < 30).filter(Student.id > 27) # filter返回的仍是一個結果集,因此還能夠繼續使用filter進行過濾 getresulte(std_list) std_list = session.query(Student).filter(Student.id < 30, Student.age > 100) # 多個條件一塊兒寫,也是and的關係 getresulte(std_list) std_list = session.query(Student).filter((Student.name == 'daxin') & (Student.age > 28)) getresulte(std_list)
條件判斷之OR
(or_,|)
# or std_list = session.query(Student).filter(or_(Student.id > 27, Student.age < 50)) getresulte(std_list) std_list = session.query(Student).filter((Student.id > 27) | (Student.age < 50 )) getresulte(std_list)
條件判斷之NOT
(not_,~)
# not std_list = session.query(Student).filter(not_(Student.id == 32)) getresulte(std_list) std_list = session.query(Student).filter(~(Student.id == 32)) getresulte(std_list)
like
和in
及not in
# like std_list = session.query(Student).filter(Student.name.like('da%')) getresulte(std_list) # not like std_list = session.query(Student).filter(Student.name.notlike('da%')) getresulte(std_list) # in std_list = session.query(Student).filter(Student.age.in_([10,30,50])) getresulte(std_list) # not in std_list = session.query(Student).filter(Student.age.notin_([10,30,50])) getresulte(std_list)
補充:
r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all() # 傳參的方式查詢 r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all() # 用的很少
# 升序 std_list = session.query(Student).filter(Student.name.like('da%')).order_by(Student.age) getresulte(std_list) # 降序 std_list = session.query(Student).filter(Student.name.like('da%')).order_by(Student.age.desc()) getresulte(std_list)
# limit std_list = session.query(Student).filter(Student.name.like('da%')).order_by(Student.age.desc()).limit(3).offset(2) getresulte(std_list) # select * from Student where name like 'da%' order_by age desc limit 3 offset 2
# count std_list = session.query(Student) print(std_list.count()) # first std_list = session.query(Student).first() print(std_list)
first本質上就是limit。
sqlalchemy一樣提供了聚合方法,使用sqlalchemy.func來調用
func提供的方法有
# max std_list = session.query(Student.name,sqlalchemy.func.max(Student.age)) # select name,max(age) from Student; getresulte(std_list) # count std_list = session.query(Student.name,sqlalchemy.func.count(Student.id)).group_by(Student.name) # SELECT name, count(id) FROM student GROUP BY name getresulte(std_list)
sqlalchemy提供ForeignKey用來進行外鍵關聯,它的格式爲:
sqlalchemy.ForeignKey(表名.字段名,ondelete='更新規則') # 若是填寫映射後的class,那麼能夠直接寫:類.字段 # 若是填寫數據庫中的表,那麼須要使用引號:'數據庫表名.字段名'
更新規則和刪除規則,可選項以下:
CASCADE
:級聯刪除,刪除被關聯數據時,從表關聯的數據所有刪除。SET NULL
:從父表刪除或更新行,會設置子表中的外鍵列爲NULL,但必須保證子表沒有指定 NOT NULL,也就是說子表的字段能夠爲NULL才行。RESTRICT
:若是從父表刪除主鍵,若是子表引用了,則拒絕對父表的刪除或更新操做。(保護數據)NO ACTION
:表中SQL的關鍵字,在MySQL中與RESTRICT相同。拒絕對父表的刪除或更新操做。現有以下關係表
CREATE TABLE `departments` ( `dept_no` char(4) NOT NULL, `dept_name` varchar(40) NOT NULL, PRIMARY KEY (`dept_no`), UNIQUE KEY `dept_name` (`dept_name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; CREATE TABLE `employees` ( `emp_no` int(11) NOT NULL, `birth_date` date NOT NULL, `first_name` varchar(14) NOT NULL, `last_name` varchar(16) NOT NULL, `gender` enum('M','F') NOT NULL, `hire_date` date NOT NULL, PRIMARY KEY (`emp_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; CREATE TABLE `dept_emp` ( `emp_no` int(11) NOT NULL, `dept_no` char(4) NOT NULL, `from_date` date NOT NULL, `to_date` date NOT NULL, PRIMARY KEY (`emp_no`,`dept_no`), KEY `dept_no` (`dept_no`), CONSTRAINT `dept_emp_ibfk_1` FOREIGN KEY (`emp_no`) REFERENCES `employees` (`emp_no`) ON DELETE CASCADE, CONSTRAINT `dept_emp_ibfk_2` FOREIGN KEY (`dept_no`) REFERENCES `departments` (`dept_no`) ON DELETE CASCADE ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
建立對應的映射實體類
import sqlalchemy from sqlalchemy.ext import declarative from sqlalchemy import Column, String, Integer, Date from sqlalchemy.orm import sessionmaker import enum db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test' engine = sqlalchemy.create_engine(db_url, echo=True) Base = declarative.declarative_base() DBSession = sessionmaker(bind=engine) session = DBSession() class Departments(Base): __tablename__ = 'departments' dept_no = Column(String(4), nullable=False, primary_key=True) dept_name = Column(String(40), nullable=False, unique=True) def __repr__(self): return '<{} {} {}>'.format(self.__class__.__name__, self.dept_no, self.dept_name) class GenderEnum(enum.Enum): M = 'M' F = 'F' class Employees(Base): __tablename__ = 'employees' emp_no = Column(Integer, nullable=False, primary_key=True) birth_date = Column(Date, nullable=False) first_name = Column(String(14), nullable=False) last_name = Column(String(16), nullable=False) gender = Column(sqlalchemy.Enum(GenderEnum), nullable=False) hire_date = Column(Date, nullable=False) def __repr__(self): return '<{} {} {} {} {} {}>'.format( self.__class__.__name__, self.emp_no, self.birth_date, self.first_name, self.last_name, self.gender, self.hire_date ) class Dept_emp(Base): __tablename__ = 'dept_emp' emp_no = Column(Integer, sqlalchemy.ForeignKey(Employees.emp_no, ondelete='CASCADE'), primary_key=True, ) dept_no = Column(String(4), sqlalchemy.ForeignKey(Departments.dept_no, ondelete='CASCADE'), nullable=False) from_date = Column(Date, nullable=False) to_date = Column(Date, nullable=False) def __repr__(self): return '<{} emp_no={} dept_no={}>'.format( self.__class__.__name__, self.emp_no, self.dept_no ) def getres(emps): for emp in emps: print(emp)
查詢10010員工所在的部門編號及員工信息
emps = session.query(Employees, Dept_emp).filter(and_(Employees.emp_no == Dept_emp.emp_no, Employees.emp_no == '10010')).all() getres(emps) emps = session.query(Employees, Dept_emp).filter(Employees.emp_no == Dept_emp.emp_no).filter(Employees.emp_no == '10010').all() getres(emps) # 至關於:select * from employees,dept_emp where employees.emp_no = dept_emp.emp_no and employees.emp_no = '10010';
使用join()關鍵字來進行連表查詢,其isouter參數用於指定join的類型,默認狀況下使用的是inner join,當isouter=True時,就表示是left join(right join沒有實現,須要本身交換前面表的順序)
# join鏈接 std_list = session.query(Employees).join(Dept_emp,Employees.emp_no == Dept_emp.emp_no,isouter=True).filter(Employees.emp_no == '10010') getres(std_list) std_list = session.query(Employees).join(Dept_emp).filter((Employees.emp_no == Dept_emp.emp_no) & (Employees.emp_no == '10010')) getres(std_list) # 若是query中,僅列出一個代表,至關於 # select employees.* from employees inner join dept_emp on employees.emp_no = dept_emp.emp_no where employees.emp_no = '10010';
一對可能是一種表與表的關係,在orm中建立和使用方式有一寫特色,這裏單獨描述
經過ForeignKey來建立一對多關係,須要注意的是它內部須要填寫對應的真實的表名和字段(非映射的類對象)
class Deptment(base): __tablename__ = 'deptment' id = Column(Integer, autoincrement=True, primary_key=True) dep_name = Column(String(32), nullable=False) class User(base): __tablename__ = 'user' id = Column(Integer, autoincrement=True, primary_key=True) name = Column(String(32), nullable=False) dept_id = Column(Integer, ForeignKey('deptment.id')) # CREATE TABLE `user` ( # `id` int(11) NOT NULL AUTO_INCREMENT, # `name` varchar(32) NOT NULL, # `dept_id` int(11) DEFAULT NULL, # PRIMARY KEY (`id`), # KEY `dept_id` (`dept_id`), # CONSTRAINT `user_ibfk_1` FOREIGN KEY (`dept_id`) REFERENCES `deptment` (`id`) # ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; # CREATE TABLE `deptment` ( # `id` int(11) NOT NULL AUTO_INCREMENT, # `dep_name` varchar(32) NOT NULL, # PRIMARY KEY (`id`) # ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
添加部門數據時,只能使用id來插入,當插入的dept_id在deptment表中不存在時,會直接爆出異常
# 批量添加部門數據 session.add_all([ Deptment(dep_name='運維部'), Deptment(dep_name='開發部'), Deptment(dep_name='產品部'), Deptment(dep_name='測試部')] ) # 添加用戶數據 session.add_all([ User(name='daxin', dept_id=1), User(name='dachenzi', dept_id=2), User(name='dahl', dept_id=3)] ) session.commit()
在表中使用relationship來建立一個關聯的對象,便於查詢(和django的一對多隱含的對象相同,但在sqlalchemy中必須經過relationship才能夠生成),並不會生成新的字段,僅僅是產生一個關聯關係。
注意relationship對象不能做爲條件直接進行表查詢:session.query(User.name,User.deptment.dept_name) 這樣是不行的。
relationship(obj,backref='')
下面是一個例子:
class Deptment(base): __tablename__ = 'deptment' id = Column(Integer, autoincrement=True, primary_key=True) dep_name = Column(String(32), nullable=False) class User(base): __tablename__ = 'user' id = Column(Integer, autoincrement=True, primary_key=True) name = Column(String(32), nullable=False) dept_id = Column(Integer, ForeignKey('deptment.id')) deptment = relationship(Deptment, backref='user') # 建立relationship關係 # 正向查(經過User查deptment) std = session.query(User).filter(User.id == 1).first() print(std.name) print(std.deptment.dep_name) # 直接經過deptment對象,讀取Deptment表中對於的信息 # 反向查(經過depement查User) dep = session.query(Deptment).filter(Deptment.id == 1).first() print(dep.user[0].name) # 反向查,經過user獲取到的數據是一個列表
relationship,就是經過在一個映射類中增長一個屬性,該屬性用於表示鏈接關係,能夠在結果中,訪問該屬性來訪問關聯的表信息
存在relationship的映射關係時,咱們添加數據時,就能夠經過relationship使用對象來添加關聯數據了
dep = session.query(Deptment).filter(Deptment.id == 3).first() session.add_all([ User(name='hello', deptment=dep), User(name='world', deptment=dep)] ) session.commit()
直接建立新的部門對象也是能夠的
user = User(name='dahlhin',deptment=Deptment(dep_name='管理員')) session.add(user) session.commit()
和django不一樣sqlalchemy的第三張表須要手動建立。
模擬主機與業務線的歸屬問題。
這是一個典型的多對多關係,須要額外一張關係表來記錄。
class Service2host(base): __tablename__ = 'service_to_host' id = Column(Integer, primary_key=True, nullable=False, unique=True, autoincrement=True) s_id = Column(Integer, ForeignKey('service.id')) h_id = Column(Integer, ForeignKey('host.id')) __table_args__ = ( # 聯合惟一索引 UniqueConstraint('s_id', 'h_id', name='serivce_to_host'), ) # 業務id和主機id不能重複,這裏建立惟一索引來約束 class Service(base): __tablename__ = 'service' id = Column(Integer, primary_key=True, nullable=False, unique=True, autoincrement=True) ser_name = Column(String(16), nullable=False) class Host(base): __tablename__ = 'host' id = Column(Integer, primary_key=True, nullable=False, unique=True, autoincrement=True) hostname = Column(String(16), nullable=False) datetime = Column(DateTime, default=datetime.datetime.now, nullable=False) # CREATE TABLE `host` ( # `id` int(11) NOT NULL AUTO_INCREMENT, # `hostname` varchar(16) NOT NULL, # `datetime` date NOT NULL, # PRIMARY KEY (`id`), # UNIQUE KEY `id` (`id`) # ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; # CREATE TABLE `service` ( # `id` int(11) NOT NULL AUTO_INCREMENT, # `ser_name` varchar(16) NOT NULL, # PRIMARY KEY (`id`), # UNIQUE KEY `id` (`id`) # ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; # CREATE TABLE `service_to_host` ( # `id` int(11) NOT NULL AUTO_INCREMENT, # `s_id` int(11) DEFAULT NULL, # `h_id` int(11) DEFAULT NULL, # PRIMARY KEY (`id`), # UNIQUE KEY `id` (`id`), # UNIQUE KEY `serivce_to_host` (`s_id`,`h_id`), # KEY `h_id` (`h_id`), # CONSTRAINT `service_to_host_ibfk_1` FOREIGN KEY (`s_id`) REFERENCES `service` (`id`), # CONSTRAINT `service_to_host_ibfk_2` FOREIGN KEY (`h_id`) REFERENCES `host` (`id`) # ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
若是沒有relationship關聯對象,那麼咱們將須要分別操做三張表,使用relationship將會大大簡化這個過程,那麼在Service中建立關係:
hosts = relationship('host', secondary='service_to_host', backref='services')
建立後:
# 添加一個業務,並建立幾臺主機 session.add( Service(ser_name='運營', hosts=[ Host(hostname='openstack'), Host(hostname='nginx') ]) ) # 添加一個主機,並關聯幾個業務線 service = session.query(Service).filter(or_(Service.ser_name == '運營', Service.ser_name == '運維')).all() session.add( Host(hostname='Tomcat', services=service) ) # 查詢一個業務線下都有哪些主機 service = session.query(Service).filter(Service.ser_name == '運營').first() for host in service.hosts: print(host.id, host.hostname) # 查詢一臺主機都歸屬哪些業務線 host = session.query(Host).filter(Host.hostname == 'openstack').first() for servcie in host.services: print(service.id, service.ser_name)
當咱們啓動多線程來執行數據庫操做時,每一個線程都會用到session來執行sql語句,通常狀況下,咱們會在線程內不建立新的session來執行sql語句,下面是一個利用session完成原生sql的執行。
import threading from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker engine = create_engine( 'mysql+pymysql://dahl:123456@10.0.0.10:3306/test', max_overflow=5, pool_size=1, pool_timeout=30, pool_recycle=-1 ) DBsession = sessionmaker(bind=engine) def func(): session = DBsession() cursor = session.execute('show tables') res = cursor.fetchall() print(res) cursor.close() session.close() if __name__ == '__main__': for i in range(10): threading.Thread(target=func).start()
這裏會產生一個問題,每一個線程啓動時都會建立一個session,用完又會關閉,這樣太麻煩了,這裏可使用scoped_session對象來完成,它相似與threading.Local的實現方式。做用是,當線程調用scoped_session對象的功能時,好比各類sql查詢,在其內部會爲每一個線程建立一個新的session對象,讓它來使用。
# 1 引入scoped_session from sqlalchemy.orm import scoped_session # 2 全局建立session對象 session = scoped_session(DBsession) # 3 多進程內直接使用 def func(): cursor = session.execute('show tables') res = cursor.fetchall() print(res) cursor.close() session.remove() # 使用完畢須要remove
不須要關閉,直接使用scoped_session對象的remove方法便可。
實現過程:
源碼以下:
# scoping def instrument(name): def do(self, *args, **kwargs): return getattr(self.registry(), name)(*args, **kwargs) return do for meth in Session.public_methods: setattr(scoped_session, meth, instrument(meth)) # registry是ThreadLocalRegistry對象 self.registry = ThreadLocalRegistry(session_factory) # ThreadLocalRegistry對象內部經過threading.local實現的 def __init__(self, createfunc): self.createfunc = createfunc # Session對象 self.registry = threading.local() # registry()其實調用的就是__call__方法 def __call__(self): try: return self.registry.value except AttributeError: val = self.registry.value = self.createfunc() # 第一次執行,就會實例化一個Session對象 return val
當使用join語句連表查詢時,不免會碰到兩個表重名的字段,這裏就可使用label
來對字段進行別名顯示。
stds = session.query(User.name.label('username'), User.id, Deptment.dep_name).join(Deptment).all() for std in stds: print(std.username, std.id, std.dep_name)