SON Web令牌以緊湊的形式由三部分組成,這些部分由點(.)分隔,分別是:python
即爲: xxxx.yyyy.zzzz
git
Header一般由兩部分組成:令牌的類型(即JWT)和所使用的簽名算法(例如HMAC SHA256或RSA)。 例如:github
{
"alg": "HS256",
"typ": "JWT"
}
複製代碼
Header會被Base64Url編碼爲JWT的第一部分。即爲:redis
$ echo -n '{"alg":"HS256","typ":"JWT"}'|base64
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9
複製代碼
Payload是有關實體(一般是用戶)和其餘數據的聲明,它包含三部分:算法
這些是一組預約義的權利要求,不是強制性的,而是建議使用的,以提供一組有用的可互操做的權利要求。其中一些是: iss(JWT的簽發者), exp(expires,到期時間), sub(主題), aud(JWT接收者),iat(issued at,簽發時間)等。數據庫
注意:聲明名稱都是三個字符json
公共的聲明能夠添加任何的信息,通常添加用戶的相關信息或其餘業務須要的必要信息.但不建議添加敏感信息,由於該部分在客戶端可解密。flask
私有聲明是提供者和消費者所共同定義的聲明,通常不建議存放敏感信息,由於base64是對稱解密的,意味着該部分信息能夠歸類爲明文信息。跨域
例子:瀏覽器
{ "iat": 1593955943,
"exp": 1593955973,
"uid": 10,
"username": "test",
"scopes": [ "admin", "user" ]
}
複製代碼
Payload會被Base64Url編碼爲JWT的第二部分。即爲:
$ echo -n '{"iat":1593955943,"exp":1593955973,"uid":10,"username":"test","scopes":["admin","user"]}'|base64
eyJ1c2VybmFtZSI6InRlc3QiLCJpYXQiOjE1OTM5NTU5NDMsInVpZCI6MTAsImV4cCI6MTU5Mzk1NTk3Mywic2NvcGVzIjpbImFkbWluIiwidXNlciJdfQ
複製代碼
注意:對於已簽名的令牌,此信息儘管能夠防止篡改,但任何人均可以讀取。除非將其加密,不然請勿將機密信息放入JWT的有效負載或報頭元素中。
Signature部分的生成須要base64編碼以後的Header,base64編碼以後的Payload,密鑰(secret),Header須要指定簽字的算法。
例如,若是要使用HMAC SHA256算法,則將經過如下方式建立簽名:
HMACSHA256(
base64UrlEncode(header) + "." +
base64UrlEncode(payload),
secret)
複製代碼
輸出是三個由點分隔的Base64-URL字符串,能夠在HTML和HTTP環境中輕鬆傳遞這些字符串,與基於XML的標準(例如SAML)相比,它更緊湊。
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6InRlc3QiLCJpYXQiOjE1OTM5NTU5NDMsInVpZCI6MTAsImV4cCI6MTU5Mzk1NTk3Mywic2NvcGVzIjpbImFkbWluIiwidXNlciJdfQ.VHpxmxKVKpsn2Iytqc_6Z1U1NtiX3EgVki4PmA-J3Pg"
複製代碼
JWT是無狀態受權機制,服務器的受保護路由將Header中檢查有效的token,若是存在,則將容許用戶訪問受保護的資源。若是JWT包含必要的數據,則能夠減小查詢數據庫中某些操做的需求。
根據下面的這張流程圖來分析一下JWT的工做過程
import time
from functools import wraps
from flask import Flask, request, jsonify
import jwt
from jwt import ExpiredSignatureError
app = Flask(__name__)
max_time = 60
refresh_max_time = 120
token_secret = "This is a secret"
def verify_token(func):
@wraps(func)
def decorator(*args, **kwargs):
try:
token = request.headers["token"]
print(token)
data = jwt.decode(token, token_secret, algorithms=['HS256'])
now = int(time.time())
time_interval = now - data['time']
if time_interval >= max_time:
# create new token
token, refresh_token = creat_token()
return jsonify({"token": token, "refresh_token": refresh_token})
except ExpiredSignatureError:
return "Token expired"
except Exception as ex:
print(ex)
return "Log in again"
return func(*args, **kwargs)
return decorator
def creat_token(uid):
now = int(time.time())
payload = {'uid': uid, 'time': now, 'exp': now + max_time}
refresh_payload = {'uid': uid, 'time': now, 'exp': now + refresh_max_time}
token = jwt.encode(payload, token_secret, algorithm='HS256')
refresh_token = jwt.encode(refresh_payload, token_secret, algorithm='HS256')
return token, refresh_token
@app.route('/login', methods=["POST"])
def login():
user_name = request.values.get('user_name')
password = request.values.get('password')
# @TODO 根據user_name和password 獲取惟一的uid
uid = 10
token, refresh_token = creat_token(uid=uid)
return jsonify({"token": token, "refresh_token": refresh_token})
@app.route('/test', methods=['GET'])
@verify_token
def test():
return 'hello world'
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)
複製代碼
itsdangerous支持JSON Web 簽名 (JWS),內部默認使用了HMAC和SHA1來簽名,其中類JSONWebSignatureSerializer
內部與JWT一致,也分紅三部分(header,payload,signature),查看源碼可知:
def dumps(self, obj, salt=None, header_fields=None):
"""Like :meth:`.Serializer.dumps` but creates a JSON Web Signature. It also allows for specifying additional fields to be included in the JWS header. """
header = self.make_header(header_fields)
signer = self.make_signer(salt, self.algorithm)
return signer.sign(self.dump_payload(header, obj))
複製代碼
def dump_payload(self, header, obj):
base64d_header = base64_encode(
self.serializer.dumps(header, **self.serializer_kwargs)
)
base64d_payload = base64_encode(
self.serializer.dumps(obj, **self.serializer_kwargs)
)
return base64d_header + b"." + base64d_payload
複製代碼
.
拼接__init__
中用戶定義的secret來生成新的token感興趣的朋友能夠直接參看github源碼,這裏再也不展開贅述。
import time
from functools import wraps
from flask import Flask, request, jsonify
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer, SignatureExpired
app = Flask(__name__)
max_time = 60
refresh_max_time = 120
token_secret = "This is a secret"
def verify_token(func):
@wraps(func)
def decorator(*args, **kwargs):
try:
token = request.headers["token"]
print(token)
s = Serializer(token_secret)
data = s.loads(token)
now = int(time.time())
time_interval = now - data['time']
if time_interval >= max_time:
# create new token
token, refresh_token = creat_token()
return jsonify({"token": token, "refresh_token": refresh_token})
except SignatureExpired:
return "Token expired"
except Exception as ex:
print(ex)
return "Log in again"
return func(*args, **kwargs)
return decorator
def creat_token(uid):
now = int(time.time())
s = Serializer(token_secret, expires_in=max_time)
token = s.dumps({"uid": uid, "time": now}).decode("ascii")
refresh_s = Serializer(token_secret, expires_in=refresh_max_time)
refresh_token = refresh_s.dumps({"uid": uid, "time": now}).decode("ascii")
return token, refresh_token
@app.route('/token', methods=["POST"])
def token():
user_name = request.values.get('user_name')
password = request.values.get('password')
# @TODO 根據user_name和password 獲取惟一的uid
uid = 10
token, refresh_token = creat_token(uid=uid)
return jsonify({"token": token, "refresh_token": refresh_token})
@app.route('/test', methods=['GET'])
@verify_token
def test():
return 'hello world'
if __name__ == "__main__":
app.run(host="0.0.0.0")
複製代碼
TimedJSONWebSignatureSerializer
相比JSONWebSignatureSerializer
在header中贈加了過時時間,若是過時會拋出SignatureExpired
異常。
JWT是無狀態的,用戶登出設置token無效就已經違背了JWT的設計原則,可是在實際應用場景中,這種功能是須要的,那該如何實現呢?提供幾種思路:
爲了保持數據的一致性,每一次認證都須要從redis中取出對應的token,每一次都以redis中的token爲準。
複製代碼
請思考這樣一種場景:
很明顯,這種狀況是不該該出現的,說一下本身的想法: