DBUtils是Python的一個用於實現數據庫鏈接池的模塊。
詳情參考官方文檔html
安裝: pip install flask-sessionhtml5
使用方法:先導入 from flask_session import Sessionmysql
建立一個flask的app,app=Flask(__name__) 將app傳入Session便可 Session(app) web
或者先實例化一個session對象 sess = Session() sess.init_app(app)redis
實例化時,將app對象,傳入,而且覆蓋了本來flask中的session_interface sql
而後在這個_get_interface中又作了分類。咱們只須要在配置文件中配置上鍊接的類型,便可使用對應的session數據庫
好比:app.config["SESSION_TYPE"] = "redis" 就會將本來的session覆蓋成redis的session。django
而後,還要有redis的鏈接,這個覆蓋了session的類爲RedisSessionInterfacejson
這個類在實例化的時候,能夠傳入四個參數,config['SESSION_REDIS'], config['SESSION_KEY_PREFIX'],flask
config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT']
這四個參數分別爲redis對象,redis中全部的keys加一個前綴,是否使用簽名對session進行加密,默認爲False,最後一個參數爲是否使用持久化。
因此咱們鏈接redis就須要設置配置文件 app.config["SESSION_REDIS"] = redis對象 便可
小結:flask中使用flask-session
from redis import Redis app.config['SESSION_TYPE'] = 'redis' app.config['SESSION_REDIS'] = Redis(host='127.0.0.1',port=6379) Session(app)
通常狀況下,咱們會把配置添加到配置文件settings中
from redis import Redis class Config(object): DEBUG = False SECRET_KEY = "asdfasdfasdf" SESSION_TYPE = 'redis' SESSION_REDIS = Redis(host='127.0.0.1', port=6379) class ProductionConfig(Config): DEBUG = False class DevelopmentConfig(Config): DEBUG = True class TestingConfig(Config): TESTING = True
flask-session的請求流程:請求進來後,在到達視圖函數以前,先執行flask-session中的open_session方法,這個方法中作的事情:先去用戶的cookie中獲取隨機字符串,若是沒有,表示請求是第一次進來,就生成一個隨機字符串sid(str(uuid.uuid4()))而且生成一個特殊的字典,將這個sid放到特殊字典中,而後返回這個特殊的字典,接着執行視圖函數,這是,視圖函數能夠向這個特殊的字典中寫入session,在返回響應時,會先執行flask-session中的save_session方法,這個方法中,將這個特殊的字典,先轉化爲字典並序列化爲一個json字符串 json.dumps(dict(特殊字典)) ,接着將這個值存入redis中 redis.setex(key,val) ,又將這個特殊字典中的sid(隨機字符串) 設置爲cookie返回給瀏覽器 response.set_cookie(sid),下次用戶在請求進來,先判斷有無這個隨機字符串,有,就從redis中取出對應的特殊字典,而後就能夠從這個特殊字典中獲取設置的session。
咱們使用偏移MySQL時,每次對數據庫的操做就須要鏈接數據庫,表的增刪改查,而後關閉數據庫,這樣每操做一次,就鏈接關閉,就會形成性能上的問題。或許會想到使用單例,鏈接一次,就能夠一直使用,這樣單線程應用徹底沒有問題,但若是涉及到多線程應用那麼就須要加鎖,一旦加鎖那麼鏈接勢必就會排隊等待,當請求比較多時,性能就會下降了。
import pymysql import threading from threading import RLock LOCK = RLock() CONN = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8') def task(arg): with LOCK: cursor = CONN.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() cursor.close() print(result) for i in range(10): t = threading.Thread(target=task, args=(i,)) t.start()
備註:不加鎖,多線程下會報錯。
DBUtils是Python的一個用於實現數據庫鏈接池的模塊。
安裝: pip install DBUtils
使用:此鏈接池有兩種鏈接模式:
from DBUtils import PersistentDB POOL = PersistentDB( creator=pymysql, # 使用連接數據庫的模塊 maxusage=None, # 一個連接最多被重複使用的次數,None表示無限制 setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always closeable=False, # 若是爲False時, conn.close() 實際上被忽略,供下次使用,再線程關閉時,纔會自動關閉連接。若是爲True時, conn.close()則關閉連接,那麼再次調用pool.connection時就會報錯,由於已經真的關閉了鏈接(pool.steady_connection()能夠獲取一個新的連接) threadlocal=None, # 本線程獨享值得對象,用於保存連接對象,若是連接對象被重置 host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8' ) def func(): conn = POOL.connection(shareable=False) cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() cursor.close() conn.close() func()
import time import pymysql import threading from DBUtils.PooledDB import PooledDB, SharedDBConnection POOL = PooledDB( creator=pymysql, # 使用連接數據庫的模塊 maxconnections=6, # 鏈接池容許的最大鏈接數,0和None表示不限制鏈接數 mincached=2, # 初始化時,連接池中至少建立的空閒的連接,0表示不建立 maxcached=5, # 連接池中最多閒置的連接,0和None不限制 maxshared=3, # 連接池中最多共享的連接數量,0和None表示所有共享。PS: 無用,由於pymysql和MySQLdb等模塊的 threadsafety都爲1,全部值不管設置爲多少,_maxcached永遠爲0,因此永遠是全部連接都共享。 blocking=True, # 鏈接池中若是沒有可用鏈接後,是否阻塞等待。True,等待;False,不等待而後報錯 maxusage=None, # 一個連接最多被重複使用的次數,None表示無限制 setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8' ) def func(): # 檢測當前正在運行鏈接數的是否小於最大連接數,若是不小於則:等待或報raise TooManyConnections異常 # 不然 # 則優先去初始化時建立的連接中獲取連接 SteadyDBConnection。 # 而後將SteadyDBConnection對象封裝到PooledDedicatedDBConnection中並返回。 # 若是最開始建立的連接沒有連接,則去建立一個SteadyDBConnection對象,再封裝到PooledDedicatedDBConnection中並返回。 # 一旦關閉連接後,鏈接就返回到鏈接池讓後續線程繼續使用。 conn = POOL.connection() # print(th, '連接被拿走了', conn1._con) # print(th, '池子裏目前有', pool._idle_cache, '\r\n') cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() conn.close() func()
PS: 查看鏈接 show status like 'Threads%';
import pymysql from DBUtils.PooledDB import PooledDB, SharedDBConnection POOL = PooledDB( creator=pymysql, # 使用連接數據庫的模塊 maxconnections=20, # 鏈接池容許的最大鏈接數,0和None表示不限制鏈接數 mincached=2, # 初始化時,連接池中至少建立的空閒的連接,0表示不建立 maxcached=5, # 連接池中最多閒置的連接,0和None不限制 maxshared=0, # 連接池中最多共享的連接數量,0和None表示所有共享。PS: 無用,由於pymysql和MySQLdb等模塊的 threadsafety都爲1,全部值不管設置爲多少,_maxcached永遠爲0,因此永遠是全部連接都共享。 blocking=True, # 鏈接池中若是沒有可用鏈接後,是否阻塞等待。True,等待;False,不等待而後報錯 maxusage=None, # 一個連接最多被重複使用的次數,None表示無限制 setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='zhaopanpan', password='qwe123..', database='code', charset='utf8' ) def on_open(cur=pymysql.cursors.DictCursor): conn = POOL.connection() cursor = conn.cursor(cursor=cur) return conn,cursor def on_close(conn,cursor): cursor.close() conn.close() def fetchone(sql,args,cur=pymysql.cursors.DictCursor): """ 獲取單條數據 :param sql: SQL語句 select title from book where id=%s; :param args: SQL中若是有須要傳的參數,則 args=[參數,參數2] ,不然 args=[] :param cur: 表示返回的值默認是以字典的形式返回,若是設置爲[]或None表示以元組的形式返回 :return: """ conn,cursor = on_open(cur) cursor.execute(sql, args) result = cursor.fetchone() on_close(conn,cursor) return result def fetchall(sql,args,cur=pymysql.cursors.DictCursor): """ 獲取多條數據 :param sql: :param args: :param cur: :return: """ conn, cursor = on_open(cur) cursor.execute(sql, args) result = cursor.fetchall() on_close(conn, cursor) return result def exec_sql(sql,args,cur=pymysql.cursors.DictCursor): """ 添加/刪除/修改 :param sql: insert into table(%s,%s) values(....) :param args: :param cur: :return: """ conn, cursor = on_open(cur) cursor.execute(sql, args) conn.commit() on_close(conn, cursor)
WTForms是一個支持多個web框架的form組件,主要用於對用戶請求數據進行驗證。django中也可使用,可是django自帶
安裝:pip install wtforms
1. 用戶登陸
當用戶登陸時候,須要對用戶提交的用戶名和密碼進行多種格式校驗。如:
密碼不能爲空;密碼長度必須大於12;密碼必須包含 字母、數字、特殊字符等(自定義正則);
from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import core from wtforms.fields import html5 from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__, template_folder='templates') app.debug = True class LoginForm(Form): name = simple.StringField( label='用戶名', validators=[ validators.DataRequired(message='用戶名不能爲空.'), validators.Length(min=6, max=18, message='用戶名長度必須大於%(min)d且小於%(max)d') ], widget=widgets.TextInput(), render_kw={'class': 'form-control'} ) pwd = simple.PasswordField( label='密碼', validators=[ validators.DataRequired(message='密碼不能爲空.'), validators.Length(min=8, message='用戶名長度必須大於%(min)d'), validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}", message='密碼至少8個字符,至少1個大寫字母,1個小寫字母,1個數字和1個特殊字符') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'GET': form = LoginForm() return render_template('login.html', form=form) else: form = LoginForm(formdata=request.form) if form.validate(): print('用戶提交數據經過格式驗證,提交的值爲:', form.data) else: print(form.errors) return render_template('login.html', form=form) if __name__ == '__main__': app.run()
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>登陸</h1> <form method="post"> <!--<input type="text" name="name">--> <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p> <!--<input type="password" name="pwd">--> <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p> <input type="submit" value="提交"> </form> </body> </html>
2. 用戶註冊
註冊頁面須要讓用戶輸入:用戶名、密碼、密碼重複、性別、愛好等。
from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets
app = Flask(__name__, template_folder='templates')
app.debug = True
class RegisterForm(Form):
name = simple.StringField(
label='用戶名',
validators=[
validators.DataRequired()
],
widget=widgets.TextInput(),
render_kw={'class': 'form-control'},
default='alex'
)
pwd = simple.PasswordField(
label='密碼',
validators=[
validators.DataRequired(message='密碼不能爲空.')
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
pwd_confirm = simple.PasswordField(
label='重複密碼',
validators=[
validators.DataRequired(message='重複密碼不能爲空.'),
validators.EqualTo('pwd', message="兩次密碼輸入不一致")
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
email = html5.EmailField(
label='郵箱',
validators=[
validators.DataRequired(message='郵箱不能爲空.'),
validators.Email(message='郵箱格式錯誤')
],
widget=widgets.TextInput(input_type='email'),
render_kw={'class': 'form-control'}
)
gender = core.RadioField(
label='性別',
choices=(
(1, '男'),
(2, '女'),
),
coerce=int
)
city = core.SelectField(
label='城市',
choices=(
('bj', '北京'),
('sh', '上海'),
)
)
hobby = core.SelectMultipleField(
label='愛好',
choices=(
(1, '籃球'),
(2, '足球'),
),
coerce=int
)
favor = core.SelectMultipleField(
label='喜愛',
choices=(
(1, '籃球'),
(2, '足球'),
),
widget=widgets.ListWidget(prefix_label=False),
option_widget=widgets.CheckboxInput(),
coerce=int,
default=[1, 2]
)
def __init__(self, *args, **kwargs):
super(RegisterForm, self).__init__(*args, **kwargs)
self.favor.choices = ((1, '籃球'), (2, '足球'), (3, '羽毛球'))
def validate_pwd_confirm(self, field):
"""
自定義pwd_confirm字段規則,例:與pwd字段是否一致
:param field:
:return:
"""
# 最開始初始化時,self.data中已經有全部的值
if field.data != self.data['pwd']:
# raise validators.ValidationError("密碼不一致") # 繼續後續驗證
raise validators.StopValidation("密碼不一致") # 再也不繼續後續驗證
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'GET':
form = RegisterForm(data={'gender': 1})
return render_template('register.html', form=form)
else:
form = RegisterForm(formdata=request.form)
if form.validate():
print('用戶提交數據經過格式驗證,提交的值爲:', form.data)
else:
print(form.errors)
return render_template('register.html', form=form)
if __name__ == '__main__':
app.run()
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>用戶註冊</h1> <form method="post" novalidate style="padding:0 50px"> {% for item in form %} <p>{{item.label}}: {{item}} {{item.errors[0] }}</p> {% endfor %} <input type="submit" value="提交"> </form> </body> </html>
備註:對於動態的choices,應該重寫其構造方法。
class StudentForm(Form): name = simple.StringField( label='學生姓名', widget=widgets.TextInput(), render_kw={'class': 'form-control', 'placeholder': '請輸入學生姓名'}, validators=[validators.DataRequired(message='學生姓名不能爲空')] ) class_id = core.SelectField( label='請選擇班級', choices=(), coerce=int ) def __init__(self, *args, **kwargs): super(StudentForm, self).__init__(*args, **kwargs) self.class_id.choices = operate.fetchall(sql='select id,name from classes', args=[], cur=None)
能夠生成csrf標籤
from flask import Flask, render_template, request, redirect, session from wtforms import Form from wtforms.csrf.core import CSRF from wtforms.fields import core from wtforms.fields import html5 from wtforms.fields import simple from wtforms import validators from wtforms import widgets from hashlib import md5 app = Flask(__name__, template_folder='templates') app.debug = True class MyCSRF(CSRF): """ Generate a CSRF token based on the user's IP. I am probably not very secure, so don't use me. """ def setup_form(self, form): self.csrf_context = form.meta.csrf_context() self.csrf_secret = form.meta.csrf_secret return super(MyCSRF, self).setup_form(form) def generate_csrf_token(self, csrf_token): gid = self.csrf_secret + self.csrf_context token = md5(gid.encode('utf-8')).hexdigest() return token def validate_csrf_token(self, form, field): print(field.data, field.current_token) if field.data != field.current_token: raise ValueError('Invalid CSRF') class TestForm(Form): name = html5.EmailField(label='用戶名') pwd = simple.StringField(label='密碼') class Meta: # -- CSRF # 是否自動生成CSRF標籤 csrf = True # 生成CSRF標籤name csrf_field_name = 'csrf_token' # 自動生成標籤的值,加密用的csrf_secret csrf_secret = 'xxxxxx' # 自動生成標籤的值,加密用的csrf_context csrf_context = lambda x: request.url # 生成和比較csrf標籤 csrf_class = MyCSRF # -- i18n # 是否支持本地化 # locales = False locales = ('zh', 'en') # 是否對本地化進行緩存 cache_translations = True # 保存本地化緩存信息的字段 translations_cache = {} @app.route('/index/', methods=['GET', 'POST']) def index(): if request.method == 'GET': form = TestForm() else: form = TestForm(formdata=request.form) if form.validate(): print(form) return render_template('index.html', form=form) if __name__ == '__main__': app.run()
實例化流程分析
# 源碼流程 1. 執行type的 __call__ 方法,讀取字段到靜態字段 cls._unbound_fields 中; meta類讀取到cls._wtforms_meta中 2. 執行構造方法 a. 循環cls._unbound_fields中的字段,並執行字段的bind方法,而後將返回值添加到 self._fields[name] 中。 即: _fields = { name: wtforms.fields.core.StringField(), } PS:因爲字段中的__new__方法,實例化時:name = simple.StringField(label='用戶名'),建立的是UnboundField(cls, *args, **kwargs),當執行完bind以後,才變成執行 wtforms.fields.core.StringField() b. 循環_fields,爲對象設置屬性 for name, field in iteritems(self._fields): # Set all the fields to attributes so that they obscure the class # attributes with the same names. setattr(self, name, field) c. 執行process,爲字段設置默認值:self.process(formdata, obj, data=data, **kwargs) 優先級:obj,data,formdata; 再循環執行每一個字段的process方法,爲每一個字段設置值: for name, field, in iteritems(self._fields): if obj is not None and hasattr(obj, name): field.process(formdata, getattr(obj, name)) elif name in kwargs: field.process(formdata, kwargs[name]) else: field.process(formdata) 執行每一個字段的process方法,爲字段的data和字段的raw_data賦值 def process(self, formdata, data=unset_value): self.process_errors = [] if data is unset_value: try: data = self.default() except TypeError: data = self.default self.object_data = data try: self.process_data(data) except ValueError as e: self.process_errors.append(e.args[0]) if formdata: try: if self.name in formdata: self.raw_data = formdata.getlist(self.name) else: self.raw_data = [] self.process_formdata(self.raw_data) except ValueError as e: self.process_errors.append(e.args[0]) try: for filter in self.filters: self.data = filter(self.data) except ValueError as e: self.process_errors.append(e.args[0]) d. 頁面上執行print(form.name) 時,打印標籤 由於執行了: 字段的 __str__ 方法 字符的 __call__ 方法 self.meta.render_field(self, kwargs) def render_field(self, field, render_kw): other_kw = getattr(field, 'render_kw', None) if other_kw is not None: render_kw = dict(other_kw, **render_kw) return field.widget(field, **render_kw) 執行字段的插件對象的 __call__ 方法,返回標籤字符串
鉤子函數
校驗單個字段,可使用 def validate_字段名(filed) 能夠經過 field.data 得到輸入的值。
校驗全局使用 validate
a. 執行form的validate方法,獲取鉤子方法 def validate(self): extra = {} for name in self._fields: inline = getattr(self.__class__, 'validate_%s' % name, None) if inline is not None: extra[name] = [inline] return super(Form, self).validate(extra) b. 循環每個字段,執行字段的 validate 方法進行校驗(參數傳遞了鉤子函數) def validate(self, extra_validators=None): self._errors = None success = True for name, field in iteritems(self._fields): if extra_validators is not None and name in extra_validators: extra = extra_validators[name] else: extra = tuple() if not field.validate(self, extra): success = False return success c. 每一個字段進行驗證時候 字段的pre_validate 【預留的擴展】 字段的_run_validation_chain,對正則和字段的鉤子函數進行校驗 字段的post_validate【預留的擴展】