Files
tradeing/exchange_keys.py
chpark d16456cb92 사용자별 격리 시스템 + 사용자 관리 + 라이브 PnL%
# 사용자별 격리
- JWT 토큰에 uid 추가 (auth.get_uid 헬퍼)
- PostgreSQL — exchange_credentials/automation_config/trades/signal_events 에 user_id BIGINT
- SQLite user_settings 테이블 신설 (글로벌 settings 는 옛 호환)
- 모든 DB 함수 시그니처에 user_id 인자 추가 — 다른 사용자 데이터 절대 접근 불가
- alert_state — 모든 dict key 가 (user_id, ...) tuple 로 계층화
- core_logic alert_loop — 활성 사용자 순회 + 각자 settings/symbol/텔레그램 적용
- ensure_user_defaults() / ensure_user_automation() — 첫 사용 시 자동 시드

# 사용자 관리 (admin only)
- users_db: delete_user / admin_reset_password / set_role
- /api/users POST DELETE PUT password PUT role (본인 강등 / 마지막 admin 보호)
- /admin/users 페이지 — 등록/삭제/role 토글/비번 reset 모달
- 사이드바 adminOnly 필터 — admin role 만 메뉴 노출

# 대시보드 개선
- 모바일 / 범례 토글 (모바일 60 캔들, 데스크톱 200)
- 트레이드 이력: open 트레이드 실시간 PnL% (Binance ticker 호출 + 방향별 계산)
- 메트릭 카드 분리 (실거래 vs 실시간 open)

# 안정성
- api.ts: error.detail array/object 안전 처리 ([object Object] 방지)
- Chart.tsx: Plotly yaxis title 객체 형태 + 모바일 height 동적 조정

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 12:14:23 +09:00

426 lines
16 KiB
Python

