咱們在作先後端分離的項目中,最經常使用的都是使用token認證。前端
登陸後將用戶信息,過時時間以及私鑰一塊兒加密生成token,可是比較頭疼的就是token過時刷新的問題,由於用戶在登陸後,若是在使用過程當中,忽然提示token過時了,須要從新登陸,會以爲很奇怪。python
我使用的方式,是在解析token的時候,若是token過時了,可是還在可刷新的時間範圍內,咱們應該自動刷新token,並將新的token返回給用戶。redis
可是若是前端採用異步請求,同時發來了多個接口的話,咱們不可能對每一個請求的token都進行刷新。json
個人解決方案是,將過時但還在刷新範圍的token存入redis,同時設置token的過時時間爲可刷新時間,過了可刷新時間,token就會被自動刪除後端
當前端多個請求過來時,會對請求帶來的token進行驗證,分三種狀況:api
1)若是token已通過了刷新時間,則拋出異常。cookie
2)若是token不在redis中,表示剛剛過時,尚未進行刷新token操做,須要刷新token。app
3)若是token在redis中,則權限默認經過。前後端分離
下面上代碼:異步
1)爲了給token加上可刷新時間,須要重寫TimedJSONWebSignatureSerializer 的make_header和loads方法
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer_
import redis
r = redis.Redis(host="127.0.0.1", port=6379,db=0)
class Serializer(Serializer_): def __init__(self, secret_key, expires_in=None, **kwargs): self.expires_in = expires_in super(Serializer, self).__init__(secret_key, expires_in, **kwargs) def make_header(self, header_fields): header = JSONWebSignatureSerializer.make_header(self, header_fields) iat = self.now() exp = iat + self.expires_in refresh_exp = iat+current_app.config["REFRESH_TIME"] header["iat"] = iat header["exp"] = exp header["refresh_exp"] = refresh_exp return header def loads(self, s, salt=None, return_header=False): payload, header = JSONWebSignatureSerializer.loads( self, s, salt, return_header=True ) if "exp" not in header: raise BadSignature("Missing expiry date", payload=payload) int_date_error = BadHeader("Expiry date is not an IntDate", payload=payload) try: header["exp"] = int(header["exp"]) except ValueError: raise int_date_error if header["exp"] < 0: raise int_date_error now = self.now() if header["exp"] < now: if header["refresh_exp"]<now: # 已通過了可刷新時間,直接拋出異常 raise SignatureExpired( "Signature expired", payload=payload, date_signed=self.get_issue_date(header), ) else: # TODO 增長判斷,看是否有存儲在redis中,若是有存儲過,表示token已經被刷新過了,直接放行便可。 if r.get(s): return payload pxt = header["refresh_exp"] - now if pxt>0: r.set(s, header["exp"], px = pxt) # 還在可刷新時間內 # 生成新的token返回給前端 serializer = Serializer(current_app.config["SECRET_KEY"], expires_in=self.expires_in) # 調用serializer的dumps方法將uid和type寫入生成token token = serializer.dumps(payload) res = make_response() res.headers["Authorization"] = token res.set_cookie("authorization",token.decode("ascii")) return payload, token if return_header: return payload, header return payload
2)認證權限
auth = HTTPBasicAuth() user = namedtuple("User",["uid","type","scope"]) @auth.verify_password def check_authorization(token, pwd): user_info = check_auth_token(token) if not user_info: return False else: if isinstance(user_info, tuple): user_info_ = user_info[0] token = user_info[1] else: user_info_ = user_info g.user = user_info_ return True if not token else token def check_auth_token(token): serialzer = Serializer(current_app.config["SECRET_KEY"]) try: s = serialzer.loads(token) except BadSignature: raise AuthFailed(msg="token is invalid", error_code=1004) except SignatureExpired: raise AuthFailed(msg="token is expired", error_code=1004) token = "" if isinstance(s, tuple): u_info = s[0] token = s[1] else: u_info = s uid = u_info["uid"] type = u_info["type"] scope = u_info["scope"] return user(uid, type, scope), token
3)生成新的token後,將新的token放入response的header中,前端人員從response header中去取authorization
@api.router("/get/<int:uid>") @auth.login_required def get_user(uid, token=None): user = User.query.get_or_404(uid) res_json = jsonify(user) res = make_response(res_json) res.headers["Authorization"] = token return res