flask框架基礎

一 web的一些框架介紹html

 Flask:短小精悍,內部沒有包含多少組件,可是第三方的組件是很是豐富的。html5

 Django:django是一個重武器,內部包含了很是多的組件:orm,form,modelForm,緩存,session等等node

 Tornado:牛逼之處就是異步非阻塞框架和node.jspython

二 Flask的快速入門mysql

 建立python虛擬環境:virtualenv 虛擬名web

 安裝:pip install flask正則表達式

 什麼是werkzeug:Werkzeug是一個WSGI工具包,他能夠做爲一個Web框架的底層庫。官方的介紹說是一個 WSGI 工具包,它能夠做爲一個 Web 框架的底層庫,由於它封裝好了不少 Web 框架的東西,例如 Request,Response 等等redis

from werkzeug.wrappers import Request, Response

@Request.application
def hello(request):
    return Response('Hello World!')

if __name__ == '__main__':
    from werkzeug.serving import run_simple
    run_simple('localhost', 4000, hello)
View Code

 基本使用:sql

from flask import Flask
app = Flask(__name__)
 
@app.route('/')
def hello_world():
    return 'Hello World!'
 
if __name__ == '__main__':
    app.run()
View Code

 wsgi:使用werkzeug模塊實現的,還可使用wsgiref實現。本質是導入socket實現的。數據庫

   一旦出現這個,監聽事件就開始了

 實例化Flask對象

  app.run():監聽用戶的請求,一旦有用戶的請求過來,就會直接執行用戶的__call__方法。

  flask的全部相關的組件全都存在flask文件下的,須要什麼導入什麼

from flask import Flask,render_template,request,redirect,session,url_for

 for_url:高級版的重定向

from flask import Flask,render_template,request,redirect,session,url_for

app = Flask(__name__)
app.debug=True
app.secret_key='fangshao'
USERS = {
    1:{'name':'張桂坤','age':18,'gender':'','text':"當眼淚掉下來的時候,是真的累了, 其實人生就是這樣: 你有你的煩,我有個人難,人人都有無聲的淚,人人都有難言的苦。 忘不了的昨天,忙不完的今天,想不到的明天,走不完的人生,過不完的坎坷,越不過的無奈,聽不完的謊話,看不透的人心放不下的牽掛,經歷不完的酸甜苦辣,這就是人生,這就是生活。"},
    2:{'name':'主城','age':28,'gender':'','text':"高中的時候有一個同窗家裏窮,每頓飯都是膜膜加點水,有時候吃點鹹菜,咱們六科老師天天下課都叫他去辦公室回答問題背誦課文,而後說太晚啦一塊兒吃個飯,後來他考上了人大,拿到通知書的時候給每一個老師磕了一個頭"},
    3:{'name':'服城','age':18,'gender':'','text':"高中的時候有一個同窗家裏窮,每頓飯都是膜膜加點水,有時候吃點鹹菜,咱們六科老師天天下課都叫他去辦公室回答問題背誦課文,而後說太晚啦一塊兒吃個飯,後來他考上了人大,拿到通知書的時候給每一個老師磕了一個頭"},
}
@app.route('/')
def hello_world():
    return 'Hello World!'

# @app.route('/index',methods=['GET'])
# def index():
#     user=session.get('user_info')
#     print(user)
#     if user:
#         return render_template('index.html',data=user,user_dict=USERS)
#     return redirect('/login')

@app.route('/index',methods=['GET'])
def index():
    user=session.get('user_info')
    if user:
        return render_template('index.html',data=user,user_dict=USERS)
    url=url_for('111')
    return redirect(url)


@app.route('/detail/<int:nid>',methods=['GET'])
def detail(nid):
    user=session.get('user_info')
    if not user:
        return redirect('/login')
    return render_template('detail.html',data=USERS[nid])

@app.route('/login',methods=['GET','POST'],endpoint='111')
def login():
    if request.method=='POST':
        user=request.form.get('user')
        pwd=request.form.get('pwd')
        if user=='fang' and pwd=='123':
            session['user_info']=user
            return redirect('/index')
        else:
            return render_template('login.html',msg='用戶名或密碼錯誤')

    return render_template('login.html')


if __name__ == '__main__':
    app.run()
View Code

三 配置文件

 開放封閉原則:對代碼的修改是封閉的,對配置文件的修改時開放的。

 app.debug=True:修改事後自動重啓項目

 app.secret_key='隨機設置字符串':全局設置session,Session, Cookies以及一些第三方擴展都會用到,

 app.config['debug']=True:修改事後自動重啓項目

 app.config:獲取當前app的全部配置

 app.config.from_object:導入文件的一個類,

  內部實現原理:導入時,將路徑由點分割

 配置文件的格式有:

flask中的配置文件是一個flask.config.Config對象(繼承字典),默認配置爲:
    {
        'DEBUG':                                get_debug_flag(default=False),  是否開啓Debug模式
        'TESTING':                              False,                          是否開啓測試模式
        'PROPAGATE_EXCEPTIONS':                 None,                          
        'PRESERVE_CONTEXT_ON_EXCEPTION':        None,
        'SECRET_KEY':                           None,
        'PERMANENT_SESSION_LIFETIME':           timedelta(days=31),
        'USE_X_SENDFILE':                       False,
        'LOGGER_NAME':                          None,
        'LOGGER_HANDLER_POLICY':               'always',
        'SERVER_NAME':                          None,
        'APPLICATION_ROOT':                     None,
        'SESSION_COOKIE_NAME':                  'session',
        'SESSION_COOKIE_DOMAIN':                None,
        'SESSION_COOKIE_PATH':                  None,
        'SESSION_COOKIE_HTTPONLY':              True,
        'SESSION_COOKIE_SECURE':                False,
        'SESSION_REFRESH_EACH_REQUEST':         True,
        'MAX_CONTENT_LENGTH':                   None,
        'SEND_FILE_MAX_AGE_DEFAULT':            timedelta(hours=12),
        'TRAP_BAD_REQUEST_ERRORS':              False,
        'TRAP_HTTP_EXCEPTIONS':                 False,
        'EXPLAIN_TEMPLATE_LOADING':             False,
        'PREFERRED_URL_SCHEME':                 'http',
        'JSON_AS_ASCII':                        True,
        'JSON_SORT_KEYS':                       True,
        'JSONIFY_PRETTYPRINT_REGULAR':          True,
        'JSONIFY_MIMETYPE':                     'application/json',
        'TEMPLATES_AUTO_RELOAD':                None,
    }
View Code

  格式一:

app.config['DEBUG'] = True
 
PS: 因爲Config對象本質上是字典,因此還可使用app.config.update(...)
View Code

  格式二:

    app.config.from_pyfile("python文件名稱")
        如:
            settings.py
                DEBUG = True
 
            app.config.from_pyfile("settings.py")
 
    app.config.from_envvar("環境變量名稱")
        環境變量的值爲python文件名稱名稱,內部調用from_pyfile方法
 
 
    app.config.from_json("json文件名稱")
        JSON文件名稱,必須是json格式,由於內部會執行json.loads
 
    app.config.from_mapping({'DEBUG':True})
View Code

  格式三:

    app.config.from_object("python類或類的路徑")
 
        app.config.from_object('pro_flask.settings.TestingConfig')
 
        settings.py
 
            class Config(object):
                DEBUG = False
                TESTING = False
                DATABASE_URI = 'sqlite://:memory:'
 
            class ProductionConfig(Config):
                DATABASE_URI = 'mysql://user@localhost/foo'
 
            class DevelopmentConfig(Config):
                DEBUG = True
 
            class TestingConfig(Config):
                TESTING = True
 
        PS: 從sys.path中已經存在路徑開始寫
View Code

 settings.py默認路徑要放在當前項目的第一級目錄下面

  PS: settings.py文件默認路徑要放在程序root_path目錄,若是instance_relative_config爲True,則就是instance_path目錄

四 路由系統

  • @app.route('/user/<username>')
  • @app.route('/post/<int:post_id>')
  • @app.route('/post/<float:post_id>')
  • @app.route('/post/<path:path>')
  • @app.route('/login', methods=['GET', 'POST'])

 路由比較特殊,:是基於裝飾器實現的,可是究其本質仍是有add_url_rule實現的。

 裝飾器能夠有多個,放在上面和下面是不一樣的,

from flask import Flask,render_template,request,redirect,session,url_for

app = Flask(__name__)
app.debug=True
app.secret_key='fangshao'
USERS = {
    1:{'name':'張桂坤','age':18,'gender':'','text':"當眼淚掉下來的時候,是真的累了, 其實人生就是這樣: 你有你的煩,我有個人難,人人都有無聲的淚,人人都有難言的苦。 忘不了的昨天,忙不完的今天,想不到的明天,走不完的人生,過不完的坎坷,越不過的無奈,聽不完的謊話,看不透的人心放不下的牽掛,經歷不完的酸甜苦辣,這就是人生,這就是生活。"},
    2:{'name':'主城','age':28,'gender':'','text':"高中的時候有一個同窗家裏窮,每頓飯都是膜膜加點水,有時候吃點鹹菜,咱們六科老師天天下課都叫他去辦公室回答問題背誦課文,而後說太晚啦一塊兒吃個飯,後來他考上了人大,拿到通知書的時候給每一個老師磕了一個頭"},
    3:{'name':'服城','age':18,'gender':'','text':"高中的時候有一個同窗家裏窮,每頓飯都是膜膜加點水,有時候吃點鹹菜,咱們六科老師天天下課都叫他去辦公室回答問題背誦課文,而後說太晚啦一塊兒吃個飯,後來他考上了人大,拿到通知書的時候給每一個老師磕了一個頭"},
}


def get_session(func):
    def inner(*args,**kwargs):
        user = session.get('user_info')
        if not user:
            return redirect('/login')
        return func(*args,**kwargs)
    return inner


@app.route('/',endpoint='1111')
@get_session
def hello_world():
    return 'Hello World!'



@app.route('/index',methods=['GET'],endpoint='n1')
@get_session
def index():
    # user=session.get('user_info')
    # print(user)
    # if user:
    return render_template('index.html',user_dict=USERS)
    # return redirect('/login')


# @app.route('/index',methods=['GET'])
# def index():
#     user=session.get('user_info')
#     if user:
#         return render_template('index.html',user_dict=USERS)
#     url=url_for('111')
#     return redirect(url)


@app.route('/detail/<int:nid>',methods=['GET'],endpoint='n2')
def detail(nid):
    user=session.get('user_info')
    if not user:
        return redirect('/login')
    return render_template('detail.html',data=USERS[nid])

@app.route('/login',methods=['GET','POST'],endpoint='111')
def login():
    if request.method=='POST':
        user=request.form.get('user')
        pwd=request.form.get('pwd')
        if user=='fang' and pwd=='123':
            session['user_info']=user
            return redirect('/index')
        else:
            return render_template('login.html',msg='用戶名或密碼錯誤')

    return render_template('login.html')


if __name__ == '__main__':
    app.run()
View Code

 注意:這裏加上裝飾器,會重名,給他設置一個別名endpoint=‘別名’

 functools.wraps(函數):# 幫助咱們保存一些設置函數的元信息

from flask import Flask,render_template,request,redirect,session,url_for

app = Flask(__name__)
app.debug=True
app.secret_key='fangshao'
USERS = {
    1:{'name':'張桂坤','age':18,'gender':'','text':"當眼淚掉下來的時候,是真的累了, 其實人生就是這樣: 你有你的煩,我有個人難,人人都有無聲的淚,人人都有難言的苦。 忘不了的昨天,忙不完的今天,想不到的明天,走不完的人生,過不完的坎坷,越不過的無奈,聽不完的謊話,看不透的人心放不下的牽掛,經歷不完的酸甜苦辣,這就是人生,這就是生活。"},
    2:{'name':'主城','age':28,'gender':'','text':"高中的時候有一個同窗家裏窮,每頓飯都是膜膜加點水,有時候吃點鹹菜,咱們六科老師天天下課都叫他去辦公室回答問題背誦課文,而後說太晚啦一塊兒吃個飯,後來他考上了人大,拿到通知書的時候給每一個老師磕了一個頭"},
    3:{'name':'服城','age':18,'gender':'','text':"高中的時候有一個同窗家裏窮,每頓飯都是膜膜加點水,有時候吃點鹹菜,咱們六科老師天天下課都叫他去辦公室回答問題背誦課文,而後說太晚啦一塊兒吃個飯,後來他考上了人大,拿到通知書的時候給每一個老師磕了一個頭"},
}

import functools
def get_session(func):
    @functools.wraps(func)
    def inner(*args,**kwargs):
        user = session.get('user_info')
        if not user:
            return redirect('/login')
        return func(*args,**kwargs)
    return inner


@app.route('/',endpoint='1111')
@get_session
def hello_world():
    return 'Hello World!'



@app.route('/index',methods=['GET'])
@get_session
def index():
    # user=session.get('user_info')
    # print(user)
    # if user:
    return render_template('index.html',user_dict=USERS)
    # return redirect('/login')


# @app.route('/index',methods=['GET'])
# def index():
#     user=session.get('user_info')
#     if user:
#         return render_template('index.html',user_dict=USERS)
#     url=url_for('111')
#     return redirect(url)


@app.route('/detail/<int:nid>',methods=['GET'])
def detail(nid):
    user=session.get('user_info')
    if not user:
        return redirect('/login')
    return render_template('detail.html',data=USERS[nid])

@app.route('/login',methods=['GET','POST'])
def login():
    if request.method=='POST':
        user=request.form.get('user')
        pwd=request.form.get('pwd')
        if user=='fang' and pwd=='123':
            session['user_info']=user
            return redirect('/index')
        else:
            return render_template('login.html',msg='用戶名或密碼錯誤')

    return render_template('login.html')


if __name__ == '__main__':
    app.run()
View Code

 FBV的寫法:

  使用裝飾器,將括號裏面的內容添加到路由中@app.route()

DEFAULT_CONVERTERS = {
    'default':          UnicodeConverter,
    'string':           UnicodeConverter,
    'any':              AnyConverter,
    'path':             PathConverter,
    'int':              IntegerConverter,
    'float':            FloatConverter,
    'uuid':             UUIDConverter,
}
View Code

   methods=[]:裏面寫的是請求的方法,支持什麼方法都要寫上什麼方法。

      endpoint=別名:爲當前的url反向生成一個url,也就是起一個別名

      app.add_url_rule('/...'):添加路由的另外一種方法

        def auth(func):
            def inner(*args, **kwargs):
                print('before')
                result = func(*args, **kwargs)
                print('after')
                return result

        return inner

        @app.route('/index.html',methods=['GET','POST'],endpoint='index')
        @auth
        def index():
            return 'Index'def index():
            return "Index"

        self.add_url_rule(rule='/index.html', endpoint="index", view_func=index, methods=["GET","POST"])
        or
        app.add_url_rule(rule='/index.html', endpoint="index", view_func=index, methods=["GET","POST"])
        app.view_functions['index'] = index
View Code

 CBV的寫法:

  MethodView:API的簡單的一種實現方式,class建立的視圖類就須要繼承它。

class IndexView(views.MethodView):
    methods=['GET']
    decorators=[auth,]
    def get(self):
        return 'Index.GET'
    
    def post(self):
        return 'Index.POST'
View Code

  app.add_url_rule('/路徑',view_func='類名'.as_view(name=返回的那個函數)

app.add_url_rule('/index',view_func=IndexView.as_view(name='index'))

  對於CBV來講:雖然傳進去的是類名,可是最後返回的仍是一個函數。

        def auth(func):
            def inner(*args, **kwargs):
                print('before')
                result = func(*args, **kwargs)
                print('after')
                return result

        return inner

        class IndexView(views.View):
            methods = ['GET']
            decorators = [auth, ]

            def dispatch_request(self):
                print('Index')
                return 'Index!'

        app.add_url_rule('/index', view_func=IndexView.as_view(name='index'))  # name=endpoint
class IndexView(views.MethodView):
            methods = ['GET']
            decorators = [auth, ]

            def get(self):
                return 'Index.GET'

            def post(self):
                return 'Index.POST'


        app.add_url_rule('/index', view_func=IndexView.as_view(name='index'))  # name=endpoint




        @app.route和app.add_url_rule參數:
            rule,                       URL規則
            view_func,                  視圖函數名稱
            defaults=None,              默認值,當URL中無參數,函數須要參數時,使用defaults={'k':'v'}爲函數提供參數
            endpoint=None,              名稱,用於反向生成URL,即: url_for('名稱')
            methods=None,               容許的請求方式,如:["GET","POST"]
            

            strict_slashes=None,        對URL最後的 / 符號是否嚴格要求,
                                        如:
                                            @app.route('/index',strict_slashes=False),
                                                訪問 http://www.xx.com/index/ 或 http://www.xx.com/index都可
                                            @app.route('/index',strict_slashes=True)
                                                僅訪問 http://www.xx.com/index 
            redirect_to=None,           重定向到指定地址
                                        如:
                                            @app.route('/index/<int:nid>', redirect_to='/home/<nid>')
                                            或
                                            def func(adapter, nid):
                                                return "/home/888"
                                            @app.route('/index/<int:nid>', redirect_to=func)
            subdomain=None,             子域名訪問
                                                from flask import Flask, views, url_for

                                                app = Flask(import_name=__name__)
                                                app.config['SERVER_NAME'] = 'wupeiqi.com:5000'


                                                @app.route("/", subdomain="admin")
                                                def static_index():
                                                    """Flask supports static subdomains
                                                    This is available at static.your-domain.tld"""
                                                    return "static.your-domain.tld"


                                                @app.route("/dynamic", subdomain="<username>")
                                                def username_index(username):
                                                    """Dynamic subdomains are also supported
                                                    Try going to user1.your-domain.tld/dynamic"""
                                                    return username + ".your-domain.tld"


                                                if __name__ == '__main__':
                                                    app.run()
        
View Code

 default:傳入函數的參數。就是url後面的那個參數

 subdomain={}:建立子域名

 window設置域名:hosts而文件下面直接就能夠設置了C:\Windows\System32\driver\etc\hosts

 mcs系統設置域名:/ect/hosts文件下面就能夠設置域名了  

          from flask import Flask, views, url_for
            from werkzeug.routing import BaseConverter

            app = Flask(import_name=__name__)


            class RegexConverter(BaseConverter):
                """
                自定義URL匹配正則表達式
                """
                def __init__(self, map, regex):
                    super(RegexConverter, self).__init__(map)
                    self.regex = regex

                def to_python(self, value):
                    """
                    路由匹配時,匹配成功後傳遞給視圖函數中參數的值
                    :param value: 
                    :return: 
                    """
                    return int(value)

                def to_url(self, value):
                    """
                    使用url_for反向生成URL時,傳遞的參數通過該方法處理,返回的值用於生成URL中的參數
                    :param value: 
                    :return: 
                    """
                    val = super(RegexConverter, self).to_url(value)
                    return val

            # 添加到flask中
            app.url_map.converters['regex'] = RegexConverter


            @app.route('/index/<regex("\d+"):nid>')
            def index(nid):
                print(url_for('index', nid='888'))
                return 'Index'


            if __name__ == '__main__':
                app.run()
View Code

五 模板

 一、模板的使用:Flask使用的是Jinja2模板,因此其語法和Django無差異

 二、自定義模板方法:Flask中自定義模板方法的方式和Bottle類似,建立一個函數並經過參數的形式傳入

 for循環,並取到索引

  {% for k,v in 對象.items()%}

    字典的取值方法:v.字段名  v['字段名']  v.get('字段名')

  {% endfor %}

 函數渲染:不只要加上括號,還能夠加上參數

  {{函數名(參數)}}  加上|safe:防止xss攻擊

<!DOCTYPE html>
<html>
<head lang="en">
    <meta charset="UTF-8">
    <title></title>
</head>
<body>
    <h1>自定義函數</h1>
    {{ww()|safe}}

</body>
</html>
View Code

  在後臺若是使用

from flask import Flask,render_template
app = Flask(__name__)
 
 
def wupeiqi():
    return '<h1>Wupeiqi</h1>'
 
@app.route('/login', methods=['GET', 'POST'])
def login():
    return render_template('login.html', ww=wupeiqi)
 
app.run()
View Code

 Markup:後臺設置xss攻擊

 宏定義:就是定義一塊html,定義的這一塊就是就是一個函數

  {% macre 函數名(參數)%}  {% endmacre %}

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>


    {% macro input(name, type='text', value='') %}
        <input type="{{ type }}" name="{{ name }}" value="{{ value }}">
    {% endmacro %}

    {{ input('n1') }}

    {% include 'tp.html' %}

    <h1>asdf{{ v.k1}}</h1>
</body>
</html>
View Code

六 請求和響應

 請求和響應都是從flask爲念中導入的

 request:請求

 response:響應

  jsonify:響應的數據類型不是字符串類型,就是用這個將響應的內容轉成字符串。

from flask import Flask
    from flask import request
    from flask import render_template
    from flask import redirect
    from flask import make_response

    app = Flask(__name__)


    @app.route('/login.html', methods=['GET', "POST"])
    def login():

        # 請求相關信息
        # request.method
        # request.args
        # request.form
        # request.values
        # request.cookies
        # request.headers
        # request.path
        # request.full_path
        # request.script_root
        # request.url
        # request.base_url
        # request.url_root
        # request.host_url
        # request.host
        # request.files
        # obj = request.files['the_file_name']
        # obj.save('/var/www/uploads/' + secure_filename(f.filename))

        # 響應相關信息
        # return "字符串"
        # return render_template('html模板路徑',**{})
        # return redirect('/index.html')

        # response = make_response(render_template('index.html'))
        # response是flask.wrappers.Response類型
        # response.delete_cookie('key')
        # response.set_cookie('key', 'value')
        # response.headers['X-Something'] = 'A value'
        # return response


        return "內容"

    if __name__ == '__main__':
        app.run()
View Code

七 Session和Cookie

 session在使用前必需要有app.sceret_key加密,就至關於加鹽

 session['名']=字段:設置session

 應用demo:實例,使用裝飾器寫一個用戶認證

  思路:裝飾器,一個函數能夠加上多個裝飾器,反向查找的名稱不容許重名:endpoint

  session.pop:刪除一個session

 咱們這裏使用的session是flask內置的使用加密的cookie來保存數據的。

 基本使用:

from flask import Flask, session, redirect, url_for, escape, request
 
app = Flask(__name__)
 
@app.route('/')
def index():
    if 'username' in session:
        return 'Logged in as %s' % escape(session['username'])
    return 'You are not logged in'
 
@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'POST':
        session['username'] = request.form['username']
        return redirect(url_for('index'))
    return '''
        <form action="" method="post">
            <p><input type=text name=username>
            <p><input type=submit value=Login>
        </form>
    '''
 
@app.route('/logout')
def logout():
    # remove the username from the session if it's there
    session.pop('username', None)
    return redirect(url_for('index'))
 
# set the secret key.  keep this really secret:
app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX/,?RT'

基本使用
View Code

 自定義session 

pip3 install Flask-Session
        
        run.py
            from flask import Flask
            from flask import session
            from pro_flask.utils.session import MySessionInterface
            app = Flask(__name__)

            app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX/,?RT'
            app.session_interface = MySessionInterface()

            @app.route('/login.html', methods=['GET', "POST"])
            def login():
                print(session)
                session['user1'] = 'alex'
                session['user2'] = 'alex'
                del session['user2']

                return "內容"

            if __name__ == '__main__':
                app.run()

        session.py
            #!/usr/bin/env python
            # -*- coding:utf-8 -*-
            import uuid
            import json
            from flask.sessions import SessionInterface
            from flask.sessions import SessionMixin
            from itsdangerous import Signer, BadSignature, want_bytes


            class MySession(dict, SessionMixin):
                def __init__(self, initial=None, sid=None):
                    self.sid = sid
                    self.initial = initial
                    super(MySession, self).__init__(initial or ())


                def __setitem__(self, key, value):
                    super(MySession, self).__setitem__(key, value)

                def __getitem__(self, item):
                    return super(MySession, self).__getitem__(item)

                def __delitem__(self, key):
                    super(MySession, self).__delitem__(key)



            class MySessionInterface(SessionInterface):
                session_class = MySession
                container = {}

                def __init__(self):
                    import redis
                    self.redis = redis.Redis()

                def _generate_sid(self):
                    return str(uuid.uuid4())

                def _get_signer(self, app):
                    if not app.secret_key:
                        return None
                    return Signer(app.secret_key, salt='flask-session',
                                  key_derivation='hmac')

                def open_session(self, app, request):
                    """
                    程序剛啓動時執行,須要返回一個session對象
                    """
                    sid = request.cookies.get(app.session_cookie_name)
                    if not sid:
                        sid = self._generate_sid()
                        return self.session_class(sid=sid)

                    signer = self._get_signer(app)
                    try:
                        sid_as_bytes = signer.unsign(sid)
                        sid = sid_as_bytes.decode()
                    except BadSignature:
                        sid = self._generate_sid()
                        return self.session_class(sid=sid)

                    # session保存在redis中
                    # val = self.redis.get(sid)
                    # session保存在內存中
                    val = self.container.get(sid)

                    if val is not None:
                        try:
                            data = json.loads(val)
                            return self.session_class(data, sid=sid)
                        except:
                            return self.session_class(sid=sid)
                    return self.session_class(sid=sid)

                def save_session(self, app, session, response):
                    """
                    程序結束前執行,能夠保存session中全部的值
                    如:
                        保存到resit
                        寫入到用戶cookie
                    """
                    domain = self.get_cookie_domain(app)
                    path = self.get_cookie_path(app)
                    httponly = self.get_cookie_httponly(app)
                    secure = self.get_cookie_secure(app)
                    expires = self.get_expiration_time(app, session)

                    val = json.dumps(dict(session))

                    # session保存在redis中
                    # self.redis.setex(name=session.sid, value=val, time=app.permanent_session_lifetime)
                    # session保存在內存中
                    self.container.setdefault(session.sid, val)

                    session_id = self._get_signer(app).sign(want_bytes(session.sid))

                    response.set_cookie(app.session_cookie_name, session_id,
                                        expires=expires, httponly=httponly,
                                        domain=domain, path=path, secure=secure)

自定義Session
View Code

 第三方session

from flask import Flask, session, redirect
from flask.ext.session import Session


app = Flask(__name__)
app.debug = True
app.secret_key = 'asdfasdfasd'


app.config['SESSION_TYPE'] = 'redis'
from redis import Redis
app.config['SESSION_REDIS'] = Redis(host='192.168.0.94',port='6379')
Session(app)


@app.route('/login')
def login():
    session['username'] = 'alex'
    return redirect('/index')


@app.route('/index')
def index():
    name = session['username']
    return name


if __name__ == '__main__':
    app.run()

第三方session
View Code

八 閃現

 flash:向某一個地方設置一個值

  category:設置值的分類

 get_flashed_messages:從某一個地方獲取多個值,而且清除

  他們也是基於app.secret_key實現的

from flask import Flask,flash,get_flashed_messages

app = Flask(__name__)
app.secret_key = 'asdfasdf'
@app.route('/get')
def get():
    # 從某個地方獲取設置過的全部值,並清除。
    data = get_flashed_messages()
    print(data)
    return 'Hello World!'


@app.route('/set')
def set():
    # 向某個地方設置一個值
    flash('阿斯蒂芬')

    return 'Hello World!'


if __name__ == '__main__':
    app.run()
View Code

  category_filter:只取一類的值

from flask import Flask,flash,get_flashed_messages,request,redirect

app = Flask(__name__)
app.secret_key = 'asdfasdf'


@app.route('/index')
def index():
    # 從某個地方獲取設置過的全部值,並清除。
    val = request.args.get('v')
    if val == 'oldboy':
        return 'Hello World!'
    flash('超時錯誤',category="x1")
    return "ssdsdsdfsd"
    # return redirect('/error')


@app.route('/error')
def error():
    """
    展現錯誤信息
    :return:
    """
    data = get_flashed_messages(category_filter=['x1'])
    if data:
        msg = data[0]
    else:
        msg = "..."
    return "錯誤信息:%s" %(msg,)


if __name__ == '__main__':
    app.run()
View Code

    request.query_string.get('字段'):獲取到全部的url

 request.args.get('字段'):獲取get請求後面的值

 什麼是閃現:設置無論多少次的值,是基於session實現的,只要一次取到所有的值,而且清除

 閃現的用法:應用於臨時數據的操做,好比:顯示錯誤信息等等

from flask import Flask,session,Session,flash,get_flashed_messages,redirect,render_template,request
app = Flask(__name__)
app.secret_key ='sdfsdfsdf'

@app.route('/users')
def users():
    # 方式一
    # msg = request.args.get('msg','')
    # 方式二
    # msg = session.get('msg')
    # if msg:
    #     del session['msg']
    # 方式三
    v = get_flashed_messages()
    print(v)
    msg = ''
    return render_template('users.html',msg=msg)

@app.route('/useradd')
def user_add():
    # 在數據庫中添加一條數據
    # 假設添加成功,在跳轉到列表頁面時,顯示添加成功
    # 方式一
    # return redirect('/users?msg=添加成功')
    # 方式二
    # session['msg'] = '添加成功'
    # 方式三
    flash('添加成功')
    return redirect('/users')


if __name__ == '__main__':
    app.run(debug=True)
View Code

九 藍圖(blueprint)

 什麼是藍圖:功能分類,還能夠定義本身的模板和路由分發。就是爲了構造目錄的。

 藍圖的特性:不只能將不一樣功能的app進行分來,而且還能夠定義本身的模板路徑能夠實現路由的分發。能夠實現每個程序構造本身的app。

 Blueprint:建立藍圖  ('藍圖名','__name__')

  url_prefix:爲下面全部函數的使用機上一個前綴,能夠爲統一的某一類加上前綴

  template_folder:定義本身的模板路徑(html文件勒頸)

 app.register_blueprint:將藍圖註冊到app

 小型應用程序:示例

 大型應用程序:示例

小中型:

manage.py

import fcrm if __name__ == '__main__': fcrm.app.run()

__init__.py(只要一導入fcrm就會執行__init__.py文件)

複製代碼
from flask import Flask #導入accout 和order from fcrm.views import accout from fcrm.views import order app = Flask(__name__) print(app.root_path) #根目錄  app.register_blueprint(accout.accout) #吧藍圖註冊到app裏面,accout.accout是建立的藍圖對象 app.register_blueprint(order.order)
複製代碼

accout.py

複製代碼
from flask import Blueprint,render_template accout = Blueprint("accout",__name__) @accout.route('/accout') def xx(): return "accout" @accout.route("/login") def login(): return render_template("login.html")
複製代碼

order.py

from flask import Blueprint order = Blueprint("order",__name__) @order.route('/order') def register(): #注意視圖函數的名字不能和藍圖對象的名字同樣 return "order

使用藍圖時須要注意的

大型:

 

十 請求的擴展

 相似於django的中間件

 @app.defore_request:定製請求函數,每次請求都會執行有這個裝飾器的函數,每次都是從上到下執行的

 request.url:拿到正要運行的url

 @app.after_request:定製響應的函數,每次響應執行這個裝飾器的函數,每次都是從下到上執行的

 請求哈響應能夠有多個,若是請求給攔截了,可是全部的響應都會執行

 @app.errorhandler:定製錯誤的信息。

 @ app.template_global:爲模板定製函數,也就是自定義模板

 @app.before_first_request:只有第一次請求來了才執行這個函數

from flask import Flask,render_template,request,redirect,session,url_for
app = Flask(__name__)
app.debug = True
app.secret_key = 'siuljskdjfs'


@app.before_request
def process_request1(*args,**kwargs):
    print('process_request1 進來了')

@app.before_request
def process_request2(*args,**kwargs):
    print('process_request2 進來了')


@app.after_request
def process_response1(response):
    print('process_response1 走了')
    return response

@app.after_request
def process_response2(response):
    print('process_response2 走了')
    return response



@app.errorhandler(404)
def error_404(arg):
    return "404錯誤了"


@app.before_first_request
def first(*args,**kwargs):
    pass

@app.route('/index',methods=['GET'])
def index():
    print('index函數')
    return "Index"





if __name__ == '__main__':
    app.run()
View Code

 基於中間件實現用戶的認證登錄

                        @app.before_request
            def process_request(*args,**kwargs):
                if request.path == '/login':
                    return None
                user = session.get('user_info')
                if user:
                    return None
                return redirect('/login')
View Code

 模板中定製方法:

            @app.template_global()
            def sb(a1, a2):
                return a1 + a2
            {{sb(1,2)}}  

            
            @app.template_filter()
            def db(a1, a2, a3):
                return a1 + a2 + a3
            {{ 1|db(2,3)}}
View Code

十一 中間件

 中間件:在這裏就是一個請求的入口

 每次請求進來都會執行app的call方法。

from flask import Flask

app = Flask(__name__)

@app.route('/')
def index():
    return 'Hello World!'

class Md(object):
    def __init__(self,old_wsgi_app):
        self.old_wsgi_app = old_wsgi_app

    def __call__(self,  environ, start_response):
        print('開始以前')
        ret = self.old_wsgi_app(environ, start_response)
        print('結束以後')
        return ret

if __name__ == '__main__':
    app.wsgi_app = Md(app.wsgi_app)
    app.run()
View Code

十二 上下文處理

 threading.local:爲每個線程開闢一個單獨的內存空間來保存他本身的值

import threading

# class Foo():
#     def __init__(self):
#         self.name=0
#
# local_value=Foo()

local_value=threading.local()
def func(num):
    local_value.name=num
    import time
    time.sleep(1)
    print(local_value.name,threading.current_thread().name)

for i in range(20):
    th=threading.Thread(target=func,args=(i,),name='線程%s'%i)
    th.start()
View Code

 request:

  狀況一:單線程和單進程的狀況下不會有問題,應爲本身使用爲完畢事後,就會自動的清空request。

  狀況二:單進程和多線程,threading.local對象

  狀況三:但進程和單線程多個協程,theading.local對象就會出問題。

 解決方法:

  之後不支持協程:就可使用內置的threading.local對象

  支持協程:自定義相似threading.local對象的功能支持協程

 _thread.get_ident:獲取線程的一個惟一標識

 greenlet.getcurrent:獲取協程額惟一標識

 自定義支持協程:

"""
{
   1368:{}
}
"""

import threading
try:
    from greenlet import getcurrent as get_ident  # 協程
except ImportError:
    try:
        from thread import get_ident
    except ImportError:
       from _thread import get_ident  # 線程

class Local():
    def __init__(self):
        self.storage={}
        self.get_ident=get_ident

    def set(self,k,v):
        ident=self.get_ident()
        origin=self.storage.get(ident)
        if not origin:
            origin={k:v}
        else:
            origin[k]=v
        self.storage[ident]=origin

    def get(self,k):
        ident=self.get_ident()
        origin=self.storage.get(ident)
        if not origin:
            return None
        return origin.get(k,None)

local_values=Local()

def task(num):
    local_values.set('name',num)
    import time
    time.sleep(1)
    print(local_values.get('name'),threading.current_thread().name)

for i in range(20):
    th=threading.Thread(target=task,args=(i,),name='線程%s'%i)
    th.start()
View Code

 反射實例:

class Foo(object):
    def __init__(self):
        object.__setattr__(self,'storage',{})

    def __setattr__(self, key, value):
        self.storage={'k1:v1'}
        print(key,value)

    def __getattr__(self, item):
        print(item)
        return 'df'

obj=Foo()
# obj.x = 123

print(obj)
View Code

 自定義支持協程的flask:使用反射實現

"""
{
   1368:{}
}
"""
import flask.globals
import threading
try:
    from greenlet import getcurrent as get_ident  # 協程
except ImportError:
    try:
        from thread import get_ident
    except ImportError:
       from _thread import get_ident  # 線程

class Local():
    def __init__(self):
        object.__setattr__(self,'__storage__',{})
        object.__setattr__(self,'__ident_func__',get_ident)

    def __setattr__(self, name, value):
        ident=self.__ident_func__()
        storage=self.__storage__
        try:
            storage[ident][name]=value
        except KeyError:
            storage[ident]={name:value}
    def __getattr__(self, name):
        try:
            return self.__storage__[self.__ident_func__()][name]
        except KeyError:
            raise AttributeError(name)


    def __delattr__(self, name):
        try:
            del self.__storage__[self.__ident_finc__()][name]
        except KeyError:
            raise AttributeError(name)

local_values=Local()

def task(num):
    local_values.name=num
    import time
    time.sleep(1)
    print(local_values.name,threading.current_thread().name)
for i in range(20):
    th=threading.Thread(target=task,args=(i,),name='線程%s'%i)
    th.start()
View Code

  補充:
  偏函數:functools模塊

   functools.partial:建立一個新的函數,主要是爲了給原函數傳入參數

# import functools
#
# def func(a1,a2):
#     print(a1+a2)
#
#
# new_func=functools.partial(func,12)  #
# new_func(21)
View Code

  面向對象的__add__方法補充:當把面向對象中的全部__函數__實現時,對象作任何操做時,都會執行其中對應的方法。

   __add__:誰在前面就會調用誰的__add__方法

# class Foo():
#     def __init__(self,num):
#         self.num=num
#     def __add__(self,other):
#         return Foo(self.num+other.num)
# obj1=Foo(11)
# obj2=Foo(22)
# v=obj1+obj2
# print(v)
View Code

  拼接列表中的值:

   itertools.chain:傳入函數,產生一個新的函數對象的列表,主要幫助咱們作列表元素的拼接

    實例1:

from itertools import chain

# def a1(x):
#     return x+1
#
# func_list=[a1,lambda x:x+1]
#
# def a2(y):
#     return y+10
#
# func_list_1=chain([a2],func_list)
# for func in func_list_1:
#     print(func)
View Code

    實例2:

# v1=[1,2,3]
# v2=[4,5,6]
#
# for l in chain(v1,v2):  # chain 主要是作列表的拼接
#     print(l)
View Code

  上下文源碼流程:https://www.processon.com/diagraming/5ab8c9f0e4b0d165d5b83fbb

 談談flask的上下文管理:

        - 與django相比是兩種不一樣的實現方式。
            - django/tornado是經過傳參數形式
            - flask是經過上下文管理
            兩種均可以實現,只不過試下方式不同。
            - 上下文管理:
                - threading.local/Local類,其中建立了一個字典{greelet作惟一標識:存數據} 保證數據隔離
                - 請求進來:
                    - 請求相關全部數據封裝到了RequestContext中。
                    - 再講RequestContext對象添加到Local中(經過LocalStack將對象添加到Local對象中)
                - 使用,調用request
                    - 調用此類方法 request.method、print(request)、request+xxx 會執行LocalProxy中對應的方法
                    - 函數
                    - 經過LocalStack去Local中獲取值。
                - 請求終止
                    - 經過LocalStack的pop方法 Local中將值異常。
View Code

 補充:

    再將對象封裝到Local中
    Flask能夠傳入任何的字符串參數
View Code

 請求上下文:請求上下文封裝的就是RequestContext對象

  request:是LocalProxy對象

   這個對象實例化事傳入了一個函數,還有一個request參數
   之後執行偏函數partail(_Lookup_req_object,'request')時,自動傳遞request參數
   目標:去Local中獲取ctx,而後再在ctx中獲取request
   ctx.push:裏面作的事,將ctx經過LocalStack添加到Local中

  session:通過push以後,session裏面已經有值了,seif.session幫助我獲取session信息

 應用上下文:應用上下文封裝的是AppContext

  AppContext裏面封裝了app對象
  app.context:建立了app對象
  app.app_ctx_global_class:至關於一個全局變量

  app和g

  g:每一個請求週期都會建立一個用於在請求週期中傳遞值的一個容器。只有一次生命週期可用,由於一次請求週期後第二次就會從新建立
print(g):執行g的實例對象

 請求到來 ,有人來訪問:

                # 將請求相關的數據environ封裝到了RequestContext對象中
                # 再講對象封裝到local中(每一個線程/每一個協程獨立空間存儲)
                # ctx.app # 當前APP的名稱
                # ctx.request # Request對象(封裝請求相關東西)
                # ctx.session # 空
                _request_ctx_stack.local = {
                    惟一標識:{
                        "stack":[ctx, ]
                    },
                    惟一標識:{
                        "stack":[ctx, ]
                    },
                }
                
                
                # app_ctx = AppContext對象
                # app_ctx.app
                # app_ctx.g
                    
                _app_ctx_stack.local = {
                    惟一標識:{
                        "stack":[app_ctx, ]
                    },
                    惟一標識:{
                        "stack":[app_ctx, ]
                    },
                }
View Code

 使用:print打印request,session.g,current_app的時候,都會執行他們相對應的對象的__str__
他們之間不一樣的是偏函數不同

                    from flask import request,session,g,current_app
                    
                    print(request,session,g,current_app)
                    
                    都會執行相應LocalProxy對象的 __str__
                    
                    current_app = LocalProxy(_find_app)
                        request = LocalProxy(partial(_lookup_req_object, 'request'))
                        session = LocalProxy(partial(_lookup_req_object, 'session'))
                        
                        current_app = LocalProxy(_find_app)
                        g = LocalProxy(partial(_lookup_app_object, 'g'))
                    
View Code

 終止,所有pop

 問題:

  若是他是多線程的時候是如何實現的。

            請求到來的時候就是兩個Local,可是沒有值,一共只有兩個Local,進來一個線程建立本身的惟一標識。不過進來多少的線程,都只用這兩個Local
View Code

  flask的local中保存數據時,使用列表建立出來的棧。爲何用棧?

                   - 若是寫web程序,web運行環境;棧中永遠保存1條數據(能夠不用棧)。
                   - 寫腳本獲取app信息時,可能存在app上下文嵌套關係。
                        from flask import Flask,current_app,globals,_app_ctx_stack

                        app1 = Flask('app01')
                        app1.debug = False # 用戶/密碼/郵箱
                        # app_ctx = AppContext(self):
                        # app_ctx.app
                        # app_ctx.g

                        app2 = Flask('app02')
                        app2.debug = True # 用戶/密碼/郵箱
                        # app_ctx = AppContext(self):
                        # app_ctx.app
                        # app_ctx.g



                        with app1.app_context():# __enter__方法 -> push -> app_ctx添加到_app_ctx_stack.local
                            # {<greenlet.greenlet object at 0x00000000036E2340>: {'stack': [<flask.ctx.AppContext object at 0x00000000037CA438>]}}
                            print(_app_ctx_stack._local.__storage__)
                            print(current_app.config['DEBUG'])

                            with app2.app_context():
                                # {<greenlet.greenlet object at 0x00000000036E2340>: {'stack': [<flask.ctx.AppContext object at 0x00000000037CA438> ]}}
                                print(_app_ctx_stack._local.__storage__)
                                print(current_app.config['DEBUG'])

                            print(current_app.config['DEBUG'])
View Code

 多app應用:不只可使用藍圖進行分發,還可使用app進行分發
  DispatchMiddleware:能夠進行路由分發。
  在這裏面沒有app.run,直接能夠run_simple啓動

            from werkzeug.wsgi import DispatcherMiddleware
            from werkzeug.serving import run_simple
            from flask import Flask, current_app

            app1 = Flask('app01')

            app2 = Flask('app02')



            @app1.route('/index')
            def index():
                return "app01"


            @app2.route('/index2')
            def index2():
                return "app2"

            # http://www.oldboyedu.com/index
            # http://www.oldboyedu.com/sec/index2
            dm = DispatcherMiddleware(app1, {
                '/sec': app2,
            })

            if __name__ == "__main__":
                run_simple('localhost', 5000, dm)
View Code

 問題:web訪問多app,上線問管理是如何實現的

  請求進來,爲棧添加的仍是一個值

 問題:爲何使用棧,離線腳本

  若是寫web程序,或者web運行環境:棧中永遠保存一條數據(這個能夠不使用棧)
  寫腳本獲取app信息的時候,可能會存在上下文嵌套關係。這時有可能要用到棧

 問題:問題:Web訪問多app應用時,上下文管理是如何實現?

 補充:

 遇到with就會執行__enter__方法,這個方法返回值,as後面那個值就是返回值
 當指執行完畢以後 ,自動調用類的__exit__方法
from flask import Flask,current_app,globals,_app_ctx_stack

app1 = Flask('app01')
app1.debug = False # 用戶/密碼/郵箱
# app_ctx = AppContext(self):
# app_ctx.app
# app_ctx.g

app2 = Flask('app02')
app2.debug = True # 用戶/密碼/郵箱
# app_ctx = AppContext(self):
# app_ctx.app
# app_ctx.g

with app1.app_context():  # # __enter__方法 -> push -> app_ctx添加到_app_ctx_stack.local
    print(_app_ctx_stack._local.__storage__)
    print(current_app.config['DEBUG'])
    with app1.app_context():
        print(_app_ctx_stack._local.__storage__)
        print(current_app.config['DEBUG'])
    print(current_app.config['DEBUG'])


with app1.app_context():
    print(_app_ctx_stack._local.__storage__)
    print(current_app.config['DEBUG'])
with app1.app_context():
    print(_app_ctx_stack._local.__storage__)
    print(current_app.config['DEBUG'])
View Code

 補充:永遠兩個Local對象

 實現細節:
  ResquestContext對象經過LocalStark添加到Local中
  導入的request是一個LocalProxy對象,而後在經過偏函數調用了LocakStack,在調用Local
  RequestContext的auto_pop,在執行LocalStack,再到Local中移除

十三 數據庫的鏈接池

 前夕:

  django的經常使用數據庫:

   ORM:django默認的數據庫

   pymysql模塊:導入mysql數據庫,python2和python3版本都有這個模塊

   MySQLdb:同樣,也是導入mysql數據庫,這個模塊只有python2版本纔有

  flask/其餘:

   pymysql:導mysql數據庫

   MySQLdb:導入mysql數據庫

  SQLAchemy:

   是ORM的一款框架,能夠導入mysql數據庫(pymysql / MySQLdb)

 原生SQL:

from flask import Flask

app = Flask(__name__)

@app.route("/")
def hello():
    import pymysql
    CONN = pymysql.connect(host='127.0.0.1',
                           port=3306,
                           user='root',
                           password='123',
                           database='pooldb',
                           charset='utf8')

    cursor = CONN.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    cursor.close()

    print(result)

    return "Hello World"

if __name__ == '__main__':
    app.run()
View Code
from flask import Flask

app = Flask(__name__)
import pymysql
CONN = pymysql.connect(host='127.0.0.1',
                           port=3306,
                           user='root',
                           password='123',
                           database='pooldb',
                           charset='utf8')

@app.route("/")
def hello():
    cursor = CONN.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    cursor.close()
    print(result)
    return "Hello World"

if __name__ == '__main__':
    app.run()




######加鎖#########################
from flask import Flask
import threading
app = Flask(__name__)
import pymysql
CONN = pymysql.connect(host='127.0.0.1',
                           port=3306,
                           user='root',
                           password='123',
                           database='pooldb',
                           charset='utf8')

@app.route("/")
def hello():
    with threading.Lock():
        cursor = CONN.cursor()
        cursor.execute('select * from tb1')
        result = cursor.fetchall()
        cursor.close()

        print(result)

    return "Hello World"

if __name__ == '__main__':
    app.run()
View Code

 問題:
 解決:

  不能爲每個用戶建立一個連接

  建立必定數量的鏈接池,只要鏈接池有空的,纔會進來,否則就會等着。

 使用UBDtils模塊:

  下載網站:https://pypi.python.org/pypi/DBUtils

   pip install UBDtils

  若是安裝到虛擬環境下面,須要先切換到虛擬環境下面

  使用:

   模式一:

    爲每個線程建立一個連接,即便縣城調用close方法,也不會關閉掉

    佔用鏈接池不放,浪費了鏈接池

import pymysql
from DBUtils.PersistentDB import PersistentDB

POOL = PersistentDB(
    creator=pymysql,  # 使用連接數據庫的模塊
    maxusage=None,  # 一個連接最多被重複使用的次數,None表示無限制
    setsession=[],  # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    closeable=False,
    # 若是爲False時, conn.close() 實際上被忽略,供下次使用,再線程關閉時,纔會自動關閉連接。若是爲True時, conn.close()則關閉連接,那麼再次調用pool.connection時就會報錯,由於已經真的關閉了鏈接(pool.steady_connection()能夠獲取一個新的連接)
    threadlocal=None,  # 本線程獨享值得對象,用於保存連接對象,若是連接對象被重置
    host='127.0.0.1',
    port=3306,
    user='root',
    password='123',
    database='pooldb',
    charset='utf8'
)

def func():
    conn = POOL.connection(shareable=False)
    cursor = conn.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    cursor.close()
    conn.close()

func()
View Code

   模式二:
    設置最大限制建立鏈接池的數量,進來一個線程,就建立一個鏈接池

    若是超過鏈接池的限制數量,就會在那裏等着有空的鏈接池釋放出來纔可以下一個線程連接進去

import time
import pymysql
import threading
from DBUtils.PooledDB import PooledDB, SharedDBConnection
POOL = PooledDB(
    creator=pymysql,  # 使用連接數據庫的模塊
    maxconnections=6,  # 鏈接池容許的最大鏈接數,0和None表示不限制鏈接數
    mincached=2,  # 初始化時,連接池中至少建立的空閒的連接,0表示不建立
    maxcached=5,  # 連接池中最多閒置的連接,0和None不限制
    maxshared=3,  # 連接池中最多共享的連接數量,0和None表示所有共享。PS: 無用,由於pymysql和MySQLdb等模塊的 threadsafety都爲1,全部值不管設置爲多少,_maxcached永遠爲0,因此永遠是全部連接都共享。
    blocking=True,  # 鏈接池中若是沒有可用鏈接後,是否阻塞等待。True,等待;False,不等待而後報錯
    maxusage=None,  # 一個連接最多被重複使用的次數,None表示無限制
    setsession=[],  # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    host='127.0.0.1',
    port=3306,
    user='root',
    password='123',
    database='pooldb',
    charset='utf8'
)


def func():
    # 檢測當前正在運行鏈接數的是否小於最大連接數,若是不小於則:等待或報raise TooManyConnections異常
    # 不然
    # 則優先去初始化時建立的連接中獲取連接 SteadyDBConnection。
    # 而後將SteadyDBConnection對象封裝到PooledDedicatedDBConnection中並返回。
    # 若是最開始建立的連接沒有連接,則去建立一個SteadyDBConnection對象,再封裝到PooledDedicatedDBConnection中並返回。
    # 一旦關閉連接後,鏈接就返回到鏈接池讓後續線程繼續使用。
    conn = POOL.connection()

    # print(th, '連接被拿走了', conn1._con)
    # print(th, '池子裏目前有', pool._idle_cache, '\r\n')

    cursor = conn.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    conn.close()


func()
View Code

  工做實用:

                         import pymysql
                from DBUtils.PooledDB import PooledDB
                POOL = PooledDB(
                    creator=pymysql,  # 使用連接數據庫的模塊
                    maxconnections=6,  # 鏈接池容許的最大鏈接數,0和None表示不限制鏈接數
                    mincached=2,  # 初始化時,連接池中至少建立的空閒的連接,0表示不建立
                    maxcached=5,  # 連接池中最多閒置的連接,0和None不限制
                    maxshared=3,  # 連接池中最多共享的連接數量,0和None表示所有共享。PS: 無用,由於pymysql和MySQLdb等模塊的 threadsafety都爲1,全部值不管設置爲多少,_maxcached永遠爲0,因此永遠是全部連接都共享。
                    blocking=True,  # 鏈接池中若是沒有可用鏈接後,是否阻塞等待。True,等待;False,不等待而後報錯
                    maxusage=None,  # 一個連接最多被重複使用的次數,None表示無限制
                    setsession=[],  # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."]
                    ping=0,
                    # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
                    host='127.0.0.1',
                    port=3306,
                    user='root',
                    password='123',
                    database='pooldb',
                    charset='utf8'
                )

                """
                class SQLHelper(object):
                    
                    @staticmethod
                    def fetch_one(sql,args):
                        conn = POOL.connection()
                        cursor = conn.cursor()
                        cursor.execute(sql, args)
                        result = cursor.fetchone()
                        conn.close()
                        return result

                    @staticmethod
                    def fetch_all(self,sql,args):
                        conn = POOL.connection()
                        cursor = conn.cursor()
                        cursor.execute(sql, args)
                        result = cursor.fetchone()
                        conn.close()
                        return result
                    
                # 之後使用:
                result = SQLHelper.fetch_one('select * from xxx',[])
                print(result)
                """
View Code

 十四 信號

 什麼是信號:signal 一種處理異步時間的方法。信號是POSIX系統的信號,由硬件或軟件觸發,再有操做系統內核發給應用程序的中斷形式。POSIX由一系列的信號集。

 什麼是信號量:semaphore 一種進程同步的機制。信號量是POSIX進程間通訊的工具,在它上面定義了一系列操做原語,簡單地講它能夠在進程間進行通訊。

 flask本身沒有信號,要依賴blinker這個模塊 安裝 pip install blinker
 內置信號:

request_started = _signals.signal('request-started')                # 請求到來前執行
request_finished = _signals.signal('request-finished')              # 請求結束後執行
 
before_render_template = _signals.signal('before-render-template')  # 模板渲染前執行
template_rendered = _signals.signal('template-rendered')            # 模板渲染後執行
 
got_request_exception = _signals.signal('got-request-exception')    # 請求執行出現異常時執行
 
request_tearing_down = _signals.signal('request-tearing-down')      # 請求執行完畢後自動執行(不管成功與否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 請求上下文執行完畢後自動執行(不管成功與否)
 
appcontext_pushed = _signals.signal('appcontext-pushed')            # 請求上下文push時執行
appcontext_popped = _signals.signal('appcontext-popped')            # 請求上下文pop時執行
message_flashed = _signals.signal('message-flashed')                # 調用flask在其中添加數據時,自動觸發
View Code

 信號放哪裏了:放在了signals文件裏面
 signals.request_started.conncet(函數名) :註冊函數
 等待請求的到來
 在視圖函數執行以前觸法信號的
 是怎麼觸發信號的?
  signals.request_started.send:方法觸發信號
  send是在full_dispatch_request方法中執行的

 信號的源碼流程:

 a. before_first_request
 b. 觸發 request_started 信號
 c. before_request
 d. 模板渲染
   渲染前的信號 before_render_template.send(app, template=template, context=context)
    rv = template.render(context) # 模板渲染
   渲染後的信號 template_rendered.send(app, template=template, context=context)
 e. after_request
 f. session.save_session()
 g. 觸發 request_finished信號

 若是上述過程出錯:
   觸發錯誤處理信號 got_request_exception.send(self, exception=e)
   
 h. 觸發信號 request_tearing_down
      
 由信號引起的源碼流程:找擴展點
View Code

 十五 flask-session插件

 Flask中的session處理機制(內置:將session保存在加密cookie中實現)

 內置的session:

  請求剛到來:獲取隨機字符串,存在則去「數據庫」中獲取原來的我的數據,不然建立一個空容器。 --> 內存:對象(隨機字符串,{放置數據的容器})

# 1. obj = 建立SecureCookieSessionInterface()
# 2. obj = open_session(self.request) = SecureCookieSession()
# self.session = SecureCookieSession()對象。
self.session = self.app.open_session(self.request)
View Code

  視圖:操做內存中 對象(隨機字符串,{放置數據的容器})
   響應:內存對象(隨機字符串,{放置數據的容器})
  將數據保存到「數據庫」
  把隨機字符串寫在用戶cookie中。

 自定義:

  請求剛到來:

# 建立特殊字典,並添加到Local中。
# 調用關係:
# self.session_interface.open_session(self, request)
# 因爲默認app中的session_interface=SecureCookieSessionInterface()
# SecureCookieSessionInterface().open_session(self, request)
# 因爲默認app中的session_interface=MySessionInterFace()
# MySessionInterFace().open_session(self, request)
self.session = self.app.open_session(self.request)
View Code

  調用:

from flask import Flask,session


app = Flask(__name__)
app.secret_key = 'suijksdfsd'


import json
class MySessionInterFace(object):
    def open_session(self,app,request):
        return {}

    def save_session(self, app, session, response):
        response.set_cookie('session_idfsdfsdfsdf',json.dumps(session))

    def is_null_session(self, obj):
        """Checks if a given object is a null session.  Null sessions are
        not asked to be saved.

        This checks if the object is an instance of :attr:`null_session_class`
        by default.
        """
        return False

app.session_interface = MySessionInterFace()

@app.route('/')
def index():
    # 特殊空字典
    # 在local的ctx中找到session
    # 在空字典中寫值
    # 在空字典中獲取值
    session['xxx'] = 123


    return 'Index'

# # 一旦請求到來
# app.__call__
# app.wsgi_app
# app.session_interface
# app.open_session


if __name__ == '__main__':

    app.run()
View Code

session -> LocalProxy -> 偏函數 -> LocalStack -> Local
  請求終止:

# 因爲默認app中的session_interface=SecureCookieSessionInterface()
# SecureCookieSessionInterface().save_session(self, app, session, response)
# 因爲默認app中的session_interface=MySessionInterFace()
# MySessionInterFace().save_session(self, app, session, response)
View Code

 flask-session組件
  使用:

from flask import Flask,session
from flask_session import RedisSessionInterface

app = Flask(__name__)
app.secret_key = 'suijksdfsd'
View Code

   方式一

from redis import Redis
conn = Redis()
app.session_interface = RedisSessionInterface(conn,key_prefix='__',use_signer=False)
View Code

   方式二

from redis import Redis
from flask.ext.session import Session
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] = Redis(host='192.168.0.94',port='6379')
Session(app)
View Code
@app.route('/')
def index():
session['xxx'] = 123
return 'Index'


if __name__ == '__main__':

app.run()
View Code

  源碼:
      流程
 問題:設置cookie時,如何設定關閉瀏覽器則cookie失效。
  response.set_cookie('k','v',exipre=None)

十六 wtforms組建

 安裝:pip3 install wtforms

 使用:

from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets

app = Flask(__name__, template_folder='templates')
app.debug = True



class RegisterForm(Form):
    name = simple.StringField(
        label='用戶名',
        validators=[
            validators.DataRequired()
        ],
        widget=widgets.TextInput(),
        render_kw={'class': 'form-control'},
        default='alex'
    )

    pwd = simple.PasswordField(
        label='密碼',
        validators=[
            validators.DataRequired(message='密碼不能爲空.')
        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )

    pwd_confirm = simple.PasswordField(
        label='重複密碼',
        validators=[
            validators.DataRequired(message='重複密碼不能爲空.'),
            validators.EqualTo('pwd', message="兩次密碼輸入不一致")
        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )

    email = html5.EmailField(
        label='郵箱',
        validators=[
            validators.DataRequired(message='郵箱不能爲空.'),
            validators.Email(message='郵箱格式錯誤')
        ],
        widget=widgets.TextInput(input_type='email'),
        render_kw={'class': 'form-control'}
    )

    gender = core.RadioField(
        label='性別',
        choices=(
            (1, ''),
            (2, ''),
        ),
        coerce=int # 「1」 「2」
     )
    city = core.SelectField(
        label='城市',
        choices=(
            ('bj', '北京'),
            ('sh', '上海'),
        )
    )

    hobby = core.SelectMultipleField(
        label='愛好',
        choices=(
            (1, '籃球'),
            (2, '足球'),
        ),
        coerce=int
    )

    favor = core.SelectMultipleField(
        label='喜愛',
        choices=(
            (1, '籃球'),
            (2, '足球'),
        ),
        widget=widgets.ListWidget(prefix_label=False),
        option_widget=widgets.CheckboxInput(),
        coerce=int,
        default=[1, 2]
    )

    def __init__(self, *args, **kwargs):
        super(RegisterForm, self).__init__(*args, **kwargs)
        self.favor.choices = ((1, '籃球'), (2, '足球'), (3, '羽毛球'))

    def validate_pwd_confirm(self, field):
        """
        自定義pwd_confirm字段規則,例:與pwd字段是否一致
        :param field:
        :return:
        """
        # 最開始初始化時,self.data中已經有全部的值

        if field.data != self.data['pwd']:
            # raise validators.ValidationError("密碼不一致") # 繼續後續驗證
            raise validators.StopValidation("密碼不一致")  # 再也不繼續後續驗證


@app.route('/register', methods=['GET', 'POST'])
def register():
    if request.method == 'GET':
        form = RegisterForm(data={'gender': 2,'hobby':[1,]}) # initial
        return render_template('register.html', form=form)
    else:
        form = RegisterForm(formdata=request.form)
        if form.validate():
            print('用戶提交數據經過格式驗證,提交的值爲:', form.data)
        else:
            print(form.errors)
        return render_template('register.html', form=form)



if __name__ == '__main__':
    app.run()
View Code

 源碼流程:

  實現方式:

                        1. 自動生成HTML
                class LoginForm(Form):
                    # 字段(內部包含正則表達式)
                    name = simple.StringField(
                        label='用戶名',
                        validators=[
                            validators.DataRequired(message='用戶名不能爲空.'),
                            validators.Length(min=6, max=18, message='用戶名長度必須大於%(min)d且小於%(max)d')
                        ],
                        widget=widgets.TextInput(), # 頁面上顯示的插件
                        render_kw={'class': 'form-control'}

                    )
                    # 字段(內部包含正則表達式)
                    pwd = simple.PasswordField(
                        label='密碼',
                        validators=[
                            validators.DataRequired(message='密碼不能爲空.'),
                            validators.Length(min=8, message='用戶名長度必須大於%(min)d'),
                            validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
                                              message='密碼至少8個字符,至少1個大寫字母,1個小寫字母,1個數字和1個特殊字符')

                        ],
                        widget=widgets.PasswordInput(),
                        render_kw={'class': 'form-control'}
                    )
                    
                    def __iter__(self):
                        return iter([self.name,self.pwd])
                # 方式一:
                    obj = LoginForm()
                    print(obj.name) # 調用字段的__str__
                # 方式二:
                    obj = LoginForm()
                    for item in obj:
                        print(item) # 調用字段的__str__
            
View Code

  校驗:

a. 後臺定義好正則
b. 用戶發來數據
c. 對數據進行校驗
View Code

  源碼實現:自動生成HTML文件
     解釋:metaclass

  Metaclass做用:用來指定當前類是誰來建立的,若是不指定默認是type建立的
  類繼承:只要這個指定了Metaclass,那麼這個類以及他的子類都指定同一個metaclass
- MetaClass做用:用來指定當前類由誰來建立(默認type建立)。
        - 使用metaclass
            class Foo(metaclass=type):
                pass 
            
            class Foo(object):
                __metaclass__ = type
        - 類繼承
            
            class MyType(type):
                def __init__(self,*args,**kwargs):
                    print('init')
                    super(MyType,self).__init__(*args,**kwargs)

                def __call__(self, *args, **kwargs):
                    print('call本質:調用類的__new__,再調用類的__init__')
                    return super(MyType,self).__call__( *args, **kwargs)


            class Foo(metaclass=MyType):
                pass

            class Bar(Foo):
                pass

            obj = Bar()
        - 問題:
            1. 什麼意思?
                # 類由type來建立
                class Foo(metaclass=type)
                # 繼承Type
                class Foo(type)
            2. Flask多線程:服務端開多線程
View Code

   其餘方式使用:

class MyType(type):
    def __init__(self,*args,**kwargs):
        print('init')
        super(MyType,self).__init__(*args,**kwargs)

    def __call__(self, *args, **kwargs):
        print('call')
        return super(MyType,self).__call__(*args,**kwargs)

# Base=MyType('Base',(object,),{})  是有MyType建立; metaclass=MyType
#
# class Foo(Base):
#     pass
#
# Foo()
# 1. type能夠建立類metaclass=type;MyType也能夠建立類metaclass=MyType
# 2. Base = MyType('Base', (object,), {}) -->
# class Base(metaclass=MyType):
#     pass
# class Foo(Base):
#     pass

class Foo(MyType("Base",(object,),{})):
    # 第一個是類名,第二個就是他的父類,第三個就是屬性
    pass

Foo()
View Code
class MyType(type):
    def __init__(self,*args,**kwargs):
        print('init')
        super(MyType,self).__init__(*args,**kwargs)

    def __call__(self, *args, **kwargs):
        print('call')
        return super(MyType,self).__call__(*args,**kwargs)

def with_metaclass(obj):
    return MyType('XX',(obj,),{})

class Foo(with_metaclass(object)):
    pass


Foo()
View Code
"""
1. 什麼意思?

# 類由type來建立
class Foo(metaclass=type)
# 繼承Type
class Foo(type)

"""


class Foo(object):
    pass
obj = Foo()
# 對象是由類建立



# 一切皆對象,類由type建立
class Foo(object):
    pass

Foo = type('Foo',(object,),{})



# 一切皆對象,類由MyType建立
class MyType(type):
    pass
Foo = MyType('Foo',(object,),{})


class Foo(object,metaclass=MyType):
    pass



# 一切皆對象,類由MyType建立
class MyType(type):
    def __init__(self, *args, **kwargs):
        print('init')
        super(MyType, self).__init__(*args, **kwargs)

    def __call__(cls, *args, **kwargs):
        print('call')
        return super(MyType, cls).__call__(*args, **kwargs)

Foo = MyType('Foo',(object,),{})


class Foo(object,metaclass=MyType):
    pass

Foo()
View Code

  實例:form = LoginForm()

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets


app = Flask(__name__, template_folder='templates')

app.debug = True

class MyForm(Form):
    '''
    建立字段,內部包含正則表達式
    '''
    name=simple.StringField(label='用戶名',
                            validators=[
                                validators.DataRequired(message='用戶名不能爲空'),
                                validators.Length(min=6,max=18,message='用戶名的長度不能小於%(min)d不能大於%(max)d')
                            ],
                            widget=widgets.TextInput(),
                            render_kw={'class': 'form-control'}
                            )
    password=simple.PasswordField(label='密碼',
                                  validators=[
                                      validators.DataRequired(message='密碼不能爲空'),
                                      validators.Length(max=18,message='用戶名不能大於%(max)'),
                                    validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
                              message='密碼至少8個字符,至少1個大寫字母,1個小寫字母,1個數字和1個特殊字符')
                                  ],
                                  widget=widgets.PasswordInput(),
                                  render_kw={'class': 'form-control'}
                                  )





@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method=='GET':
        form=MyForm()
        return render_template('login.html',form=form)
    else:
        form = MyForm(formdata=request.form)
        if form.validate():
            print('用戶提交數據經過格式驗證,提交的值爲:', form.data)
        else:
            return render_template('login.html', form=form)
if __name__ == '__main__':
    app.run()
View Code

  驗證:form.validate()

import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
View Code

 補充:

  爲何要在類裏面定義方法:主要是爲對象的數據進行二次加工

  類的對象不能直接被for循環,若想被循環,在類裏面加上__iter__方法

十七 SQLAlchmey ORM框架

 目標:類/對象操做 -> SQL -> pymysql、MySQLdb -> 再在數據庫中執行。

 基本使用:這個不常見,

class MyForm(Form):
    '''
    建立字段,內部包含正則表達式
    '''
    name=simple.StringField(label='用戶名',
                            validators=[
                                validators.DataRequired(message='用戶名不能爲空'),
                                validators.Length(min=6,max=18,message='用戶名的長度不能小於%(min)d不能大於%(max)d')
                            ],
                            widget=widgets.TextInput(),
                            render_kw={'class': 'form-control'}
                            )
    password=simple.PasswordField(label='密碼',
                                  validators=[
                                      validators.DataRequired(message='密碼不能爲空'),
                                      validators.Length(max=18,message='用戶名不能大於%(max)'),
                                    validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
                              message='密碼至少8個字符,至少1個大寫字母,1個小寫字母,1個數字和1個特殊字符')
                                  ],
                                  widget=widgets.PasswordInput(),
                                  render_kw={'class': 'form-control'}
                                  )