"""
거래소 API 키 영속 저장 (PostgreSQL + Fernet 대칭 암호화).
마스터 키는 ENCRYPTION_KEY 환경변수 (또는 /app/data/.encryption_key 자동 생성).
컨테이너 재기동 / 백업 시 마스터 키 분실하면 저장된 API 키들 복호화 불가하므로
.env 에 명시 보관 권장 (docker-compose 의 .env).
테이블: exchange_credentials
id, exchange, label, api_key_enc, api_secret_enc, passphrase_enc,
testnet, enabled, created_at, updated_at
자동매매 설정도 단일 row 로 자동 저장 (settings_db 와 분리: PostgreSQL 일원화).
테이블: automation_config (key, value)
"""
import os
import threading
from typing import List, Dict, Optional, Any
try:
import psycopg2
import psycopg2.extras
HAS_PG = True
except ImportError:
HAS_PG = False
try:
from cryptography.fernet import Fernet, InvalidToken
HAS_CRYPTO = True
except ImportError:
HAS_CRYPTO = False
DATABASE_URL = os.environ.get("DATABASE_URL", "")
KEY_FILE = "/app/data/.encryption_key"
_lock = threading.RLock()
_conn = None
_init_done = False
_fernet: Optional["Fernet"] = None
def _enabled() -> bool:
return HAS_PG and HAS_CRYPTO and bool(DATABASE_URL)
def _get_fernet():
global _fernet
if _fernet is not None:
return _fernet
if not HAS_CRYPTO:
return None
key = os.environ.get("ENCRYPTION_KEY", "").strip()
if not key:
if os.path.exists(KEY_FILE):
with open(KEY_FILE, "rb") as f:
key = f.read().strip().decode()
else:
os.makedirs(os.path.dirname(KEY_FILE), exist_ok=True)
new_key = Fernet.generate_key()
with open(KEY_FILE, "wb") as f:
f.write(new_key)
os.chmod(KEY_FILE, 0o600)
key = new_key.decode()
print(f"[exchange_keys] 마스터 키 자동 생성됨: {KEY_FILE}. .env 의 ENCRYPTION_KEY 로 옮겨 보관 권장.")
_fernet = Fernet(key.encode() if isinstance(key, str) else key)
return _fernet
def _get_conn():
global _conn
if not _enabled():
return None
if _conn is not None:
try:
with _conn.cursor() as cur:
cur.execute("SELECT 1")
return _conn
except Exception:
try: _conn.close()
except: pass
_conn = None
try:
_conn = psycopg2.connect(DATABASE_URL, connect_timeout=5)
_conn.autocommit = True
except Exception as e:
print(f"[exchange_keys] connect 실패: {e}")
_conn = None
return _conn
def init_db():
global _init_done
if _init_done or not _enabled():
return
with _lock:
conn = _get_conn()
if conn is None:
return
try:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS exchange_credentials (
id BIGSERIAL PRIMARY KEY,
exchange TEXT NOT NULL,
label TEXT NOT NULL DEFAULT '',
api_key_enc TEXT NOT NULL,
api_secret_enc TEXT NOT NULL,
passphrase_enc TEXT,
testnet BOOLEAN NOT NULL DEFAULT FALSE,
enabled BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
)
""")
# 사용자별 격리 — user_id 컬럼 추가 (없으면)
cur.execute("ALTER TABLE exchange_credentials ADD COLUMN IF NOT EXISTS user_id BIGINT NOT NULL DEFAULT 1")
cur.execute("CREATE INDEX IF NOT EXISTS idx_excred_user ON exchange_credentials(user_id)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_excred_exchange ON exchange_credentials(exchange)")
cur.execute("""
CREATE TABLE IF NOT EXISTS automation_config (
key TEXT NOT NULL,
value TEXT NOT NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
)
""")
# automation_config 마이그레이션 — 옛 (key PRIMARY KEY) → (user_id, key)
cur.execute("ALTER TABLE automation_config ADD COLUMN IF NOT EXISTS user_id BIGINT NOT NULL DEFAULT 1")
# 옛 PRIMARY KEY 제거 후 (user_id, key) composite PK
cur.execute("""
DO $$ BEGIN
IF EXISTS (SELECT 1 FROM pg_constraint WHERE conname='automation_config_pkey') THEN
ALTER TABLE automation_config DROP CONSTRAINT automation_config_pkey;
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname='automation_config_user_key') THEN
ALTER TABLE automation_config ADD CONSTRAINT automation_config_user_key UNIQUE (user_id, key);
END IF;
END $$;
""")
_seed_automation_default()
_init_done = True
print("[exchange_keys] init OK")
except Exception as e:
print(f"[exchange_keys] init 실패: {e}")
def _seed_automation_default():
"""글로벌 default 시드 — 새 사용자가 처음 자동매매 페이지 열면 ensure_user_automation() 호출."""
pass
AUTOMATION_DEFAULTS = {
"enabled": "0",
"dry_run": "1",
"active_credential": "",
"leverage": "10",
"position_size_pct": "1.0",
"max_open_trades": "3",
"min_signal_score": "1",
"allowed_directions": "long,short",
"tp_pct": "0.0",
}
def ensure_user_automation(user_id: int):
"""사용자가 처음 자동매매 페이지 열 때 default 시드."""
if not _enabled() or not user_id:
return
with _lock:
conn = _get_conn()
if conn is None:
return
try:
with conn.cursor() as cur:
for k, v in AUTOMATION_DEFAULTS.items():
cur.execute(
"INSERT INTO automation_config(user_id, key, value) VALUES (%s, %s, %s) "
"ON CONFLICT (user_id, key) DO NOTHING",
(user_id, k, v),
)
except Exception as e:
print(f"[exchange_keys] ensure_user_automation 실패: {e}")
# ──────────────────────────────────────────────
# Encryption helpers
# ──────────────────────────────────────────────
def _encrypt(plaintext: Optional[str]) -> Optional[str]:
if plaintext is None or plaintext == "":
return None
f = _get_fernet()
if f is None:
return None
return f.encrypt(plaintext.encode()).decode()
def _decrypt(ciphertext: Optional[str]) -> Optional[str]:
if not ciphertext:
return None
f = _get_fernet()
if f is None:
return None
try:
return f.decrypt(ciphertext.encode()).decode()
except InvalidToken:
return None
def _mask(s: Optional[str]) -> str:
if not s:
return ""
if len(s) <= 8:
return "*" * len(s)
return s[:4] + "" + s[-4:]
# ──────────────────────────────────────────────
# Exchange credentials CRUD
# ──────────────────────────────────────────────
SUPPORTED_EXCHANGES = ["binance", "bybit", "okx", "bitget", "upbit", "bithumb"]
def list_credentials(user_id: int) -> List[Dict[str, Any]]:
if not _enabled() or not user_id:
return []
with _lock:
conn = _get_conn()
if conn is None:
return []
try:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"SELECT id, exchange, label, testnet, enabled, created_at, updated_at, "
"api_key_enc, api_secret_enc, passphrase_enc "
"FROM exchange_credentials WHERE user_id=%s ORDER BY id DESC",
(user_id,),
)
rows = cur.fetchall()
for r in rows:
r["api_key_masked"] = _mask(_decrypt(r.pop("api_key_enc", None)))
r["api_secret_masked"] = _mask(_decrypt(r.pop("api_secret_enc", None)))
pp = _decrypt(r.pop("passphrase_enc", None))
r["passphrase_masked"] = _mask(pp) if pp else ""
return rows
except Exception as e:
print(f"[exchange_keys] list_credentials 실패: {e}")
return []
def get_credential(cred_id: int, user_id: int) -> Optional[Dict[str, Any]]:
"""user_id 소유 cred 만 반환 (다른 사용자 키 접근 차단)."""
if not _enabled() or not user_id:
return None
with _lock:
conn = _get_conn()
if conn is None:
return None
try:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT * FROM exchange_credentials WHERE id=%s AND user_id=%s", (cred_id, user_id))
row = cur.fetchone()
if row is None:
return None
row["api_key"] = _decrypt(row.pop("api_key_enc", None))
row["api_secret"] = _decrypt(row.pop("api_secret_enc", None))
row["passphrase"] = _decrypt(row.pop("passphrase_enc", None))
return row
except Exception as e:
print(f"[exchange_keys] get_credential 실패: {e}")
return None
def add_credential(user_id: int, exchange: str, label: str, api_key: str, api_secret: str,
passphrase: Optional[str] = None, testnet: bool = False, enabled: bool = True) -> Optional[int]:
if not _enabled() or not user_id:
return None
if not api_key or not api_secret:
return None
with _lock:
conn = _get_conn()
if conn is None:
return None
try:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO exchange_credentials(user_id, exchange, label, api_key_enc, api_secret_enc, "
"passphrase_enc, testnet, enabled) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) RETURNING id",
(
user_id, exchange, label or "",
_encrypt(api_key), _encrypt(api_secret), _encrypt(passphrase),
bool(testnet), bool(enabled),
),
)
return cur.fetchone()[0]
except Exception as e:
print(f"[exchange_keys] add_credential 실패: {e}")
return None
def update_credential(cred_id: int, user_id: int, **fields) -> bool:
if not _enabled() or not user_id or not fields:
return False
set_parts = []
values = []
for k, v in fields.items():
if k in ("api_key", "api_secret", "passphrase"):
set_parts.append(f"{k}_enc=%s")
values.append(_encrypt(v) if v else None)
elif k in ("exchange", "label"):
set_parts.append(f"{k}=%s")
values.append(v)
elif k in ("testnet", "enabled"):
set_parts.append(f"{k}=%s")
values.append(bool(v))
if not set_parts:
return False
set_parts.append("updated_at=now()")
values.extend([cred_id, user_id])
with _lock:
conn = _get_conn()
if conn is None:
return False
try:
with conn.cursor() as cur:
cur.execute(
f"UPDATE exchange_credentials SET {', '.join(set_parts)} WHERE id=%s AND user_id=%s",
tuple(values),
)
return cur.rowcount > 0
except Exception as e:
print(f"[exchange_keys] update_credential 실패: {e}")
return False
def delete_credential(cred_id: int, user_id: int) -> bool:
if not _enabled() or not user_id:
return False
with _lock:
conn = _get_conn()
if conn is None:
return False
try:
with conn.cursor() as cur:
cur.execute("DELETE FROM exchange_credentials WHERE id=%s AND user_id=%s", (cred_id, user_id))
return cur.rowcount > 0
except Exception as e:
print(f"[exchange_keys] delete_credential 실패: {e}")
return False
# ──────────────────────────────────────────────
# Automation config
# ──────────────────────────────────────────────
def automation_get(key: str, user_id: int, default: str = "") -> str:
if not _enabled() or not user_id:
return AUTOMATION_DEFAULTS.get(key, default)
with _lock:
conn = _get_conn()
if conn is None:
return AUTOMATION_DEFAULTS.get(key, default)
try:
with conn.cursor() as cur:
cur.execute("SELECT value FROM automation_config WHERE user_id=%s AND key=%s", (user_id, key))
row = cur.fetchone()
if row is None:
return AUTOMATION_DEFAULTS.get(key, default)
return row[0]
except Exception as e:
print(f"[exchange_keys] automation_get 실패: {e}")
return AUTOMATION_DEFAULTS.get(key, default)
def automation_set(key: str, value: Any, user_id: int) -> bool:
if not _enabled() or not user_id:
return False
with _lock:
conn = _get_conn()
if conn is None:
return False
try:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO automation_config(user_id, key, value) VALUES (%s, %s, %s) "
"ON CONFLICT (user_id, key) DO UPDATE SET value=EXCLUDED.value, updated_at=now()",
(user_id, key, str(value)),
)
return True
except Exception as e:
print(f"[exchange_keys] automation_set 실패: {e}")
return False
def automation_all(user_id: int) -> Dict[str, str]:
if not _enabled() or not user_id:
return dict(AUTOMATION_DEFAULTS)
ensure_user_automation(user_id)
with _lock:
conn = _get_conn()
if conn is None:
return dict(AUTOMATION_DEFAULTS)
try:
with conn.cursor() as cur:
cur.execute("SELECT key, value FROM automation_config WHERE user_id=%s", (user_id,))
rows = cur.fetchall()
d = dict(AUTOMATION_DEFAULTS)
d.update(dict(rows))
return d
except Exception as e:
print(f"[exchange_keys] automation_all 실패: {e}")
return dict(AUTOMATION_DEFAULTS)
def list_users_with_auto_enabled() -> List[int]:
"""자동매매 활성 사용자 ID 리스트 (자동매매 워커용)."""
if not _enabled():
return []
with _lock:
conn = _get_conn()
if conn is None:
return []
try:
with conn.cursor() as cur:
cur.execute("SELECT user_id FROM automation_config WHERE key='enabled' AND value='1'")
return [r[0] for r in cur.fetchall()]
except Exception as e:
print(f"[exchange_keys] list_users_with_auto_enabled 실패: {e}")
return []