驗證與加密(第三篇)
cmd
pip install python_jwt
py
from typing import List
import fastapi as _fastapi
import fastapi.security as _security
import sqlalchemy.orm as _orm
import services as _services, schemas as _schemas
app = _fastapi.FastAPI()
@app.post("/api/users")
async def create_user(
user: _schemas.UserCreate, db: _orm.Session = _fastapi.Depends(_services.get_db)
):
db_user = await _services.get_user_by_email(user.email, db)
if db_user:
raise _fastapi.HTTPException(status_code=400, detail="Email already in use")
user = await _services.create_user(user, db)
return await _services.create_token(user)
@app.post("/api/token")
async def generate_token(
form_data: _security.OAuth2PasswordRequestForm = _fastapi.Depends(),
db: _orm.Session = _fastapi.Depends(_services.get_db),
):
user = await _services.authenticate_user(form_data.username, form_data.password, db)
if not user:
raise _fastapi.HTTPException(status_code=401, detail="Invalid Credentials")
return await _services.create_token(user)
@app.get("/api/users/myprofile", response_model=_schemas.User)
async def get_user(user: _schemas.User = _fastapi.Depends(_services.get_current_user)):
return user
py
# 安全性與資料庫連接
import fastapi as _fastapi
import fastapi.security as _security
import jwt as _jwt
import datetime as _dt
import sqlalchemy.orm as _orm
import passlib.hash as _hash
# 引入資料庫、資料表、驗證設定
import database as _database, models as _models, schemas as _schemas
oauth2schema = _security.OAuth2PasswordBearer(tokenUrl="/api/token")
# 預設字串,會放在密碼後面
JWT_SECRET = "evancodeaddoutside"
def create_database():
return _database.Base.metadata.create_all(bind=_database.engine)
def get_db():
db = _database.SessionLocal()
try:
yield db
finally:
db.close()
async def get_user_by_email(email: str, db: _orm.Session):
return db.query(_models.User).filter(_models.User.email == email).first()
async def create_user(user: _schemas.UserCreate, db: _orm.Session):
user_obj = _models.User(
email=user.email, hashed_password=_hash.bcrypt.hash(user.hashed_password)
)
db.add(user_obj)
db.commit()
db.refresh(user_obj)
return user_obj
async def authenticate_user(email: str, password: str, db: _orm.Session):
user = await get_user_by_email(db=db, email=email)
if not user:
return False
if not user.verify_password(password):
return False
return user
async def create_token(user: _models.User):
user_obj = _schemas.User.from_orm(user)
token = _jwt.encode(user_obj.dict(), JWT_SECRET)
return dict(access_token=token, token_type="bearer")
async def get_current_user(
db: _orm.Session = _fastapi.Depends(get_db),
token: str = _fastapi.Depends(oauth2schema),
):
try:
payload = _jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
user = db.query(_models.User).get(payload["id"])
except:
raise _fastapi.HTTPException(
status_code=401, detail="Invalid Email or Password"
)
return _schemas.User.from_orm(user)
py
# 資料庫連線
import sqlalchemy as _sql
import sqlalchemy.ext.declarative as _declarative
import sqlalchemy.orm as _orm
DATABASE_URL = "sqlite:///./database.db"
engine = _sql.create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = _orm.sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = _declarative.declarative_base()
py
# 資料表模型
import datetime as _dt
import sqlalchemy as _sql
import sqlalchemy.orm as _orm
import passlib.hash as _hash
import database as _database
class User(_database.Base):
__tablename__ = "users"
id = _sql.Column(_sql.Integer, primary_key=True, index=True)
email = _sql.Column(_sql.String, unique=True, index=True)
hashed_password = _sql.Column(_sql.String)
def verify_password(self, password: str):
return _hash.bcrypt.verify(password, self.hashed_password)
py
# 資料驗證設定
import datetime as _dt
import pydantic as _pydantic
class _UserBase(_pydantic.BaseModel):
email: str
class UserCreate(_UserBase):
hashed_password: str
class Config:
orm_mode = True
class User(_UserBase):
id: int
class Config:
orm_mode = True