Files
tradeing/trades_db.py
T
chpark c4e6aab7b2 React + FastAPI 풀 마이그레이션 — Streamlit 제거
- backend/ — FastAPI + JWT + 모든 REST 엔드포인트
- frontend/ — Next.js 14 + Tailwind + 7페이지 (대시보드/트레이드/거래소/자동매매/설정/내정보/로그인)
- core_logic.py — 신호계산/알림 로직 분리 (기존 app_streamlit.py 에서 추출)
- users_db.py + bcrypt 인증, exchange_keys.py + Fernet 암호화
- trades_db.py — 진입/청산 lifecycle 추적, signal_events raw 로그
- settings_db.py — 모든 운영 파라미터 DB 영속 저장 (RSI/거래량/펀딩비 임계값 포함)
- docker-compose: frontend / backend / postgres + Traefik 라우팅
- assets/logo.svg — JUNGGOMOA 그라디언트 로고

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 17:27:11 +09:00

245 lines
9.1 KiB
Python

"""
PostgreSQL 기반 트레이드 이력 저장.
DATABASE_URL 환경변수 미설정 시 전체 silent no-op (로컬 개발에서 streamlit run
직접 실행해도 에러 안 남). 도커 환경에선 docker-compose 가 자동 주입.
- trades: 진입 → 청산 lifecycle. status = 'open' / 'stopped' / 'reversal' / 'cancelled'
- signal_events: 발사된 모든 알림 raw log (디버그/통계용)
스레드 안전: 모든 ops 가 _lock 으로 보호. 알림 스레드 + Streamlit UI 동시 접근.
"""
import os
import threading
from datetime import datetime, timezone
from typing import Optional, List, Dict, Any
try:
import psycopg2
import psycopg2.extras
HAS_PG = True
except ImportError:
HAS_PG = False
DATABASE_URL = os.environ.get("DATABASE_URL", "")
_lock = threading.RLock()
_conn = None
_init_done = False
def _enabled() -> bool:
return HAS_PG and bool(DATABASE_URL)
def _get_conn():
global _conn
if not _enabled():
return None
if _conn is not None:
try:
with _conn.cursor() as cur:
cur.execute("SELECT 1")
return _conn
except Exception:
try:
_conn.close()
except Exception:
pass
_conn = None
try:
_conn = psycopg2.connect(DATABASE_URL, connect_timeout=5)
_conn.autocommit = True
except Exception as e:
print(f"[trades_db] connect 실패: {e}")
_conn = None
return _conn
def init_db():
"""앱 기동 시 1회. 테이블 없으면 생성."""
global _init_done
if _init_done or not _enabled():
return
with _lock:
conn = _get_conn()
if conn is None:
return
try:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS trades (
id BIGSERIAL PRIMARY KEY,
symbol TEXT NOT NULL,
interval TEXT NOT NULL,
direction TEXT NOT NULL,
signal_types TEXT NOT NULL,
candle_time TIMESTAMP NOT NULL,
entry_time TIMESTAMPTZ NOT NULL DEFAULT now(),
entry_price DOUBLE PRECISION NOT NULL,
stop_price DOUBLE PRECISION NOT NULL,
status TEXT NOT NULL DEFAULT 'open',
exit_time TIMESTAMPTZ,
exit_price DOUBLE PRECISION,
exit_reason TEXT,
pnl_pct DOUBLE PRECISION
)
""")
cur.execute("CREATE INDEX IF NOT EXISTS idx_trades_status ON trades(status)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_trades_entry_time ON trades(entry_time DESC)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_trades_lookup ON trades(symbol, interval, direction, candle_time)")
cur.execute("""
CREATE TABLE IF NOT EXISTS signal_events (
id BIGSERIAL PRIMARY KEY,
symbol TEXT NOT NULL,
interval TEXT NOT NULL,
signal_type TEXT NOT NULL,
direction TEXT NOT NULL,
candle_time TIMESTAMP NOT NULL,
fired_at TIMESTAMPTZ NOT NULL DEFAULT now(),
price DOUBLE PRECISION
)
""")
cur.execute("CREATE INDEX IF NOT EXISTS idx_signal_events_fired ON signal_events(fired_at DESC)")
_init_done = True
print("[trades_db] init OK")
except Exception as e:
print(f"[trades_db] init 실패: {e}")
def log_signal_events(symbol: str, interval: str, group: List[Dict[str, Any]]):
"""알림 발사 직전에 호출. group 내 각 signal 을 signal_events 에 기록."""
if not _enabled():
return
with _lock:
conn = _get_conn()
if conn is None:
return
try:
with conn.cursor() as cur:
for e in group:
cur.execute(
"INSERT INTO signal_events(symbol, interval, signal_type, direction, candle_time, price) "
"VALUES (%s, %s, %s, %s, %s, %s)",
(
symbol, interval, e["sig"], e["direction"],
_to_naive(e["candle_time"]),
float(e["row"]["open"]) if "row" in e and e["row"] is not None else None,
),
)
except Exception as e:
print(f"[trades_db] log_signal_events 실패: {e}")
def record_entry(symbol: str, interval: str, direction: str, signal_types: List[str],
candle_time, entry_price: float, stop_price: float):
"""진입 신호 발사 시 호출. 이미 같은 (symbol, interval, direction, candle_time) open 트레이드가
있으면 무시 (중복 방지)."""
if not _enabled():
return
with _lock:
conn = _get_conn()
if conn is None:
return
try:
with conn.cursor() as cur:
cur.execute(
"SELECT id FROM trades WHERE symbol=%s AND interval=%s AND direction=%s "
"AND candle_time=%s AND status='open' LIMIT 1",
(symbol, interval, direction, _to_naive(candle_time)),
)
if cur.fetchone():
return
cur.execute(
"INSERT INTO trades(symbol, interval, direction, signal_types, candle_time, "
"entry_price, stop_price, status) "
"VALUES (%s, %s, %s, %s, %s, %s, %s, 'open')",
(
symbol, interval, direction, ",".join(signal_types),
_to_naive(candle_time), float(entry_price), float(stop_price),
),
)
except Exception as e:
print(f"[trades_db] record_entry 실패: {e}")
def record_exit(symbol: str, interval: str, direction: str, candle_time,
exit_price: float, exit_reason: str):
"""진입 candle_time 매칭으로 open 트레이드를 close. 없으면 무시."""
if not _enabled():
return
with _lock:
conn = _get_conn()
if conn is None:
return
try:
with conn.cursor() as cur:
cur.execute(
"SELECT id, entry_price FROM trades WHERE symbol=%s AND interval=%s AND direction=%s "
"AND candle_time=%s AND status='open' ORDER BY id DESC LIMIT 1",
(symbol, interval, direction, _to_naive(candle_time)),
)
row = cur.fetchone()
if not row:
return
trade_id, entry_price = row
if direction == "long":
pnl = (float(exit_price) - float(entry_price)) / float(entry_price) * 100.0
else:
pnl = (float(entry_price) - float(exit_price)) / float(entry_price) * 100.0
cur.execute(
"UPDATE trades SET status=%s, exit_time=now(), exit_price=%s, exit_reason=%s, pnl_pct=%s "
"WHERE id=%s",
(exit_reason, float(exit_price), exit_reason, pnl, trade_id),
)
except Exception as e:
print(f"[trades_db] record_exit 실패: {e}")
def fetch_trades(limit: int = 500, status: Optional[str] = None) -> list:
if not _enabled():
return []
with _lock:
conn = _get_conn()
if conn is None:
return []
try:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
if status:
cur.execute(
"SELECT * FROM trades WHERE status=%s ORDER BY entry_time DESC LIMIT %s",
(status, limit),
)
else:
cur.execute("SELECT * FROM trades ORDER BY entry_time DESC LIMIT %s", (limit,))
return cur.fetchall()
except Exception as e:
print(f"[trades_db] fetch_trades 실패: {e}")
return []
def fetch_signal_events(limit: int = 1000) -> list:
if not _enabled():
return []
with _lock:
conn = _get_conn()
if conn is None:
return []
try:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT * FROM signal_events ORDER BY fired_at DESC LIMIT %s", (limit,))
return cur.fetchall()
except Exception as e:
print(f"[trades_db] fetch_signal_events 실패: {e}")
return []
def _to_naive(ts):
if ts is None:
return None
if hasattr(ts, "to_pydatetime"):
ts = ts.to_pydatetime()
if isinstance(ts, datetime) and ts.tzinfo is not None:
ts = ts.replace(tzinfo=None)
return ts