flask知識梳理

Flask

0.Flask簡介

Flask是一個基於Python開發而且依賴jinja2模板和Werkzeug WSGI服務的一個微型框架,對於Werkzeug本質是Socket服務端,其用於接收http請求並對請求進行預處理,而後觸發Flask框架,開發人員基於Flask框架提供的功能對請求進行相應的處理,並返回給用戶,若是要返回給用戶複雜的內容時,須要藉助jinja2模板來實現對模板的處理,即:將模板和數據進行渲染,將渲染後的字符串返回給用戶瀏覽器。html

「微」(micro) 並不表示你須要把整個 Web 應用塞進單個 Python 文件(雖然確實能夠 ),也不意味着 Flask 在功能上有所欠缺。微框架中的「微」意味着 Flask 旨在保持核心簡單而易於擴展。Flask 不會替你作出太多決策——好比使用何種數據庫。而那些 Flask 所選擇的——好比使用何種模板引擎——則很容易替換。除此以外的一切都由可由你掌握。如此,Flask 能夠與您珠聯璧合。html5

默認狀況下,Flask 不包含數據庫抽象層、表單驗證,或是其它任何已有多種庫能夠勝任的功能。然而,Flask 支持用擴展來給應用添加這些功能,如同是 Flask 自己實現的同樣。衆多的擴展提供了數據庫集成、表單驗證、上傳處理、各類各樣的開放認證技術等功能。Flask 也許是「微小」的,但它已準備好在需求繁雜的生產環境中投入使用python

1.安裝

pip3 install flaskmysql

2.werkzeug簡介

Werkzeug是一個WSGI工具包,他能夠做爲一個Web框架的底層庫。這裏稍微說一下, werkzeug 不是一個web服務器,也不是一個web框架,而是一個工具包,官方的介紹說是一個 WSGI 工具包,它能夠做爲一個 Web 框架的底層庫,由於它封裝好了不少 Web 框架的東西,例如 Request,Response 等等 web

代碼示例:正則表達式

from werkzeug.wrappers import Request, Response

@Request.application
def hello(request):
   return Response('Hello World!')

if __name__ == '__main__':
   from werkzeug.serving import run_simple
   run_simple('localhost', 4000, hello)

3.flask快速使用


from flask import Flask
# 實例化產生一個Flask對象
app = Flask(__name__)
# 將 '/'和視圖函數hello_workd的對應關係添加到路由中
@app.route('/') # 1. v=app.route('/') 2. v(hello_world)
def hello_world():
   return 'Hello World!'

if __name__ == '__main__':
   app.run() # 最終調用了run_simple()

案例:登陸,顯示用戶信息

main.pyredis


from flask import Flask,render_template,request,redirect,session,url_for
app = Flask(__name__)
app.debug = True
app.secret_key = 'sdfsdfsdfsdf'

USERS = {
   1:{'name':'張三','age':18,'gender':'男','text':"道路千萬條"},
   2:{'name':'李四','age':28,'gender':'男','text':"安全第一條"},
   3:{'name':'王五','age':18,'gender':'女','text':"行車不規範"},
}

@app.route('/detail/<int:nid>',methods=['GET'])
def detail(nid):
   user = session.get('user_info')
   if not user:
       return redirect('/login')

   info = USERS.get(nid)
   return render_template('detail.html',info=info)


@app.route('/index',methods=['GET'])
def index():
   user = session.get('user_info')
   if not user:
       # return redirect('/login')
       url = url_for('l1')
       return redirect(url)
   return render_template('index.html',user_dict=USERS)


@app.route('/login',methods=['GET','POST'],endpoint='l1')
def login():
   if request.method == "GET":
       return render_template('login.html')
   else:
       # request.query_string
       user = request.form.get('user')
       pwd = request.form.get('pwd')
       if user == 'lqz' and pwd == '123':
           session['user_info'] = user
           return redirect('http://www.baidu.com')
       return render_template('login.html',error='用戶名或密碼錯誤')

if __name__ == '__main__':
   app.run()

detail.htmlsql


<!DOCTYPE html>
<html lang="en">
<head>
   <meta charset="UTF-8">
   <title>Title</title>
</head>
<body>
   <h1>詳細信息 {{info.name}}</h1>
   <div>
      {{info.text}}
   </div>
</body>
</html>

index.html數據庫


<!DOCTYPE html>
<html lang="en">
<head>
   <meta charset="UTF-8">
   <title>Title</title>
</head>
<body>
   <h1>用戶列表</h1>
   <table>
      {% for k,v in user_dict.items() %}
       <tr>
           <td>{{k}}</td>
           <td>{{v.name}}</td>
           <td>{{v['name']}}</td>
           <td>{{v.get('name')}}</td>
           <td><a href="/detail/{{k}}">查看詳細</a></td>
       </tr>
      {% endfor %}
   </table>
</body>
</html>

login.htmldjango


<!DOCTYPE html>
<html lang="en">
<head>
   <meta charset="UTF-8">
   <title>Title</title>
</head>
<body>
   <h1>用戶登陸</h1>
   <form method="post">
       <input type="text" name="user">
       <input type="text" name="pwd">
       <input type="submit" value="登陸">{{error}}
   </form>
</body>
</html>

4.配置文件

flask中的配置文件是一個flask.config.Config對象(繼承字典),默認配置爲:


