安裝:
pip install sqlalchemy
# 安裝數據庫驅動:
pip install pymysql
pip install cx_oracle
舉例:(在url後面加入?charset=utf8能夠防止亂碼)python
from sqlalchemy import create_engine engine=create_engine('mysql+pymysql://username:password@hostname:port/dbname', echo=True) #echo=True 打印sql語句信息
create_engine
接受一個url,格式爲:mysql
# '數據庫類型+數據庫驅動名稱://用戶名:口令@機器地址:端口號/數據庫名' # 經常使用的 engine = create_engine('sqlite:///:memory:', echo=True) # sqlite內存 engine = create_engine('sqlite:///./cnblogblog.db',echo=True) # sqlite文件 engine = create_engine("mysql+pymysql://username:password@hostname:port/dbname",echo=True) # mysql+pymysql engine = create_engine('mssql+pymssql://username:password@hostname:port/dbname',echo=True) # mssql+pymssql engine = create_engine('postgresql://scott:tiger@hostname:5432/dbname') # postgresql示例 engine = create_engine('oracle://scott:tiger@hostname:1521/sidname') # oracle engine = create_engine('oracle+cx_oracle://scott:tiger@tnsname') #pdb就能夠用tns鏈接
簡單demo:
from sqlalchemy import create_engine, Column, Integer, String from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base engine = create_engine('oracle://spark:a@orclpdb',echo=True) #echo要求打印sql語句等調試信息 session_maker = sessionmaker(bind=engine) session = session_maker() Base = declarative_base() #對應一張表 class Student(Base): __tablename__ = 'STUDENT' id = Column('STUID', Integer, primary_key=True) name = Column('STUNAME', String(32), nullable=False) age = Column('STUAGE', Integer) def __repr__(self): return '<Student(id:%s, name:%s, age:%s)>' % (self.id, self.name, self.age) Base.metadata.create_all(engine) #若存在STUDENT表則不作,不存在則建立。 queryObject = session.query(Student).order_by(Student.id.desc()) for ins in queryObject: print(ins.id, ins.name, ins.age) ''' 4 hey 24 3 lwtxxs 27 2 gyb 89 1 ns 23 '''
將查詢結果映射爲DataFrame:
import pandas as pd df = pd.read_sql(session.query(Student).filter(Student.id > 1).statement, engine) print(df) ''' STUID STUNAME STUAGE 0 4 hey 24 1 2 gyb 89 2 3 lwtxxs 27 '''
查詢:
session的query方法除了能夠接受Base子類對象做爲參數外,還能夠是字段,如:sql
query = session.query(Student.name, Student.age) # query爲一個sqlalchemy.orm.query.Query對象 for stu_name, stu_age in query: print(stu_name, stu_age)
查詢條件filter:
# = / like query.filter(Student.name == 'wendy') query.filter(Student.name.like('%ed%')) # in query.filter(Student.name.in_(['wendy', 'jack'])) query.filter(Student.name.in_( session.query(User.name).filter(User.name.like('%ed%')) )) # not in query.filter(~Student.name.in_(['ed', 'wendy', 'jack'])) # is null / is not null query.filter(Student.name == None) query.filter(Student.name.is_(None)) query.filter(Student.name != None) query.filter(Student.name.isnot(None)) # and from sqlalchemy import and_, or_ query.filter(and_(Student.name == 'ed', Student.age != 23)) query.filter(Student.name == 'ed', Student.age != 23) query.filter(Student.name == 'ed').filter(Student.age != 23) # or query.filter(or_(Student.name == 'ed', Student.name == 'wendy')) # match query.filter(Student.name.match('wendy'))
Query的方法:
all()
方法以列表形式返回結果集:數據庫
from sqlalchemy import or_, and_ queryObject = session.query(Student).filter(or_(Student.id == 1, Student.id == 2)) print(queryObject.all()) # [<Student(id:1, name:ns, age:23)>, <Student(id:2, name:gyb, age:89)>] queryObject = session.query(Student.name).filter(or_(Student.id == 1, Student.id == 2)) print(queryObject.all()) # [('ns',), ('gyb',)]
first()
方法返回單個結果。(若結果集爲空則返回None
)session
print(queryObject.first()) # ('ns',)
one()
方法返回單個結果,與first()
方法不一樣的是:當結果集中沒有元素或有多於一個元素會拋出異常。
one_or_none()
方法同one()
同樣,不一樣是結果集爲空則返回None
,爲多個拋出異常。oracle
查詢數量:
from sqlalchemy import func session.query(func.count(Student.id)).scalar() # SELECT count("STUDENT"."STUID") AS count_1 FROM "STUDENT"
分組:
session.query(func.count(Student.id), Student.name).group_by(Student.name).all()
嵌套SQL語句:
from sqlalchemy import text query = session.query(Student.id, Student.name).filter(text('stuid>2')) query = session.query('stuid', 'stuname', 'stuage').from_statement(\ text("select * from student where stuname=:stuname")).params(stuname='hey').all() #[(4, 'hey', 24)]