""" 사용자 인증 (PostgreSQL + bcrypt). - users 테이블: id / username (unique) / password_hash / role / created_at / last_login_at - 초기 기동 시 admin 계정 자동 시드 (없는 경우만). 비밀번호는 환경변수 ADMIN_INIT_PASSWORD 로 override 가능. 미설정 시 'Adm!n2026!' 디폴트 (사용자가 첫 로그인 후 변경 권장). """ import os import threading from datetime import datetime from typing import Optional, Dict, Any try: import psycopg2 import psycopg2.extras HAS_PG = True except ImportError: HAS_PG = False try: import bcrypt HAS_BCRYPT = True except ImportError: HAS_BCRYPT = False DATABASE_URL = os.environ.get("DATABASE_URL", "") DEFAULT_ADMIN_USERNAME = "admin" DEFAULT_ADMIN_PASSWORD = os.environ.get("ADMIN_INIT_PASSWORD", "Adm!n2026!") _lock = threading.RLock() _conn = None _init_done = False def _enabled() -> bool: return HAS_PG and HAS_BCRYPT and bool(DATABASE_URL) def _get_conn(): global _conn if not _enabled(): return None if _conn is not None: try: with _conn.cursor() as cur: cur.execute("SELECT 1") return _conn except Exception: try: _conn.close() except: pass _conn = None try: _conn = psycopg2.connect(DATABASE_URL, connect_timeout=5) _conn.autocommit = True except Exception as e: print(f"[users_db] connect 실패: {e}") _conn = None return _conn def _hash(password: str) -> str: return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt(rounds=10)).decode("utf-8") def _check(password: str, hashed: str) -> bool: try: return bcrypt.checkpw(password.encode("utf-8"), hashed.encode("utf-8")) except Exception: return False def init_db(): global _init_done if _init_done or not _enabled(): return with _lock: conn = _get_conn() if conn is None: return try: with conn.cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS users ( id BIGSERIAL PRIMARY KEY, username TEXT NOT NULL UNIQUE, password_hash TEXT NOT NULL, role TEXT NOT NULL DEFAULT 'user', created_at TIMESTAMPTZ NOT NULL DEFAULT now(), last_login_at TIMESTAMPTZ ) """) cur.execute("SELECT count(*) FROM users") count = cur.fetchone()[0] if count == 0: cur.execute( "INSERT INTO users(username, password_hash, role) VALUES (%s, %s, 'admin')", (DEFAULT_ADMIN_USERNAME, _hash(DEFAULT_ADMIN_PASSWORD)), ) print(f"[users_db] 초기 admin 시드 → username={DEFAULT_ADMIN_USERNAME} password={DEFAULT_ADMIN_PASSWORD}") _init_done = True print("[users_db] init OK") except Exception as e: print(f"[users_db] init 실패: {e}") def authenticate(username: str, password: str) -> Optional[Dict[str, Any]]: """성공 시 user dict (password_hash 제외) 반환, 실패 시 None.""" if not _enabled() or not username or not password: return None with _lock: conn = _get_conn() if conn is None: return None try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SELECT * FROM users WHERE username=%s", (username,)) row = cur.fetchone() if row is None: return None if not _check(password, row["password_hash"]): return None cur.execute("UPDATE users SET last_login_at=now() WHERE id=%s", (row["id"],)) row.pop("password_hash", None) return dict(row) except Exception as e: print(f"[users_db] authenticate 실패: {e}") return None def change_password(username: str, old_password: str, new_password: str) -> bool: if not _enabled() or len(new_password) < 6: return False user = authenticate(username, old_password) if user is None: return False with _lock: conn = _get_conn() if conn is None: return False try: with conn.cursor() as cur: cur.execute( "UPDATE users SET password_hash=%s WHERE username=%s", (_hash(new_password), username), ) return cur.rowcount > 0 except Exception as e: print(f"[users_db] change_password 실패: {e}") return False def create_user(username: str, password: str, role: str = "user") -> Optional[int]: if not _enabled() or not username or not password: return None with _lock: conn = _get_conn() if conn is None: return None try: with conn.cursor() as cur: cur.execute( "INSERT INTO users(username, password_hash, role) VALUES (%s, %s, %s) RETURNING id", (username, _hash(password), role), ) return cur.fetchone()[0] except Exception as e: print(f"[users_db] create_user 실패: {e}") return None def list_users(): if not _enabled(): return [] with _lock: conn = _get_conn() if conn is None: return [] try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SELECT id, username, role, created_at, last_login_at FROM users ORDER BY id") return cur.fetchall() except Exception as e: print(f"[users_db] list_users 실패: {e}") return []