{
       'DEBUG':                                get_debug_flag(default=False),  是否開啓Debug模式
       'TESTING':                              False,                          是否開啓測試模式
       'PROPAGATE_EXCEPTIONS':                 None,                          
       'PRESERVE_CONTEXT_ON_EXCEPTION':        None,
       'SECRET_KEY':                           None,
       'PERMANENT_SESSION_LIFETIME':           timedelta(days=31),
       'USE_X_SENDFILE':                       False,
       'LOGGER_NAME':                          None,
       'LOGGER_HANDLER_POLICY':               'always',
       'SERVER_NAME':                          None,
       'APPLICATION_ROOT':                     None,
       'SESSION_COOKIE_NAME':                  'session',
       'SESSION_COOKIE_DOMAIN':                None,
       'SESSION_COOKIE_PATH':                  None,
       'SESSION_COOKIE_HTTPONLY':              True,
       'SESSION_COOKIE_SECURE':                False,
       'SESSION_REFRESH_EACH_REQUEST':         True,
       'MAX_CONTENT_LENGTH':                   None,
       'SEND_FILE_MAX_AGE_DEFAULT':            timedelta(hours=12),
       'TRAP_BAD_REQUEST_ERRORS':              False,
       'TRAP_HTTP_EXCEPTIONS':                 False,
       'EXPLAIN_TEMPLATE_LOADING':             False,
       'PREFERRED_URL_SCHEME':                 'http',
       'JSON_AS_ASCII':                        True,
       'JSON_SORT_KEYS':                       True,
       'JSONIFY_PRETTYPRINT_REGULAR':          True,
       'JSONIFY_MIMETYPE':                     'application/json',
       'TEMPLATES_AUTO_RELOAD':                None,
  }

方式一


  app.config['DEBUG'] = True
  PS: 因爲Config對象本質上是字典,因此還可使用app.config.update(...)

方式二


#經過py文件配置
app.config.from_pyfile("python文件名稱")
如:
settings.py
DEBUG = True

app.config.from_pyfile("settings.py")
#經過環境變量配置
app.config.from_envvar("環境變量名稱")
#app.config.from_pyfile(os.environ['YOURAPPLICATION_SETTINGS'])
環境變量的值爲python文件名稱名稱,內部調用from_pyfile方法

app.config.from_json("json文件名稱")
JSON文件名稱,必須是json格式,由於內部會執行json.loads

app.config.from_mapping({'DEBUG': True})
字典格式

app.config.from_object("python類或類的路徑")

app.config.from_object('pro_flask.settings.TestingConfig')

settings.py


class Config(object):
   DEBUG = False
   TESTING = False
   DATABASE_URI = 'sqlite://:memory:'


class ProductionConfig(Config):
   DATABASE_URI = 'mysql://user@localhost/foo'


class DevelopmentConfig(Config):
   DEBUG = True


class TestingConfig(Config):
   TESTING = True


PS: 從sys.path中已經存在路徑開始寫

PS: settings.py文件默認路徑要放在程序root_path目錄,若是instance_relative_config爲True,則就是instance_path目錄(Flask對象init方法的參數)

5.路由系統

典型寫法


@app.route('/detail/<int:nid>',methods=['GET'],endpoint='detail')

默認轉換器


DEFAULT_CONVERTERS = {
   'default':          UnicodeConverter,
   'string':           UnicodeConverter,
   'any':              AnyConverter,
   'path':             PathConverter,
   'int':              IntegerConverter,
   'float':            FloatConverter,
   'uuid':             UUIDConverter,
}

路由系統本質


"""
1. decorator = app.route('/',methods=['GET','POST'],endpoint='n1')
  def route(self, rule, **options):
      # app對象
      # rule= /
      # options = {methods=['GET','POST'],endpoint='n1'}
      def decorator(f):
          endpoint = options.pop('endpoint', None)
          self.add_url_rule(rule, endpoint, f, **options)
          return f
      return decorator
2. @decorator
  decorator(index)
"""
#同理
def login():
   return '登陸'
app.add_url_rule('/login', 'n2', login, methods=['GET',"POST"])
#與django路由相似
#django與flask路由:flask路由基於裝飾器,本質是基於:add_url_rule
#add_url_rule 源碼中,endpoint若是爲空,endpoint = _endpoint_from_view_func(view_func),最終取view_func.__name__(函數名)

CBV(源碼分析)


def auth(func):
   def inner(*args, **kwargs):
       print('before')
       result = func(*args, **kwargs)
       print('after')
       return result

   return inner

class IndexView(views.View):
   methods = ['GET']
   decorators = [auth, ]

   def dispatch_request(self):
       print('Index')
       return 'Index!'

app.add_url_rule('/index', view_func=IndexView.as_view(name='index'))  # name=endpoint
#或者,一般用此方式
 class IndexView(views.MethodView):
           methods = ['GET']
           decorators = [auth, ]

           def get(self):
               return 'Index.GET'

           def post(self):
               return 'Index.POST'
       app.add_url_rule('/index', view_func=IndexView.as_view(name='index'))  # name=endpoint

app.add_url_rule參數


@app.route和app.add_url_rule參數:
rule, URL規則
view_func, 視圖函數名稱
defaults = None, 默認值, 當URL中無參數,函數須要參數時,使用defaults = {'k': 'v'}
爲函數提供參數
endpoint = None, 名稱,用於反向生成URL,即: url_for('名稱')
methods = None, 容許的請求方式,如:["GET", "POST"]
#對URL最後的 / 符號是否嚴格要求
strict_slashes = None
   '''
      @app.route('/index', strict_slashes=False)
      #訪問http://www.xx.com/index/ 或http://www.xx.com/index都可
      @app.route('/index', strict_slashes=True)
      #僅訪問http://www.xx.com/index
  '''
#重定向到指定地址
redirect_to = None,
   '''
      @app.route('/index/<int:nid>', redirect_to='/home/<nid>')
  '''

#子域名訪問
subdomain = None,
   '''
  #C:\Windows\System32\drivers\etc\hosts
  127.0.0.1       www.liuqingzheng.com
127.0.0.1       admin.liuqingzheng.com
127.0.0.1       buy.liuqingzheng.com
   
  from flask import Flask, views, url_for
  app = Flask(import_name=__name__)
  app.config['SERVER_NAME'] = 'liuqingzheng.com:5000'
  @app.route("/", subdomain="admin")
  def static_index():
      """Flask supports static subdomains
      This is available at static.your-domain.tld"""
      return "static.your-domain.tld"
  #能夠傳入任意的字符串,如傳入的字符串爲aa,顯示爲 aa.liuqingzheng.com
  @app.route("/dynamic", subdomain="<username>")
  def username_index(username):
      """Dynamic subdomains are also supported
      Try going to user1.your-domain.tld/dynamic"""
      return username + ".your-domain.tld"
  if __name__ == '__main__':
      app.run()
       
  訪問:
  http://www.liuqingzheng.com:5000/dynamic
  http://admin.liuqingzheng.com:5000/dynamic
  http://buy.liuqingzheng.com:5000/dynamic
  '''

