SQLAlchemy(1)

介紹

SQLAlchemy是一個基於Python實現的ORM框架。該框架創建在 DB API之上,使用關係對象映射進行數據庫操做,簡言之即是:將類和對象轉換成SQL,而後使用數據API執行SQL並獲取執行結果,並把獲取的結果轉爲python對象。其中發sql到mysql服務器,從mysql服務器拿結果都是藉助其餘工具來完成的,例如pymysql.html

  • Engine,框架的引擎
  • Connection Pooling ,數據庫鏈接池
  • Dialect,選擇鏈接數據庫的DB API種類
  • Schema/Types,架構和類型
  • SQL Exprression Language,SQL表達式語言

SQLAlchemy自己沒法操做數據庫,其必須以來pymsql等第三方插件,Dialect用於和數據API進行交流,根據配置文件的不一樣調用不一樣的數據庫API,從而實現對數據庫的操做,如:python

shell

MySQL-Python mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>

pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]mysql

MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>sql

cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]docker

更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html

單表

單表的建立

import datetime import time

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy import Column
from sqlalchemy import Integer, String, Date數據庫

from sqlalchemy.orm import sessionmakerdjango

Base = declarative_base()flask

engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/test?charset=utf8",
encoding='utf8',
max_overflow=0,
pool_size=5,
pool_timeout=20,
pool_recycle=-1
)bash

class User(Base):
# tablename 字段必須有,不然會報錯
tablename = 'user'
# 不一樣於django model 會自動加主鍵,sqlalchemy須要手動加主鍵
id = Column(Integer, primary_key=True)
name = Column(String(32), nullable=False)
# 時間類型的default的默認值須要使用datetime.date.today(), 可是使用flask-sqlalchemy的時候使用datetime.date.today
date = Column(Date, default=datetime.date.today())

def create_table():
# 建立全部的表,表若是存在也不會重複建立,只會建立新的表,並且sqlalchemy默認不支持修改表結構
# 要想和django orm同樣能修改表結構並反映到數據庫須要藉助第三方組件
Base.metadata.create_all(engine)

def drop_table():
# 刪除全部的表
Base.metadata.drop_all(engine)

單表的增刪改查

# 增長 # user = User(name='jack') # session.add(user) # session.commit() # session.close() # # 增長多條 # user_list = [User(name='a'), User(name='b'), User(name='c')] # session.add_all(user_list) # session.commit() # # result 是一個列表,裏面存放着對象 # result = session.query(User).all() # for item in result: # print(item.name) # 查詢最後加all() 獲得的是一個存放對象的列表,不加all() 經過print 打印出的是sql語句 # 可是結果還是一個可迭代的對象,只不過對象的__str__ 返回的是sql語句,迭代的時候裏面的對象 # 是一個類元組的對象,可使用下標取值,也能夠經過對象的`.`方式取值 # result = session.query(User.name, User.date).filter(User.id>3) # for item in result: # print(item[0], item.date) # 條件查詢 from sqlalchemy import and_, or_,func ## 邏輯查詢 r0 = session.query(User).filter(User.id.in_([1, 2])) r1 = session.query(User).filter(~User.id.in_([1, 2])) r2 = session.query(User).filter(User.name.startswith('j'), User.id>2) r3 = session.query(User).filter( or_( User.id>3, and_(User.name=='jack', User.id<2) ) ) ## 通配符 r4 = session.query(User).filter(User.name.like('%j')) r5 = session.query(User).filter(~User.name.like('%j')) ## limit 和django orm 同樣都是經過索引來限制 r6 = session.query(User)[0:4] ## 排序, 排序通常是倒數第二的位置,倒數第一是limit r7 = session.query(User).order_by(User.id.desc()) ## 分組和聚合 r8 = session.query(func.max(User.id)).group_by(User.name).all() # 改, 獲得的結果是收到影響的記錄條數 # r9 = session.query(User).filter(User.id==2).update({'name': User.name + User.name.concat('hh')}, synchronize_session=False) # 刪除 session.query(User).delete() ## 子查詢 session.query(User).filter(User.id.in_(session.query(User.id).filter(User.name.startswith('j'))))session.commit()
# 這邊的close並非真實的關閉鏈接,而是完成終止事務和清除工做
session.close()

連表

兩張表

建立表

import datetime import time

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy import Column
from sqlalchemy import Integer, String, Date
from sqlalchemy import ForeignKey
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/test?charset=utf8",
encoding='utf8',
max_overflow=0,
pool_size=5,
pool_timeout=20,
pool_recycle=-1
)

class User(Base):
# tablename 字段必須有,不然會報錯
tablename = 'user'
# 不一樣於django model 會自動加主鍵,sqlalchemy須要手動加主鍵
id = Column(Integer, primary_key=True)
name = Column(String(32), nullable=False)
# 時間類型的default的默認值須要使用datetime.date.today(), 可是使用flask-sqlalchemy的時候使用datetime.date.today
date = Column(Date, default=datetime.date.today())

# 由於外鍵的sh設置更偏向於數據庫底層,因此這裏使用了表名,而不是類名
depart_id = <span class="hljs-type">Column</span>(<span class="hljs-type">Integer</span>, <span class="hljs-type">ForeignKey</span>('<span class="hljs-title">department</span>.<span class="hljs-title">id'</span>))
# 由於外鍵的sh設置更偏向於數據庫底層,因此這裏使用了表名,而不是類名 depart_id = <span class="hljs-type">Column</span>(<span class="hljs-type">Integer</span>, <span class="hljs-type">ForeignKey</span>('<span class="hljs-title">department</span>.<span class="hljs-title">id'</span>))class Department(Base):
tablename = 'department'
id = Column(Integer, primary_key=True)
# 默認的nullable 是True
title = Column(String(32), nullable=False)

查詢

# 默認根據在類裏面定義的外鍵進行on, 此時獲得的結果是[(userobj, departmnetobj),()] 這種形式,默認是inner join r1 = session.query(User, Department).join(Department).all() r2 = session.query(User.name, Department.title).join(Department, Department.id==User.depart_id).all()

有了 isouter 參數,inner join 就變成 left join

r3 = session.query(User.name, Department.title).join(Department, Department.id==User.depart_id, isouter=True).all()

relationship

如今問題來了,想要查name是jack所屬的部門名,兩種方式

  1. 分兩次sql查詢
user = session.query(User).filter(User.name == 'jack').first()
title = session.query(Department.title).filter(Department.id == user.depart_id).first().title
  1. 一次連表查詢
r1 = session.query(Department.title).join(User).filter(User.name == 'jack').first().title
print(r1)

這樣的方式在python代碼的級別貌似沒有django的方便,django 的 orm 拿到一個對象obj, obj.deaprtment.title 就能拿到結果。sqlalchemy也有相似功能,經過relationship來實現。

# 注意,導入的是relationship,而不是relationships from sqlalchemy.orm import relationship class Department(Base): __tablename__ = 'department' id = Column(Integer, primary_key=True) # 默認的nullable 是True title = Column(String(32), nullable=False)
<span class="hljs-meta"># 若是backref 的那張表和這張表是一對一關係,加上一個uselist=False參數就行</span>
user = relationship(<span class="hljs-string">"User"</span>, backref=<span class="hljs-comment">'department')</span>

class User(Base):
# tablename 字段必須有,不然會報錯
tablename = 'user'
# 不一樣於django model 會自動加主鍵,sqlalchemy須要手動加主鍵
id = Column(Integer, primary_key=True)
name = Column(String(32), nullable=False)
# 時間類型的default的默認值須要使用datetime.date.today(), 可是使用flask-sqlalchemy的時候使用datetime.date.today
date = Column(Date, default=datetime.date.today())

<span class="hljs-meta"># 由於外鍵的sh設置更偏向於數據庫底層,因此這裏使用了表名,而不是類名</span>
depart_id = Column(<span class="hljs-built_in">Integer</span>, ForeignKey(<span class="hljs-comment">'department.id'))</span>
<span class="hljs-meta"># 神奇的一點是,SQLAlchemy會根據關係的對應狀況自動給關係相關屬性的類型</span>
<span class="hljs-meta"># 好比這裏的Department下面的user自動是一個list類型,而User因爲設定了外鍵的緣故</span>
<span class="hljs-meta"># 一個user最多隻能應對一個用戶,因此自動識別成一個非列表類型</span>
<span class="hljs-meta"># 這樣寫兩個relationship比較麻煩,在設置了外鍵的一邊使用relationship,而且加上backref參數</span>
<span class="hljs-meta"># department = relationship("Department")</span>

session_factory = sessionmaker(engine)
session = session_factory()

user = session.query(User).first()
print(user.department)

<span class="hljs-meta"># 若是backref 的那張表和這張表是一對一關係,加上一個uselist=False參數就行</span> user = relationship(<span class="hljs-string">"User"</span>, backref=<span class="hljs-comment">'department')</span><span class="hljs-meta"># 由於外鍵的sh設置更偏向於數據庫底層,因此這裏使用了表名,而不是類名</span> depart_id = Column(<span class="hljs-built_in">Integer</span>, ForeignKey(<span class="hljs-comment">'department.id'))</span> <span class="hljs-meta"># 神奇的一點是,SQLAlchemy會根據關係的對應狀況自動給關係相關屬性的類型</span> <span class="hljs-meta"># 好比這裏的Department下面的user自動是一個list類型,而User因爲設定了外鍵的緣故</span> <span class="hljs-meta"># 一個user最多隻能應對一個用戶,因此自動識別成一個非列表類型</span> <span class="hljs-meta"># 這樣寫兩個relationship比較麻煩,在設置了外鍵的一邊使用relationship,而且加上backref參數</span> <span class="hljs-meta"># department = relationship("Department")</span>department = session.query(Department).first()
print(department.user)

有了relationship,不只查詢方便,增長數據也更方便。

# 增長一個用戶ppp,並新建這個用戶的部門叫IT
 ## 方式一
# d = Department(title='IT')
# session.add(d)
# session.commit() # 只有commit以後才能取d的id
#
# session.add(User(name='ppp', depart_id=d.id))
# session.commit()
 ## 方式二
 # session.add(User(name='ppp', department=Department(title='IT')))
# session.commit()
 # 增長一個部門xx,並在部門裏添加員工:aa/bb/cc
# session.add(Department(title='xx', users=[User(name='aa'), User(name='bb'),User(name='cc')]))
# session.commit()

三張表

建立表

結果是每5個一塊兒打印

在全局建立一個特殊的session,各個線程去使用這個特殊的session

scoped_session 這個類還真是神奇,名字居然還不是大寫,並且原先的session有的,這個類實例化的對象也會有。咱們第一反應是繼承,其實它也不是繼承。它的實現原理是這樣的
執行導入語句的的時候,點進去看源碼發現執行了一個scoping.py的文件。



最終self.registry()就是session_factory() 對象,並且是線程隔離的,每一個線程有本身的會話對象

from sqlalchemy.orm import relationship from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import create_engine from sqlalchemy import Column from sqlalchemy import Integer, String, Date from sqlalchemy import ForeignKey, UniqueConstraint, Index class Student(Base): __tablename__ = 'student' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False)
course_list = relationship('<span class="hljs-type">Course</span>', <span class="hljs-title">secondary</span>='<span class="hljs-title">student2course'</span>, <span class="hljs-title">backref</span>='<span class="hljs-title">student_list'</span>)

class Course(Base):
tablename = 'course'
id = Column(Integer, primary_key=True)
title = Column(String(32), index=True, nullable=False)

class Student2Course(Base):
tablename = 'student2course'
id = Column(Integer, primary_key=True, autoincrement=True)
student_id = Column(Integer, ForeignKey('student.id'))
course_id = Column(Integer, ForeignKey('course.id'))

__table_args__ = (
    <span class="hljs-type">UniqueConstraint</span>('<span class="hljs-title">student_id'</span>, '<span class="hljs-title">course_id'</span>, <span class="hljs-title">name</span>='<span class="hljs-title">uix_stu_cou'</span>), # 聯合惟一索引
    # <span class="hljs-type">Index</span>('<span class="hljs-title">student_id'</span>, '<span class="hljs-title">course_id'</span>, <span class="hljs-title">name</span>='<span class="hljs-title">stu_cou'</span>),                          # 聯合索引
)</span></code></pre>

查詢

查詢方式和只有兩張表的狀況相似,例如查詢jack選擇的全部課

# obj = session.query(Student).filter(Student.name=='jack').first()
# for item in obj.course_list:
# print(item.title)

建立一個課程,建立2學生,兩個學生選新建立的課程

# obj = Course(title='英語')
# obj.student_list = [Student(name='haha'),Student(name='hehe')]
#
# session.add(obj)
# session.commit()

執行原生sql

方式一

# 查詢
# cursor = session.execute('select * from users')
# 拿到的結果是一個ResultProxy對象,ResultProxy對象裏套着類元組的對象,這些對象能夠經過下標取值,也能夠經過對象.屬性的方式取值
# result = cursor.fetchall()
 # 添加
