JWT在Flask中的應用

python中實現jwt的有兩個經常使用的庫。前端

  • itsdangerouspython

    • JSONWebSignatureSerializer數據庫

    • TimedJSONWebSignatureSerializer (可設置有效期)flask

  • pyjwt安全

    pyjwt.readthedocs.io/en/latest/bash

    下面介紹pyjwt的使用。restful

    安裝

    $ pip install pyjwt複製代碼

    用例

    >>> import jwt
    
      >>> encoded_jwt = jwt.encode({'some': 'payload'}, 'secret', algorithm='HS256')
      >>> encoded_jwt
      'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzb21lIjoicGF5bG9hZCJ9.4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZg'
    
      >>> jwt.decode(encoded_jwt, 'secret', algorithms=['HS256'])
      {'some': 'payload'}複製代碼

項目封裝

因爲項目中多個業務場景都須要用到jwt,下面先封裝到項目的utils包中,下面介紹在Flask項目中的一種形式,先封裝jwt的生成utils/jwt_util.pyapp

import jwt
from flask import current_app

def generate_jwt(payload, expiry, secret=None):
    """ :param payload: dict 載荷 :param expiry: datetime 有效期 :param secret: 密鑰 :return: 生成jwt """
    _payload = {'exp': expiry}
    _payload.update(payload)

    if not secret:
        secret = current_app.config['JWT_SECRET'] # 須要在配置文件配置JWT_SECRET

    token = jwt.encode(_payload, secret, algorithm='HS256')
    return token.decode()複製代碼

下面是封裝jwt的校驗。post

def verify_jwt(token, secret=None):
    """ 校驗jwt :param token: jwt :param secret: 密鑰 :return: dict: payload """
    if not secret:
        secret = current_app.config['JWT_SECRET']

    try:
        payload = jwt.decode(token, secret, algorithm=['HS256'])
    except jwt.PyJWTError:
        payload = None

    return payload複製代碼

其中algorithm='HS256'表示使用sha256計算簽名的時候,使用一個secret祕鑰字符串參與運算,使用sha256計算驗籤的時候,使用相同的secret 祕鑰字符串參與運算。ui

登陸方案

爲了考慮安全性,用戶的初始token 的有效期不宜過長,能夠初始設置一個短時間的有效期,而後在過時的時候,在用戶無感知狀況下,讓客戶端從新發一個請求去獲取一個refresh_token,這個可以很好的防止抓包,提高了安全性。

下面是一種使用場景。短時間token和長期token可根據需求設置。

from datetime import datetime, timedelta
from flask_restful import Resource
from utils.jwt_util import generate_jwt
from flask import current_app, g


class AuthorizationResource(Resource):
    """ 登陸認證 """
    def _generate_tokens(self, user_id, with_refresh_token=True):
        """ 生成token 和refresh_token :param user_id: 用戶id :return: token, refresh_token """
         # 頒發JWT
        now = datetime.utcnow()
        expiry = now + timedelta(hours=current_app.config['JWT_EXPIRY_HOURS'])# 短時間token
        token = generate_jwt({'user_id': user_id, 'refresh': False}, expiry)
        refresh_token = None
        if with_refresh_token:
            refresh_expiry = now + timedelta(days=current_app.config['JWT_REFRESH_DAYS']) # 長期token
            refresh_token = generate_jwt({'user_id': user_id, 'refresh': True}, refresh_expiry)
        return token, refresh_token

    def post(self):
        """ 用戶登陸建立token """
 	# ...根據業務不一樣,此處省略參數校驗和數據庫保存業務場景

        token, refresh_token = self._generate_tokens(user.id) 

        return {'token': token, 'refresh_token': refresh_token}, 201複製代碼

爲了使用jwt對用戶進行受權驗證,能夠在請求鉤子before_request中驗證用戶身份。下面封裝一箇中間件utils/middlewares.py,而後在建立應用中調用。

from flask import request, g
from .jwt_util import verify_jwt

def jwt_authentication():
    """ 根據jwt驗證用戶身份 """
    g.user_id = None
    g.is_refresh_token = False
    authorization = request.headers.get('Authorization')
    if authorization and authorization.startswith('Bearer '): # 讓前端請求頭攜帶Authorization,值以'Bearer '開頭
        token = authorization.strip()[7:]
        payload = verify_jwt(token)
        if payload:
            g.user_id = payload.get('user_id')
            g.is_refresh_token = payload.get('refresh')複製代碼

項目中添加請求鉤子,保證每次請求以前都可以訪問。

# 添加請求鉤子
from common.utils.middlewares import jwt_authentication
app.before_request(jwt_authentication)複製代碼

下面提供更新token的接口。

class AuthorizationResource(Resource):
    """ 認證 """
    ......

    # 補充put方式 更新token接口
    def put(self):
        """ 刷新token """
        user_id = g.user_id
        if user_id and g.is_refresh_token:
            token, refresh_token = self._generate_tokens(user_id, with_refresh_token=False)
            return {'token': token}, 201
        else:
            return {'message': 'Wrong refresh token.'}, 403複製代碼

有些業務場景必需要求用戶進行登陸才能訪問,下面封裝/utils/decorators.py.

def login_required(func):
    """ 用戶必須登陸裝飾器 使用方法:放在method_decorators中 """
 @wraps(func)
    def wrapper(*args, **kwargs):
        if not g.user_id:
            return {'message': 'User must be authorized.'}, 401
        elif g.is_refresh_token:
            return {'message': 'Do not use refresh token.'}, 403
        else:
            return func(*args, **kwargs)

    return wrapper複製代碼
相關文章
相關標籤/搜索