Flask 藍圖,數據庫連接

藍圖

使用場景

若是代碼很是多,要進行歸類。不一樣的功能放在不一樣的文件,把相關的視圖函數也放進去。html

藍圖也就是對flask的目錄結構進行分配(應用於小,中型的程序)python

固然對於大型項目也能夠經過   url_prefix 加前綴的方式實現mysql

使用方法

# __init__.py
from .views.account import ac
from .views.user import us

app.register_blueprint(ac)
app.register_blueprint(us)



# account.py
from flask import Blueprint,render_template
ac = Blueprint("ac" ,__name__,template_folder="xxxx",static_url_path="xxxx")

# template_folder 優先在 templates 文件夾找。找不到再去 xxxx 裏找
# static_url_path 優先在 static 文件夾找。找不到再去 xxxx 裏找
# url_prefix="/xx" 爲當前藍圖的url里加前綴 

目錄結構

crm 
    crm
        view
            account.py
            user.py
        static
        templates
            login.html
        __init__.py
    manage.py

 __init__.py

只要一導入crm就會執行__init__.py文件sql

在此文件實現app 對象的生成,以及全部藍圖的註冊功能數據庫

from flask import Flask
from .views.account import ac
from .views.user import us
def create_app():
    app = Flack(__name__)
    
    
    @app.before_request   # 對全局的視圖有效 
    def xx():
        print("app.before_request")
    
    app.register_blueprint(ac)
    app.register_blueprint(us)
    
    return app 

account.py

各自的視圖文件,建立藍圖對象flask

本身視圖的使用爲本身的藍圖對象session

注意: 視圖函數的名字不能和藍圖對象重名多線程

from flask import Blueprint,render_template
ac = Blueprint("ac" ,__name__,template_folder="xxxx",static_url_path="xxxx")
# template_folder 優先在 templates 文件夾找。找不到再去 xxxx 裏找
# static_url_path 優先在 static 文件夾找。找不到再去 xxxx 裏找
# url_prefix="/xx" 爲當前藍圖的url里加前綴 
@ac.route("/login")
def login():
    return render_template("login.html")

user.py

from flask import Blueprint    
us = Blueprint("us" ,__name__)

@us.before_request   # 僅對當前的視圖有效
def xx():
    print("us.before_request")

@us.route("/user")
def user():
    return "user"

 

pymysql

方式一  數據庫連接放在視圖中

  每次視圖的執行進行數據庫鏈接查詢關閉。併發

  反覆建立數據庫連接,屢次連接數據庫會很是耗時 app

解決辦法:放在全局,單例模式

#!usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
from  flask import Flask

app = Flask(__name__)

@app.route('/index')
def index():
    # 連接數據庫
    conn = pymysql.connect(host="127.0.0.1",port=3306,user='root',password='123', database='pooldb',charset='utf8')
    cursor = conn.cursor()
    cursor.execute("select * from td where id=%s", [5, ])
    result = cursor.fetchall()  # 獲取數據
    cursor.close()
    conn.close()  # 關閉連接
    print(result)
    return  "執行成功"

if __name__ == '__main__':
    app.run(debug=True)

方式二  放在全局

  不在頻繁連接數據庫。

  若是是單線程,這樣沒什麼問題,

  可是若是是多線程,就得加把鎖。這樣就成串行的了

  爲了支持併發,此方法依舊不可取

#!usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
from  flask import Flask
from threading import RLock

app = Flask(__name__)
CONN = pymysql.connect(host="127.0.0.1",port=3306,user='root',password='123', database='pooldb',charset='utf8')

@app.route('/index')
def index():
    with RLock:
        cursor = CONN.cursor()
        cursor.execute("select * from td where id=%s", [5, ])
        result = cursor.fetchall()  # 獲取數據
        cursor.close()
        print(result)
        return  "執行成功"
if __name__ == '__main__':
    app.run(debug=True)

