Skip to content
On this page

驗證與加密(第一篇)

安全性驗證對前後端取用資料非常重要,第一篇會先導讀官方的方式。

環境需求

  • python-jose 加密編碼
  • passlib
cmd
pip install python-jose passlib

先前預備

在加密前需要先取得一組密碼,官方使用openssl來亂數編譯一串hex 32格式的密碼。

cmd
openssl rand -hex 32

會產出下方一串亂數。

cmd
> 09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7

開始編譯

首先宣告一串假的帳號及密碼,通常這些是放在資料庫中。

py
# main.py
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
}

在來將剛剛得到的一串亂數宣告,採取JWT HS256格式編譯。

py
# main.py
from jose import JWTError, jwt

SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

接著採BaseModel要驗證輸入的格式是否符合。

py
# main.py
from pydantic import BaseModel

class Token(BaseModel):
    access_token: str
    token_type: str

class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None

官方在此處使用OAutha2來處理加密。

py
# main.py
from passlib.context import CryptContext
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

驗證完格式正確後,接續宣告一個函式來與內部帳密(fake_users_db)確認,此時會以編譯的方式進行。

py
# main.py
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user


async def get_current_active_user(
    current_user: Annotated[User, Depends(get_current_user)]
):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

最後當然要放到路由上。

py
# main.py
from typing import Annotated

@app.get("/users/me/", response_model=User)
async def read_users_me(
    current_user: Annotated[User, Depends(get_current_active_user)]
):
    return current_user

看完官方文件後,我有點頭暈了orz,下篇我使用網路教學的影片,來製作另一個比較容易理解的方式。


參考: