Flask是一個使用 Python 編寫的輕量級 Web 應用框架。其 WSGI 工具箱採用 Werkzeug ,模板引擎則使用 Jinja2 。Flask使用 BSD 受權。
Flask也被稱爲 「microframework」 ,由於它使用簡單的核心,用 extension 增長其餘功能。Flask沒有默認使用的數據庫、窗體驗證工具。html
Flask是一個基於Python開發而且依賴jinja2模板和Werkzeug WSGI服務的一個微型框架,對於Werkzeug本質是Socket服務端,其用於接收http請求並對請求進行預處理,而後觸發Flask框架,開發人員基於Flask框架提供的功能對請求進行相應的處理,並返回給用戶,若是要返回給用戶複雜的內容時,須要藉助jinja2模板來實現對模板的處理,即:將模板和數據進行渲染,將渲染後的字符串返回給用戶瀏覽器。前端
「微」(micro) 並不表示你須要把整個 Web 應用塞進單個 Python 文件(雖然確實能夠 ),也不意味着 Flask 在功能上有所欠缺。微框架中的「微」意味着 Flask 旨在保持核心簡單而易於擴展。Flask 不會替你作出太多決策——好比使用何種數據庫。而那些 Flask 所選擇的——好比使用何種模板引擎——則很容易替換。除此以外的一切都由可由你掌握。如此,Flask 能夠與您珠聯璧合。python
默認狀況下,Flask 不包含數據庫抽象層、表單驗證,或是其它任何已有多種庫能夠勝任的功能。然而,Flask 支持用擴展來給應用添加這些功能,如同是 Flask 自己實現的同樣。衆多的擴展提供了數據庫集成、表單驗證、上傳處理、各類各樣的開放認證技術等功能。Flask 也許是「微小」的,但它已準備好在需求繁雜的生產環境中投入使用。mysql
一句話歸納: flask 是一個輕量級且含有豐富組件的框架, 優勢: 短小精悍正則表達式
虛擬環境(virtualenv):sql
一個項目django1.7 一個項目django2.0 用一個django是不可能同時裝兩個版本。
解決方法:
建立一個虛擬環境,來讓咱們實現兩個版本的使用。數據庫
pip3 install virtualenv 安裝環境
virtualenv env1 建立一個虛擬環境env1
cd env1
cd Scripts
(env1) E:\flask\env1\Scripts>
(env1) E:\flask\env1\Scripts>deactivate.bat 退出env1
Scripts/activete 進入
Scripts/deactivete 退出django
pip3 install flask
1 from werkzeug.wrappers import Request,Response 2 3 @Request.application 4 def hello(request): 5 return Response("Hello world") 6 7 8 if __name__ == '__main__': 9 from werkzeug.serving import run_simple 10 run_simple("localhost",4000,hello)
from flask import Flask app = Flask(__name__) #實例化 @app.route('/index') #路由系統 def hello_world(): #視圖函數 return 'Hello world' if __name__ == '__main__': app.run() #啓動
反向生成url: url_for endpointflask
from flask import Flask,url_for app=Flask(__name__) @app.route("/index",methods=['GET','POST'],endpoint='aaa') def index(): v = url_for('aaa') print(v) return 'ok' if __name__ == '__main__': app.run()
路由傳參: /index/<int:nid>瀏覽器
from flask import Flask,url_for app=Flask(__name__) @app.route("/index/<int:nid>",methods=['GET','POST'],endpoint='aaa') def index(nid): v = url_for('aaa',nid=666) print(v) print(nid) return 'ok' if __name__ == '__main__': app.run()
默認值,當URL中無參數,函數須要參數時,使用defaults={'k':'v'}爲函數提供參數.
對URL最後的 / 符號是否嚴格要求. strict_slashes=None,
from flask import Flask,url_for app=Flask(__name__) @app.route("/index/<int:nid>",methods=['GET','POST'],endpoint='aaa',defaults={'nid':777},strict_slashes=False) def index(nid): # v = url_for('aaa',nid=666) # print(v) print(nid) return 'ok' if __name__ == '__main__': app.run()
重定向:redirect_to='路徑'
from flask import Flask,url_for,redirect app=Flask(__name__) @app.route('/new',redirect_to='/old') def new(): return 'new' @app.route('/old') def old(): return 'old' if __name__ == '__main__': app.run()
對於全部的域名都須要解析爲ip地址,然而瀏覽器並回去哪找?它會先去本地找。
兩種解決方法:
1.去租個域名 aaa.com
2. 去租個公網ip 97.25.22.11
域名解析:
aaa.com 97.25.22.11
把代碼放到 97.25.22.11公網的ip服務器上去運行。
而後,再去訪問aaa.com的時候,它會先找域名,把域名解析爲ip,
接着在瀏覽器對ip發起請求,服務器就會接收到請求並響應。
本地測試:
本身的本地文件:
C:\Windows\System32\drivers\etc\host裏面。添加對應的信息。
from flask import Flask,url_for,redirect app=Flask(__name__) app.config['SERVER_NAME'] = 'aaa.com:5000' #這句要記得加上 #只有訪問 admin.aaa.com:5000/index 纔有效 @app.route('/index', subdomain="admin") def admin_index(): return "admin.aaa.com" @app.route('/index',subdomain="www") def admin_index(): return "www.aaa.com" if __name__ == '__main__': app.run()
擴展url,自定義URL匹配正則表達式。
from flask import Flask,url_for app = Flask(__name__) # 定義轉換的類 from werkzeug.routing import BaseConverter class RegexConverter(BaseConverter): """ 自定義URL匹配正則表達式 """ def __init__(self, map, regex): super(RegexConverter, self).__init__(map) self.regex = regex def to_python(self, value): """ 路由匹配時,匹配成功後傳遞給視圖函數中參數的值 :param value: :return: """ return int(value) def to_url(self, value): """ 使用url_for反向生成URL時,傳遞的參數通過該方法處理,返回的值用於生成URL中的參數 :param value: :return: """ val = super(RegexConverter, self).to_url(value) return val # 添加到converts中 app.url_map.converters['xxx'] = RegexConverter # 進行使用 @app.route('/index/<xxx("\d+"):nid>',endpoint='xx') def index(nid): url_for('xx',nid=123) return "Index" if __name__ == '__main__': app.run()
Django:
FBV: /index/ func 對應函數
CBV: /index/ IndexClass.as_view() 對應類
Flask也支持CBV:
from flask import Flask,url_for,views app = Flask(__name__) def auth(func): def inner(*args, **kwargs): result = func(*args, **kwargs) return result return inner class IndexView(views.MethodView): # methods = ['POST'] decorators = [auth,] #裝飾器 def get(self): v = url_for('index') #反向生成url print(v) return "GET" def post(self): return "GET" app.add_url_rule('/index', view_func=IndexView.as_view(name='index')) if __name__ == '__main__': app.run()
1.模板繼承
#模板頁面:layout.html <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta http-equiv="X-UA-Compatible" content="IE=edge"> <meta name="viewport" content="width=device-width, initial-scale=1"> <title>Title</title> </head> <body> <h1>模板</h1> {% block body %} {% endblock %} </body> </html> 繼承的頁面: index.html {% extends "layout.html" %} {% block body %} {{v1}} <ul> {% for item in v2%} <li> {{item}} </li> {% endfor %} {{v2.0}} {{v2.1}} {{v2.2}} </ul> <ul> {% for key,val in v3.items() %} <li>{{key}}:{{val}}</li> {% endfor%} {% for k in v3.items() %} <li>{{k.0,k.1}}</li> <li>{{k}}</li> {% endfor%} {% for k in v3 %} <li>{{k}}</li> {% endfor%} {% for k in v3.values() %} <li>{{k}}</li> {% endfor%} </ul> {{v3.name}}:{{v3.age}} {{v3.get('name')}} {{v3.get('age')}} {{v4}} {{v4|safe}} {{test(1,19)}} {{sb(50,50)}} {{50|db(100,100)}} {% macro xxxx(name, type='text', value='') %} <input type="{{ type }}" name="{{ name }}" value="{{ value }}"> <input type="{{ type }}" name="{{ name }}" value="{{ value }}"> <input type="{{ type }}" name="{{ name }}" value="{{ value }}"> <input type="{{ type }}" name="{{ name }}" value="{{ value }}"> {% endmacro %} {{ xxxx('n1') }} {% endblock %}
2.模板語言.py
from flask import Flask,render_template,Markup app = Flask(__name__) def test(a1,a2): return a1+a2 @app.template_global() #公共資源 前端的用法: {{sb(100,100)}} def sb(a1, a2): return a1 + a2 + 100 @app.template_filter() #公共資源 前端的用法: {{1|db(100,100)}} def db(a1, a2, a3): return a1 + a2 + a3 @app.route('/index2') def index(): v1 = "字符串" v2 = [11,22,33] v3 = {'name':'zbk','age':18} v4 = Markup("<input type='text' / >") return render_template('index2.html',v1=v1,v2=v2,v3=v3,v4=v4,test=test) if __name__ == '__main__': app.run()
前端:index2.html
{% extends "layout.html" %} {% block body %} {{v1}} <ul> {% for item in v2%} <li> {{item}} </li> {% endfor %} {{v2.0}} {{v2.1}} {{v2.2}} </ul> <ul> {% for key,val in v3.items() %} <li>{{key}}:{{val}}</li> {% endfor%} {% for k in v3.items() %} <li>{{k.0,k.1}}</li> <li>{{k}}</li> {% endfor%} {% for k in v3 %} <li>{{k}}</li> {% endfor%} {% for k in v3.values() %} <li>{{k}}</li> {% endfor%} </ul> {{v3.name}}:{{v3.age}} {{v3.get('name')}} {{v3.get('age')}} {{v4}} {{v4|safe}} {{test(1,19)}} {{sb(50,50)}} {{50|db(100,100)}}
# 宏 {% macro xxxx(name, type='text', value='') %} <input type="{{ type }}" name="{{ name }}" value="{{ value }}"> <input type="{{ type }}" name="{{ name }}" value="{{ value }}"> <input type="{{ type }}" name="{{ name }}" value="{{ value }}"> <input type="{{ type }}" name="{{ name }}" value="{{ value }}"> {% endmacro %} {{ xxxx('n1') }} {% endblock %}
from flask import Flask,session from werkzeug.local import LocalProxy app = Flask(__name__) app.secret_key = 'asdadas' app.config['SESSION_COOKIE_NAME'] = 'session_zbk' """ 'SESSION_COOKIE_NAME': 'session', 'SESSION_COOKIE_DOMAIN': None, 支持的域名 'SESSION_COOKIE_PATH': None, 'SESSION_COOKIE_HTTPONLY': True, 'SESSION_COOKIE_SECURE': False, 'SESSION_REFRESH_EACH_REQUEST': True, 'PERMANENT_SESSION_LIFETIME': timedelta(days=31) 是否每次都更新 """ @app.route('/index3') def index(): # print(type(session)) # session本質上操做的是字典,假設session保存在數據庫 # session['xxx'] = 123 # session['xx1'] = 123 # session['xx2'] = 123 # session['xx3'] = 123 # del session['xx2'] session['xx3'] = 123 return 'xxx'
它是基於session建立的,flash往裏面放個值,只要取一下就沒了,
說簡單點:就是session取值的時候就不是讀取了,而是POP了。
from flask import Flask,session,Session,flash,get_flashed_messages,redirect,render_template,request app = Flask(__name__) app.secret_key ='sdfsdfsdf' @app.route('/users') def users(): # msg = request.args.get('msg','') # msg = session.get('msg') # if msg: # del session['msg'] v = get_flashed_messages() print(v) msg = '' return render_template('users.html',msg=msg) @app.route('/useradd') def user_add(): # 在數據庫中添加一條數據 # 假設添加成功,在跳轉到列表頁面時,顯示添加成功 # return redirect('/users?msg=添加成功') # session['msg'] = '添加成功' flash('添加成功') return redirect('/users') if __name__ == '__main__': app.run()
from flask import Flask,session,Session,flash,get_flashed_messages,redirect,render_template,request app = Flask(__name__) app.secret_key ='sdfsdfsdf' @app.before_request def process_request1(): print('process_request1') @app.after_request def process_response1(response): print('process_response1') return response @app.before_request def process_request2(): print('process_request2') @app.after_request def process_response2(response): print('process_response2') return response @app.route('/index') def index(): print('index') return 'Index' @app.route('/order') def order(): print('order') return 'order' # @app.route('/test') # def test(): # print('test') # return 'test' if __name__ == '__main__': app.run()
1.文件參考
2.app.py:
from flask import Flask,session,current_app
# 建立配置, Config 從實例化的時候就開始有了。
app = Flask(__name__) app.secret_key ='sdfsdfsdf' # 方式一:缺點:都在一個文件下 # app.config['SESSION_COOKIE_NAME'] = 'session_zbk' # # 方式二:優勢; 分離開,不在文件下 # app.config.from_pyfile('settings.py') #settings下的.print(app.config['AAAA']) # 方式三:優勢:django也是這麼作的。誰讀取文件,能夠隔離開。 # import os # os.environ['FLAKS-SETTINGS'] = 'settings.py' # app.config.from_envvar('FLAKS-SETTINGS') # 方式四:經常使用 .若是是python2 是 string, import_string 若是是python3 是 encode. 推薦用第四種,優勢:不須要導入。 # app.config.from_object('settings.DevConfig') @app.route('/index',endpoint='xx') def index(): print(current_app.config) #current_app:無論views視圖函數在哪,均可以找到。 session['xx3'] = 123 return "xxx" if __name__ == '__main__': # app.__call__ app.run()
3.settings.py:
class BaseConfig(object):
AAAA=123
class TestConfig(BaseConfig): DB = '127.0.0.1' class DevConfig(BaseConfig): DB = '192.168.1.1' class ProConfig(BaseConfig): DB = '47.18.1.1'
做用:本來是一個文件,可是業務代碼太多,咱們就須要分類了。不一樣的東西放在不一樣的文件。
在flask中,凡是多py文件須要拆分的,都應該用藍圖來作。把目錄結構作一個調整。
action:
manage.py: 啓動文件
import fcrm if __name__ == '__main__': fcrm.app.run()
__init__.py : 內部關聯條件
from flask import Flask from .views import account from .views import order # __init__ 表示:導入這個模塊它就導入了,執行了 app = Flask(__name__) print(app.root_path) app.register_blueprint(account.account) app.register_blueprint(order.order)
account.py: 帳戶相關
from flask import Blueprint,render_template account = Blueprint('pap_account',__name__) @account.route('/login') def login(): return render_template('login.html')
order.py : 訂單相關
from flask import Blueprint order = Blueprint('pap_order',__name__) @order.route('/order') def login(): return 'Order'
小結: 經過藍圖,能夠調整咱們的目錄結構,把不一樣的東西都拆分,放在另外一個文件。
3個過程:
1.第一步,每次請求反覆建立數據庫鏈接。 缺點:鏈接數太多
解決方法: 把鏈接放到全局下。
from flask import Flask from db import POOL import pymysql app = Flask(__name__) app.secret_key ='sdfsdfsdf' conn = pymysql.connect() #放在全局下 @app.route('/index') def index(): # 第一步:缺點:每次請求反覆建立數據庫鏈接,鏈接數太多 cursor = conn.cursor() cursor.execute('select * from tb where id > %s',[5,]) result = cursor.fetchall() cursor.close() conn.close() print(result) return '執行成功' if __name__ == '__main__': # app.__call__ app.run()
2.第二步:若是是多線程的話,pymysql只知道同一時刻,只處理一個線程。由於,源代碼有個 theadsafety = 1 缺點:不能支持併發
解決方法:加把鎖,強制實現串行。 只有第一個線程執行完了,另外一個線程才能進來。因此這樣一來,也支持多線程。
from flask import Flask from db import POOL import pymysql app = Flask(__name__) app.secret_key ='sdfsdfsdf' @app.route('/index') def index(): # 第一步:缺點:每次請求反覆建立數據庫鏈接,鏈接數太多 # conn = pymysql.connect() # cursor = conn.cursor() # cursor.execute('select * from tb where id > %s',[5,]) # result = cursor.fetchall() # cursor.close() # conn.close() # print(result) # 第二步:缺點,不能支持併發 pymysql.threadsafety with LOCK: cursor = CONN.cursor() cursor.execute('select * from tb where id > %s', [5, ]) result = cursor.fetchall() cursor.close() print(result) return '執行成功' if __name__ == '__main__': # app.__call__ app.run()
3.第三步: 支持多線程,可是,不能併發操做。 以上兩部是兩個極端,因此咱們得折中一下。
python裏面並無解決方案,咱們須要引入第三方模塊。該模塊爲: pip3 install dbutils
from flask import Flask from db import POOL import pymysql app = Flask(__name__) app.secret_key ='sdfsdfsdf' @app.route('/index') def index(): # 第一步:缺點:每次請求反覆建立數據庫鏈接,鏈接數太多 # conn = pymysql.connect() # cursor = conn.cursor() # cursor.execute('select * from tb where id > %s',[5,]) # result = cursor.fetchall() # cursor.close() # conn.close() # print(result) # 第二步:缺點,不能支持併發 # pymysql.threadsafety # with LOCK: # cursor = CONN.cursor() # cursor.execute('select * from tb where id > %s', [5, ]) # result = cursor.fetchall() # cursor.close() # # print(result) # 第三步:基於DBUtils實現數據鏈接池 # - 爲沒個線程建立一個鏈接,該線程關閉時,不是真正關閉;本線程再次調用時,仍是使用的最開始建立的鏈接。直到線程終止,數據庫鏈接才關閉。 # - 建立一個鏈接池(10個鏈接),爲全部線程提供鏈接,使用時來進行獲取,使用完畢後,再次放回到鏈接池。 # PS: conn = POOL.connection() cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() conn.close() return '執行成功' if __name__ == '__main__': # app.__call__ app.run()
模式一:爲每個線程建立一個鏈接。若是一個線程反覆去鏈接數據庫的時候,始終用的是屬於本身的那一個鏈接。 close並無真的關閉,是一個僞關閉。線程終止了纔會關閉。
""" 爲每一個線程建立一個鏈接,thread.local實現。 """ from DBUtils.PersistentDB import PersistentDB import pymysql POOL = PersistentDB( creator=pymysql, # 使用連接數據庫的模塊 maxusage=None, # 一個連接最多被重複使用的次數,None表示無限制 setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always closeable=False, # 若是爲False時, conn.close() 實際上被忽略,供下次使用,再線程關閉時,纔會自動關閉連接。若是爲True時, conn.close()則關閉連接,那麼再次調用pool.connection時就會報錯,由於已經真的關閉了鏈接(pool.steady_connection()能夠獲取一個新的連接) threadlocal=None, # 本線程獨享值得對象,用於保存連接對象,若是連接對象被重置 host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8' ) def func(): # conn = SteadyDBConnection() conn = POOL.connection() cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() cursor.close() conn.close() # 不是真的關閉,而是假的關閉。 conn = pymysql.connect() conn.close() conn = POOL.connection() cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() cursor.close() conn.close() import threading for i in range(10): t = threading.Thread(target=func) t.start()
模式二:建立一個鏈接池,爲全部線程提供鏈接,線程使用完畢,就把鏈接從新放回到鏈接池裏。 鏈接池全部的鏈接都會被重複使用。theadsafety = 1
import time import pymysql import threading from DBUtils.PooledDB import PooledDB, SharedDBConnection POOL = PooledDB( creator=pymysql, # 使用連接數據庫的模塊 maxconnections=6, # 鏈接池容許的最大鏈接數,0和None表示不限制鏈接數 mincached=2, # 初始化時,連接池中至少建立的空閒的連接,0表示不建立 maxcached=5, # 連接池中最多閒置的連接,0和None不限制 maxshared=3, # 連接池中最多共享的連接數量,0和None表示所有共享。PS: 無用,由於pymysql和MySQLdb等模塊的 threadsafety都爲1,全部值不管設置爲多少,_maxcached永遠爲0,因此永遠是全部連接都共享。 blocking=True, # 鏈接池中若是沒有可用鏈接後,是否阻塞等待。True,等待;False,不等待而後報錯 maxusage=None, # 一個連接最多被重複使用的次數,None表示無限制 setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8' ) def func(): # 檢測當前正在運行鏈接數的是否小於最大連接數,若是不小於則:等待或報raise TooManyConnections異常 # 不然 # 則優先去初始化時建立的連接中獲取連接 SteadyDBConnection。 # 而後將SteadyDBConnection對象封裝到PooledDedicatedDBConnection中並返回。 # 若是最開始建立的連接沒有連接,則去建立一個SteadyDBConnection對象,再封裝到PooledDedicatedDBConnection中並返回。 # 一旦關閉連接後,鏈接就返回到鏈接池讓後續線程繼續使用。 # PooledDedicatedDBConnection conn = POOL.connection() # print(th, '連接被拿走了', conn1._con) # print(th, '池子裏目前有', pool._idle_cache, '\r\n') cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() conn.close() conn = POOL.connection() # print(th, '連接被拿走了', conn1._con) # print(th, '池子裏目前有', pool._idle_cache, '\r\n') cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() conn.close() func()
兩個過程:
1.第一步:因爲線程執行的速度快,途中sleep2秒,全部每一個線程所打印的值都同樣,前面打印的值都被最後一個的覆蓋了。
import threading import time # 本地線程對象 # local_values = threading.local() class Foo(object): def __init__(self): self.name= None local_values = Foo() def func(num): """ # 第一個線程進來,本地線程對象會爲他建立一個 # 第二個線程進來,本地線程對象會爲他建立一個 { 線程1的惟一標識:{name:1}, 線程2的惟一標識:{name:2}, } :param num: :return: """ local_values.name = num # 4 # 線程停下來了 time.sleep(2) # 第二個線程: local_values.name,去local_values中根據本身的惟一標識做爲key,獲取value中name對應的值 print(local_values.name, threading.current_thread().name) for i in range(5): th = threading.Thread(target=func, args=(i,), name='線程%s' % i) th.start()
打印以下;
2.第二步: 本地線程能把咱們作一個: 線程與線程之間的數據的隔離。
import threading import time # 本地線程對象 local_values = threading.local() def func(num): """ # 第一個線程進來,本地線程對象會爲他建立一個 # 第二個線程進來,本地線程對象會爲他建立一個 { 線程1的惟一標識:{name:1}, 線程2的惟一標識:{name:2}, } :param num: :return: """ local_values.name = num # 4 # 線程停下來了 time.sleep(2) # 第二個線程: local_values.name,去local_values中根據本身的惟一標識做爲key,獲取value中name對應的值 print(local_values.name, threading.current_thread().name) for i in range(5): th = threading.Thread(target=func, args=(i,), name='線程%s' % i) th.start()
打印以下:
總結: 一下是實現了:即完成了併發,又防止無限制的鏈接
app.py:
from flask import Flask from db import POOL #導入 import pymysql app = Flask(__name__) app.secret_key ='sdfsdfsdf' @app.route('/index') def index(): conn = POOL.connection() #鏈接POOL ,即完成了併發,又防止無限制的鏈接 cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() conn.close() return '執行成功' if __name__ == '__main__': # app.__call__ app.run()
db.py:
import time import pymysql import threading from DBUtils.PooledDB import PooledDB, SharedDBConnection POOL = PooledDB( creator=pymysql, # 使用連接數據庫的模塊 maxconnections=6, # 鏈接池容許的最大鏈接數,0和None表示不限制鏈接數 mincached=2, # 初始化時,連接池中至少建立的空閒的連接,0表示不建立 maxcached=5, # 連接池中最多閒置的連接,0和None不限制 maxshared=3, # 連接池中最多共享的連接數量,0和None表示所有共享。PS: 無用,由於pymysql和MySQLdb等模塊的 threadsafety都爲1,全部值不管設置爲多少,_maxcached永遠爲0,因此永遠是全部連接都共享。 blocking=True, # 鏈接池中若是沒有可用鏈接後,是否阻塞等待。True,等待;False,不等待而後報錯 maxusage=None, # 一個連接最多被重複使用的次數,None表示無限制 setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8' )
本地線程:是flask裏面本身建立的一個線程。它是怎麼建立的?(本地線程只要用到可上下文處理的退一部份內容)
知識:
gevent 依賴於 greenlet 的協程。
from greenlet import getcurrent as get_ident
from _thread import get_ident # 獲取線程的惟一標識 get_ident()
ident = self.__ident_func__() # 獲取當前線程(協程)的惟一標識
storage[ident][name] = value # { 111:{'stack':[] },222:{'stack':[] } }
Local的功能跟本地線程相似: 若是有人建立一個對象,往這設置值,每一個線程裏都有一份。
可是,若是安裝 greenlet ,那麼它仍是線程的惟一標識嗎?它就不是線程的惟一標識了。而是 每個協程的惟一標識了,
-------------from greenlet import getcurrent as get_ident -----=-----------------
若是裝上它(greenlet),以前是一個線程建立一個鏈接,它是一個協程,微線程,它建立的更細了。
它就不是線程的惟一標識了。而是 每個協程的惟一標識了。每個線程裏有多個協程。
LocalStack裏面的push方法,就是往local裏面添加stack.(爲當前線程(協程)建立了一個stack和列表。)
def push(self, obj): """Pushes a new item to the stack""" rv = getattr(self._local, 'stack', None) if rv is None: """ self._local= { 惟一標示: {'stack':[]} } """ self._local.stack = rv = [] rv.append(obj) return rv
def pop(self): """Removes the topmost item from the stack, will return the old value or `None` if the stack was already empty. """ # [] stack = getattr(self._local, 'stack', None) #getattr把stack拿回來了,是一個列表 if stack is None: return None elif len(stack) == 1: #release_local(self._local) return stack[-1] #把列表的一個值拿回來。若是有一個就拿最後一個,多個就pop出來。 else: return stack.pop()
@property def top(self): """The topmost item on the stack. If the stack is empty, `None` is returned. """ try: return self._local.stack[-1] #只取一個,並無刪。 except (AttributeError, IndexError): return None
_request_ctx_stack = LocalStack() #實力化一個對象
小結:
class Local:這個類用來保存每個線程或者協程的值。
class LocalStack: 去列表裏面的取值。
當程序運行起來,就會在建立一個stack對象,localstack封裝一個本身的local對象。local對象就至關於一個字典。
利用flask源碼中的stack和local。
from functools import partial from flask.globals import LocalStack, LocalProxy _request_ctx_stack = LocalStack() class RequestContext(object): #請求相關 def __init__(self, environ): self.request = environ def _lookup_req_object(name): top = _request_ctx_stack.top if top is None: raise RuntimeError(_request_ctx_stack) return getattr(top, name) # 實例化了LocalProxy對象,_lookup_req_object參數傳遞 session = LocalProxy(partial(_lookup_req_object, 'session')) """ local = { 「標識」: {'stack': [RequestContext(),]} } """ _request_ctx_stack.push(RequestContext('c1')) # 當請求進來時,放入 print(session) # 獲取 RequestContext('c1'), top方法 print(session) # 獲取 RequestContext('c1'), top方法 _request_ctx_stack.pop() # 請求結束pop
小結: flask裏的request,session都是pop原理。至關於鏈接池。
補充 : 源代碼。
ctrl+request
單例模式: