點擊python編程從入門到實踐,置頂 公衆號重磅 python入門資料,第一時間送達python
讀完須要 18分鐘算法
速讀僅需 6 分鐘數據庫
/ python 生產實戰 60 秒系統安全認證明戰 /編程
上節主要講解了目前主流的認證規範/協議以及對 JWT 進行了深刻的研究和分析並在最後給出了在生產環境中如何去生成一個有效的 Token,基於 Python 語言那在生產環境中是如何進行有效的安全認證的呢?上節咱們也基於 JWT 的 Token 的認證過程進行了登錄認證、請求認證的理論分析以及用圖示的方式給出了數據的流向,本節咱們再帶你們從代碼層面走一次流程,一方面加深你們對上節理論部分的理解,另外一方面也是給你們在作工程的過程當中提供一套"模版"快速應用在項目中後端
1api
準備工具安全
1.1微信
密碼安全閉包
爲了數據安全,咱們利用 PassLib 對入庫的用戶密碼進行加密處理,推薦的加密算法是"Bcrypt"。咱們須要安裝依賴包:併發
pip install passlibpip install bcrypt
簡單的介紹一下這兩個庫:
1.passlib 是 python2&3 的密碼散列庫,它提供超過 30 種密碼散列算法的跨平臺實現,以及做爲管理現有密碼哈希的框架。它被設計成有用的 對於範圍普遍的任務,從驗證/etc/shadow 中找到的散列到 爲多用戶應用程序提供全強度密碼哈希。2.bcrypt 模塊是一個用於在 Python 中生成強哈希值的庫。
注意:若對於以上兩個庫的詳細內容比較感興趣的小夥伴能夠自行到 python 官網查看相關資料,咱們本節的核心是作整個流程的實現這個具體的庫內容就先不作詳細介紹了。
2
用戶登錄流程
用戶經過終端發送 username 和 password 到後端。後端收到數據後,進行一下操做:1.用戶信息校驗:查詢當前系統是否存在該用戶,以及密碼是否正確
2.若是用戶存在,則生成 JWT token 並返回,JWT payload 中能夠攜帶自定義數據
# -*- encoding: utf-8 -*-from datetime import datetime, timedeltafrom typing import Optionalfrom fastapi import Depends, FastAPI, HTTPException, statusfrom fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestFormimport jwtfrom pydantic import BaseModelfrom passlib.context import CryptContext# to get a string like this run:# openssl rand -hex 32SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM = "HS256"ACCESS_TOKEN_EXPIRE_MINUTES = 25# 模擬數據庫數據fake_users_db = { "hiashiniu": { "username": "hiashiniu", "full_name": "HaiShiNiu", "email": "haishiniu@bbs.com", "hashed_password": "$2b$12$uG4n5CK4.kCid/MfQ3ns9.uRslWuWr9EL5FR4HYNg6SNo6wEn8OPC", "disabled": False, }}class Token(BaseModel): access_token: str token_type: strclass User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = Noneclass UserInDB(User): hashed_password: strpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")app = FastAPI()# 校驗密碼def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password)# 密碼哈希def get_password_hash(password): # pwd_context.hash('123456789') # '$2b$12$uG4n5CK4.kCid/MfQ3ns9.uRslWuWr9EL5FR4HYNg6SNo6wEn8OPC' return pwd_context.hash(password)# 模擬從數據庫讀取用戶信息def get_user(db, username: str): user_dict = db.get(username,None) if user_dict: return UserInDB(**user_dict)# 用戶信息校驗:username和password分別校驗def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) print(user) if not user: return False if not verify_password(password, user.hashed_password): return False return user# 生成token,帶有過時時間def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() # 有一個實效性 默認的時間25分鐘 if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt@app.post("/token", response_model=Token)async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): # 首先校驗用戶信息 print(form_data.password) print(form_data.username) user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) # 生成並返回token信息 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}
3
請求數據流
終端獲取到 token 信息後,必須在後續請求的 Authorization 頭信息中帶有 Bearer token,才能被容許訪問。咱們添加一個校驗函數,對請求的合法性進行校驗,讀取 token 內容解析並進行驗證。
# -*- encoding: utf-8 -*-async def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception except jwt.PyJWTError as ex: print(ex) raise credentials_exception user = get_user(fake_users_db, username=username) if user is None: raise credentials_exception return user@app.get("/users/me/", response_model=User)async def read_users_me(current_user: User = Depends(get_current_user)): return current_user
4
效果展現
本小節咱們整合上述代碼進行一下效果展現
# -*- encoding: utf-8 -*-from datetime import datetime, timedeltafrom typing import Optionalfrom fastapi import Depends, FastAPI, HTTPException, statusfrom fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestFormimport jwtfrom pydantic import BaseModelfrom passlib.context import CryptContext# to get a string like this run:# openssl rand -hex 32SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM = "HS256"ACCESS_TOKEN_EXPIRE_MINUTES = 25# 模擬數據庫數據fake_users_db = { "hiashiniu": { "username": "hiashiniu", "full_name": "HaiShiNiu", "email": "haishiniu@bbs.com", "hashed_password": "$2b$12$uG4n5CK4.kCid/MfQ3ns9.uRslWuWr9EL5FR4HYNg6SNo6wEn8OPC", "disabled": False, }}class Token(BaseModel): access_token: str token_type: strclass User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = Noneclass UserInDB(User): hashed_password: strpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")app = FastAPI()# 校驗密碼def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password)# 密碼哈希def get_password_hash(password): # pwd_context.hash('123456789') # '$2b$12$uG4n5CK4.kCid/MfQ3ns9.uRslWuWr9EL5FR4HYNg6SNo6wEn8OPC' return pwd_context.hash(password)# 模擬從數據庫讀取用戶信息def get_user(db, username: str): user_dict = db.get(username,None) if user_dict: return UserInDB(**user_dict)# 用戶信息校驗:username和password分別校驗def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return user# 生成token,帶有過時時間def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() # 有一個實效性 默認的時間25分鐘 if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt@app.post("/token", response_model=Token)async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): # 首先校驗用戶信息 print(form_data.password) print(form_data.username) user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) # 生成並返回token信息 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}async def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception except jwt.PyJWTError as ex: print(ex) raise credentials_exception user = get_user(fake_users_db, username=username) if user is None: raise credentials_exception return user@app.get("/users/me/", response_model=User)async def read_users_me(current_user: User = Depends(get_current_user)): return current_user
1.使用命令啓動項目:
uvicorn main:app --reload
2.使用 Postman 模擬登錄獲取驗證 Token
3.使用 Postman 模擬攜帶 Token 獲取正常邏輯信息
5
總結
本節核心:從代碼層面實戰了 登錄認證、請求認證的數據流轉
原創不易,只願能幫助那些須要這些內容的同行或剛入行的小夥伴,你的每次 點贊、分享 都是我繼續創做下去的動力,我但願能在推廣 python 技術的道路上盡我一份力量,歡迎在評論區向我提問,我都會一一解答,記得一鍵三連支持一下哦!
加入python學習交流微信羣,請後臺回覆「入羣」
往期推薦
大型fastapi項目實戰 靠 python 中間件解決方案漲薪了
大型fastapi項目實戰 高併發請求神器之aiohttp(下)
大型fastapi項目實戰 高併發請求神器之aiohttp(上) [建議收藏]
本文分享自微信公衆號 - python編程軍火庫(PythonCoder1024)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。