支持正則


#1 寫類,繼承BaseConverter
#2 註冊:app.url_map.converters['regex'] = RegexConverter
# 3 使用:@app.route('/index/<regex("\d+"):nid>') 正則表達式會看成第二個參數傳遞到類中
from flask import Flask, views, url_for
from werkzeug.routing import BaseConverter

app = Flask(import_name=__name__)

class RegexConverter(BaseConverter):
   """
  自定義URL匹配正則表達式
  """
   def __init__(self, map, regex):
       super(RegexConverter, self).__init__(map)
       self.regex = regex

   def to_python(self, value):
       """
      路由匹配時,匹配成功後傳遞給視圖函數中參數的值
      """
       return int(value)

   def to_url(self, value):
       """
      使用url_for反向生成URL時,傳遞的參數通過該方法處理,返回的值用於生成URL中的參數
      """
       val = super(RegexConverter, self).to_url(value)
       return val
# 添加到flask中
app.url_map.converters['regex'] = RegexConverter
@app.route('/index/<regex("\d+"):nid>')
def index(nid):
   print(url_for('index', nid='888'))
   return 'Index'

if __name__ == '__main__':
   app.run()

6.模版

比django中多能夠加括號,執行函數,傳參數


from flask import Flask,render_template,Markup,jsonify,make_response
app = Flask(__name__)

def func1(arg):
   return Markup("<input type='text' value='%s' />" %(arg,))
@app.route('/')
def index():
   return render_template('index.html',ff = func1)

if __name__ == '__main__':
   app.run()

index.html


<!DOCTYPE html>
<html lang="en">
<head>
   <meta charset="UTF-8">
   <title>Title</title>
</head>
<body>

  {{ff('六五')}}
{{ff('六五')|safe}}

</body>
</html>

注意:

1.Markup等價django的mark_safe ,

2.extends,include如出一轍

7.請求響應


  from flask import Flask
   from flask import request
   from flask import render_template
   from flask import redirect
   from flask import make_response

   app = Flask(__name__)


   @app.route('/login.html', methods=['GET', "POST"])
   def login():

       # 請求相關信息
       # request.method
       # request.args #至關於django中的request.GET
       # request.form #至關於django中的request.POST
       # request.values
       # request.cookies
       # request.headers
       # request.path
       # request.full_path
       # request.script_root
       # request.url
       # request.base_url
       # request.url_root
       # request.host_url
       # request.host
       # request.files
       # obj = request.files['the_file_name']
       # obj.save('/var/www/uploads/' + secure_filename(f.filename))

       # 響應相關信息
       # return "字符串"
       # return render_template('html模板路徑',**{})
       # return redirect('/index.html')
       #return jsonify({'k1':'v1'})

       # response = make_response(render_template('index.html'))
       # response是flask.wrappers.Response類型
       # response.delete_cookie('key')
       # response.set_cookie('key', 'value')
       # response.headers['X-Something'] = 'A value'
       # return response
       return "內容"

   if __name__ == '__main__':
       app.run()

8.session

除請求對象以外,還有一個 session 對象。它容許你在不一樣請求間存儲特定用戶的信息。它是在 Cookies 的基礎上實現的,而且對 Cookies 進行密鑰簽名要使用會話,須要設置一個密鑰。 (app.session_interface對象)

app.secret_key='xxxxx'
設置:session['username'] 'xxx'
取值:username=session['username']
刪除:session.pop('username', None)
   del session['username']

9.閃現(message)


-設置:flash('aaa')
-取值:get_flashed_message()
-假設在a頁面操做出錯,跳轉到b頁面,在b頁面顯示a頁面的錯誤信息

示例:


from flask import Flask,flash,get_flashed_messages,request,redirect

app = Flask(__name__)
app.secret_key = 'asdfasdf'


@app.route('/index')
def index():
   # 從某個地方獲取設置過的全部值,並清除。
   val = request.args.get('v')
   if val == 'oldboy':
       return 'Hello World!'
   flash('超時錯誤',category="x1")
   return "ssdsdsdfsd"
   # return redirect('/error')


@app.route('/error')
def error():
   """
  展現錯誤信息
  :return:
  """
   data = get_flashed_messages(category_filter=['x1'])
   if data:
       msg = data[0]
   else:
       msg = "..."
   return "錯誤信息:%s" %(msg,)

if __name__ == '__main__':
   app.run()

10.請求擴展

1 before_request

類比django中間件中的process_request,在請求收到以前綁定一個函數作一些事情


#基於它作用戶登陸認證
@app.before_request
def process_request(*args,**kwargs):
   if not request.path == '/login':
     user = session.get('user_info')
     if not user:
       return redirect('/login')

2 after_request

類比django中間件中的process_response,每個請求以後綁定一個函數,若是請求沒有異常


@app.after_request
def process_response1(response):
   print('process_response1 走了')
   return response

3 before_first_request

第一次請求時,跟瀏覽器無關


@app.before_first_request
def first():
   pass

4 teardown_request

每個請求以後綁定一個函數,即便遇到了異常(每一次請求不管如何都會通過此函數)


@app.teardown_request
def ter(e):
   pass

5 errorhandler

路徑不存在時404,服務器內部錯誤500

@app.errorhandler(404)
def error_404(err):
  print(err)
  return "404錯誤了"

6 template_global

標籤


@app.template_global()
def add(a1, a2):
   return a1 + a2
#{{add(1,2)}}

7 template_filter

過濾器


@app.template_filter()
def add(a1, a2, a3):
   return a1 + a2 + a3
#{{ 1|add(2,3)}}

總結:

1 重點掌握before_request和after_request,

2 注意有多個的狀況,執行順序

3 before_request請求攔截後(也就是有return值),response全部都執行

11 中間件


from flask import Flask

app = Flask(__name__)

@app.route('/')
def index():
   return 'Hello World!'
# 模擬中間件
class MyMiddle(object):
   def __init__(self,old_wsgi_app):
       self.old_wsgi_app = old_wsgi_app

   def __call__(self,  environ, start_response):
       print('開始以前')
       ret = self.old_wsgi_app(environ, start_response)
       print('結束以後')
       return ret

if __name__ == '__main__':
   #把原來的wsgi_app替換爲自定義的
   app.wsgi_app = MyMiddle(app.wsgi_app)
   app.run()

12.藍圖

對程序進行目錄結構劃分

不使用藍圖,本身分文件

目錄結構:


-templates
-views
-__init__.py
   -user.py
   -order.py
-app.py

app.py


from views import app
if __name__ == '__main__':
   app.run()

init.py


from flask import Flask,request
app = Flask(__name__)
#不導入這個不行
from . import account
from . import order
from . import user

user.py


from . import app
@app.route('/user')
def user():
   return 'user'

order.py


from . import app
@app.route('/order')
def order():
   return 'order'

使用藍圖之中小型系統

詳見代碼:pro_flask_簡單應用程序目錄示例.zip

目錄結構:


-flask_pro
-flask_test
  -__init__.py
  -static
       -templates
       -views
      -order.py
           -user.py
    -manage.py
       

_init.py


from flask import  Flask
app=Flask(__name__)
from flask_test.views import user
from flask_test.views import order
#註冊藍圖
app.register_blueprint(user.us)
app.register_blueprint(order.ord)

manage.py


from flask_test import  app
if __name__ == '__main__':
   app.run(port=8008)

user.py


from flask import Blueprint
us=Blueprint('user',__name__)

@us.route('/login')
def login():
   return 'login'

order.py


from flask import Blueprint
ord=Blueprint('order',__name__)

@ord.route('/test')
def test():
   return 'order test'

使用藍圖之大型系統

詳見代碼:pro_flask_大型應用目錄示例.zip

總結:

1 xxx = Blueprint('account', name,url_prefix='/xxx') :藍圖URL前綴,表示url的前綴,在該藍圖下全部url都加前綴

2 xxx = Blueprint('account', name,url_prefix='/xxx',template_folder='tpls'):給當前藍圖單獨使用templates,向上查找,當前找不到,會找總templates

3 藍圖的befort_request,對當前藍圖有效

4 大型項目,能夠模擬出相似於django中app的概念

13.請求上下文源碼分析


第一階段:將ctx(request,session)放到Local對象上
 
第二階段:視圖函數導入:request/session
request.method
-LocalProxy對象.method,執行getattr方法,getattr(self._get_current_object(), name)
-self._get_current_object()返回return self.__local(),self.__local(),在LocakProxy實例化的時候,object.__setattr__(self, '_LocalProxy__local', local),此處local就是:partial(_lookup_req_object, 'request')

-def _lookup_req_object(name):
top = _request_ctx_stack.top #_request_ctx_stack 就是LocalStack()對象,top方法把ctx取出來
if top is None:
raise RuntimeError(_request_ctx_err_msg)
return getattr(top, name)#獲取ctx中的request或session對象

第三階段:請求處理完畢
- 獲取session並保存到cookie
- 將ctx刪除

程序運行,兩個LocalStack()對象,一個裏面放request和session,另外一個放g和current_app

14.g對象

專門用來存儲用戶信息的g對象,g的全稱的爲global

g對象在一次請求中的全部的代碼的地方,都是可使用的

g對象和session的區別


session對象是能夠跨request的,只要session還未失效,不一樣的request的請求會獲取到同一個session,可是g對象不是,g對象不須要管過時時間,請求一次就g對象就改變了一次,或者從新賦值了一次

15.flask-session

做用:將默認保存的簽名cookie中的值 保存到 redis/memcached/file/Mongodb/SQLAlchemy

安裝:pip3 install flask-session

使用1:

from flask import Flask,session
from flask_session import RedisSessionInterface
import redis
app = Flask(__name__)
conn=redis.Redis(host='127.0.0.1',port=6379)
#use_signer是否對key簽名
app.session_interface=RedisSessionInterface(conn,key_prefix='lqz')
@app.route('/')
def hello_world():
   session['name']='lqz'
   return 'Hello World!'

if __name__ == '__main__':
   app.run()

使用2:


from redis import Redis
from flask.ext.session import Session
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] = Redis(host='192.168.0.94',port='6379')
Session(app)

問題:設置cookie時,如何設定關閉瀏覽器則cookie失效。


response.set_cookie('k','v',exipre=None)#這樣設置便可
#在session中設置
app.session_interface=RedisSessionInterface(conn,key_prefix='lqz',permanent=False)
#通常不用,咱們通常都設置超時時間,多長時間後失效

問題:cookie默認超時時間是多少?如何設置超時時間


#源碼expires = self.get_expiration_time(app, session)
'PERMANENT_SESSION_LIFETIME':           timedelta(days=31),#這個配置文件控制

 

 

16.數據庫鏈接池

pymsql連接數據庫


import pymysql

conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123456', db='s8day127db')
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
# cursor.execute("select id,name from users where name=%s and pwd=%s",['lqz','123',])
cursor.execute("select id,name from users where name=%(user)s and pwd=%(pwd)s",{'user':'lqz','pwd':'123'})
obj = cursor.fetchone()
conn.commit()
cursor.close()
conn.close()

print(obj)

數據庫鏈接池版

setting.py


from datetime import timedelta
from redis import Redis
import pymysql
from DBUtils.PooledDB import PooledDB, SharedDBConnection

class Config(object):
   DEBUG = True
   SECRET_KEY = "umsuldfsdflskjdf"
   PERMANENT_SESSION_LIFETIME = timedelta(minutes=20)
   SESSION_REFRESH_EACH_REQUEST= True
   SESSION_TYPE = "redis"
   PYMYSQL_POOL = PooledDB(
       creator=pymysql,  # 使用連接數據庫的模塊
       maxconnections=6,  # 鏈接池容許的最大鏈接數,0和None表示不限制鏈接數
       mincached=2,  # 初始化時,連接池中至少建立的空閒的連接,0表示不建立
       maxcached=5,  # 連接池中最多閒置的連接,0和None不限制
       maxshared=3,
       # 連接池中最多共享的連接數量,0和None表示所有共享。PS: 無用,由於pymysql和MySQLdb等模塊的 threadsafety都爲1,全部值不管設置爲多少,_maxcached永遠爲0,因此永遠是全部連接都共享。
       blocking=True,  # 鏈接池中若是沒有可用鏈接後,是否阻塞等待。True,等待;False,不等待而後報錯
       maxusage=None,  # 一個連接最多被重複使用的次數,None表示無限制
       setsession=[],  # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."]
       ping=0,
       # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
       host='127.0.0.1',
       port=3306,
       user='root',
       password='123456',
       database='s8day127db',
       charset='utf8'
  )

class ProductionConfig(Config):
   SESSION_REDIS = Redis(host='192.168.0.94', port='6379')



class DevelopmentConfig(Config):
   SESSION_REDIS = Redis(host='127.0.0.1', port='6379')


class TestingConfig(Config):
   pass

utils/sql.py


import pymysql
from settings import Config
class SQLHelper(object):

   @staticmethod
   def open(cursor):
       POOL = Config.PYMYSQL_POOL
       conn = POOL.connection()
       cursor = conn.cursor(cursor=cursor)
       return conn,cursor

   @staticmethod
   def close(conn,cursor):
       conn.commit()
       cursor.close()
       conn.close()

   @classmethod
   def fetch_one(cls,sql,args,cursor =pymysql.cursors.DictCursor):
       conn,cursor = cls.open(cursor)
       cursor.execute(sql, args)
       obj = cursor.fetchone()
       cls.close(conn,cursor)
       return obj

   @classmethod
   def fetch_all(cls,sql, args,cursor =pymysql.cursors.DictCursor):
       conn, cursor = cls.open(cursor)
       cursor.execute(sql, args)
       obj = cursor.fetchall()
       cls.close(conn, cursor)
       return obj

使用:


obj = SQLHelper.fetch_one("select id,name from users where name=%(user)s and pwd=%(pwd)s", form.data)

 

17.wtforms

安裝:pip3 install wtforms

使用1:


from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets

app = Flask(__name__, template_folder='templates')

app.debug = True


class LoginForm(Form):
   # 字段(內部包含正則表達式)
   name = simple.StringField(
       label='用戶名',
       validators=[
           validators.DataRequired(message='用戶名不能爲空.'),
           validators.Length(min=6, max=18, message='用戶名長度必須大於%(min)d且小於%(max)d')
      ],
       widget=widgets.TextInput(), # 頁面上顯示的插件
       render_kw={'class': 'form-control'}

  )
   # 字段(內部包含正則表達式)
   pwd = simple.PasswordField(
       label='密碼',
       validators=[
           validators.DataRequired(message='密碼不能爲空.'),
           validators.Length(min=8, message='用戶名長度必須大於%(min)d'),
           validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
                             message='密碼至少8個字符,至少1個大寫字母,1個小寫字母,1個數字和1個特殊字符')

      ],
       widget=widgets.PasswordInput(),
       render_kw={'class': 'form-control'}
  )



@app.route('/login', methods=['GET', 'POST'])
def login():
   if request.method == 'GET':
       form = LoginForm()
       return render_template('login.html', form=form)
   else:
       form = LoginForm(formdata=request.form)
       if form.validate():
           print('用戶提交數據經過格式驗證,提交的值爲:', form.data)
       else:
           print(form.errors)
       return render_template('login.html', form=form)

if __name__ == '__main__':
   app.run()

login.html


<!DOCTYPE html>
<html lang="en">
<head>
   <meta charset="UTF-8">
   <title>Title</title>
</head>
<body>
<h1>登陸</h1>
<form method="post">
   <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p>

   <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p>
   <input type="submit" value="提交">
</form>
</body>
</html>

使用2:


from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets

app = Flask(__name__, template_folder='templates')
app.debug = True



class RegisterForm(Form):
   name = simple.StringField(
       label='用戶名',
       validators=[
           validators.DataRequired()
      ],
       widget=widgets.TextInput(),
       render_kw={'class': 'form-control'},
       default='alex'
  )

   pwd = simple.PasswordField(
       label='密碼',
       validators=[
           validators.DataRequired(message='密碼不能爲空.')
      ],
       widget=widgets.PasswordInput(),
       render_kw={'class': 'form-control'}
  )

   pwd_confirm = simple.PasswordField(
       label='重複密碼',
       validators=[
           validators.DataRequired(message='重複密碼不能爲空.'),
           validators.EqualTo('pwd', message="兩次密碼輸入不一致")
      ],
       widget=widgets.PasswordInput(),
       render_kw={'class': 'form-control'}
  )

   email = html5.EmailField(
       label='郵箱',
       validators=[
           validators.DataRequired(message='郵箱不能爲空.'),
           validators.Email(message='郵箱格式錯誤')
      ],
       widget=widgets.TextInput(input_type='email'),
       render_kw={'class': 'form-control'}
  )

   gender = core.RadioField(
       label='性別',
       choices=(
          (1, '男'),
          (2, '女'),
      ),
       coerce=int # 「1」 「2」
    )
   city = core.SelectField(
       label='城市',
       choices=(
          ('bj', '北京'),
          ('sh', '上海'),
      )
  )

   hobby = core.SelectMultipleField(
       label='愛好',
       choices=(
          (1, '籃球'),
          (2, '足球'),
      ),
       coerce=int
  )

   favor = core.SelectMultipleField(
       label='喜愛',
       choices=(
          (1, '籃球'),
          (2, '足球'),
      ),
       widget=widgets.ListWidget(prefix_label=False),
       option_widget=widgets.CheckboxInput(),
       coerce=int,
       default=[1, 2]
  )

   def __init__(self, *args, **kwargs):
       super(RegisterForm, self).__init__(*args, **kwargs)
       self.favor.choices = ((1, '籃球'), (2, '足球'), (3, '羽毛球'))

   def validate_pwd_confirm(self, field):
       """
      自定義pwd_confirm字段規則,例:與pwd字段是否一致
      :param field:
      :return:
      """
       # 最開始初始化時,self.data中已經有全部的值

       if field.data != self.data['pwd']:
           # raise validators.ValidationError("密碼不一致") # 繼續後續驗證
           raise validators.StopValidation("密碼不一致")  # 再也不繼續後續驗證


