""" 거래소 API 키 영속 저장 (PostgreSQL + Fernet 대칭 암호화). 마스터 키는 ENCRYPTION_KEY 환경변수 (또는 /app/data/.encryption_key 자동 생성). 컨테이너 재기동 / 백업 시 마스터 키 분실하면 저장된 API 키들 복호화 불가하므로 .env 에 명시 보관 권장 (docker-compose 의 .env). 테이블: exchange_credentials id, exchange, label, api_key_enc, api_secret_enc, passphrase_enc, testnet, enabled, created_at, updated_at 자동매매 설정도 단일 row 로 자동 저장 (settings_db 와 분리: PostgreSQL 일원화). 테이블: automation_config (key, value) """ import os import threading from typing import List, Dict, Optional, Any try: import psycopg2 import psycopg2.extras HAS_PG = True except ImportError: HAS_PG = False try: from cryptography.fernet import Fernet, InvalidToken HAS_CRYPTO = True except ImportError: HAS_CRYPTO = False DATABASE_URL = os.environ.get("DATABASE_URL", "") KEY_FILE = "/app/data/.encryption_key" _lock = threading.RLock() _conn = None _init_done = False _fernet: Optional["Fernet"] = None def _enabled() -> bool: return HAS_PG and HAS_CRYPTO and bool(DATABASE_URL) def _get_fernet(): global _fernet if _fernet is not None: return _fernet if not HAS_CRYPTO: return None key = os.environ.get("ENCRYPTION_KEY", "").strip() if not key: if os.path.exists(KEY_FILE): with open(KEY_FILE, "rb") as f: key = f.read().strip().decode() else: os.makedirs(os.path.dirname(KEY_FILE), exist_ok=True) new_key = Fernet.generate_key() with open(KEY_FILE, "wb") as f: f.write(new_key) os.chmod(KEY_FILE, 0o600) key = new_key.decode() print(f"[exchange_keys] 마스터 키 자동 생성됨: {KEY_FILE}. .env 의 ENCRYPTION_KEY 로 옮겨 보관 권장.") _fernet = Fernet(key.encode() if isinstance(key, str) else key) return _fernet def _get_conn(): global _conn if not _enabled(): return None if _conn is not None: try: with _conn.cursor() as cur: cur.execute("SELECT 1") return _conn except Exception: try: _conn.close() except: pass _conn = None try: _conn = psycopg2.connect(DATABASE_URL, connect_timeout=5) _conn.autocommit = True except Exception as e: print(f"[exchange_keys] connect 실패: {e}") _conn = None return _conn def init_db(): global _init_done if _init_done or not _enabled(): return with _lock: conn = _get_conn() if conn is None: return try: with conn.cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS exchange_credentials ( id BIGSERIAL PRIMARY KEY, exchange TEXT NOT NULL, label TEXT NOT NULL DEFAULT '', api_key_enc TEXT NOT NULL, api_secret_enc TEXT NOT NULL, passphrase_enc TEXT, testnet BOOLEAN NOT NULL DEFAULT FALSE, enabled BOOLEAN NOT NULL DEFAULT TRUE, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ) """) # 사용자별 격리 — user_id 컬럼 추가 (없으면) cur.execute("ALTER TABLE exchange_credentials ADD COLUMN IF NOT EXISTS user_id BIGINT NOT NULL DEFAULT 1") cur.execute("CREATE INDEX IF NOT EXISTS idx_excred_user ON exchange_credentials(user_id)") cur.execute("CREATE INDEX IF NOT EXISTS idx_excred_exchange ON exchange_credentials(exchange)") cur.execute(""" CREATE TABLE IF NOT EXISTS automation_config ( key TEXT NOT NULL, value TEXT NOT NULL, updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ) """) # automation_config 마이그레이션 — 옛 (key PRIMARY KEY) → (user_id, key) cur.execute("ALTER TABLE automation_config ADD COLUMN IF NOT EXISTS user_id BIGINT NOT NULL DEFAULT 1") # 옛 PRIMARY KEY 제거 후 (user_id, key) composite PK cur.execute(""" DO $$ BEGIN IF EXISTS (SELECT 1 FROM pg_constraint WHERE conname='automation_config_pkey') THEN ALTER TABLE automation_config DROP CONSTRAINT automation_config_pkey; END IF; IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname='automation_config_user_key') THEN ALTER TABLE automation_config ADD CONSTRAINT automation_config_user_key UNIQUE (user_id, key); END IF; END $$; """) _seed_automation_default() _init_done = True print("[exchange_keys] init OK") except Exception as e: print(f"[exchange_keys] init 실패: {e}") def _seed_automation_default(): """글로벌 default 시드 — 새 사용자가 처음 자동매매 페이지 열면 ensure_user_automation() 호출.""" pass AUTOMATION_DEFAULTS = { "enabled": "0", "dry_run": "1", "active_credential": "", "leverage": "10", "position_size_pct": "1.0", "max_open_trades": "3", "min_signal_score": "1", "allowed_directions": "long,short", "tp_pct": "0.0", } def ensure_user_automation(user_id: int): """사용자가 처음 자동매매 페이지 열 때 default 시드.""" if not _enabled() or not user_id: return with _lock: conn = _get_conn() if conn is None: return try: with conn.cursor() as cur: for k, v in AUTOMATION_DEFAULTS.items(): cur.execute( "INSERT INTO automation_config(user_id, key, value) VALUES (%s, %s, %s) " "ON CONFLICT (user_id, key) DO NOTHING", (user_id, k, v), ) except Exception as e: print(f"[exchange_keys] ensure_user_automation 실패: {e}") # ────────────────────────────────────────────── # Encryption helpers # ────────────────────────────────────────────── def _encrypt(plaintext: Optional[str]) -> Optional[str]: if plaintext is None or plaintext == "": return None f = _get_fernet() if f is None: return None return f.encrypt(plaintext.encode()).decode() def _decrypt(ciphertext: Optional[str]) -> Optional[str]: if not ciphertext: return None f = _get_fernet() if f is None: return None try: return f.decrypt(ciphertext.encode()).decode() except InvalidToken: return None def _mask(s: Optional[str]) -> str: if not s: return "" if len(s) <= 8: return "*" * len(s) return s[:4] + "…" + s[-4:] # ────────────────────────────────────────────── # Exchange credentials CRUD # ────────────────────────────────────────────── SUPPORTED_EXCHANGES = ["binance", "bybit", "okx", "bitget", "upbit", "bithumb"] def list_credentials(user_id: int) -> List[Dict[str, Any]]: if not _enabled() or not user_id: return [] with _lock: conn = _get_conn() if conn is None: return [] try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute( "SELECT id, exchange, label, testnet, enabled, created_at, updated_at, " "api_key_enc, api_secret_enc, passphrase_enc " "FROM exchange_credentials WHERE user_id=%s ORDER BY id DESC", (user_id,), ) rows = cur.fetchall() for r in rows: r["api_key_masked"] = _mask(_decrypt(r.pop("api_key_enc", None))) r["api_secret_masked"] = _mask(_decrypt(r.pop("api_secret_enc", None))) pp = _decrypt(r.pop("passphrase_enc", None)) r["passphrase_masked"] = _mask(pp) if pp else "" return rows except Exception as e: print(f"[exchange_keys] list_credentials 실패: {e}") return [] def get_credential(cred_id: int, user_id: int) -> Optional[Dict[str, Any]]: """user_id 소유 cred 만 반환 (다른 사용자 키 접근 차단).""" if not _enabled() or not user_id: return None with _lock: conn = _get_conn() if conn is None: return None try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SELECT * FROM exchange_credentials WHERE id=%s AND user_id=%s", (cred_id, user_id)) row = cur.fetchone() if row is None: return None row["api_key"] = _decrypt(row.pop("api_key_enc", None)) row["api_secret"] = _decrypt(row.pop("api_secret_enc", None)) row["passphrase"] = _decrypt(row.pop("passphrase_enc", None)) return row except Exception as e: print(f"[exchange_keys] get_credential 실패: {e}") return None def add_credential(user_id: int, exchange: str, label: str, api_key: str, api_secret: str, passphrase: Optional[str] = None, testnet: bool = False, enabled: bool = True) -> Optional[int]: if not _enabled() or not user_id: return None if not api_key or not api_secret: return None with _lock: conn = _get_conn() if conn is None: return None try: with conn.cursor() as cur: cur.execute( "INSERT INTO exchange_credentials(user_id, exchange, label, api_key_enc, api_secret_enc, " "passphrase_enc, testnet, enabled) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) RETURNING id", ( user_id, exchange, label or "", _encrypt(api_key), _encrypt(api_secret), _encrypt(passphrase), bool(testnet), bool(enabled), ), ) return cur.fetchone()[0] except Exception as e: print(f"[exchange_keys] add_credential 실패: {e}") return None def update_credential(cred_id: int, user_id: int, **fields) -> bool: if not _enabled() or not user_id or not fields: return False set_parts = [] values = [] for k, v in fields.items(): if k in ("api_key", "api_secret", "passphrase"): set_parts.append(f"{k}_enc=%s") values.append(_encrypt(v) if v else None) elif k in ("exchange", "label"): set_parts.append(f"{k}=%s") values.append(v) elif k in ("testnet", "enabled"): set_parts.append(f"{k}=%s") values.append(bool(v)) if not set_parts: return False set_parts.append("updated_at=now()") values.extend([cred_id, user_id]) with _lock: conn = _get_conn() if conn is None: return False try: with conn.cursor() as cur: cur.execute( f"UPDATE exchange_credentials SET {', '.join(set_parts)} WHERE id=%s AND user_id=%s", tuple(values), ) return cur.rowcount > 0 except Exception as e: print(f"[exchange_keys] update_credential 실패: {e}") return False def delete_credential(cred_id: int, user_id: int) -> bool: if not _enabled() or not user_id: return False with _lock: conn = _get_conn() if conn is None: return False try: with conn.cursor() as cur: cur.execute("DELETE FROM exchange_credentials WHERE id=%s AND user_id=%s", (cred_id, user_id)) return cur.rowcount > 0 except Exception as e: print(f"[exchange_keys] delete_credential 실패: {e}") return False # ────────────────────────────────────────────── # Automation config # ────────────────────────────────────────────── def automation_get(key: str, user_id: int, default: str = "") -> str: if not _enabled() or not user_id: return AUTOMATION_DEFAULTS.get(key, default) with _lock: conn = _get_conn() if conn is None: return AUTOMATION_DEFAULTS.get(key, default) try: with conn.cursor() as cur: cur.execute("SELECT value FROM automation_config WHERE user_id=%s AND key=%s", (user_id, key)) row = cur.fetchone() if row is None: return AUTOMATION_DEFAULTS.get(key, default) return row[0] except Exception as e: print(f"[exchange_keys] automation_get 실패: {e}") return AUTOMATION_DEFAULTS.get(key, default) def automation_set(key: str, value: Any, user_id: int) -> bool: if not _enabled() or not user_id: return False with _lock: conn = _get_conn() if conn is None: return False try: with conn.cursor() as cur: cur.execute( "INSERT INTO automation_config(user_id, key, value) VALUES (%s, %s, %s) " "ON CONFLICT (user_id, key) DO UPDATE SET value=EXCLUDED.value, updated_at=now()", (user_id, key, str(value)), ) return True except Exception as e: print(f"[exchange_keys] automation_set 실패: {e}") return False def automation_all(user_id: int) -> Dict[str, str]: if not _enabled() or not user_id: return dict(AUTOMATION_DEFAULTS) ensure_user_automation(user_id) with _lock: conn = _get_conn() if conn is None: return dict(AUTOMATION_DEFAULTS) try: with conn.cursor() as cur: cur.execute("SELECT key, value FROM automation_config WHERE user_id=%s", (user_id,)) rows = cur.fetchall() d = dict(AUTOMATION_DEFAULTS) d.update(dict(rows)) return d except Exception as e: print(f"[exchange_keys] automation_all 실패: {e}") return dict(AUTOMATION_DEFAULTS) def list_users_with_auto_enabled() -> List[int]: """자동매매 활성 사용자 ID 리스트 (자동매매 워커용).""" if not _enabled(): return [] with _lock: conn = _get_conn() if conn is None: return [] try: with conn.cursor() as cur: cur.execute("SELECT user_id FROM automation_config WHERE key='enabled' AND value='1'") return [r[0] for r in cur.fetchall()] except Exception as e: print(f"[exchange_keys] list_users_with_auto_enabled 실패: {e}") return []