驗證與加密(第一篇)
安全性驗證對前後端取用資料非常重要,第一篇會先導讀官方的方式。
環境需求
- python-jose 加密編碼
- passlib
cmd
pip install python-jose passlib
先前預備
在加密前需要先取得一組密碼,官方使用openssl來亂數編譯一串hex 32格式的密碼。
cmd
openssl rand -hex 32
會產出下方一串亂數。
cmd
> 09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7
開始編譯
首先宣告一串假的帳號及密碼,通常這些是放在資料庫中。
py
# main.py
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
}
在來將剛剛得到的一串亂數宣告,採取JWT HS256格式編譯。
py
# main.py
from jose import JWTError, jwt
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
接著採BaseModel
要驗證輸入的格式是否符合。
py
# main.py
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
官方在此處使用OAutha2來處理加密。
py
# main.py
from passlib.context import CryptContext
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
驗證完格式正確後,接續宣告一個函式來與內部帳密(fake_users_db)確認,此時會以編譯的方式進行。
py
# main.py
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)]
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
最後當然要放到路由上。
py
# main.py
from typing import Annotated
@app.get("/users/me/", response_model=User)
async def read_users_me(
current_user: Annotated[User, Depends(get_current_active_user)]
):
return current_user
看完官方文件後,我有點頭暈了orz,下篇我使用網路教學的影片,來製作另一個比較容易理解的方式。