@app.route('/register', methods=['GET', 'POST'])
def register():
   if request.method == 'GET':
       form = RegisterForm(data={'gender': 2,'hobby':[1,]}) # initial
       return render_template('register.html', form=form)
   else:
       form = RegisterForm(formdata=request.form)
       if form.validate():
           print('用戶提交數據經過格式驗證,提交的值爲:', form.data)
       else:
           print(form.errors)
       return render_template('register.html', form=form)



if __name__ == '__main__':
   app.run()

<!DOCTYPE html>
<html lang="en">
<head>
   <meta charset="UTF-8">
   <title>Title</title>
</head>
<body>
<h1>用戶註冊</h1>
<form method="post" novalidate style="padding:0 50px">
  {% for field in form %}
   <p>{{field.label}}: {{field}} {{field.errors[0] }}</p>
  {% endfor %}
   <input type="submit" value="提交">
</form>
</body>
</html>

 

18.信號

Flask框架中的信號基於blinker,其主要就是讓開發者但是在flask請求過程當中定製一些用戶行爲

安裝:pip3 install blinker

內置信號:


request_started = _signals.signal('request-started')                # 請求到來前執行
request_finished = _signals.signal('request-finished')              # 請求結束後執行

before_render_template = _signals.signal('before-render-template')  # 模板渲染前執行
template_rendered = _signals.signal('template-rendered')            # 模板渲染後執行

got_request_exception = _signals.signal('got-request-exception')    # 請求執行出現異常時執行

request_tearing_down = _signals.signal('request-tearing-down')      # 請求執行完畢後自動執行(不管成功與否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 應用上下文執行完畢後自動執行(不管成功與否)

appcontext_pushed = _signals.signal('appcontext-pushed')            # 應用上下文push時執行
appcontext_popped = _signals.signal('appcontext-popped')            # 應用上下文pop時執行
message_flashed = _signals.signal('message-flashed')                # 調用flask在其中添加數據時,自動觸發

使用信號:


from flask import Flask,signals,render_template

app = Flask(__name__)

# 往信號中註冊函數
def func(*args,**kwargs):
   print('觸發型號',args,kwargs)
signals.request_started.connect(func)

# 觸發信號: signals.request_started.send()
@app.before_first_request
def before_first1(*args,**kwargs):
   pass
@app.before_first_request
def before_first2(*args,**kwargs):
   pass

@app.before_request
def before_first3(*args,**kwargs):
   pass

@app.route('/',methods=['GET',"POST"])
def index():
   print('視圖')
   return render_template('index.html')


if __name__ == '__main__':
   app.wsgi_app
   app.run()

一個流程中的信號觸發點(瞭解)


a. before_first_request
b. 觸發 request_started 信號
c. before_request
d. 模板渲染
渲染前的信號 before_render_template.send(app, template=template, context=context)
rv = template.render(context) # 模板渲染
渲染後的信號 template_rendered.send(app, template=template, context=context)
e. after_request
f. session.save_session()
g. 觸發 request_finished信號
若是上述過程出錯:
觸發錯誤處理信號 got_request_exception.send(self, exception=e)

h. 觸發信號 request_tearing_down

自定義信號(瞭解):


from flask import Flask, current_app, flash, render_template
from flask.signals import _signals
app = Flask(import_name=__name__)

# 自定義信號
xxxxx = _signals.signal('xxxxx')

def func(sender, *args, **kwargs):
   print(sender)
# 自定義信號中註冊函數
xxxxx.connect(func)
@app.route("/x")
def index():
   # 觸發信號
   xxxxx.send('123123', k1='v1')
   return 'Index'

if __name__ == '__main__':
   app.run()

 

19.多app應用


from werkzeug.wsgi import DispatcherMiddleware
from werkzeug.serving import run_simple
from flask import Flask, current_app
app1 = Flask('app01')
app2 = Flask('app02')

@app1.route('/index')
def index():
   return "app01"

@app2.route('/index2')
def index2():
   return "app2"

# http://www.oldboyedu.com/index
# http://www.oldboyedu.com/sec/index2
dm = DispatcherMiddleware(app1, {
   '/sec': app2,
})

if __name__ == "__main__":
   run_simple('localhost', 5000, dm)

20.flask-script

用於實現相似於django中 python3 manage.py runserver ...相似的命令

安裝:pip3 install flask-script

使用


from flask_script import Manager
app = Flask(__name__)
manager=Manager(app)
...
if __name__ == '__main__':
   manager.run()
#之後在執行,直接:python3 manage.py runserver
#python3 manage.py runserver --help

自定製命令

@manager.command
def custom(arg):
   """
  自定義命令
  python manage.py custom 123
  :param arg:
  :return:
  """
   print(arg)


@manager.option('-n', '--name', dest='name')
#@manager.option('-u', '--url', dest='url')
def cmd(name, url):
   """
  自定義命令(-n也能夠寫成--name)
  執行: python manage.py cmd -n lqz -u http://www.oldboyedu.com
  執行: python manage.py cmd --name lqz --url http://www.oldboyedu.com
  :param name:
  :param url:
  :return:
  """
   print(name, url)
#有什麼用?
#把excel的數據導入數據庫,定製個命令,去執行

 

SQLAlchemy

1.介紹

SQLAlchemy是一個基於Python實現的ORM框架。該框架創建在 DB API之上,使用關係對象映射進行數據庫操做,簡言之即是:將類和對象轉換成SQL,而後使用數據API執行SQL並獲取執行結果。

pip3 install sqlalchemy

 

 

組成部分:


Engine,框架的引擎
Connection Pooling ,數據庫鏈接池
Dialect,選擇鏈接數據庫的DB API種類
Schema/Types,架構和類型
SQL Exprression Language,SQL表達式語言

SQLAlchemy自己沒法操做數據庫,其必須以來pymsql等第三方插件,Dialect用於和數據API進行交流,根據配置文件的不一樣調用不一樣的數據庫API,從而實現對數據庫的操做,如:


MySQL-Python
   mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
   
