"""JWT 인증.""" import os from datetime import datetime, timedelta from typing import Optional from fastapi import HTTPException, status, Depends from fastapi.security import OAuth2PasswordBearer from jose import jwt, JWTError import users_db JWT_SECRET = os.environ.get("JWT_SECRET", "change-me-in-production-please") JWT_ALG = "HS256" JWT_EXP_HOURS = 24 * 7 oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login", auto_error=False) def create_token(user_id: int, username: str, role: str) -> str: payload = { "sub": username, "uid": int(user_id), "role": role, "exp": datetime.utcnow() + timedelta(hours=JWT_EXP_HOURS), "iat": datetime.utcnow(), } return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALG) def get_uid(payload: dict) -> int: """JWT payload 에서 user_id 추출. 옛 토큰 호환 — uid 없으면 username 으로 lookup.""" if "uid" in payload: return int(payload["uid"]) # 옛 토큰 fallback try: import users_db username = payload.get("sub") for u in users_db.list_users(): if u.get("username") == username: return int(u["id"]) except Exception: pass return 0 def decode_token(token: str) -> Optional[dict]: try: return jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALG]) except JWTError: return None def require_user(token: Optional[str] = Depends(oauth2_scheme)) -> dict: if not token: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="not authenticated") payload = decode_token(token) if not payload: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="invalid token") return payload def require_admin(payload: dict = Depends(require_user)) -> dict: if payload.get("role") != "admin": raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="admin only") return payload