View Code

  方式一:

                    engine = create_engine(
                        "mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8",
                        max_overflow=2,  # 超過鏈接池大小外最多建立的鏈接
                        pool_size=5,  # 鏈接池大小
                        pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
                        pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
                    )
                    
                    conn = engine.raw_connection()
                    cursor = conn.cursor()
                    cursor.execute(
                        "select * from t1"
                    )
                    result = cursor.fetchall()
                    cursor.close()
                    conn.close()
View Code

  方式二:

                    engine = create_engine(
                        "mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8",
                        max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
                        pool_size=5,  # 鏈接池大小
                        pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
                        pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
                    )
                    
                    def task(arg):
                        conn = engine.raw_connection()
                        cursor = conn.cursor()
                        cursor.execute(
                            #"select * from t1"
                            "select sleep(2)"
                        )
                        result = cursor.fetchall()
                        cursor.close()
                        conn.close()
                     
                     
                    for i in range(20):
                        t = threading.Thread(target=task, args=(i,))
                        t.start()
                
View Code

 ORM:

                models.py 
                    #!/usr/bin/env python
                    # -*- coding:utf-8 -*-
                    import datetime
                    from sqlalchemy import create_engine
                    from sqlalchemy.ext.declarative import declarative_base
                    from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

                    Base = declarative_base()

                    class Users(Base):
                        __tablename__ = 'users' # 數據庫表名稱
                        id = Column(Integer, primary_key=True) # id 主鍵
                        name = Column(String(32), index=True, nullable=False) # name列,
                        
                    def init_db():
                        """
                        根據類建立數據庫表
                        :return: 
                        """
                        engine = create_engine(
                            "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
                            max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
                            pool_size=5,  # 鏈接池大小
                            pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
                            pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
                        )

                        Base.metadata.create_all(engine)


                    def drop_db():
                        """
                        根據類刪除數據庫表
                        :return: 
                        """
                        engine = create_engine(
                            "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
                            max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
                            pool_size=5,  # 鏈接池大小
                            pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
                            pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
                        )

                        Base.metadata.drop_all(engine)


                    if __name__ == '__main__':
                        #drop_db()
                        #init_db()
            
            
                app.py 
                    #!/usr/bin/env python
                    # -*- coding:utf-8 -*-
                    from sqlalchemy.orm import sessionmaker
                    from sqlalchemy import create_engine
                    from models import Users
                      
                     
                    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
                    Connection = sessionmaker(bind=engine)
                      
                    # 每次執行數據庫操做時,都須要建立一個Connection
                    con = Connection()
                      
                      
                    # ############# 執行ORM操做 #############
                    obj1 = Users(name="alex1")
                    con.add(obj1)
                    # 提交事務
                    con.commit()
                    
                    
                    
                    # 關閉session
                    con.close()
                    
            
View Code

  詳細信息:

  建立表:

import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

Base = declarative_base()


class Users(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)
    email = Column(String(32), unique=True)
    ctime = Column(DateTime, default=datetime.datetime.now)  # 3期師兄
    extra = Column(Text, nullable=True)

    __table_args__ = (
        # UniqueConstraint('id', 'name', name='uix_id_name'),
        # Index('ix_id_name', 'name', 'email'),
    )

class Hobby(Base):
    __tablename__ = 'hobby'
    id = Column(Integer, primary_key=True)
    caption = Column(String(50), default='籃球')


class Person(Base):
    __tablename__ = 'person'
    nid = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    hobby_id = Column(Integer, ForeignKey("hobby.id"))


class b2g(Base):
    __tablename__ = 'b2g'
    id = Column(Integer, primary_key=True, autoincrement=True)
    girl_id = Column(Integer, ForeignKey('girl.id'))
    boy_id = Column(Integer, ForeignKey('boy.id'))


class Girl(Base):
    __tablename__ = 'girl'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)


class Boy(Base):
    __tablename__ = 'boy'

    id = Column(Integer, primary_key=True, autoincrement=True)
    hostname = Column(String(64), unique=True, nullable=False)



engine = create_engine(
    "mysql+pymysql://root:0410@127.0.0.1:3306/sqlachemy?charset=utf8",
    max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
    pool_size=5,  # 鏈接池大小
    pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
    pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
)