爲此。爲了解決方式一二的問題,實現不頻繁操做且能夠並行的數據庫連接,咱們須要用到 DBUtils 

方式三 DBUtils + thread.local

爲每個線程建立一個連接(是基於本地線程來實現的。thread.local),

每一個線程獨立使用本身的數據庫連接,該線程關閉不是真正的關閉,本線程再次調用時,仍是使用的最開始建立的連接,直到線程終止,數據庫連接才關閉

#!usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask
app = Flask(__name__)
from DBUtils.PersistentDB import PersistentDB
import pymysql
POOL = PersistentDB(
    creator=pymysql,  # 使用連接數據庫的模塊
    maxusage=None,  # 一個連接最多被重複使用的次數,None表示無限制
    setsession=[],  # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    closeable=False,
    # 若是爲False時, conn.close() 實際上被忽略,供下次使用,再線程關閉時,纔會自動關閉連接。若是爲True時, conn.close()則關閉連接,那麼再次調用pool.connection時就會報錯,由於已經真的關閉了鏈接(pool.steady_connection()能夠獲取一個新的連接)
    threadlocal=None,  # 本線程獨享值得對象,用於保存連接對象,若是連接對象被重置
    host='127.0.0.1',
    port=3306,
    user='root',
    password='123',
    database='pooldb',
    charset='utf8'
)

@app.route('/func')
def func():
  conn = POOL.connection()
  cursor = conn.cursor()
  cursor.execute('select * from tb1')
  result = cursor.fetchall()
  cursor.close()
  conn.close() # 不是真的關閉,而是假的關閉。 conn = pymysql.connect()   conn.close()

  conn = POOL.connection()
  cursor = conn.cursor()
  cursor.execute('select * from tb1')
  result = cursor.fetchall()
  cursor.close()
  conn.close()
if __name__ == '__main__': app.run(debug=True)

方式四 DBUtils + 連接池

建立一個連接池,爲全部線程提供鏈接,使用時來進行獲取,使用完畢後在放回到鏈接池。

PS:

  假設最大連接數有10個,其實也就是一個列表,當你pop一個,人家會在append一個,連接池的全部的連接都是按照排隊的這樣的方式來連接的。

  連接池裏全部的連接都能重複使用,共享的, 即實現了併發,又防止了連接次數太多

#!usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask
app = Flask(__name__)
from DBUtils.PersistentDB import PersistentDB
import pymysql

POOL = PooledDB(
    creator=pymysql,  # 使用連接數據庫的模塊
    maxconnections=6,  # 鏈接池容許的最大鏈接數,0和None表示不限制鏈接數
    mincached=2,  # 初始化時,連接池中至少建立的空閒的連接,0表示不建立
    maxcached=5,  # 連接池中最多閒置的連接,0和None不限制
    maxshared=3,  # 連接池中最多共享的連接數量,0和None表示所有共享。
    # PS: 無用,由於pymysql和MySQLdb等模塊的 threadsafety都爲1,全部值不管設置爲多少,_maxcached永遠爲0,因此永遠是全部連接都共享。
    blocking=True,  # 鏈接池中若是沒有可用鏈接後,是否阻塞等待。True,等待;False,不等待而後報錯
    maxusage=None,  # 一個連接最多被重複使用的次數,None表示無限制
    setsession=[],  # 開始會話前執行的命令列    AQ 表。如:["set datestyle to ...", "set time zone ..."]
    threadlocal=None,  # 本線程獨享值得對象,用於保存連接對象,若是連接對象被重置
    ping=0,
    # ping MySQL服務端,檢查是否服務可用。
        # 取值:
        # 0 = None = never, 
        # 1 = default = whenever it is requested, 
        # 2 = when a cursor is created, 
        # 4 = when a query is executed, 
        # 7 = always
    host='127.0.0.1',
    port=3306,
    user='root',
    password='',
    database='core_master',
    charset='utf8'
)
@app.route('/func')
def func():
  conn = POOL.connection()
  cursor = conn.cursor()
  cursor.execute('select * from tb1')
  result = cursor.fetchall()
  cursor.close()
  conn.close() # 不是真的關閉,而是假的關閉。 conn = pymysql.connect()   conn.close()

  conn = POOL.connection()
  cursor = conn.cursor()
  cursor.execute('select * from tb1')
  result = cursor.fetchall()
  cursor.close()
  conn.close()
if __name__ == '__main__': app.run(debug=True)

其餘

  pymysql 的操做非常繁瑣,大量的重複代碼,能夠進一部封裝

# 建立 連接池的操做封裝 
import pymysql

from settings import Config

def connect():
    conn = Config.POOL.connection()
    cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
    return conn,cursor


def connect_close(conn,cursor):
    cursor.close()
    conn.close()

def fetch_all(sql,args):
    conn,cursor = connect()

    cursor.execute(sql, args)
    record_list = cursor.fetchall()
    connect_close(conn,cursor)

    return record_list


def fetch_one(sql, args):
    conn, cursor = connect()
    cursor.execute(sql, args)
    result = cursor.fetchone()
    connect_close(conn, cursor)

    return result


def insert(sql, args):
    conn, cursor = connect()
    row = cursor.execute(sql, args)
    conn.commit()
    connect_close(conn, cursor)
    return row
封裝代碼

DBUtils 內部原理

在 DBUtils 中爲每一個線程建立一個數據庫鏈接的時候,

對每一個線程單首創建內存空間來保存數據,實現數據的空間分離

初始的多線程

import threading
from threading import local
import time

def task(i):
    global v
    time.sleep(1)
    print(v)

for i in range(10):
    t = threading.Thread(target=task,args=(i,))
    t.start()
#  9 9 9 9 9 9 9 9 9 

 

實現數據隔離的多線程

import threading
from threading import local
import time

obj = local()  # 爲每一個線程建立一個獨立的空間。數據空間隔離


def task(i):
    obj.xxxxx = i
    time.sleep(2)
    print(obj.xxxxx,i)

for i in range(10):
    t = threading.Thread(target=task,args=(i,))
    t.start()
# 0-9 打亂順序 

 

獲取線程的惟一標記示例

import threading
from threading import local

def task(i):
    print(threading.get_ident(),i)  # 獲取線程的惟一標記 

for i in range(10):
    t = threading.Thread(target=task,args=(i,))
    t.start()

 

threading local 的內部實現原理

import time
import threading
import greenlet

DIC = {}    # 用線程的惟一標識做爲 key 來建立一個大字典分別保存每一個線程的數據

def task(i):

    # ident = threading.get_ident()  # 獲取進程的 惟一id
    ident = greenlet.getcurrent()    # 獲取協程的 惟一id
    if ident in DIC:
        DIC[ident]['xxxxx'] = i
    else:
        DIC[ident] = {'xxxxx':i }
    time.sleep(2)

    print(DIC[ident]['xxxxx'],i)

for i in range(10):
    t = threading.Thread(target=task,args=(i,))
    t.start()

 

最終完整版

import time
import threading
try:
    import greenlet
    get_ident =  greenlet.getcurrent
except Exception as e:
    get_ident = threading.get_ident

class Local(object):
    DIC = {}

    def __getattr__(self, item):
        ident = get_ident()
        if ident in self.DIC:
            return self.DIC[ident].get(item)
        return None

    def __setattr__(self, key, value):
        ident = get_ident()
        if ident in self.DIC:
            self.DIC[ident][key] = value
        else:
            self.DIC[ident] = {key:value}


obj = Local()

def task(i):
    obj.xxxxx = i
    time.sleep(2)
    print(obj.xxxxx,i)

for i in range(10):
    t = threading.Thread(target=task,args=(i,))
    t.start()
相關文章
相關標籤/搜索