一、短小精悍、可擴展性強 的一個web框架html
注意:上下文管理機制前端
二、依賴wsgi:werkzurghtml5
from werkzeug.wrappers import Request, Response @Request.application def hello(request): return Response('Hello World!') if __name__ == '__main__': from werkzeug.serving import run_simple run_simple('localhost', 4000, hello)
注意__init__和__call__的區別:python
class Foo(): def __init__(self): print('init') def __call__(self, *args, **kwargs): print('call') a = Foo() a() #init #call
一個簡單的flask代碼mysql
from flask import Flask app = Flask(__name__)
@app.route('/index') def index(): return 'hello world' if __name__ == '__main__': app.run('local
一、template能夠相似django修改git
二、static有兩種方式,github
A: a、 static_folder="staticccc" b、將static文件夾命名爲 staticccc c、img引用的時候用staticccc/xxxxx
B: a、 static_url_path='/vvvvv' b、 img引用的時候用 /vvvvv/xxxxx
from flask import Flask,render_template,request,redirect,session # app = Flask(__name__,template_folder="templates",static_folder="staticccc",static_url_path='/vvvvv') app = Flask(__name__,template_folder="templates",static_folder="static") app.secret_key = 'asdfasdf' @app.route('/login',methods=["GET","POST"]) def login(): if request.method == 'GET': return render_template('login.html')
# request.args 獲取的是get的信息 user = request.form.get('user') #獲取的是post的信息 pwd = request.form.get('pwd') if user == 'oldboy' and pwd == '666': session['user'] = user return redirect('/index') return render_template('login.html',error='用戶名或密碼錯誤') # return render_template('login.html',**{"error":'用戶名或密碼錯誤'}) @app.route('/index') def index(): user = session.get('user') if not user: return redirect('/login') return render_template('index.html') if __name__ == '__main__': app.run()
index.htmlweb
<body> <h1>歡迎使用</h1> <img src="/static/111.png" alt=""> </body>
login.html正則表達式
<body> <h1>用戶登陸</h1> <form method="post"> <input type="text" name="user"> <input type="password" name="pwd"> <input type="submit" value="提交">{{error}} </form> </body>
1、配置文件 2、路由系統 3、視圖 4、請求相關 5、響應 6、模板渲染 7、session 8、閃現 9、中間件 10、藍圖(blueprint) 11、特殊裝飾器
django:是一個大而全的框架,內部提供了不少組件,如orm,admin,forms,分頁等不少方便的組件,只需在配置裏配置下就能夠了redis
flask: 是一個輕量級的框架,短小精悍,可擴展性很強,適用於開發小的網站,他提供了不少第三方的組件,只須要跟這些組件結合起來也能夠創造一個相似django的框架 ,可定製型強
就我我的而言,我比較偏向於。。。。。
request/session,
django須要導入,參數;flask直接調用
--模板+靜態文件 ,app=Flask(__name__,......)
--路由
@app.route('/index',methods=['GET'])
--請求
request.form
request.args
request.method
--響應
「」
render
redirect
--session
session['xx']=123
session.get('xx')
settings.py
class Foo(): DEBUG = True TEXT = True
test.py
import importlib path = 'settings.Foo' p,c = path.rsplit('.',maxsplit=1) m = importlib.import_module(p) cls = getattr(m,c) for key in dir(cls): if key.isupper(): print(key,getattr(cls,key))
#DEBUG True
#TEXT True
app.config.from_object('settings.Dev')
test.py
from flask import Flask app = Flask(__name__) print(app.config) app.config.from_object('settings.Dev') print(app.config) if __name__ == '__main__': app.run()
settings.py
class Base(object): #不管哪一種環境都須要的類 XX = 123 class Pro(Base): #生產環境 DEBUG = False class Dev(Base): #開發環境 DEBUG = True
實例:
from flask import Flask,url_for app = Flask(__name__) app.route('/index/<int:nid>',methods=['GET','POST']) def index(nid): print(url_for("index",nid=888))
知識點:
---endpoint,反向生成URL,默認函數名 ---url_for('endpoint') ---動態路由: /index/<int:nid> def index(nid): print(nid) return "index"
請求相關消息
# 請求相關信息
# request.method
# request.args
# request.form
# request.values
# request.cookies
# request.headers
# request.path
# request.full_path
# request.script_root
# request.url
# request.base_url
# request.url_root
# request.host_url
# request.host
# request.files
響應相關消息
A、響應體
return render_template('xxx.html') return redirect('/index') return 'xxx' return jsonify({'k1':'v1'})
B、響應頭
obj = make_response('asdf') obj.headers['xxxx'] = '123' obj.set_cookie('key','value') return obj
版本一幾乎不用,版本二在某些函數前作定製,版本在全局使用的時候用到
版本一: @app.route('/index') def index(): if not session.get('user'): return redirect(url_for('login')) return render_template('index.html',stu_dic=STUDENT_DICT) 版本二: import functools def auth(func): @functools.wraps(func) def inner(*args,**kwargs): if not session.get('user'): return redirect(url_for('login')) ret = func(*args,**kwargs) return ret return inner @app.route('/index') @auth def index(): return render_template('index.html',stu_dic=STUDENT_DICT) 應用場景:比較少的函數中須要額外添加功能。 版本三:before_request @app.before_request def xxxxxx(): if request.path == '/login': return None if session.get('user'): return None return redirect('/login')
app.py
from flask import Flask,render_template,request,redirect,session,url_for,jsonify,make_response,Markup,flash,get_flashed_messages app = Flask(__name__) app.config.from_object("settings.DevelopmentConfig") STUDENT_DICT = { 1:{'name':'王龍泰','age':38,'gender':'中'}, 2:{'name':'小東北','age':73,'gender':'男'}, 3:{'name':'田碩','age':84,'gender':'男'}, } @app.before_request def xxxxxx(): if request.path == '/login': return None if session.get('user'): return None return redirect('/login') @app.route('/login',methods=["GET","POST"]) def login(): print('login') if request.method == 'GET': return render_template('login.html') user = request.form.get('user') pwd = request.form.get('pwd') if user == 'oldboy' and pwd == '666': session['user'] = user return redirect('/index') return render_template('login.html',error='用戶名或密碼錯誤') @app.route('/index') def index(): print('index') return render_template('index.html',stu_dic=STUDENT_DICT) @app.route('/delete/<int:nid>') def delete(nid): del STUDENT_DICT[nid] return redirect(url_for('index')) @app.route('/detail/<int:nid>') def detail(nid): info = STUDENT_DICT[nid] return render_template('detail.html',info=info) if __name__ == '__main__': app.run()
login.html
<body> <h1>用戶登陸</h1> <form method="post"> <input type="text" name="user"> <input type="password" name="pwd"> <input type="submit" value="提交">{{error}} </form>
index.html
<body> <h1>學生列表</h1> <table border="1"> <thead> <tr> <th>ID</th> <th>姓名</th> <th>年齡</th> <th>性別</th> <th>選項</th> </tr> </thead> <tbody> {% for k,v in stu_dic.items() %} <tr> <td>{{k}}</td> <td>{{v.name }}</td> <td>{{v.age}}</td> <td>{{v.gender}}</td> <td> <a href="/detail/{{k}}">查看詳細</a> | <a href="/delete/{{k}}">刪除</a> </td> </tr> {% endfor %} </tbody> </table> </body>
detail.html
<body> <h1>學生詳細</h1> <ul> {% for item in info.values() %} <li>{{item}}</li> {% endfor %} </ul> </body>
6. 模板渲染 - 基本數據類型:能夠執行python語法,如:dict.get() list['xx'] - 傳入函數 - django,自動執行 - flask,不自動執行 - 全局定義函數 @app.template_global() def sb(a1, a2): # {{sb(1,9)}} return a1 + a2 @app.template_filter() def db(a1, a2, a3): # {{ 1|db(2,3) }} return a1 + a2 + a3 - 模板繼承 layout.html <!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <title>Title</title> <meta name="viewport" content="width=device-width, initial-scale=1"> </head> <body> <h1>模板</h1> {% block content %}{% endblock %} </body> </html> tpl.html {% extends "layout.html"%} {% block content %} {{users.0}} {% endblock %} - include {% include "form.html" %} form.html <form> asdfasdf asdfasdf asdf asdf </form> - 宏 {% macro ccccc(name, type='text', value='') %} <h1>宏</h1> <input type="{{ type }}" name="{{ name }}" value="{{ value }}"> <input type="submit" value="提交"> {% endmacro %} {{ ccccc('n1') }} {{ ccccc('n2') }} - 安全 - 前端: {{u|safe}} - 前端: MarkUp("asdf")
當請求剛到來:flask讀取cookie中session對應的值:eyJrMiI6NDU2LCJ1c2VyIjoib2xkYm95,將該值解密並反序列化成字典,放入內存以便視圖函數使用。 視圖函數: @app.route('/ses') def ses(): session['k1'] = 123 session['k2'] = 456 del session['k1'] return "Session" session['xxx'] = 123 session['xxx'] 當請求結束時,flask會讀取內存中字典的值,進行序列化+加密,寫入到用戶cookie中。
settings.py
from datetime import timedelta class Config(object): DEBUG = False TESTING = False SECRET_KEY = "asdfasdfas23" DATABASE_URI = 'sqlite://:memory:' SESSION_COOKIE_NAME = 'session' SESSION_COOKIE_DOMAIN = None SESSION_COOKIE_PATH = None SESSION_COOKIE_HTTPONLY = True SESSION_COOKIE_SECURE = False SESSION_REFRESH_EACH_REQUEST = True PERMANENT_SESSION_LIFETIME = timedelta(hours=1)
在session中存儲一個數據,讀取時經過pop將數據移除。 from flask import Flask,flash,get_flashed_messages @app.route('/page1') def page1(): flash('臨時數據存儲','error') flash('sdfsdf234234','error') flash('adasdfasdf','info') return "Session" @app.route('/page2') def page2(): print(get_flashed_messages(category_filter=['error'])) return "Session"
- call方法何時出發? - 用戶發起請求時,才執行。 - 任務:在執行call方法以前,作一個操做,call方法執行以後作一個操做。 class Middleware(object): def __init__(self,old): self.old = old def __call__(self, *args, **kwargs): ret = self.old(*args, **kwargs) return ret if __name__ == '__main__': app.wsgi_app = Middleware(app.wsgi_app) app.run()
1. before_request 2. after_request 示例: from flask import Flask app = Flask(__name__) @app.before_request def x1(): print('before:x1') return '滾' @app.before_request def xx1(): print('before:xx1') @app.after_request def x2(response): print('after:x2') return response @app.after_request def xx2(response): print('after:xx2') return response @app.route('/index') def index(): print('index') return "Index" @app.route('/order') def order(): print('order') return "order" if __name__ == '__main__': app.run() 3. before_first_request from flask import Flask app = Flask(__name__) @app.before_first_request def x1(): print('123123') @app.route('/index') def index(): print('index') return "Index" @app.route('/order') def order(): print('order') return "order" if __name__ == '__main__': app.run() 4. template_global 5. template_filter 6. errorhandler @app.errorhandler(404) def not_found(arg): print(arg) return "沒找到"
1. django和flask區別? 2.什麼是wsgi? web服務網關接口,wsgi是一個協議,實現該寫一個的模塊: - wsgiref - werkzeug 實現其協議的模塊本質上就是socket服務端用於接收用戶請求,並處理。 通常web框架基於wsgi實現,這樣實現關注點分離。 wsgiref示例: from wsgiref.simple_server import make_server def run_server(environ, start_response): start_response('200 OK', [('Content-Type', 'text/html')]) return [bytes('<h1>Hello, web!</h1>', encoding='utf-8'), ] if __name__ == '__main__': httpd = make_server('127.0.0.1', 8000, run_server) httpd.serve_forever() werkzeug示例: from werkzeug.wrappers import Response from werkzeug.serving import run_simple def run_server(environ, start_response): response = Response('hello') return response(environ, start_response) if __name__ == '__main__': run_simple('127.0.0.1', 8000, run_server) Flask源碼入口: from werkzeug.wrappers import Response from werkzeug.serving import run_simple class Flask(object): def __call__(self,environ, start_response): response = Response('hello') return response(environ, start_response) def run(self): run_simple('127.0.0.1', 8000, self) app = Flask() if __name__ == '__main__': app.run() 3. Flask提供功能 - 配置文件 - 全部配置都在app.config中 - app.config["xx"] = 123 - app.config.from_object("類的路徑") - 應用:importlib、getattr - django中間件 - rest framework全局配置 - session - 加密後放置在用戶瀏覽器的cookie中。 - 流程: - 請求到來 - 視圖函數 - 請求結束 - 配置文件 - 閃現 - 基於session實現 - 路由 - 裝飾器(帶參數) - 自定義裝飾器放下面 - 參數 - url_for - 視圖 - FBV - 請求和響應 - 請求:request - 響應: 4種 - 模板 - ... - 特殊裝飾器 - before_first_request - before_request - after_request - template_global() - template_filter() - errorhandler(404) - 中間件 今日內容: 1. 路由+視圖 2. session實現原理(源碼) 3. 藍圖 4. threading.local 5. 上下文管理(第一次)
a. 路由設置的兩種方式: @app.route('/xxx') def index(): return "index" def index(): return "index" app.add_url_rule("/xxx",None,index) 注意事項: - 不用讓endpoint重名 - 若是重名函數也必定要相同。
rule, URL規則 view_func, 視圖函數名稱 endpoint=None, 名稱,用於反向生成URL,即: url_for('名稱') methods=None, 容許的請求方式,如:["GET","POST"] strict_slashes=None, 對URL最後的 / 符號是否嚴格要求, redirect_to=None, 重定向到指定地址 defaults=None, 默認值,當URL中無參數,函數須要參數時,使用defaults={'k':'v'}爲函數提供參數 subdomain=None, 子域名訪問
實例
須要去hosts文件裏將 127.0.0.1 wupeiqi.com,將ip和域名配置下
from flask import Flask, views, url_for app = Flask(import_name=__name__) app.config['SERVER_NAME'] = 'wupeiqi.com:5000' """ 127.0.0.1 wupeiqi.com 127.0.0.1 web.wupeiqi.com 127.0.0.1 admin.wupeiqi.com """ # http://admin.wupeiqi.com:5000/ @app.route("/", subdomain="admin") def admin_index(): return "admin.your-domain.tld" # http://web.wupeiqi.com:5000/ @app.route("/", subdomain="web") def web_index(): return "web.your-domain.tld" # http://sdsdf.wupeiqi.com:5000/ # http://sdfsdf.wupeiqi.com:5000/ # http://asdf.wupeiqi.com:5000/ @app.route("/dynamic", subdomain="<username>") def username_index(username): """Dynamic subdomains are also supported Try going to user1.your-domain.tld/dynamic""" return username + ".your-domain.tld" if __name__ == '__main__': app.run()
import functools from flask import Flask,views app = Flask(__name__) def wrapper(func): @functools.wraps(func) def inner(*args,**kwargs): return func(*args,**kwargs) return inner class UserView(views.MethodView): methods = ['GET'] decorators = [wrapper,] def get(self,*args,**kwargs): return 'GET' def post(self,*args,**kwargs): return 'POST' app.add_url_rule('/user',None,UserView.as_view('uuuu')) if __name__ == '__main__': app.run()
from flask import Flask,url_for app = Flask(__name__) # 步驟一:定製類 from werkzeug.routing import BaseConverter class RegexConverter(BaseConverter): """ 自定義URL匹配正則表達式 """ def __init__(self, map, regex): super(RegexConverter, self).__init__(map) self.regex = regex def to_python(self, value): """ 路由匹配時,匹配成功後傳遞給視圖函數中參數的值 :param value: :return: """ return int(value) def to_url(self, value): """ 使用url_for反向生成URL時,傳遞的參數通過該方法處理,返回的值用於生成URL中的參數 :param value: :return: """ val = super(RegexConverter, self).to_url(value) return val # 步驟二:添加到轉換器 app.url_map.converters['reg'] = RegexConverter """ 1. 用戶發送請求 2. flask內部進行正則匹配 3. 調用to_python(正則匹配的結果)方法 4. to_python方法的返回值會交給視圖函數的參數 """ # 步驟三:使用自定義正則 @app.route('/index/<reg("\d+"):nid>') def index(nid): print(nid,type(nid)) print(url_for('index',nid=987)) return "index" if __name__ == '__main__': app.run()
session流程圖(加分項)
目標:給開發者提供目錄結構 其餘: - 自定義模板、靜態文件 - 某一類url添加前綴 -----用以加版本號如,v1.... - 給一類url添加before_request
A、小藍圖(template、views放一塊兒,大藍圖相似django分佈)目錄
B、manage.py
from crm import create_app app = create_app() if __name__ == '__main__': app.run()
init.py
from flask import Flask from .views.account import ac from .views.user import uc def create_app(): app = Flask(__name__) # @app.before_request # def x1(): # print('app.before_request') app.register_blueprint(ac) app.register_blueprint(uc,url_prefix='/api') return app
accout.py
from flask import Blueprint,render_template ac = Blueprint('ac',__name__) @ac.before_request def x1(): print('app.before_request') @ac.route('/login') def login(): return render_template('login.html') @ac.route('/logout') def logout(): return 'Logout'
user.py
from flask import Blueprint uc = Blueprint('uc',__name__) @uc.route('/list') def list(): return 'List' @uc.route('/detail') def detail(): return 'detail'
flask裏local()原碼
做用:爲每一個線程建立一個獨立的空間,使得線程對本身的空間中的數據進行操做(數據隔離)。 import threading from threading import local import time obj = local() def task(i): obj.xxxxx = i time.sleep(2) print(obj.xxxxx,i) for i in range(10): t = threading.Thread(target=task,args=(i,)) t.start() 問題: - 如何獲取一個線程的惟一標記? threading.get_ident() - 根據字典自定義一個相似於threading.local功能? import time import threading DIC = {} def task(i): ident = threading.get_ident() if ident in DIC: DIC[ident]['xxxxx'] = i else: DIC[ident] = {'xxxxx':i } time.sleep(2) print(DIC[ident]['xxxxx'],i) for i in range(10): t = threading.Thread(target=task,args=(i,)) t.start() - 根據字典自定義一個爲每一個協程開闢空間進行存取數據。 import time import threading import greenlet DIC = {} def task(i): # ident = threading.get_ident() ident = greenlet.getcurrent() if ident in DIC: DIC[ident]['xxxxx'] = i else: DIC[ident] = {'xxxxx':i } time.sleep(2) print(DIC[ident]['xxxxx'],i) for i in range(10): t = threading.Thread(target=task,args=(i,)) t.start() - 經過getattr/setattr 構造出來 threading.local的增強版(協程) import time import threading try: import greenlet get_ident = greenlet.getcurrent except Exception as e: get_ident = threading.get_ident class Local(object): DIC = {} def __getattr__(self, item): ident = get_ident() if ident in self.DIC: return self.DIC[ident].get(item) return None def __setattr__(self, key, value): ident = get_ident() if ident in self.DIC: self.DIC[ident][key] = value else: self.DIC[ident] = {key:value} obj = Local() def task(i): obj.xxxxx = i time.sleep(2) print(obj.xxxxx,i) for i in range(10): t = threading.Thread(target=task,args=(i,)) t.start()
請求到來時候: # ctx = RequestContext(self, environ) # self是app對象,environ請求相關的原始數據 # ctx.request = Request(environ) # ctx.session = None # 將包含了request/session的ctx對象放到「空調」 { 1232:{ctx:ctx對象} 1231:{ctx:ctx對象} 1211:{ctx:ctx對象} 1111:{ctx:ctx對象} 1261:{ctx:ctx對象} } 視圖函數: from flask import reuqest,session request.method 請求結束: 根據當前線程的惟一標記,將「空調」上的數據移除。
一、偏函數
# by luffycity.com import functools def index(a1,a2): return a1 + a2 # 原來的調用方式 # ret = index(1,23) # print(ret) # 偏函數,幫助開發者自動傳遞參數 new_func = functools.partial(index,666) ret = new_func(1) print(ret)
二、執行父類方法
- super和執行類的區別? """ class Base(object): def func(self): print('Base.func') class Foo(Base): def func(self): # 方式一:根據mro的順序執行方法 # super(Foo,self).func() # 方式二:主動執行Base類的方法 # Base.func(self) print('Foo.func') obj = Foo() obj.func() """ #################################### class Base(object): def func(self): super(Base, self).func() print('Base.func') class Bar(object): def func(self): print('Bar.func') class Foo(Base,Bar): pass # 示例一 # obj = Foo() # obj.func() # print(Foo.__mro__) # 示例二 # obj = Base() # obj.func()
三、面向對象中特殊方法 setattr/getattr注意事項:
class Foo(object): def __init__(self): # self.storage = {} object.__setattr__(self,'storage',{}) def __setattr__(self, key, value): print(key,value,self.storage) obj = Foo() obj.xx = 123
四、基於列表實現棧
class Stack(object): def __init__(self): self.data = [] def push(self,val): self.data.append(val) def pop(self): return self.data.pop() def top(self): return self.data[-1] _stack = Stack() _stack.push('佳俊') _stack.push('鹹魚') print(_stack.pop()) print(_stack.pop())
pip3 install flask-session 掌握: - 使用 # by luffycity.com import redis from flask import Flask,request,session from flask.sessions import SecureCookieSessionInterface from flask_session import Session app = Flask(__name__) # app.session_interface = SecureCookieSessionInterface() # app.session_interface = RedisSessionInterface() app.config['SESSION_TYPE'] = 'redis' app.config['SESSION_REDIS'] = redis.Redis(host='140.143.227.206',port=6379,password='1234') Session(app) @app.route('/login') def login(): session['user'] = 'alex' return 'asdfasfd' @app.route('/home') def index(): print(session.get('user')) return '...' if __name__ == '__main__': app.run() - 原理: - session數據保存到redis session:隨機字符串1:q23asifaksdfkajsdfasdf session:隨機字符串2:q23asifaksdfkajsdfasdf session:隨機字符串3:q23asifaksdfkajsdfasdf session:隨機字符串4:q23asifaksdfkajsdfasdf session:隨機字符串5:q23asifaksdfkajsdfasdf - 隨機字符串返回給用戶。 隨機字符串 源碼: from flask_session import RedisSessionInterface
def func(): pass class Foo(object): def func(self): pass # 執行方式一 # obj = Foo() # obj.func() # 方法 # 執行方式二 # Foo.func(123) # 函數 from types import FunctionType,MethodType # obj = Foo() # print(isinstance(obj.func,FunctionType)) # False # print(isinstance(obj.func,MethodType)) # True print(isinstance(Foo.func,FunctionType)) # True print(isinstance(Foo.func,MethodType)) # False
redis-desktop manager 0.9.3.817.exe
一、注意設計表的時候,自增和外鍵的id類型要一致
二、work
內容詳細: - 代碼統計 - 數據庫鏈接池: pip3 install DBUtils 注意: - 使用數據庫鏈接池 - 封裝SQLHelper 做業: 1. 功能完善 2. BootStrap 模板 3. 詳細頁面: http://127.0.0.1:5000/detail/1 -> 折線圖 4. 用戶列表: - 柱狀圖 - 表格 PS: select user_id,sum(line) from record group by user_id + 連表查詢到用戶姓名
第一部分:Flask 1. 談談你對django和flask的認識? 2. Flask基礎: - 配置文件:反射+importlib - 路由系統: - 裝飾器 @app.route() - 參數: - url - endpoint - methods - 加裝飾器 - endpoint默認是函數名 - functools.wraps(func) + functools.partial - 寫路由兩種方式: - 裝飾器 - add_url_rule - 自定義支持正則的URL - session - 藍圖 - 目錄結構劃分 - 前綴 - 特殊裝飾器 3. 上下文管理 - threading.local - 爲每一個線程開闢空間,使得線程之間進行數據隔離。 - 應用:DBUtils中爲每一個線程建立一個數據庫鏈接時使用。 - 面向對象特殊方法: - getattr - setattr - delattr - 偏函數 - 單例模式 - 請求上下文流程: - 班級示例: - 源碼流程: - __call__ - wsgi_app - ctx = RequestContext(): 封裝= 請求數據+空session - ctx.push() : 將ctx傳給LocalStack對象,LocalStack再將數據傳給Local存儲起來。 問題:Local中是如何存儲? __storage__ = { 1231:{} } 問題:LocalStack做用? __storage__ = { 1231:{stack:[ctx] } } - 視圖函數:再次去獲取 - 關閉 4. 第三方組件: 1. flask-session 2. DBUtils
1. 什麼是響應式佈局? @media屬性 2. MySQL數據庫 - 引擎: - innodb - 支持事務 - 鎖 - 行鎖 - 表鎖 - 示例: - 終端: begin; select xx from xx for update; commit; - pymysql cursor.execute('select * from xx for update') - django with trancation.automic(): models.User.objects.all().for_update() - mysaim - 不支持事務 - 鎖 - 表鎖 - 快
知識點
# by luffycity.com class Foo(object): def __str__(self): return 'asdf' def __getattr__(self, item): return "999" def __getitem__(self, item): return '87' def __add__(self, other): return other + 1 obj = Foo() print(obj) print(obj.x) print(obj['x1']) print(obj + 7)
上下文
1. 上下文管理:LocalProxy對象 2. 上下文管理: - 請求上下文:request/session - App上下文: app/g
from flask import Flask,request,session app = Flask(__name__) @app.route('/index') def index(): # 1. request是LocalProxy對象 # 2. 對象中有method、執行__getattr__ print(request.method) # request['method'] # request + 1 # 1. session是LocalProxy對象 # 2. LocalProxy對象的__setitem__ session['x'] = 123 return "Index" if __name__ == '__main__': app.run() # app.__call__ # app.wsgi_app """ 第一階段:請求到來 將request和Session相關數據封裝到ctx=RequestContext對象中。 再經過LocalStack將ctx添加到Local中。 __storage__ = { 1231:{'stack':[ctx(request,session)]} } 第二階段:視圖函數中獲取request或session 方式一:直接找LocalStack獲取 from flask.globals import _request_ctx_stack print(_request_ctx_stack.top.request.method) 方式二:經過代理LocalProxy(小東北)獲取 from flask import Flask,request print(request.method) """
詳細
容詳細: 1. 上下文管理:LocalProxy對象 2. 上下文管理: - 請求上下文(ctx=RequestContext()):request/session - App上下文(app_ctx=AppContext()): app/g - 程序啓動: 兩個Local: local1 = { } local2 = { } 兩個LocalStack: _request_ctx_stack _app_ctx_stack - 請求到來 對數據進行封裝: ctx = RequestContext(request,session) app_ctx = AppContext(app,g) 保存數據: 將包含了(app,g)數據的app_ctx對象,利用 _app_ctx_stack(貝貝,LocalStack())將app_ctx添加到Local中 storage = { 1231:{stack:[app_ctx(app,g),]} } 將包含了request,session數據的ctx對象,利用_request_ctx_stack(劉淞,LocalStack()),將ctx添加到Local中 storage = { 1231:{stack:[ctx(request,session),]} } - 視圖函數處理: from flask import Flask,request,session,current_app,g app = Flask(__name__) @app.route('/index') def index(): # 去請求上下文中獲取值 _request_ctx_stack request.method # 找小東北獲取值 session['xxx'] # 找龍泰獲取值 # 去app上下文中獲取值:_app_ctx_stack print(current_app) print(g) return "Index" if __name__ == '__main__': app.run() app.wsgi_app - 結束 _app_ctx_stack.pop() _request_ctx_stack.pop() 問題: 1. Flask中g的生命週期? 2. g和session同樣嗎? 3. g和全局變量同樣嗎?
基本使用
from flask import Flask,request,render_template,session,current_app,g,redirect
from wtforms import Form
from wtforms.fields import simple
from wtforms.fields import html5
from wtforms.fields import core
from wtforms import widgets
from wtforms import validators
app = Flask(__name__)
class LoginForm(Form):
name = simple.StringField(
validators=[
validators.DataRequired(message='用戶名不能爲空.'),
# validators.Length(min=6, max=18, message='用戶名長度必須大於%(min)d且小於%(max)d')
],
widget=widgets.TextInput(),
render_kw={'placeholder':'請輸入用戶名'}
)
pwd = simple.PasswordField(
validators=[
validators.DataRequired(message='密碼不能爲空.'),
# validators.Length(min=8, message='用戶名長度必須大於%(min)d'),
# validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
# message='密碼至少8個字符,至少1個大寫字母,1個小寫字母,1個數字和1個特殊字符')
],
render_kw={'placeholder':'請輸入密碼'}
)
@app.route('/login',methods=['GET','POST'])
def login():
if request.method == "GET":
form = LoginForm()
# print(form.name,type(form.name)) # form.name是StringField()對象, StringField().__str__
# print(form.pwd,type(form.pwd)) # form.pwd是PasswordField()對象,PasswordField().__str__
return render_template('login.html',form=form)
form = LoginForm(formdata=request.form)
if form.validate():
print(form.data)
return redirect('https://www.luffycity.com/home')
else:
# print(form.errors)
return render_template('login.html', form=form)
import helper
class UserForm(Form):
city = core.SelectField(
label='城市',
choices=(),
coerce=int
)
name = simple.StringField(label='姓名')
def __init__(self,*args,**kwargs):
super(UserForm,self).__init__(*args,**kwargs)
self.city.choices=helper.fetch_all('select id,name from tb1',[],type=None)
@app.route('/user')
def user():
if request.method == "GET":
#form = UserForm(data={'name':'alex','city':3}) #默認傳入值,在編輯用戶界面適合用a
form = UserForm()
return render_template('user.html',form=form)
if __name__ == '__main__':
app.run()
login.html
<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <title>Title</title> <meta name="viewport" content="width=device-width, initial-scale=1"> </head> <body> <form method="post" novalidate> <p>用戶名:{{form.name}} {{form.name.errors[0]}}</p> <p>密碼:{{form.pwd}} {{form.pwd.errors[0]}} </p> <p><input type="submit" value="提交" ></p> </form> </body> </html>
出現數據庫未實時更新的緣由是以下,類的靜態字段在實例化時只會導入一次,以後不做處理
若想實時更新,須要將靜態字段放在__init__裏,每次實例化的時候都導入一次
class Foo(object): country = helper.fetch_all('select id,name from tb1',[],type=None) def __init__(self): self.name = '東北' print(Foo.country) obj = Foo() obj = Foo() obj = Foo() obj = Foo() obj = Foo() obj = Foo()
views.py
class RegisterForm(Form): name = simple.StringField( label='用戶名', validators=[ validators.DataRequired() ], widget=widgets.TextInput(), render_kw={'class': 'form-control'}, default='alex' ) pwd = simple.PasswordField( label='密碼', validators=[ validators.DataRequired(message='密碼不能爲空.') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) pwd_confirm = simple.PasswordField( label='重複密碼', validators=[ validators.DataRequired(message='重複密碼不能爲空.'), validators.EqualTo('pwd', message="兩次密碼輸入不一致") ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) email = html5.EmailField( label='郵箱', validators=[ validators.DataRequired(message='郵箱不能爲空.'), validators.Email(message='郵箱格式錯誤') ], widget=widgets.TextInput(input_type='email'), render_kw={'class': 'form-control'} ) gender = core.RadioField( label='性別', choices=( (1, '男'), (2, '女'), ), coerce=int # int("1") ) city = core.SelectField( label='城市', choices=( ('bj', '北京'), ('sh', '上海'), ) ) hobby = core.SelectMultipleField( label='愛好', choices=( (1, '籃球'), (2, '足球'), ), coerce=int ) favor = core.SelectMultipleField( label='喜愛', choices=( (1, '籃球'), (2, '足球'), ), widget=widgets.ListWidget(prefix_label=False), option_widget=widgets.CheckboxInput(), coerce=int, default=[1, ] ) @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'GET': form = RegisterForm() return render_template('register.html', form=form) form = RegisterForm(formdata=request.form) if form.validate(): print(form.data) return redirect('https://www.luffycity.com/home') return render_template('register.html', form=form) import helper class UserForm(Form): city = core.SelectField( label='城市', choices=(), coerce=int ) name = simple.StringField(label='姓名') def __init__(self,*args,**kwargs): super(UserForm,self).__init__(*args,**kwargs) #數據庫實時更新
self.city.choices=helper.fetch_all('select id,name from tb1',[],type=None) @app.route('/user') def user(): if request.method == "GET": #form = UserForm(data={'name':'alex','city':3}) form = UserForm() return render_template('user.html',form=form) if __name__ == '__main__': app.run()
register.html
<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <title>Title</title> <meta name="viewport" content="width=device-width, initial-scale=1"> </head> <body> <form method="post" novalidate> {% for field in form %} <p>{{field.label}}: {{field}} {{field.errors[0]}}</p> {% endfor %} <input type="submit" value="提交"> </form> </body> </html>
第一部分:Flask 1. flask和django比較? 2. wsgi? 3. flask上下文理解? 兩類: 請求上下文管理 應用上下文管理 流程: 請求到來: 將請求和session相關封裝到ctx = RequestContext對象中。 將app和g封裝到app_ctx = AppContext對象中。 再經過LocalStack對象將ctx、app_ctx封裝到Local對象中。 問題: Local是什麼?做用? LocalStack是什麼?做用? 獲取數據 經過LocalProxy對象+偏函數,調用LocalStack去Local中獲取響應ctx、app_ctx中封裝的值。 問題: 爲何要把 ctx=request/session app_ctx = app/g ? 答:由於離線腳本須要使用app_ctx。 請求結束: 調用LocalStack的pop方法,將ctx和app_ctx移除。 4. threading.local 5. 偏函數 6. 單例模式 7. 問題: before_request的執行時機(源碼實現),在local以後,由於local須要request
1. 數據庫引擎 2. 數據庫受權 3. 表結構設計:代碼統計(教育機構,班級表結構設計) 4. SQL語句 https://www.cnblogs.com/wupeiqi/articles/5729934.html 5. 瞭解:https://www.cnblogs.com/wupeiqi/articles/5713323.html - 視圖 - 存儲過程 - 觸發器 - 函數 select max(id) from tb group by xid; 6. 索引 索引做用:加速查找+約束。 索引種類: - 主鍵索引:加速查找、不重複、非空 - 惟一索引:加速查找、不重複 - 普通索引:加速查找 - 聯合索引:加速查找 - 聯合惟一索引:加速查找、不重複 PS:聯合索引遵循最左前綴原則。 id name pwd email select * from tb where name='x' select * from tb where name='x' and pwd='123' select * from tb where name='x' and pwd='123' and email='xs' 名詞: - 覆蓋索引:在索引文件中就能夠把想要的數據獲得。 select name from tb1; - 索引合併:使用多個單列索引去查找數據。
一、對象能夠被for循環
- form對象爲何能夠被for循環? 答:變爲可迭代對象。 class Foo(object): # def __iter__(self): # return iter([11,22,33]) #iter()爲生成器 def __iter__(self): yield 1 yield 2 #生成器也是迭代器的一種 yield 3 obj = Foo() for item in obj: print(item)
二、 new方法的返回值決定對象究竟是什麼?
class Bar(object): pass class Foo(object): def __new__(cls, *args, **kwargs): # return super(Foo,cls).__new__(cls,*args, **kwargs) return Bar() obj = Foo() print(obj)
三、 metaclass
# 1. 類建立的兩種方式 # class Foo(object): # a1 = 123 # def func(self): # return 666 # Foo = type("Foo",(object,),{'a1':123,'func':lambda self:666}) # 2. 自定義type # class MyType(type): # pass # # class Foo(object,metaclass=MyType): # a1 = 123 # def func(self): # return 666 # # Foo = MyType("Foo",(object,),{'a1':123,'func':lambda self:666}) # 注意:metaclass做用是指定當前類由誰來建立。
分析
- 建立類時,先執行type的__init__。 - 類的實例化時,執行type的__call__,__call__方法的的返回值就是實例化的對象。 __call__內部調用: - 類.__new__,建立對象 - 類.__init__,對象的初始化 class MyType(type): def __init__(self,*args,**kwargs): super(MyType,self).__init__(*args,**kwargs) def __call__(cls, *args, **kwargs): obj = cls.__new__(cls) cls.__init__(obj,*args, **kwargs) return obj class Foo(object,metaclass=MyType): a1 = 123 def __init__(self): pass def __new__(cls, *args, **kwargs): return object.__new__(cls) def func(self): return 666 # Foo是類 # Foo是MyType的一個對象 obj = Foo()
SQLAlchemy,ORM框架。 做用:幫助咱們使用類和對象快速實現數據庫操做。 數據庫: - 原生: - MySQLdb:py2 - pymysql:py2/py3 http://www.cnblogs.com/wupeiqi/articles/5095821.html - ORM框架 - SQLAlchemy
使用
SQLAlchemy使用: 參考:https://www.cnblogs.com/wupeiqi/articles/8259356.html 1. 單表操做 表: from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column from sqlalchemy import Integer,String,Text,Date,DateTime from sqlalchemy import create_engine Base = declarative_base() class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) def create_all(): engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) Base.metadata.create_all(engine) def drop_all(): engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) Base.metadata.drop_all(engine) if __name__ == '__main__': create_all() 行: 示例: from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Users engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超過鏈接池大小外最多建立的鏈接 pool_size=5, # 鏈接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,不然報錯 pool_recycle=-1 # 多久以後對線程池中的線程進行一次鏈接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) # 根據Users類對users表進行增刪改查 session = SessionFactory() # 1. 增長 # obj = Users(name='alex') # session.add(obj) # session.commit() # session.add_all([ # Users(name='小東北'), # Users(name='龍泰') # ]) # session.commit() # 2. 查 # result = session.query(Users).all() # for row in result: # print(row.id,row.name) # result = session.query(Users).filter(Users.id >= 2) # for row in result: # print(row.id,row.name) # result = session.query(Users).filter(Users.id >= 2).first() # print(result) # 3.刪 # session.query(Users).filter(Users.id >= 2).delete() # session.commit() # 4.改 # session.query(Users).filter(Users.id == 4).update({Users.name:'東北'}) # session.query(Users).filter(Users.id == 4).update({'name':'小東北'}) # session.query(Users).filter(Users.id == 4).update({'name':Users.name+"DSB"},synchronize_session=False) # session.commit() session.close()
經常使用
# ############################## 其餘經常使用 ############################### # 1. 指定列 # select id,name as cname from users; # result = session.query(Users.id,Users.name.label('cname')).all() # for item in result: # print(item[0],item.id,item.cname) # 2. 默認條件and # session.query(Users).filter(Users.id > 1, Users.name == 'eric').all() # 3. between # session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all() # 4. in # session.query(Users).filter(Users.id.in_([1,3,4])).all() # session.query(Users).filter(~Users.id.in_([1,3,4])).all() # 5. 子查詢 # session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name=='eric'))).all() # 6. and 和 or # from sqlalchemy import and_, or_ # session.query(Users).filter(Users.id > 3, Users.name == 'eric').all() # session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all() # session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all() # session.query(Users).filter( # or_( # Users.id < 2, # and_(Users.name == 'eric', Users.id > 3), # Users.extra != "" # )).all() # 7. filter_by # session.query(Users).filter_by(name='alex').all() # 8. 通配符 # ret = session.query(Users).filter(Users.name.like('e%')).all() # ret = session.query(Users).filter(~Users.name.like('e%')).all() # 9. 切片 # result = session.query(Users)[1:2] # 10.排序 # ret = session.query(Users).order_by(Users.name.desc()).all() # ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 11. group by from sqlalchemy.sql import func # ret = session.query( # Users.depart_id, # func.count(Users.id), # ).group_by(Users.depart_id).all() # for item in ret: # print(item) # # from sqlalchemy.sql import func # # ret = session.query( # Users.depart_id, # func.count(Users.id), # ).group_by(Users.depart_id).having(func.count(Users.id) >= 2).all() # for item in ret: # print(item) # 12.union 和 union all """ select id,name from users UNION select id,name from users; """ # q1 = session.query(Users.name).filter(Users.id > 2) # q2 = session.query(Favor.caption).filter(Favor.nid < 2) # ret = q1.union(q2).all() # # q1 = session.query(Users.name).filter(Users.id > 2) # q2 = session.query(Favor.caption).filter(Favor.nid < 2) # ret = q1.union_all(q2).all()