Base.metadata.create_all(engine)
# Base.metadata.drop_all(engine)
View Code

  對於表的增刪改查:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text

from db import Users, Hosts

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)

session = Session()

# ################ 添加 ################
"""
obj1 = Users(name="wupeiqi")
session.add(obj1)

session.add_all([
    Users(name="wupeiqi"),
    Users(name="alex"),
    Hosts(name="c1.com"),
])
session.commit()
"""

# ################ 刪除 ################
"""
session.query(Users).filter(Users.id > 2).delete()
session.commit()
"""
# ################ 修改 ################
"""
session.query(Users).filter(Users.id > 0).update({"name" : "099"})
session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)
session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")
session.commit()
"""
# ################ 查詢 ################
"""
r1 = session.query(Users).all()
r2 = session.query(Users.name.label('xx'), Users.age).all()
r3 = session.query(Users).filter(Users.name == "alex").all()
r4 = session.query(Users).filter_by(name='alex').all()
r5 = session.query(Users).filter_by(name='alex').first()
r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all()
r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()
"""


session.close()
View Code

  查看的其餘操做:

# 條件
ret = session.query(Users).filter_by(name='alex').all()
ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
from sqlalchemy import and_, or_
ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
ret = session.query(Users).filter(
    or_(
        Users.id < 2,
        and_(Users.name == 'eric', Users.id > 3),
        Users.extra != ""
    )).all()


# 通配符
ret = session.query(Users).filter(Users.name.like('e%')).all()
ret = session.query(Users).filter(~Users.name.like('e%')).all()

# 限制
ret = session.query(Users)[1:2]

# 排序
ret = session.query(Users).order_by(Users.name.desc()).all()
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

# 分組
from sqlalchemy.sql import func

ret = session.query(Users).group_by(Users.extra).all()
ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).all()

ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()

# 連表

ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()

ret = session.query(Person).join(Favor).all()

ret = session.query(Person).join(Favor, isouter=True).all()


# 組合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()

q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()
View Code

  原生的sql語句:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)

session = Session()

# 查詢
# cursor = session.execute('select * from users')
# result = cursor.fetchall()

# 添加
cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
session.commit()
print(cursor.lastrowid)

session.close()
View Code

  多表操做:

一對多:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts, Hobby, Person

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()
# 添加
"""
session.add_all([
    Hobby(caption='乒乓球'),
    Hobby(caption='羽毛球'),
    Person(name='張三', hobby_id=3),
    Person(name='李四', hobby_id=4),
])

person = Person(name='張九', hobby=Hobby(caption='姑娘'))
session.add(person)

hb = Hobby(caption='人妖')
hb.pers = [Person(name='文飛'), Person(name='博雅')]
session.add(hb)

session.commit()
"""

# 使用relationship正向查詢
"""
v = session.query(Person).first()
print(v.name)
print(v.hobby.caption)
"""

# 使用relationship反向查詢
"""
v = session.query(Hobby).first()
print(v.caption)
print(v.pers)
"""

session.close()


多對多:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()
# 添加
"""
session.add_all([
    Server(hostname='c1.com'),
    Server(hostname='c2.com'),
    Group(name='A組'),
    Group(name='B組'),
])
session.commit()

s2g = Server2Group(server_id=1, group_id=1)
session.add(s2g)
session.commit()


gp = Group(name='C組')
gp.servers = [Server(hostname='c3.com'),Server(hostname='c4.com')]
session.add(gp)
session.commit()


ser = Server(hostname='c6.com')
ser.groups = [Group(name='F組'),Group(name='G組')]
session.add(ser)
session.commit()
"""


# 使用relationship正向查詢
"""
v = session.query(Group).first()
print(v.name)
print(v.servers)
"""

# 使用relationship反向查詢
"""
v = session.query(Server).first()
print(v.hostname)
print(v.groups)
"""


session.close()
View Code

  其餘操做:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text, func
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()

# 關聯子查詢
subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar()
result = session.query(Group.name, subqry)
"""
SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid 
FROM server 
WHERE server.id = `group`.id) AS anon_1 
FROM `group`
"""


# 原生SQL
"""
# 查詢
cursor = session.execute('select * from users')
result = cursor.fetchall()

# 添加
cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
session.commit()
print(cursor.lastrowid)
"""

session.close()

其餘
View Code

  基於scoped_session實現線程安全:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import Users

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)

"""
# 線程安全,基於本地線程實現每一個線程用同一個session
# 特殊的:scoped_session中有原來方法的Session中的一下方法:

public_methods = (
    '__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested',
    'close', 'commit', 'connection', 'delete', 'execute', 'expire',
    'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind',
    'is_modified', 'bulk_save_objects', 'bulk_insert_mappings',
    'bulk_update_mappings',
    'merge', 'query', 'refresh', 'rollback',
    'scalar'
)
"""
session = scoped_session(Session)


# ############# 執行ORM操做 #############
obj1 = Users(name="alex1")
session.add(obj1)



# 提交事務
session.commit()
# 關閉session
session.close()
View Code

  多線程執行實例:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from db import Users

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)


def task(arg):
    session = Session()

    obj1 = Users(name="alex1")
    session.add(obj1)

    session.commit()


for i in range(10):
    t = threading.Thread(target=task, args=(i,))
    t.start()

多線程執行示例
View Code

 flask_sqlalchemy:Flask-SQLAlchemy(文件和目錄的管理)

  Flask和SQLAlchemy的管理
 - db = SQLAlchemy()
            - 包含配置
            - 包含ORM基類
            - 包含create_all
            - engine
            - 建立鏈接
                
        # 目錄結構保存好
        
View Code

 flask_sqlalchemy實例:https://pan.baidu.com/s/1IL68-68tBDluDtqsB1NS1g

 補充:

3. pipreqs
    
    pip3 install pipreqs
        
    pipreqs ./
        
View Code

 十八 flask_scriptflask_migrate

 flask_script:功能至關於django中的manage文件,用於啓動flask項目使用的,python manage.py runserver

 flask_migrate:至關於django中的數據庫遷移,makemigrations/migrate -> migrate/upgrade

  批量導入:xlrd/xlwt

#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
生成依賴文件:
    pipreqs ./

"""
from sansa import create_app,db
from flask_script import Manager
from flask_migrate import Migrate,MigrateCommand

app = create_app()
manager = Manager(app)
migrate = Migrate(app, db)


"""
# 數據庫遷移命名
    python manage.py db init
    python manage.py db migrate
    python manage.py db upgrade
"""
manager.add_command('db', MigrateCommand)


@manager.command
def custom(arg):
    """
    自定義命令
    python manage.py custom 123
    :param arg:
    :return:
    """
    print(arg)


@manager.option('-n', '--name', dest='name')
@manager.option('-u', '--url', dest='url')
def cmd(name, url):
    """
    自定義命令
    執行: python manage.py  cmd -n wupeiqi -u http://www.oldboyedu.com
    :param name:
    :param url:
    :return:
    """
    print(name, url)



if __name__ == '__main__':
    manager.run()

"""
1. 運行程序時:
    python manage.py runserver 
    

"""
View Code

 補充:  

 flask和django的其餘導入靜態文件

  

相關文章
相關標籤/搜索