flask刷新token

咱們在作先後端分離的項目中,最經常使用的都是使用token認證。前端

登陸後將用戶信息,過時時間以及私鑰一塊兒加密生成token,可是比較頭疼的就是token過時刷新的問題,由於用戶在登陸後,若是在使用過程當中,忽然提示token過時了,須要從新登陸,會以爲很奇怪。python

我使用的方式,是在解析token的時候,若是token過時了,可是還在可刷新的時間範圍內,咱們應該自動刷新token,並將新的token返回給用戶。redis

可是若是前端採用異步請求,同時發來了多個接口的話,咱們不可能對每一個請求的token都進行刷新。json

個人解決方案是,將過時但還在刷新範圍的token存入redis,同時設置token的過時時間爲可刷新時間,過了可刷新時間,token就會被自動刪除後端

當前端多個請求過來時,會對請求帶來的token進行驗證,分三種狀況:api

  1)若是token已通過了刷新時間,則拋出異常。cookie

  2)若是token不在redis中,表示剛剛過時,尚未進行刷新token操做,須要刷新token。app

  3)若是token在redis中,則權限默認經過。前後端分離

下面上代碼:異步

  1)爲了給token加上可刷新時間,須要重寫TimedJSONWebSignatureSerializer 的make_header和loads方法

from itsdangerous import TimedJSONWebSignatureSerializer as Serializer_
import redis
r = redis.Redis(host="127.0.0.1", port=6379,db=0)
class Serializer(Serializer_):
    def __init__(self, secret_key, expires_in=None, **kwargs):
        self.expires_in = expires_in
        super(Serializer, self).__init__(secret_key, expires_in, **kwargs)

    def make_header(self, header_fields):
        header = JSONWebSignatureSerializer.make_header(self, header_fields)
        iat = self.now()
        exp = iat + self.expires_in
        refresh_exp = iat+current_app.config["REFRESH_TIME"]
        header["iat"] = iat
        header["exp"] = exp
        header["refresh_exp"] = refresh_exp
        return header

    def loads(self, s, salt=None, return_header=False):
        payload, header = JSONWebSignatureSerializer.loads(
            self, s, salt, return_header=True
        )

        if "exp" not in header:
            raise BadSignature("Missing expiry date", payload=payload)

        int_date_error = BadHeader("Expiry date is not an IntDate", payload=payload)
        try:
            header["exp"] = int(header["exp"])
        except ValueError:
            raise int_date_error
        if header["exp"] < 0:
            raise int_date_error
        now = self.now()
        if header["exp"] < now:
            if header["refresh_exp"]<now:
                # 已通過了可刷新時間,直接拋出異常
                raise SignatureExpired(
                    "Signature expired",
                    payload=payload,
                    date_signed=self.get_issue_date(header),
                )
            else:
                # TODO 增長判斷,看是否有存儲在redis中,若是有存儲過,表示token已經被刷新過了,直接放行便可。
                if r.get(s):
                    return payload
                pxt = header["refresh_exp"] - now
                if pxt>0:
                    r.set(s, header["exp"], px = pxt)
                # 還在可刷新時間內
                # 生成新的token返回給前端
                serializer = Serializer(current_app.config["SECRET_KEY"], expires_in=self.expires_in)
                # 調用serializer的dumps方法將uid和type寫入生成token
                token = serializer.dumps(payload)
                res = make_response()
                res.headers["Authorization"] = token
                res.set_cookie("authorization",token.decode("ascii"))
                return payload, token
        if return_header:
            return payload, header
        return payload 

  2)認證權限

auth = HTTPBasicAuth()
user = namedtuple("User",["uid","type","scope"])

@auth.verify_password
def check_authorization(token, pwd):
    user_info = check_auth_token(token)
    if not user_info:
        return False
    else:
        if isinstance(user_info, tuple):
            user_info_ = user_info[0]
            token = user_info[1]
        else:
            user_info_ = user_info
        g.user = user_info_
        return True if not token else token


def check_auth_token(token):
    serialzer = Serializer(current_app.config["SECRET_KEY"])
    try:
        s = serialzer.loads(token)
    except BadSignature:
        raise AuthFailed(msg="token is invalid", error_code=1004)
    except SignatureExpired:
        raise AuthFailed(msg="token is expired", error_code=1004)
    token = ""
    if isinstance(s, tuple):
        u_info = s[0]
        token = s[1]
    else:
        u_info = s
    uid = u_info["uid"]
    type = u_info["type"]
    scope = u_info["scope"]
    return user(uid, type, scope), token

  3)生成新的token後,將新的token放入response的header中,前端人員從response header中去取authorization

@api.router("/get/<int:uid>")
@auth.login_required
def get_user(uid, token=None):
    user = User.query.get_or_404(uid)
    res_json = jsonify(user)
    res = make_response(res_json)
    res.headers["Authorization"] = token
    return res
相關文章
相關標籤/搜索