Skip to content
On this page

驗證與加密(第三篇)

cmd
pip install python_jwt
py
from typing import List
import fastapi as _fastapi
import fastapi.security as _security
 
import sqlalchemy.orm as _orm
import services as _services, schemas as _schemas
 
app = _fastapi.FastAPI()
 
@app.post("/api/users")
async def create_user(
    user: _schemas.UserCreate, db: _orm.Session = _fastapi.Depends(_services.get_db)
):
    db_user = await _services.get_user_by_email(user.email, db)
    if db_user:
        raise _fastapi.HTTPException(status_code=400, detail="Email already in use")
        user = await _services.create_user(user, db)
    return await _services.create_token(user)
 
@app.post("/api/token")
async def generate_token(
    form_data: _security.OAuth2PasswordRequestForm = _fastapi.Depends(),
    db: _orm.Session = _fastapi.Depends(_services.get_db),
):
    user = await _services.authenticate_user(form_data.username, form_data.password, db)
 
    if not user:
        raise _fastapi.HTTPException(status_code=401, detail="Invalid Credentials")
 
    return await _services.create_token(user)
 
@app.get("/api/users/myprofile", response_model=_schemas.User)
async def get_user(user: _schemas.User = _fastapi.Depends(_services.get_current_user)):
    return user
py
# 安全性與資料庫連接
import fastapi as _fastapi
import fastapi.security as _security
import jwt as _jwt
import datetime as _dt
import sqlalchemy.orm as _orm
import passlib.hash as _hash
# 引入資料庫、資料表、驗證設定
import database as _database, models as _models, schemas as _schemas

oauth2schema = _security.OAuth2PasswordBearer(tokenUrl="/api/token")
# 預設字串,會放在密碼後面
JWT_SECRET = "evancodeaddoutside"
 
def create_database():
    return _database.Base.metadata.create_all(bind=_database.engine)
 
def get_db():
    db = _database.SessionLocal()
    try:
        yield db
    finally:
        db.close()
 
async def get_user_by_email(email: str, db: _orm.Session):
    return db.query(_models.User).filter(_models.User.email == email).first()
 
async def create_user(user: _schemas.UserCreate, db: _orm.Session):
    user_obj = _models.User(
        email=user.email, hashed_password=_hash.bcrypt.hash(user.hashed_password)
    )
    db.add(user_obj)
    db.commit()
    db.refresh(user_obj)
    return user_obj
 
async def authenticate_user(email: str, password: str, db: _orm.Session):
    user = await get_user_by_email(db=db, email=email)
 
    if not user:
        return False
    if not user.verify_password(password):
        return False
    return user
 
 
async def create_token(user: _models.User):
    user_obj = _schemas.User.from_orm(user)
    token = _jwt.encode(user_obj.dict(), JWT_SECRET)
    return dict(access_token=token, token_type="bearer")
 
async def get_current_user(
    db: _orm.Session = _fastapi.Depends(get_db),
    token: str = _fastapi.Depends(oauth2schema),
):
    try:
        payload = _jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
        user = db.query(_models.User).get(payload["id"])
    except:
        raise _fastapi.HTTPException(
            status_code=401, detail="Invalid Email or Password"
        ) 
    return _schemas.User.from_orm(user)
py
# 資料庫連線
import sqlalchemy as _sql
import sqlalchemy.ext.declarative as _declarative
import sqlalchemy.orm as _orm
 
DATABASE_URL = "sqlite:///./database.db"
 
engine = _sql.create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
 
SessionLocal = _orm.sessionmaker(autocommit=False, autoflush=False, bind=engine)
 
Base = _declarative.declarative_base()
py
# 資料表模型
import datetime as _dt
import sqlalchemy as _sql
import sqlalchemy.orm as _orm
import passlib.hash as _hash
 
import database as _database
 
class User(_database.Base):
    __tablename__ = "users"
    id = _sql.Column(_sql.Integer, primary_key=True, index=True)
    email = _sql.Column(_sql.String, unique=True, index=True)
    hashed_password = _sql.Column(_sql.String)
 
    def verify_password(self, password: str):
        return _hash.bcrypt.verify(password, self.hashed_password)
py
# 資料驗證設定
import datetime as _dt
 
import pydantic as _pydantic
 
class _UserBase(_pydantic.BaseModel):
    email: str
 
class UserCreate(_UserBase):
    hashed_password: str
 
    class Config:
        orm_mode = True
 
class User(_UserBase):
    id: int
 
    class Config:
        orm_mode = True

參考