Flask是一個基於Python開發而且依賴jinja2
模板和Werkzeug WSGI服務的一個微型框架,對於Werkzeug本質是Socket服務端,其用於接收http請求並對請求進行預處理,而後觸發Flask框架,開發人員基於Flask框架提供的功能對請求進行相應的處理,並返回給用戶,若是要返回給用戶複雜的內容時,須要藉助jinja2
模板來實現對模板的處理,即:將模板和數據進行渲染,將渲染後的字符串返回給用戶瀏覽器。html
「微」(micro) 並不表示你須要把整個 Web 應用塞進單個 Python 文件(雖然確實能夠 ),也不意味着 Flask 在功能上有所欠缺。微框架中的「微」意味着 Flask 旨在保持核心簡單而易於擴展。Flask 不會替你作出太多決策——好比使用何種數據庫。而那些 Flask 所選擇的——好比使用何種模板引擎——則很容易替換。除此以外的一切都由可由你掌握。如此,Flask 能夠與您珠聯璧合。html5
默認狀況下,Flask 不包含數據庫抽象層、表單驗證,或是其它任何已有多種庫能夠勝任的功能。然而,Flask 支持用擴展來給應用添加這些功能,如同是 Flask 自己實現的同樣。衆多的擴展提供了數據庫集成、表單驗證、上傳處理、各類各樣的開放認證技術等功能。Flask 也許是「微小」的,但它已準備好在需求繁雜的生產環境中投入使用python
最簡單的Web應用就是先把HTML用文件保存好,用一個現成的HTTP服務器軟件,接收用戶請求,從文件中讀取HTML,返回。mysql
若是要動態生成HTML,就須要把上述步驟本身來實現。不過,接受HTTP請求、解析HTTP請求、發送HTTP響應都是苦力活,若是咱們本身來寫這些底層代碼,還沒開始寫動態HTML呢,就得花個把月去讀HTTP規範。web
正確的作法是底層代碼由專門的服務器軟件實現,咱們用Python專一於生成HTML文檔。由於咱們不但願接觸到TCP鏈接、HTTP原始請求和響應格式,因此,須要一個統一的接口協議來實現這樣的服務器軟件,讓咱們專心用Python編寫Web業務。這個接口就是WSGI:Web Server Gateway Interface。而wsgiref模塊就是python基於wsgi協議開發的服務模塊正則表達式
from wsgiref.simple_server import make_server def mya(environ, start_response): print(environ) start_response('200 OK', [('Content-Type', 'text/html')]) if environ.get('PATH_INFO') == '/index': with open('index.html','rb') as f: data=f.read() elif environ.get('PATH_INFO') == '/login': with open('login.html', 'rb') as f: data = f.read() else: data=b'<h1>Hello, web!</h1>' return [data] if __name__ == '__main__': myserver = make_server('', 8011, mya) print('監聽8010') myserver.serve_forever() wsgiref簡單應用
pip3 install flaskredis
Werkzeug是一個WSGI工具包,他能夠做爲一個Web框架的底層庫。這裏稍微說一下, werkzeug 不是一個web服務器,也不是一個web框架,而是一個工具包,官方的介紹說是一個 WSGI 工具包,它能夠做爲一個 Web 框架的底層庫,由於它封裝好了不少 Web 框架的東西,例如 Request,Response 等等算法
代碼示例:sql
from werkzeug.wrappers import Request, Response @Request.application def hello(request): return Response('Hello World!') if __name__ == '__main__': from werkzeug.serving import run_simple run_simple('localhost', 4000, hello)
from flask import Flask # 實例化產生一個Flask對象 app = Flask(__name__) # 將 '/'和視圖函數hello_workd的對應關係添加到路由中 @app.route('/') # 1. v=app.route('/') 2. v(hello_world) def hello_world(): return 'Hello World!' if __name__ == '__main__': app.run() # 最終調用了run_simple()
main.py數據庫
from flask import Flask,render_template,request,redirect,session,url_for app = Flask(__name__) app.debug = True app.secret_key = 'sdfsdfsdfsdf' USERS = { 1:{'name':'張三','age':18,'gender':'男','text':"道路千萬條"}, 2:{'name':'李四','age':28,'gender':'男','text':"安全第一條"}, 3:{'name':'王五','age':18,'gender':'女','text':"行車不規範"}, } @app.route('/detail/<int:nid>',methods=['GET']) def detail(nid): user = session.get('user_info') if not user: return redirect('/login') info = USERS.get(nid) return render_template('detail.html',info=info) @app.route('/index',methods=['GET']) def index(): user = session.get('user_info') if not user: # return redirect('/login') url = url_for('l1') return redirect(url) return render_template('index.html',user_dict=USERS) @app.route('/login',methods=['GET','POST'],endpoint='l1') def login(): if request.method == "GET": return render_template('login.html') else: # request.query_string user = request.form.get('user') pwd = request.form.get('pwd') if user == 'cxw' and pwd == '123': session['user_info'] = user return redirect('http://www.baidu.com') return render_template('login.html',error='用戶名或密碼錯誤') if __name__ == '__main__': app.run()
detail.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>詳細信息 {{info.name}}</h1> <div> {{info.text}} </div> </body> </html>
index.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>用戶列表</h1> <table> {% for k,v in user_dict.items() %} <tr> <td>{{k}}</td> <td>{{v.name}}</td> <td>{{v['name']}}</td> <td>{{v.get('name')}}</td> <td><a href="/detail/{{k}}">查看詳細</a></td> </tr> {% endfor %} </table> </body> </html>
login.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>用戶登陸</h1> <form method="post"> <input type="text" name="user"> <input type="text" name="pwd"> <input type="submit" value="登陸">{{error}} </form> </body> </html>
-多個裝飾器執行順序
-反向查找的名稱(endpoint),不容許重複
flask中的配置文件是一個flask.config.Config對象(繼承字典),默認配置爲:
{ 'DEBUG': get_debug_flag(default=False), 是否開啓Debug模式 'TESTING': False, 是否開啓測試模式 'PROPAGATE_EXCEPTIONS': None, 'PRESERVE_CONTEXT_ON_EXCEPTION': None, 'SECRET_KEY': None, 'PERMANENT_SESSION_LIFETIME': timedelta(days=31), 'USE_X_SENDFILE': False, 'LOGGER_NAME': None, 'LOGGER_HANDLER_POLICY': 'always', 'SERVER_NAME': None, 'APPLICATION_ROOT': None, 'SESSION_COOKIE_NAME': 'session', 'SESSION_COOKIE_DOMAIN': None, 'SESSION_COOKIE_PATH': None, 'SESSION_COOKIE_HTTPONLY': True, 'SESSION_COOKIE_SECURE': False, 'SESSION_REFRESH_EACH_REQUEST': True, 'MAX_CONTENT_LENGTH': None, 'SEND_FILE_MAX_AGE_DEFAULT': timedelta(hours=12), 'TRAP_BAD_REQUEST_ERRORS': False, 'TRAP_HTTP_EXCEPTIONS': False, 'EXPLAIN_TEMPLATE_LOADING': False, 'PREFERRED_URL_SCHEME': 'http', 'JSON_AS_ASCII': True, 'JSON_SORT_KEYS': True, 'JSONIFY_PRETTYPRINT_REGULAR': True, 'JSONIFY_MIMETYPE': 'application/json', 'TEMPLATES_AUTO_RELOAD': None, }
app.config['DEBUG'] = True PS: 因爲Config對象本質上是字典,因此還可使用app.config.update(...)
#經過py文件配置 app.config.from_pyfile("python文件名稱", silent=True) # 這裏的silent=True就是讓服務器端不爆炸 # 這裏也能夠是txt文件的格式, 可是txt中的配置所有須要大寫 如: settings.py DEBUG = True app.config.from_pyfile("settings.py") #經過環境變量配置 app.config.from_envvar("環境變量名稱") #app.config.from_pyfile(os.environ['YOURAPPLICATION_SETTINGS']) 環境變量的值爲python文件名稱名稱,內部調用from_pyfile方法 app.config.from_json("json文件名稱") JSON文件名稱,必須是json格式,由於內部會執行json.loads app.config.from_mapping({'DEBUG': True}) 字典格式 app.config.from_object("python類或類的路徑") app.config.from_object('pro_flask.settings.TestingConfig') settings.py class Config(object): DEBUG = False TESTING = False DATABASE_URI = 'sqlite://:memory:' class ProductionConfig(Config): DATABASE_URI = 'mysql://user@localhost/foo' class DevelopmentConfig(Config): DEBUG = True class TestingConfig(Config): TESTING = True PS: 從sys.path中已經存在路徑開始寫 PS: settings.py文件默認路徑要放在程序root_path目錄,若是instance_relative_config爲True,則就是instance_path目錄(Flask對象init方法的參數)
@app.route('/detail/<int:nid>',methods=['GET'],endpoint='detail')
DEFAULT_CONVERTERS = { 'default': UnicodeConverter, 'string': UnicodeConverter, 'any': AnyConverter, 'path': PathConverter, 'int': IntegerConverter, 'float': FloatConverter, 'uuid': UUIDConverter, }
""" 1. decorator = app.route('/',methods=['GET','POST'],endpoint='n1') def route(self, rule, **options): # app對象 # rule= / # options = {methods=['GET','POST'],endpoint='n1'} def decorator(f): endpoint = options.pop('endpoint', None) self.add_url_rule(rule, endpoint, f, **options) return f return decorator 2. @decorator decorator(index) """ #同理 def login(): return '登陸' app.add_url_rule('/login', 'n2', login, methods=['GET',"POST"]) #與django路由相似 #django與flask路由:flask路由基於裝飾器,本質是基於:add_url_rule #add_url_rule 源碼中,endpoint若是爲空,endpoint = _endpoint_from_view_func(view_func),最終取view_func.__name__(函數名)
def auth(func): def inner(*args, **kwargs): print('before') result = func(*args, **kwargs) print('after') return result return inner class IndexView(views.View): methods = ['GET'] decorators = [auth, ] def dispatch_request(self): print('Index') return 'Index!' #若是不傳name,這全部返回的都是view,這樣就會報錯,全部人家必須你要傳遞參數 #而後他傳遞給view_func的其實就是你視圖類中的dispatch_request方法。這樣咱們沒有辦法,在一個視圖類中寫多種請求方式 app.add_url_rule('/index', view_func=IndexView.as_view(name='index')) # name=endpoint #或者,一般用此方式 class IndexView(views.MethodView): methods = ['GET'] #cbv添加裝飾,用這個,咱們看as_view中就知道了 decorators = [auth, ] def get(self): return 'Index.GET' def post(self): return 'Index.POST' #若是咱們繼承了MethodView,他幫咱們重寫了,dispatch_request方法,他給咱們作了一個分發,經過請求,來執行不一樣的函數 app.add_url_rule('/index', view_func=IndexView.as_view(name='index')) # name=endpoint
@app.route和app.add_url_rule參數: rule, URL規則 view_func, 視圖函數名稱 defaults = None, 默認值, 當URL中無參數,函數須要參數時,使用defaults = {'k': 'v'} 爲函數提供參數 endpoint = None, 名稱,用於反向生成URL,即: url_for('名稱') methods = None, 容許的請求方式,如:["GET", "POST"] #對URL最後的 / 符號是否嚴格要求 strict_slashes = None ''' @app.route('/index', strict_slashes=False) #訪問http://www.xx.com/index/ 或http://www.xx.com/index都可 @app.route('/index', strict_slashes=True) #僅訪問http://www.xx.com/index ''' #重定向到指定地址 redirect_to = None, ''' @app.route('/index/<int:nid>', redirect_to='/home/<nid>') ''' #子域名訪問 subdomain = None, ''' #C:\Windows\System32\drivers\etc\hosts 127.0.0.1 www.liuqingzheng.com 127.0.0.1 admin.liuqingzheng.com 127.0.0.1 buy.liuqingzheng.com from flask import Flask, views, url_for app = Flask(import_name=__name__) app.config['SERVER_NAME'] = 'liuqingzheng.com:5000' @app.route("/", subdomain="admin") def static_index(): """Flask supports static subdomains This is available at static.your-domain.tld""" return "static.your-domain.tld" #能夠傳入任意的字符串,如傳入的字符串爲aa,顯示爲 aa.liuqingzheng.com @app.route("/dynamic", subdomain="<username>") def username_index(username): """Dynamic subdomains are also supported Try going to user1.your-domain.tld/dynamic""" return username + ".your-domain.tld" if __name__ == '__main__': app.run() 訪問: http://www.liuqingzheng.com:5000/dynamic http://admin.liuqingzheng.com:5000/dynamic http://buy.liuqingzheng.com:5000/dynamic '''
#1 寫類,繼承BaseConverter #2 註冊:app.url_map.converters['regex'] = RegexConverter # 3 使用:@app.route('/index/<regex("\d+"):nid>') 正則表達式會看成第二個參數傳遞到類中 from flask import Flask, views, url_for from werkzeug.routing import BaseConverter app = Flask(import_name=__name__) class RegexConverter(BaseConverter): """ 自定義URL匹配正則表達式 """ def __init__(self, map, regex): super(RegexConverter, self).__init__(map) self.regex = regex def to_python(self, value): """ 路由匹配時,匹配成功後傳遞給視圖函數中參數的值 """ return int(value) def to_url(self, value): """ 使用url_for反向生成URL時,傳遞的參數通過該方法處理,返回的值用於生成URL中的參數 """ val = super(RegexConverter, self).to_url(value) return val # 添加到flask中 app.url_map.converters['regex'] = RegexConverter @app.route('/index/<regex("\d+"):nid>') def index(nid): print(url_for('index', nid='888')) return 'Index' if __name__ == '__main__': app.run()
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>用戶列表</h1> <table> {% for k,v in user_dict.items() %} <tr> <td>{{k}}</td> <td>{{v.name}}</td> <td>{{v['name']}}</td> <td>{{v.get('name')}}</td> <td><a href="/detail/{{k}}">查看詳細</a></td> </tr> {% endfor %} </table> </body> </html>
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>用戶列表</h1> <table> {% for k,v in user_dict.items() %} <tr> <td>{{k}}</td> <td>{{v.name}}</td> <td>{{v['name']}}</td> <td>{{v.get('name')}}</td> <td><a href="/detail/{{k}}">查看詳細</a></td> </tr> {% endfor %} </table> </body> </html>
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>用戶列表</h1> <table> {% if name %} <h1>Hello {{ name }}!</h1> {% else %} <h1>Hello World!</h1> {% endif %} </table> </body> </html>
比django中多能夠加括號,執行函數,傳參數
from flask import Flask,render_template,Markup,jsonify,make_response app = Flask(__name__) def func1(arg): return Markup("<input type='text' value='%s' />" %(arg,)) @app.route('/') def index(): return render_template('index.html',ff = func1) if __name__ == '__main__': app.run()
index.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> {{ff('六五')}} {{ff('六五')|safe}} </body> </html>
注意:
1.Markup等價django的mark_safe ,
2.extends,include如出一轍
from flask import Flask from flask import request from flask import render_template from flask import redirect from flask import make_response app = Flask(__name__) @app.route('/login.html', methods=['GET', "POST"]) def login(): # 請求相關信息 # request.method 提交的方法 # request.args get請求說起的數據 # request.form post請求提交的數據 # request.values post和get提交的數據總和 # request.cookies 客戶端所帶的cookie # request.headers 請求頭 # request.path 不帶域名,請求路徑 # request.full_path 不帶域名,帶參數的請求路徑 # request.script_root # request.url 帶域名帶參數的請求路徑 # request.base_url 帶域名請求路徑 # request.url_root 域名 # request.host_url 域名 # request.host 127.0.0.1:500 # request.files # obj = request.files['the_file_name'] # obj.save('/var/www/uploads/' + secure_filename(f.filename)) # 響應相關信息 # return "字符串" # return render_template('html模板路徑',**{}) # return redirect('/index.html') #return jsonify({'k1':'v1'}) # response = make_response(render_template('index.html')) # response是flask.wrappers.Response類型 # response.delete_cookie('key') # response.set_cookie('key', 'value') # response.headers['X-Something'] = 'A value' # return response return "內容" if __name__ == '__main__': app.run()
cookie:存放在客戶端的鍵值對 session:存放在客戶端的鍵值對 token:存放在客戶端,經過算法來校驗
在使用session以前必須如今設置一下密鑰
app.secret_key="asdas" #值隨便
除請求對象以外,還有一個 session 對象。它容許你在不一樣請求間存儲特定用戶的信息。它是在 Cookies 的基礎上實現的,而且對 Cookies 進行密鑰簽名要使用會話,你須要設置一個密鑰。 (app.session_interface對象)
設置:session['username'] = 'xxx' #在django中發什麼三件事,1,生成一個隨機的字符串 2 往數據庫存 3 寫入cookie返回瀏覽器 #在flask中他沒有數據庫,但session是怎樣實現的? # 生成一個密鑰寫入這個cookie,而後下次請求的時候,經過這個cookie解密,而後賦值給session #咱們經過app.session_interface來查看 刪除:session.pop('username', None)
key, 鍵 value='', 值 max_age=None, 超時時間 cookie須要延續的時間(以秒爲單位)若是參數是\ None`` ,這個cookie會延續到瀏覽器關閉爲止 expires=None, 超時時間(IE requires expires, so set it if hasn't been already.) path='/', Cookie生效的路徑,/ 表示根路徑,特殊的:根路徑的cookie能夠被任何url的頁面訪問,瀏覽器只會把cookie回傳給帶有該路徑的頁面,這樣能夠避免將cookie傳給站點中的其餘的應用。 domain=None, Cookie生效的域名 你可用這個參數來構造一個跨站cookie。如, domain=".example.com"所構造的cookie對下面這些站點都是可讀的:www.example.com 、 www2.example.com 和an.other.sub.domain.example.com 。若是該參數設置爲 None ,cookie只能由設置它的站點讀取 secure=False, 瀏覽器將經過HTTPS來回傳cookie httponly=False 只能http協議傳輸,沒法被JavaScript獲取(不是絕對,底層抓包能夠獲取到也能夠被覆蓋)
-save_seesion -響應的時候,把session中的值加密序列化放大到了cookie中,返回到瀏覽器中 -open_session -請求來了,從cookie中取出值,反解,生成session對象,之後再視圖函數中直接用sessoin就能夠了。
-設置:flash('aaa') -取值:get_flashed_message() - -假設在a頁面操做出錯,跳轉到b頁面,在b頁面顯示a頁面的錯誤信息
示例:
from flask import Flask,flash,get_flashed_messages,request,redirect app = Flask(__name__) app.secret_key = 'asdfasdf' @app.route('/index') def index(): # 從某個地方獲取設置過的全部值,並清除。 val = request.args.get('v') if val == 'oldboy': return 'Hello World!' flash('超時錯誤',category="x1") return "ssdsdsdfsd" # return redirect('/error') @app.route('/error') def error(): """ 展現錯誤信息 :return: 若是get_flashed_messages(with_category=True) """ data = get_flashed_messages(category_filter=['x1']) if data: msg = data[0] else: msg = "..." return "錯誤信息:%s" %(msg,) if __name__ == '__main__': app.run()
類比django中間件中的process_request,在請求收到以前綁定一個函數作一些事情
#基於它作用戶登陸認證 @app.before_request def process_request(*args,**kwargs): if request.path == '/login': return None user = session.get('user_info') if user: return None return redirect('/login')
類比django中間件中的process_response,每個請求以後綁定一個函數,若是請求沒有異常
@app.after_request def process_response1(response): print('process_response1 走了') return response
第一次請求時,跟瀏覽器無關
@app.before_first_request def first(): pass
每個請求以後綁定一個函數,即便遇到了異常
@app.teardown_request def ter(e): pass
路徑不存在時404,服務器內部錯誤500
@app.errorhandler(404) def error_404(arg): return "404錯誤了"
標籤
@app.template_global() def sb(a1, a2): return a1 + a2 #{{sb(1,2)}}
過濾器
@app.template_filter() def db(a1, a2, a3): return a1 + a2 + a3 #{{ 1|db(2,3)}}
總結:
1 重點掌握before_request和after_request,
2 注意有多個的狀況,執行順序
3 before_request請求攔截後(也就是有return值),response全部都執行
from flask import Flask app = Flask(__name__) @app.route('/') def index(): return 'Hello World!' # 模擬中間件 class Md(object): def __init__(self,old_wsgi_app): self.old_wsgi_app = old_wsgi_app def __call__(self, environ, start_response): print('開始以前') ret = self.old_wsgi_app(environ, start_response) print('結束以後') return ret if __name__ == '__main__': #1咱們發現當執行app.run方法的時候,最終執行run_simple,最後執行app(),也就是在執行app.__call__方法 #2 在__call__裏面,執行的是self.wsgi_app().那咱們但願在執行他自己的wsgi以前作點事情。 #3 因此咱們先用Md類中__init__,保存以前的wsgi,而後咱們用將app.wsgi轉化成Md的對象。 #4 那執行新的的app.wsgi_app,就是執行Md的__call__方法。 #把原來的wsgi_app替換爲自定義的, app.wsgi_app = Md(app.wsgi_app) app.run()
請求全部的流程
ctx = self.request_context(environ) error = None try: try: ctx.push() #根據路徑去執行視圖函數,視圖類 response = self.full_dispatch_request() except Exception as e: error = e response = self.handle_exception(e) except: # noqa: B001 error = sys.exc_info()[1] raise return response(environ, start_response) finally: #無論出不出異常,都會走這裏 if self.should_ignore_error(error): error = None ctx.auto_pop(error)
對程序進行目錄結構劃分
目錄結構:
-templates -views -__init__.py -user.py -order.py -app.py
app.py
from views import app if __name__ == '__main__': app.run()
init.py
from flask import Flask,request app = Flask(__name__) #不導入這個不行 from . import account from . import order from . import user
user.py
from . import app @app.route('/user') def user(): return 'user'
order.py
from . import app @app.route('/order') def order(): return 'order'
詳見代碼:pro_flask_簡單應用程序目錄示例.zip
目錄結構:
-flask_pro -flask_test -__init__.py -static -templates -views -order.py -user.py -manage.py
__init__.py
from flask import Flask app=Flask(__name__) from flask_test.views import user from flask_test.views import order app.register_blueprint(user.us) app.register_blueprint(order.ord)
manage.py
from flask_test import app if __name__ == '__main__': app.run(port=8008)
user.py
from flask import Blueprint us=Blueprint('user',__name__) @us.route('/login') def login(): return 'login'
order.py
from flask import Blueprint ord=Blueprint('order',__name__) @ord.route('/test') def test(): return 'order test'
詳見代碼:pro_flask_大型應用目錄示例.zip
總結:
1 xxx = Blueprint(‘account’, name,url_prefix=’/xxx’) :藍圖URL前綴,表示url的前綴,在該藍圖下全部url都加前綴
2 xxx = Blueprint(‘account’, name,url_prefix=’/xxx’,template_folder=’tpls’):給當前藍圖單獨使用templates,向上查找,當前找不到,會找總templates
3 藍圖的befort_request,對當前藍圖有效
4 大型項目,能夠模擬出相似於django中app的概念
第一階段:將ctx(request,session)放到Local對象上 第二階段:視圖函數導入:request/session request.method -LocalProxy對象.method,執行getattr方法,getattr(self._get_current_object(), name) -self._get_current_object()返回return self.__local(),self.__local(),在LocakProxy實例化的時候,object.__setattr__(self, '_LocalProxy__local', local),此處local就是:partial(_lookup_req_object, 'request') -def _lookup_req_object(name): top = _request_ctx_stack.top #_request_ctx_stack 就是LocalStack()對象,top方法把ctx取出來 if top is None: raise RuntimeError(_request_ctx_err_msg) return getattr(top, name)#獲取ctx中的request或session對象 第三階段:請求處理完畢 - 獲取session並保存到cookie - 將ctx刪除
程序運行,兩個LocalStack()對象,一個裏面放request和session,另外一個放g和current_app
專門用來存儲用戶信息的g對象,g的全稱的爲global
g對象在一次請求中的全部的代碼的地方,都是可使用的
session對象是能夠跨request的,只要session還未失效,不一樣的request的請求會獲取到同一個session,可是g對象不是,g對象不須要管過時時間,請求一次就g對象就改變了一次,或者從新賦值了一次
g.name=cxw
做用:將默認保存的簽名cookie中的值 保存到 redis/memcached/file/Mongodb/SQLAlchemy
安裝:pip3 install flask-session
使用1:
from flask import Flask,session from flask_session import RedisSessionInterface import redis app = Flask(__name__) conn=redis.Redis(host='127.0.0.1',port=6379) #use_signer是否對key簽名 #若是use_siginer爲False,這表示不須要配置app.secret_key app.session_interface=RedisSessionInterface(conn,key_prefix='lqz') @app.route('/') def hello_world(): session['name']='lqz' return 'Hello World!' if __name__ == '__main__': app.run()
使用2:
from redis import Redis from flask_session import Session app.config['SESSION_TYPE'] = 'redis' app.config['SESSION_REDIS'] = Redis(host='192.168.0.94',port='6379') Session(app)
問題:設置cookie時,如何設定關閉瀏覽器則cookie失效。
response.set_cookie('k','v',exipre=None)#這樣設置便可 #在session中設置 app.session_interface=RedisSessionInterface(conn,key_prefix='lqz',permanent=False) #通常不用,咱們通常都設置超時時間,多長時間後失效
問題:cookie默認超時時間是多少?如何設置超時時間
#源碼expires = self.get_expiration_time(app, session) 'PERMANENT_SESSION_LIFETIME': timedelta(days=31),#這個配置文件控制
import pymysql conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123456', db='s8day127db') cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # cursor.execute("select id,name from users where name=%s and pwd=%s",['lqz','123',]) cursor.execute("select id,name from users where name=%(user)s and pwd=%(pwd)s",{'user':'lqz','pwd':'123'}) obj = cursor.fetchone() conn.commit() cursor.close() conn.close() print(obj)
爲每一個線程建立一個鏈接,線程即便調用了close方法,也不會關閉,只是把鏈接從新放到鏈接池,供本身線程再次使用。當線程終止時,鏈接自動關閉
from DBUtils.PersistentDB import PersistentDB import pymysql POOL = PersistentDB( creator=pymysql, # 使用連接數據庫的模塊 maxusage=None, # 一個連接最多被重複使用的次數,None表示無限制 setsession=[], # 開始會話前執行的命令列表。 ping=0, # ping MySQL服務端,檢查是否服務可用。 closeable=False, # 若是爲False時, conn.close() 實際上被忽略,供下次使用,再線程關閉時,纔會自動關閉連接。若是爲True時, conn.close()則關閉連接,那麼再次調用pool.connection時就會報錯,由於已經真的關閉了鏈接(pool.steady_connection()能夠獲取一個新的連接) threadlocal=None, # 本線程獨享值得對象,用於保存連接對象,若是連接對象被重置 host='127.0.0.1', port=3306, user='root', password='123456', database='test', charset='utf8' ) def func(): conn = POOL.connection(shareable=False) cursor = conn.cursor() cursor.execute('select * from user') result = cursor.fetchall() print(result) cursor.close() conn.close() if __name__ == '__main__': func()
模式二:建立一批鏈接到鏈接池,供全部線程共享使用
setting.py
from datetime import timedelta from redis import Redis import pymysql from DBUtils.PooledDB import PooledDB, SharedDBConnection class Config(object): DEBUG = True SECRET_KEY = "umsuldfsdflskjdf" PERMANENT_SESSION_LIFETIME = timedelta(minutes=20) SESSION_REFRESH_EACH_REQUEST= True SESSION_TYPE = "redis" PYMYSQL_POOL = PooledDB( creator=pymysql, # 使用連接數據庫的模塊 maxconnections=6, # 鏈接池容許的最大鏈接數,0和None表示不限制鏈接數 mincached=2, # 初始化時,連接池中至少建立的空閒的連接,0表示不建立 maxcached=5, # 連接池中最多閒置的連接,0和None不限制 maxshared=3, # 連接池中最多共享的連接數量,0和None表示所有共享。PS: 無用,由於pymysql和MySQLdb等模塊的 threadsafety都爲1,全部值不管設置爲多少,_maxcached永遠爲0,因此永遠是全部連接都共享。 blocking=True, # 鏈接池中若是沒有可用鏈接後,是否阻塞等待。True,等待;False,不等待而後報錯 maxusage=None, # 一個連接最多被重複使用的次數,None表示無限制 setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='123456', database='s8day127db', charset='utf8' ) class ProductionConfig(Config): SESSION_REDIS = Redis(host='192.168.0.94', port='6379') class DevelopmentConfig(Config): SESSION_REDIS = Redis(host='127.0.0.1', port='6379') class TestingConfig(Config): pass
utils/sql.py
import pymysql from settings import Config class SQLHelper(object): @staticmethod def open(cursor): POOL = Config.PYMYSQL_POOL conn = POOL.connection() cursor = conn.cursor(cursor=cursor) return conn,cursor @staticmethod def close(conn,cursor): conn.commit() cursor.close() conn.close() @classmethod def fetch_one(cls,sql,args,cursor =pymysql.cursors.DictCursor): conn,cursor = cls.open(cursor) cursor.execute(sql, args) obj = cursor.fetchone() cls.close(conn,cursor) return obj @classmethod def fetch_all(cls,sql, args,cursor =pymysql.cursors.DictCursor): conn, cursor = cls.open(cursor) cursor.execute(sql, args) obj = cursor.fetchall() cls.close(conn, cursor) return obj
使用:
obj = SQLHelper.fetch_one("select id,name from users where name=%(user)s and pwd=%(pwd)s", form.data)
安裝:pip3 install wtforms
from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__, template_folder='templates') app.debug = True class LoginForm(Form): # 字段(內部包含正則表達式) name = simple.StringField( label='用戶名', validators=[ validators.DataRequired(message='用戶名不能爲空.'), validators.Length(min=6, max=18, message='用戶名長度必須大於%(min)d且小於%(max)d') ], widget=widgets.TextInput(), # 頁面上顯示的插件 render_kw={'class': 'form-control'} ) # 字段(內部包含正則表達式) pwd = simple.PasswordField( label='密碼', validators=[ validators.DataRequired(message='密碼不能爲空.'), validators.Length(min=8, message='用戶名長度必須大於%(min)d'), validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}", message='密碼至少8個字符,至少1個大寫字母,1個小寫字母,1個數字和1個特殊字符') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'GET': form = LoginForm() return render_template('login.html', form=form) else: form = LoginForm(formdata=request.form) if form.validate(): print('用戶提交數據經過格式驗證,提交的值爲:', form.data) else: print(form.errors) return render_template('login.html', form=form) if __name__ == '__main__': app.run()
login.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>登陸</h1> <form method="post"> <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p> <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p> <input type="submit" value="提交"> </form> </body> </html>
from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import core from wtforms.fields import html5 from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__, template_folder='templates') app.debug = True class RegisterForm(Form): name = simple.StringField( label='用戶名', validators=[ validators.DataRequired() ], widget=widgets.TextInput(), render_kw={'class': 'form-control'}, default='cxw' ) pwd = simple.PasswordField( label='密碼', validators=[ validators.DataRequired(message='密碼不能爲空.') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) pwd_confirm = simple.PasswordField( label='重複密碼', validators=[ validators.DataRequired(message='重複密碼不能爲空.'), validators.EqualTo('pwd', message="兩次密碼輸入不一致") ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) email = html5.EmailField( label='郵箱', validators=[ validators.DataRequired(message='郵箱不能爲空.'), validators.Email(message='郵箱格式錯誤') ], widget=widgets.TextInput(input_type='email'), render_kw={'class': 'form-control'} ) gender = core.RadioField( label='性別', choices=( (1, '男'), (2, '女'), ), #這句話的意思是上面的choices元組的第一個值是int類型 #若是上上面爲(‘1’, '男'),(‘2’, '女'),則下面的coerce則不用寫 coerce=int # 「1」 「2」 ) #這裏是單選框 city = core.SelectField( label='城市', choices=( ('bj', '北京'), ('sh', '上海'), ) ) #這裏是多選框 hobby = core.SelectMultipleField( label='愛好', choices=( (1, '籃球'), (2, '足球'), ), coerce=int ) #這裏是多選的checkbox favor = core.SelectMultipleField( label='喜愛', choices=( (1, '籃球'), (2, '足球'), ), widget=widgets.ListWidget(prefix_label=False), option_widget=widgets.CheckboxInput(), coerce=int, default=[1, 2] ) #這裏能夠改值 def __init__(self, *args, **kwargs): super(RegisterForm, self).__init__(*args, **kwargs) self.favor.choices = ((1, '籃球'), (2, '足球'), (3, '羽毛球')) def validate_pwd_confirm(self, field): """ 自定義pwd_confirm字段規則,例:與pwd字段是否一致 :param field: :return: """ # 最開始初始化時,self.data中已經有全部的值 if field.data != self.data['pwd']: # raise validators.ValidationError("密碼不一致") # 繼續後續驗證 raise validators.StopValidation("密碼不一致") # 再也不繼續後續驗證 @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'GET': #這裏能夠傳默認值 form = RegisterForm(data={'gender': 2,'hobby':[1,]}) # initial return render_template('register.html', form=form) else: form = RegisterForm(formdata=request.form) if form.validate(): print('用戶提交數據經過格式驗證,提交的值爲:', form.data) else: print(form.errors) return render_template('register.html', form=form) if __name__ == '__main__': app.run()
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>用戶註冊</h1> <form method="post" novalidate style="padding:0 50px"> {% for field in form %} <p>{{field.label}}: {{field}} {{field.errors[0] }}</p> {% endfor %} <input type="submit" value="提交"> </form> </body> </html>
Flask框架中的信號基於blinker,其主要就是讓開發者但是在flask請求過程當中定製一些用戶行爲
安裝:pip3 install blinker
內置信號:
request_started = _signals.signal('request-started') # 請求到來前執行 request_finished = _signals.signal('request-finished') # 請求結束後執行 before_render_template = _signals.signal('before-render-template') # 模板渲染前執行 template_rendered = _signals.signal('template-rendered') # 模板渲染後執行 got_request_exception = _signals.signal('got-request-exception') # 請求執行出現異常時執行 request_tearing_down = _signals.signal('request-tearing-down') # 請求執行完畢後自動執行(不管成功與否) appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 應用上下文執行完畢後自動執行(不管成功與否) appcontext_pushed = _signals.signal('appcontext-pushed') # 應用上下文push時執行 appcontext_popped = _signals.signal('appcontext-popped') # 應用上下文pop時執行 message_flashed = _signals.signal('message-flashed') # 調用flask在其中添加數據時,自動觸發
使用信號:
from flask import Flask,signals,render_template app = Flask(__name__) # 往信號中註冊函數 def func(*args,**kwargs): print('觸發型號',args,kwargs) signals.request_started.connect(func) # 觸發信號: signals.request_started.send() @app.before_first_request def before_first1(*args,**kwargs): pass @app.before_first_request def before_first2(*args,**kwargs): pass @app.before_request def before_first3(*args,**kwargs): pass @app.route('/',methods=['GET',"POST"]) def index(): print('視圖') return render_template('index.html') if __name__ == '__main__': app.wsgi_app app.run()
一個流程中的信號觸發點(瞭解)
a. before_first_request b. 觸發 request_started 信號 c. before_request d. 模板渲染 渲染前的信號 before_render_template.send(app, template=template, context=context) rv = template.render(context) # 模板渲染 渲染後的信號 template_rendered.send(app, template=template, context=context) e. after_request f. session.save_session() g. 觸發 request_finished信號 若是上述過程出錯: 觸發錯誤處理信號 got_request_exception.send(self, exception=e) h. 觸發信號 request_tearing_down
自定義信號(瞭解):
from flask import Flask, current_app, flash, render_template from flask.signals import _signals app = Flask(import_name=__name__) # 自定義信號 xxxxx = _signals.signal('xxxxx') #必須有一個位置參數,去接收他的發送者, def func(sender, *args, **kwargs): print(sender) # 自定義信號中註冊函數 xxxxx.connect(func) @app.route("/x") def index(): # 觸發信號,這裏的第一是發送者,第二個參數可選的話,必須是鍵值對 xxxxx.send('123123', k1='v1') return 'Index' if __name__ == '__main__': app.run()
from werkzeug.wsgi import DispatcherMiddleware from werkzeug.serving import run_simple from flask import Flask, current_app app1 = Flask('app01') app2 = Flask('app02') @app1.route('/index') def index(): return "app01" @app2.route('/index2') def index2(): return "app2" # http://www.oldboyedu.com/index # http://www.oldboyedu.com/sec/index2 dm = DispatcherMiddleware(app1, { '/sec': app2, }) if __name__ == "__main__": run_simple('localhost', 5000, dm)
用於實現相似於django中 python3 manage.py runserver ..…相似的命令
安裝:pip3 install flask-script
from flask_script import Manager app = Flask(__name__) manager=Manager(app) ... if __name__ == '__main__': manager.run() #之後在執行,直接:python3 manage.py runserver #python3 manage.py runserver --help
@manager.command def custom(arg): """ 自定義命令 python manage.py custom 123 :param arg: :return: """ print(arg) @manager.option('-n', '--name', dest='name') #@manager.option('-u', '--url', dest='url') def cmd(name, url): """ 自定義命令(-n也能夠寫成--name) 執行: python manage.py cmd -n lqz -u http://www.oldboyedu.com 執行: python manage.py cmd --name lqz --url http://www.oldboyedu.com :param name: :param url: :return: """ print(name, url) #有什麼用? #把excel的數據導入數據庫,定製個命令,去執行
pip3 install flask_admin
from flask import Flask from flask_admin import Admin app = Flask(__name__) #將app註冊到adminzhong admin = Admin(app) if __name__=="mian": app.run() #訪問 #127.0.0.1:5000/admin端口,會獲得一個空白的頁面
#在將表註冊以前應該對app進行配置 SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:@127.0.0.1:3307/py9api?charset=utf8mb4" SQLALCHEMY_POOL_SIZE = 5 SQLALCHEMY_POOL_TIMEOUT = 30 SQLALCHEMY_POOL_RECYCLE = -1 #導入models文件的中的表模型 from flask_admin.contrib.sqla import ModelView from api.models import Stock,Product,Images,Category,Wxuser,Banner admin.add_view(ModelView(Stock, db.session)) admin.add_view(ModelView(Product, db.session)) admin.add_view(ModelView(Category, db.session))
#配置上傳文件的路徑 #導入from flask_admin.contrib.fileadmin import FileAdmin from flask_admin.contrib.fileadmin import FileAdmin,form file_path = op.join(op.dirname(__file__), 'static') admin = Admin(app) admin.add_view(FileAdmin(file_path, '/static/', name='文件')) #若是有個字段要是上傳文件重寫該方法的modleView類,假設imgae_url是文件圖片的字段 class ImagesView(ModelView): form_extra_fields = { 'image_url': form.ImageUploadField('Image', base_path=file_path, relative_path='uploadFile/' ) } admin.add_view(ImagesView(Images, db.session))