Охватываемые темы: Installation, Configuration, Password and Token Utilities, Token Endpoint.
Installation
pip install python-jose[cryptography] passlib[bcrypt] python-multipart
Configuration
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token")
Password and Token Utilities
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode["exp"] = expire
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
Token Endpoint
@router.post("/token")
def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_session),
):
user = db.exec(select(User).where(User.username == form_data.username)).first()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(status_code=401, detail="Invalid credentials")
access_token = create_access_token(
data={"sub": str(user.id)},
expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
)
return {"access_token": access_token, "token_type": "bearer"}
Current User Dependency
def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_session),
) -> User:
credentials_exception = HTTPException(status_code=401, detail="Invalid token")
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = db.get(User, int(user_id))
if user is None:
raise credentials_exception
return user
@app.get("/tasks/")
def list_tasks(current_user: User = Depends(get_current_user)):
return db.query(Task).filter(Task.owner_id == current_user.id).all()
💬 Comments (0)
No comments yet
Be the first to share your opinion about this article!