sqlalchemy在pythonweb中開發的使用(基於tornado的基礎上)

1、關於SQLAlchemy的安裝
pip install SQLAlchemy安裝
若是上面的方式安裝不成功的狀況可使用下面的方法
百度下載window或者linux下面對應的sqlalchemy的版本下載地址
解壓下載的壓縮包
進去該目錄下使用python setup.py install
測試安裝是否成功
2、開發基本的配置(以tornado開發爲參考)
一、新建一個包取名爲models
二、在__init__.py文件中寫上基本的配置python


#!/usr/bin/env pythonmysql


# encoding: utf-8linux


#引入基本的包web

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmakersql

 

# 鏈接數據庫的數據數據庫

HOSTNAME = '127.0.0.1'
PORT = '3306'
DATABASE = 'tornadotest'
USERNAME = 'root'
PASSWORD = 'root'服務器

# DB_URI的格式:dialect(mysql/sqlite)+driver://username:password@host:port/database?charset=utf8session

DB_URI = 'mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8'.format(USERNAME,PASSWORD,HOSTNAME,PORT,DATABASE)app


# 建立引擎tornado

engine = create_engine(DB_URI, echo=False )


# sessionmaker生成一個session類

Session = sessionmaker(bind=engine)
dbSession = Session()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
三、在models包下建立一個實體的類(取名User.py)


# coding=utf-8

from datetime import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, ForeignKey
from models import engine
from models import dbSession
from sqlalchemy.orm import relationship

Base = declarative_base(engine)

# 定義好一些屬性,與user表中的字段進行映射,而且這個屬性要屬於某個類型

class User(Base):
__tablename__ = 'user1'

id = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(50), nullable=False)
password = Column(String(100))
createtime = Column(DateTime, default=datetime.now)
last_login = Column(DateTime)
loginnum = Column(Integer, default=0)
_locked = Column(Boolean, default=False, nullable=False)

#能夠在類裏面寫別的方法,相似查詢方法
@classmethod
def all(cls):
return dbSession.query(cls).all()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
四、在tornado的.py文件中


# coding:utf8

import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
from pycket.session import SessionMixin
from tornado.options import define, options
from models import dbSession
from models.User import User1
import creat_tables #單獨寫一個文件用來建立表的

define("port", default=8000, help="run tornado service", type=int)
define("tables", default=False, group="application", help="creat tables", type=bool)
....
....
if __name__ == "__main__":
tornado.options.parse_command_line()
if options.tables:
creat_tables.run()
app = tornado.web.Application(handlers=[
(r"/", IndexHandle),
(r"/test01", Test01)
], **settings)
app.listen(options.port)
tornado.httpserver.HTTPServer(app)
tornado.ioloop.IOLoop.instance().start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
五、建立creat_tables.py文件,裏面的代碼以下:


#coding=utf-8

from models import engine
from models.User import Base


#將建立好的User類,映射到數據庫的users表中

def run():
print '------------create_all-------------'
Base.metadata.create_all(engine)
print '------------create_end-------------'
1
2
3
4
5
6
7
8
9
10
11
12
13
六、運行代碼建立表

前提是在代碼的服務器上建立了數據庫
進入對應的工做空間
直接運行python 項目文件名稱 --tables[這個地方的名字由這個地方決定:define("tables", default=False, group="application", help="creat tables", type=bool)]
3、關於sqlalchemy的基本操做
一、基本的配置

class BaseHandle(tornado.web.RequestHandler, SessionMixin):
def initialize(self):
self.db = dbSession
print "-----------initialize方法---------"

def get_current_user(self):
username = self.session.get("username")
return username if username else None

def on_finish(self):
self.db.close()
print "-------------on_finish方法----------------"
1
2
3
4
5
6
7
8
9
10
11
12
二、查詢數據(在建立數據的實體類中新增類方法)

@classmethod
def all(cls):
return dbSession.query(cls).all()

@classmethod
def by_id(cls, id):
return dbSession.query(cls).filter_by(id=id).first()

@classmethod
def by_name(cls, name):
return dbSession.query(cls).filter_by(username=name).first()
1
2
3
4
5
6
7
8
9
10
11
使用方式


# 測試sqlalchemy的test視圖(all返回的是一個`list`,可是`first`返回的是一個對象)

class Test01(BaseHandle):
def get(self):
# ===============寫sql語句start==============#
aa = self.db.query(User1).filter(User1.id == 1).all()
print aa
#或者直接使用定義好的方法
print User.all();
# ===============寫sql語句end==============#
self.write("我是測試數據")
1
2
3
4
5
6
7
8
9
10
11
12
三、關於增長數據(注意關於增刪改的操做要進行commit提交)

user = User()
user.username = xxx
user.password = xxx
self.db.add(user)
self.db.commit()
1
2
3
4
5
四、關於修改數據


#先查詢出,而後從新賦值
1
2
3
五、查詢User1表中所有數據

user = self.db.query(User1).all()
1
六、查詢某些字段

user = self.db.query(User1.username,User1.password).all()
1
七、根據條件查詢返回一個對象

user = self.db.query(User1).filter(User1.password == "123").first()
1
八、查詢計算總條數

user = self.db.query(User1).count()
1
九、使用limit分頁查找

user = self.db.query(User1).limit(3).all()
1
十、slice切片操做

user = self.db.query(User1).slice(1,3).all()
1
十一、使用like進行模糊查詢

user = self.db.query(User1).filter(User1.username.like("zhan%")).all()
1
十二、and_的使用(須要先導包from sqlalchemy import text, and_,func)

user = self.db.query(User1).filter(and_(User1.username.like("zhan%"),
User1.password.like("2%"))).all()
1
2
1三、查找大於多少的值

user = self.db.query(User1).filter(User1.loginnum >= 3).all()
1
1四、排序desc的使用

user = self.db.query(User1).order_by(User1.loginnum.desc()).all()
1
1五、filter_by() 使用關鍵字進行條件查詢,不須要用類名指定字段名

user = self.db.query(User).filter_by(username='zhansan').all()
user = self.db.query(User).filter_by(password='222').all()
1
2
1六、update() 更新操做,至關於執行了add操做

self.dbSession.query(User).filter(User.id==1)
.update({User.username:"aaa"})
self.dbSession.commit()
1
2
3
1七、delete刪除數據

aa = self.db.query(User).filter(User.id==3).first()
self.db.delete(aa)
1
2
4、關於sqlalchemy中使用mysql的約束
一、主鍵約束:primary key 不能爲空且不能重複
二、主鍵的自動增加:auto_increment
三、惟一約束:unique
四、非空約束:not null
五、外鍵約束:foreign key
5、使用execute批量建立數據
self.db.execute(表名.__table__.insert(), [
{'字段名1': randint(1, 100), '字段名2':'aaa','字段名3': randint(1, 100)}
for i in xrange(500)
])
#提交數據
self.db.commit()
1
2
3
4
5
6
6、關於sqlalchemy的一對多的查詢(重點也是經常使用的)
一、在一對多的處理中主要使用是一個表的一個字段是別的表某一列中指定的值(所謂的外鍵約束)
二、在一對多的關係表中子表中使用外鍵約束[ForeignKey(主表.列名)]注意點主表與子表關聯的列的屬性要一致
三、父與子表中互相查詢使用relationship
四、新建一個學生表(student)和一個班級表(classes)

from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, ForeignKey


#導入relationship

from sqlalchemy.orm import relationship


#新建立一個學生表的類(子表)

class Student(Base):
__tablename__ = "student"
id = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(100), nullable=False)
create_time = Column(DateTime, default=datetime.now)
# 建立一個外鍵約束(在子表中建立外鍵約束,只能是父表中可枚舉的值)
class_id = Column(Integer, ForeignKey('classess.id'))
"""
使用relationship()反向查找
理解:
在student表中能夠經過classes查找到classess表裏面的數據
在classess表裏面能夠經過students查找到student表裏面的數據
"""
classes = relationship("Classess", backref="students")

 

# 建立一個班級表的類(父表)

class Classess(Base):
__tablename__ = "classess"
id = Column(Integer, primary_key=True, autoincrement=True)
classname = Column(String(100), nullable=False)
createtime = Column(DateTime, default=datetime.now)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
四、批量插入數據


#班級表中批量插入

self.db.execute(Classess.__table__.insert(), [
{'classname': str(i) + 'class'} for i in xrange(1, 4)
])
self.db.commit()

#學生表中批量插入

self.db.execute(Student.__table__.insert(), [
{'username': 'username' + str(i),
'class_id': randint(1, 3)} for i in xrange(20)
])
self.db.commit()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
五、關於一對多的查詢的使用


#查詢語句跟以前介紹的同樣的


#經過學生查詢到該學生的班級

aa = self.db.query(Student).filter_by(id=1).first()
print aa.classes.classname


#查詢班級下全部的學生

bb = self.db.query(Classess).filter_by(id=2).first()
for stu in bb.students:
print stu.username
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
7、關於sqlalchemy的一對一的使用(擴展一張表的信息)
一、新建一個用戶表和用戶擴展表


# 建立一個用戶表的類

class User1(Base):
__tablename__ = 'user1'

id = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(50), nullable=False)
password = Column(String(100))
createtime = Column(DateTime, default=datetime.now)
last_login = Column(DateTime)
loginnum = Column(Integer, default=0)
_locked = Column(Boolean, default=False, nullable=False)
#添加一個字段關聯到子表
user1Ext = relationship("User1Ext",uselist=False)


#建立一個用戶表的擴展表

class User1Ext(Base):
__tablename__ = "user1ext"
id = Column(Integer,primary_key=True)
sex = Column(String(10))
age = Column(Integer)
#擴展表當作是一個子表添加一個外鍵約束
user_id = Column(Integer,ForeignKey("user1.id"),unique=True)
#添加一個與user關聯的字段
user = relationship("User1",uselist=False)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
二、添加數據

用戶表

用戶擴展表

 


#根據用戶表查詢到用戶擴展表

aa = self.db.query(User1).filter_by(id=17).first()
print aa.user1Ext.sex


#根據用戶擴展表查詢用戶信息

bb = self.db.query(User1Ext).filter_by(id=1).first()
print bb.user.username
1
2
3
4
5
6
7
8
9
10
11
8、多對多的查詢(須要一張中間表,而後根據一對多的方式查詢)
案例:一個學生能夠有多個學科,一個學科有多個學生

一、建立一張學生表的試視圖類


# 建立學生表

class Student(Base):
__tablename__ = "student"
id = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(100), nullable=False)
createtime = Column(DateTime, default=datetime.now)
# 新增關聯到學科表的字段(secondary表示第二張表的意思,StudentToScore是中間表)
student_core = relationship("Core", secondary=StudentToScore.__table__)
1
2
3
4
5
6
7
8
9
10
二、建立一個學科表的視圖類


# 建立學科表

class Core(Base):
__tablename__ = "score"
id = Column(Integer, primary_key=True, autoincrement=True)
corename = Column(String(100), nullable=False)
core = Column(Integer)
# 新增關聯到學生表的字段(secondary表示第二張表的意思,StudentToScore是中間表)
student_core = relationship("Student", secondary=StudentToScore.__table__)
1
2
3
4
5
6
7
8
9
10
三、建立一箇中間表來關聯學生表與學科表


# 建立中間表

class StudentToScore(Base):
__tablename__ = "student_to_score"
student_id = Column(Integer, ForeignKey("student.id"), primary_key=True)
score_id = Column(Integer, ForeignKey("score.id"), primary_key=True)
1
2
3
4
5
6
7
四、總結上面建立的三張表

一、多對多的關係必須建立一箇中間表做爲橋樑
二、中間表是這兩張表的子表,經過外鍵約束到對應,而後經過雙主鍵約束
三、在各自的表中加上relationship()去關聯對應的表
五、批量建立數據(注意先插入學生表與學科表數據後插入中間表的)


#批量插入學生表數據

self.db.execute(Student.__table__.insert(), [
{'username': 'username' + str(i + 1),'class_id': randint(1, 3)} for i in xrange(20)
])
self.db.commit()

#批量插入學科表數據

self.db.execute(Core.__table__.insert(), [
{'corename': 'coursename' + str(i + 1),'score': randint(60, 80)} for i in xrange(5)
])

#批量插入中間表數據

self.db.execute(StudentToScore.__table__.insert(), [
{'student_id': randint(1, 20),'score_id': randint(1, 5)} for i in xrange(10)])
self.db.commit()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
六、測試代碼


#查詢學生表id=1的學科

aa = self.db.query(Student).filter_by(id=1).first()
for a in aa.student_core:
print a.corename


#查詢學科id=1下所有的學生

bb = self.db.query(Core).filter_by(id=1).first()for b in bb.student_score: print b.username--------------------- 做者:水痕01 來源:CSDN 原文:https://blog.csdn.net/kuangshp128/article/details/73413584 版權聲明:本文爲博主原創文章,轉載請附上博文連接!

相關文章
相關標籤/搜索