pymysql
   mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
   
MySQL-Connector
   mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
   
cx_Oracle
   oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
   
更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html

2.簡單使用(能建立表,刪除表,不能修改表)

修改表:在數據庫添加字段,類對應上

1執行原生sql(不經常使用)

import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine

engine = create_engine(
   "mysql+pymysql://root:123456@127.0.0.1:3306/test?charset=utf8",
   max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
   pool_size=5,  # 鏈接池大小
   pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
   pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
)
def task(arg):
   conn = engine.raw_connection()
   cursor = conn.cursor()
   cursor.execute(
       "select * from app01_book"
  )
   result = cursor.fetchall()
   print(result)
   cursor.close()
   conn.close()

for i in range(20):
   t = threading.Thread(target=task, args=(i,))
   t.start()

2 orm使用

models.py


import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
Base = declarative_base()

class Users(Base):
   __tablename__ = 'users'  # 數據庫表名稱
   id = Column(Integer, primary_key=True)  # id 主鍵
   name = Column(String(32), index=True, nullable=False)  # name列,索引,不可爲空
   # email = Column(String(32), unique=True)
   #datetime.datetime.now不能加括號,加了括號,之後永遠是當前時間
   # ctime = Column(DateTime, default=datetime.datetime.now)
   # extra = Column(Text, nullable=True)

   __table_args__ = (
       # UniqueConstraint('id', 'name', name='uix_id_name'), #聯合惟一
       # Index('ix_id_name', 'name', 'email'), #索引
  )

def init_db():
   """
  根據類建立數據庫表
  :return:
  """
   engine = create_engine(
       "mysql+pymysql://root:123456@127.0.0.1:3306/aaa?charset=utf8",
       max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
       pool_size=5,  # 鏈接池大小
       pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
       pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
  )

   Base.metadata.create_all(engine)

def drop_db():
   """
  根據類刪除數據庫表
  :return:
  """
   engine = create_engine(
       "mysql+pymysql://root:123456@127.0.0.1:3306/aaa?charset=utf8",
       max_overflow=0,  # 超過鏈接池大小外最多建立的鏈接
       pool_size=5,  # 鏈接池大小
       pool_timeout=30,  # 池中沒有線程最多等待的時間,不然報錯
       pool_recycle=-1  # 多久以後對線程池中的線程進行一次鏈接的回收(重置)
  )

   Base.metadata.drop_all(engine)

if __name__ == '__main__':
   # drop_db()
   init_db()

app.py


from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users
#"mysql+pymysql://root@127.0.0.1:3306/aaa"
engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)
Connection = sessionmaker(bind=engine)

# 每次執行數據庫操做時,都須要建立一個Connection
con = Connection()

# ############# 執行ORM操做 #############
obj1 = Users(name="lqz")
con.add(obj1)
# 提交事務
con.commit()

# 關閉session,實際上是將鏈接放回鏈接池
con.close()

3.一對多關係


class Hobby(Base):
   __tablename__ = 'hobby'
   id = Column(Integer, primary_key=True)
   caption = Column(String(50), default='籃球')


class Person(Base):
   __tablename__ = 'person'
   nid = Column(Integer, primary_key=True)
   name = Column(String(32), index=True, nullable=True)
   # hobby指的是tablename而不是類名
   hobby_id = Column(Integer, ForeignKey("hobby.id"))
   
   # 跟數據庫無關,不會新增字段,只用於快速鏈表操做
   # 類名,backref用於反向查詢
   hobby=relationship('Hobby',backref='pers')

4.多對多關係


class Boy2Girl(Base):
   __tablename__ = 'boy2girl'
   id = Column(Integer, primary_key=True, autoincrement=True)
   girl_id = Column(Integer, ForeignKey('girl.id'))
   boy_id = Column(Integer, ForeignKey('boy.id'))


class Girl(Base):
   __tablename__ = 'girl'
   id = Column(Integer, primary_key=True)
   name = Column(String(64), unique=True, nullable=False)


class Boy(Base):
   __tablename__ = 'boy'

   id = Column(Integer, primary_key=True, autoincrement=True)
   hostname = Column(String(64), unique=True, nullable=False)
   
   # 與生成表結構無關,僅用於查詢方便,放在哪一個單表中均可以
   servers = relationship('Girl', secondary='boy2girl', backref='boys')

5.操做數據表


from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users
 
engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
 
# 每次執行數據庫操做時,都須要建立一個session
session = Session()
 
# ############# 執行ORM操做 #############
obj1 = Users(name="lqz")
session.add(obj1)
 
# 提交事務
session.commit()
# 關閉session
session.close()

6.基於scoped_session實現線程安全


from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import Users

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)

"""
# 線程安全,基於本地線程實現每一個線程用同一個session
# 特殊的:scoped_session中有原來方法的Session中的一下方法:

public_methods = (
  '__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested',
  'close', 'commit', 'connection', 'delete', 'execute', 'expire',
  'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind',
  'is_modified', 'bulk_save_objects', 'bulk_insert_mappings',
  'bulk_update_mappings',
  'merge', 'query', 'refresh', 'rollback',
  'scalar'
)
"""
#scoped_session類並無繼承Session,可是卻又它的全部方法
session = scoped_session(Session)
# ############# 執行ORM操做 #############
obj1 = Users(name="alex1")
session.add(obj1)

# 提交事務
session.commit()
# 關閉session
session.close()

7.基本增刪查改


import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text

from db import Users, Hosts

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)

session = Session()

# ################ 添加 ################
"""
obj1 = Users(name="wupeiqi")
session.add(obj1)

session.add_all([
  Users(name="lqz"),
  Users(name="egon"),
  Hosts(name="c1.com"),
])
session.commit()
"""

# ################ 刪除 ################
"""
session.query(Users).filter(Users.id > 2).delete()
session.commit()
"""
# ################ 修改 ################
"""
#傳字典
session.query(Users).filter(Users.id > 0).update({"name" : "lqz"})
#相似於django的F查詢
session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)
session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")
session.commit()
"""
# ################ 查詢 ################
"""
r1 = session.query(Users).all()
#只取age列,把name重命名爲xx
r2 = session.query(Users.name.label('xx'), Users.age).all()
#filter傳的是表達式,filter_by傳的是參數
r3 = session.query(Users).filter(Users.name == "lqz").all()
r4 = session.query(Users).filter_by(name='lqz').all()
r5 = session.query(Users).filter_by(name='lqz').first()
#:value 和:name 至關於佔位符,用params傳參數
r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all()
#自定義查詢sql
r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()
"""