cursor = session.execute('INSERT( INTO users(name) VALUES(:value)', params={"value": 'wupeiqi'})
session.commit()
print(cursor.lastrowid)

方式二

import pymysql
conn = engine.raw_connection()
cursor = conn.cursor(pymysql.cursors.DictCursor)
cursor.execute(
    "select * from user"
)
result = cursor.fetchall()
# 結果是一個列表,列表裏面套着的對象就是原生的字典對象
print(result)
cursor.close()
conn.close()

多線程狀況下的sqlalchemy

在每一個線程內部建立session並關閉session

course_list = relationship('<span class="hljs-type">Course</span>', <span class="hljs-title">secondary</span>='<span class="hljs-title">student2course'</span>, <span class="hljs-title">backref</span>='<span class="hljs-title">student_list'</span>)__table_args__ = ( <span class="hljs-type">UniqueConstraint</span>('<span class="hljs-title">student_id'</span>, '<span class="hljs-title">course_id'</span>, <span class="hljs-title">name</span>='<span class="hljs-title">uix_stu_cou'</span>), # 聯合惟一索引 # <span class="hljs-type">Index</span>('<span class="hljs-title">student_id'</span>, '<span class="hljs-title">course_id'</span>, <span class="hljs-title">name</span>='<span class="hljs-title">stu_cou'</span>), # 聯合索引 )</span></code></pre># obj = session.query(Student).filter(Student.name=='jack').first() # for item in obj.course_list: # print(item.title)# obj = Course(title='英語') # obj.student_list = [Student(name='haha'),Student(name='hehe')] # # session.add(obj) # session.commit()# 查詢 # cursor = session.execute('select * from users') # 拿到的結果是一個ResultProxy對象,ResultProxy對象裏套着類元組的對象,這些對象能夠經過下標取值,也能夠經過對象.屬性的方式取值 # result = cursor.fetchall() # 添加 cursor = session.execute('INSERT( INTO users(name) VALUES(:value)', params={"value": 'wupeiqi'}) session.commit() print(cursor.lastrowid)import pymysql conn = engine.raw_connection() cursor = conn.cursor(pymysql.cursors.DictCursor) cursor.execute( "select * from user" ) result = cursor.fetchall() # 結果是一個列表,列表裏面套着的對象就是原生的字典對象 print(result) cursor.close() conn.close()session_factory = sessionmaker(engine)

def task(i):
# 建立一個會話對象,沒錯僅僅是建立一個對象這麼簡單
session = session_factory()
# 執行query語句的時候纔會真真去拿鏈接去執行sql語句,若是沒有close那麼沒有空閒鏈接就會等待
result = session.execute('select * from user where id=14')
for i in result:
print(i.name)
time.sleep(1)
# 必需要close,這裏的close能夠理解爲關閉會話,把連接放回鏈接池
# 若是註釋掉這一句代碼,程序會報錯QueuePool limit of size 5 overflow 0 reached, connection timed out, timeout 20
session.close()

if name == 'main':
for i in range(10):
t = Thread(target=task, args=(i,))
t.start()

session_factory = sessionmaker(engine)

def task(i):
# 建立一個會話對象,沒錯僅僅是建立一個對象這麼簡單
session = session_factory()
# 執行query語句的時候纔會真真去拿鏈接去執行sql語句,若是沒有close那麼沒有空閒鏈接就會等待
result = session.execute('select * from user where id=14')
for i in result:
print(i.name)
time.sleep(1)
# 必需要close,這裏的close能夠理解爲關閉會話,把連接放回鏈接池
# 若是註釋掉這一句代碼,程序會報錯QueuePool limit of size 5 overflow 0 reached, connection timed out, timeout 20
session.close()

if name == 'main':
for i in range(10):
t = Thread(target=task, args=(i,))
t.start()from sqlalchemy.orm import scoped_session

session_factory = sessionmaker(engine)
session = scoped_session(session_factory)

def task(i):
result = session.execute('select * from user where id=14')
for i in result:
print(i.name)
time.sleep(1)

session.remove()
session.remove()if name == 'main':
for i in range(10):
t = Thread(target=task, args=(i,))
t.start()from sqlalchemy.orm import scoped_session
本站公眾號
   歡迎關注本站公眾號,獲取更多信息