先後端分離項目中作用戶認證,用戶每次登陸成功後返回一個token,下次訪問時header帶上返回的token證實改用戶是登陸過的,不須要再次登陸,不然返回錯誤信息python
重寫裝飾器作登陸認證後端
def authenticated(func): """ 重寫tornado authenticated """ @functools.wraps(func) async def wrapper(self, *args, **kwargs): res_data = {} token = self.request.headers.get("token") if token: user = None try: jwt_data = jwt.decode( token, self.settings["secret_key"], leeway=self.settings["jwt_expires"], # 判斷過時時間 options={"verify_exp": True} # 是否驗證 ) user_id = jwt_data["id"] user = await self.application.objects.get(User, user_id=user_id) except Exception as e: self.set_status(401) res_data["content"] = "token error" if user: self._current_user = user result = await func(self, *args, **kwargs) return result else: self.set_status(401) res_data["content"] = "token error" else: self.set_status(401) res_data["content"] = "miss token" self.write(res_data) return wrapper
登陸成功後返回jwt_tokenapp
def get_jwt_token(self,user_id): """ fun : 使用jwt生成token :param user_id: :return: """ payload = { "id": user_id, "iat": int(time.time()), "exp": int(time.mktime((datetime.datetime.now() + datetime.timedelta(minutes=60)).timetuple())) } jwt_token = jwt.encode( payload, self.settings["secret_key"], # 進行加密簽名的密鑰 algorithm="HS256", headers={"alg": "HS256", "typ": "JWT"} ).decode("utf-8") return jwt_token