#增,刪,改都要commit()
session.close()

8.經常使用操做


# 條件
ret = session.query(Users).filter_by(name='lqz').all()
#表達式,and條件鏈接
ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
#注意下劃線
ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
#~非,除。。外
ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
#二次篩選
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
from sqlalchemy import and_, or_
#or_包裹的都是or條件,and_包裹的都是and條件
ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
ret = session.query(Users).filter(
   or_(
       Users.id < 2,
       and_(Users.name == 'eric', Users.id > 3),
       Users.extra != ""
  )).all()


# 通配符,以e開頭,不以e開頭
ret = session.query(Users).filter(Users.name.like('e%')).all()
ret = session.query(Users).filter(~Users.name.like('e%')).all()

# 限制,用於分頁,區間
ret = session.query(Users)[1:2]

# 排序,根據name降序排列(從大到小)
ret = session.query(Users).order_by(Users.name.desc()).all()
#第一個條件重複後,再按第二個條件升序排
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

# 分組
from sqlalchemy.sql import func

ret = session.query(Users).group_by(Users.extra).all()
#分組以後取最大id,id之和,最小id
ret = session.query(
   func.max(Users.id),
   func.sum(Users.id),
   func.min(Users.id)).group_by(Users.name).all()
#haviing篩選
ret = session.query(
   func.max(Users.id),
   func.sum(Users.id),
   func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()

# 連表(默認用forinkey關聯)

ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()
#join表,默認是inner join
ret = session.query(Person).join(Favor).all()
#isouter=True 外連,表示Person left join Favor,沒有右鏈接,反過來便可
ret = session.query(Person).join(Favor, isouter=True).all()
#打印原生sql
aa=session.query(Person).join(Favor, isouter=True)
print(aa)
# 本身指定on條件(連表條件),第二個參數,支持on多個條件,用and_,同上
ret = session.query(Person).join(Favor,Person.id==Favor.id, isouter=True).all()
# 組合(瞭解)UNION 操做符用於合併兩個或多個 SELECT 語句的結果集
#union和union all的區別?
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()

q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()

9.執行原生sql


import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)

session = Session()

# 查詢
# cursor = session.execute('select * from users')
# result = cursor.fetchall()

# 添加
cursor = session.execute('insert into users(name) values(:value)',params={"value":'lqz'})
session.commit()
print(cursor.lastrowid)

session.close()

10.一對多


import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts, Hobby, Person

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()
# 添加
"""
session.add_all([
  Hobby(caption='乒乓球'),
  Hobby(caption='羽毛球'),
  Person(name='張三', hobby_id=3),
  Person(name='李四', hobby_id=4),
])

person = Person(name='張九', hobby=Hobby(caption='姑娘'))
session.add(person)
#添加二
hb = Hobby(caption='人妖')
hb.pers = [Person(name='文飛'), Person(name='博雅')]
session.add(hb)

session.commit()
"""

# 使用relationship正向查詢
"""
v = session.query(Person).first()
print(v.name)
print(v.hobby.caption)
"""

# 使用relationship反向查詢
"""
v = session.query(Hobby).first()
print(v.caption)
print(v.pers)
"""
#方式一,本身鏈表
# person_list=session.query(models.Person.name,models.Hobby.caption).join(models.Hobby,isouter=True).all()
person_list=session.query(models.Person,models.Hobby).join(models.Hobby,isouter=True).all()
for row in person_list:
   # print(row.name,row.caption)
   print(row[0].name,row[1].caption)

#方式二:經過relationship

person_list=session.query(models.Person).all()
for row in person_list:
   print(row.name,row.hobby.caption)
#查詢喜歡姑娘的全部人
obj=session.query(models.Hobby).filter(models.Hobby.id==1).first()
persons=obj.pers
print(persons)
session.close()

11.多對多


import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()
# 添加
"""
session.add_all([
  Server(hostname='c1.com'),
  Server(hostname='c2.com'),
  Group(name='A組'),
  Group(name='B組'),
])
session.commit()

s2g = Server2Group(server_id=1, group_id=1)
session.add(s2g)
session.commit()


gp = Group(name='C組')
gp.servers = [Server(hostname='c3.com'),Server(hostname='c4.com')]
session.add(gp)
session.commit()


ser = Server(hostname='c6.com')
ser.groups = [Group(name='F組'),Group(name='G組')]
session.add(ser)
session.commit()
"""


# 使用relationship正向查詢
"""
v = session.query(Group).first()
print(v.name)
print(v.servers)
"""

# 使用relationship反向查詢
"""
v = session.query(Server).first()
print(v.hostname)
print(v.groups)
"""


session.close()

12.其它


import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text, func
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()

# 關聯子查詢:correlate(Group)表示跟Group表作關聯,as_scalar至關於對該sql加括號,用於放在後面當子查詢
subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar()
result = session.query(Group.name, subqry)
"""
SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid
FROM server
WHERE server.id = `group`.id) AS anon_1
FROM `group`
"""
'''

select * from tb where id in [select id from xxx];

select id,
name,
#必須保證這次查詢只有一個值
(select max(id) from xxx) as mid
from tb

例如,第三個字段只能有一個值
id name mid
1 lqz   1,2 不合理
2 egon   2


'''


# 原生SQL
"""
# 查詢
cursor = session.execute('select * from users')
result = cursor.fetchall()

# 添加
cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
session.commit()
print(cursor.lastrowid)
"""

session.close()

13.Flask-SQLAlchemy

flask和SQLAchemy的管理者,經過他把他們作鏈接


db = SQLAlchemy()
- 包含配置
- 包含ORM基類
- 包含create_all
- engine
- 建立鏈接

離線腳本,建立表

詳見代碼

相關文章
相關標籤/搜索