Files
chpark c4e6aab7b2 React + FastAPI 풀 마이그레이션 — Streamlit 제거
- backend/ — FastAPI + JWT + 모든 REST 엔드포인트
- frontend/ — Next.js 14 + Tailwind + 7페이지 (대시보드/트레이드/거래소/자동매매/설정/내정보/로그인)
- core_logic.py — 신호계산/알림 로직 분리 (기존 app_streamlit.py 에서 추출)
- users_db.py + bcrypt 인증, exchange_keys.py + Fernet 암호화
- trades_db.py — 진입/청산 lifecycle 추적, signal_events raw 로그
- settings_db.py — 모든 운영 파라미터 DB 영속 저장 (RSI/거래량/펀딩비 임계값 포함)
- docker-compose: frontend / backend / postgres + Traefik 라우팅
- assets/logo.svg — JUNGGOMOA 그라디언트 로고

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 17:27:11 +09:00

253 lines
11 KiB
Python

"""
백엔드 통합 테스트 — 컨테이너 안에서 실행 (DATABASE_URL / ENCRYPTION_KEY 환경변수 필요).
실행:
docker compose exec app python -m pytest tests/ -v
또는:
docker compose exec app python tests/test_backend.py
각 테스트는 독립적이고 실제 DB / Binance 와 통신.
"""
import os
import sys
import time
import unittest
from datetime import datetime, timedelta
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import settings_db
import trades_db
import exchange_keys
import exchange_adapters
import alert_state
# ──────────────────────────────────────────────
# 1. settings_db
# ──────────────────────────────────────────────
class SettingsDBTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
settings_db.init_db_with_env_defaults()
def test_defaults_seeded(self):
v = settings_db.get("alert_symbol")
self.assertTrue(v in ("BTCUSDT", "ETHUSDT", "SOLUSDT", "BNBUSDT") or v.endswith("USDT"))
def test_set_and_get(self):
settings_db.set_value("__test_key", "hello")
self.assertEqual(settings_db.get("__test_key"), "hello")
def test_get_int(self):
settings_db.set_value("__test_int", "42")
self.assertEqual(settings_db.get_int("__test_int"), 42)
self.assertEqual(settings_db.get_int("__no_such_key", 7), 7)
def test_get_float(self):
settings_db.set_value("__test_float", "0.0075")
self.assertAlmostEqual(settings_db.get_float("__test_float"), 0.0075)
def test_get_bool(self):
settings_db.set_value("__test_bool", "1")
self.assertTrue(settings_db.get_bool("__test_bool"))
settings_db.set_value("__test_bool", "0")
self.assertFalse(settings_db.get_bool("__test_bool"))
def test_get_list(self):
settings_db.set_value("__test_list", "5m,15m,30m")
self.assertEqual(settings_db.get_list("__test_list"), ["5m", "15m", "30m"])
def test_threshold_defaults_present(self):
for k in ["long_rsi_max", "short_rsi_min", "body_pct_min",
"vol_exhaustion_mult", "fr_short_extreme",
"candle_limit_desktop", "polling_interval_sec"]:
v = settings_db.get(k)
self.assertIsNotNone(v, f"{k} missing")
self.assertNotEqual(v, "", f"{k} empty")
# ──────────────────────────────────────────────
# 2. trades_db (PostgreSQL)
# ──────────────────────────────────────────────
class TradesDBTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
if not trades_db._enabled():
raise unittest.SkipTest("DATABASE_URL not set")
trades_db.init_db()
def test_record_entry_and_exit_long(self):
ct = datetime.utcnow().replace(microsecond=0)
trades_db.record_entry(
"TESTUSDT", "5m", "long",
["strong_long_signal"], ct, 100.0, 99.25,
)
trades_db.record_exit("TESTUSDT", "5m", "long", ct, 110.0, "stop_loss")
rows = trades_db.fetch_trades(limit=10)
match = [r for r in rows if r["symbol"] == "TESTUSDT" and r["candle_time"] == ct]
self.assertTrue(match, "trade not found after record_entry+record_exit")
r = match[0]
self.assertEqual(r["status"], "stop_loss")
self.assertAlmostEqual(r["pnl_pct"], (110.0 - 100.0) / 100.0 * 100, places=2)
def test_record_entry_and_exit_short(self):
ct = datetime.utcnow().replace(microsecond=0) - timedelta(seconds=1)
trades_db.record_entry(
"TESTUSDT", "5m", "short",
["short_signal"], ct, 100.0, 100.75,
)
trades_db.record_exit("TESTUSDT", "5m", "short", ct, 90.0, "reversal")
rows = trades_db.fetch_trades(limit=10)
r = next(r for r in rows if r["candle_time"] == ct and r["direction"] == "short")
self.assertEqual(r["status"], "reversal")
self.assertAlmostEqual(r["pnl_pct"], (100.0 - 90.0) / 100.0 * 100, places=2)
def test_duplicate_entry_ignored(self):
ct = datetime.utcnow().replace(microsecond=0) - timedelta(seconds=2)
trades_db.record_entry("DUPUSDT", "5m", "long", ["x"], ct, 100.0, 99.0)
trades_db.record_entry("DUPUSDT", "5m", "long", ["y"], ct, 100.0, 99.0)
rows = [r for r in trades_db.fetch_trades(limit=20)
if r["symbol"] == "DUPUSDT" and r["candle_time"] == ct]
self.assertEqual(len(rows), 1)
def test_log_signal_events(self):
ct = datetime.utcnow().replace(microsecond=0)
trades_db.log_signal_events(
"TESTUSDT", "5m",
[{"sig": "long_signal", "direction": "long", "candle_time": ct,
"row": {"open": 100.0}}],
)
events = trades_db.fetch_signal_events(limit=100)
self.assertTrue(any(e["symbol"] == "TESTUSDT" for e in events))
# ──────────────────────────────────────────────
# 3. exchange_keys (Fernet 암호화 + automation_config)
# ──────────────────────────────────────────────
class ExchangeKeysTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
if not exchange_keys._enabled():
raise unittest.SkipTest("DATABASE_URL or cryptography not set")
exchange_keys.init_db()
def test_add_and_decrypt(self):
cid = exchange_keys.add_credential(
exchange="binance", label="__test_main",
api_key="AK_TEST_12345", api_secret="SK_TEST_67890",
passphrase=None, testnet=True, enabled=True,
)
self.assertIsNotNone(cid)
cred = exchange_keys.get_credential(cid)
self.assertEqual(cred["api_key"], "AK_TEST_12345")
self.assertEqual(cred["api_secret"], "SK_TEST_67890")
self.assertTrue(cred["testnet"])
# 마스킹 확인
creds = exchange_keys.list_credentials()
match = [c for c in creds if c["id"] == cid][0]
self.assertNotEqual(match["api_key_masked"], "AK_TEST_12345") # 마스킹됨
self.assertIn("", match["api_key_masked"])
# 정리
exchange_keys.delete_credential(cid)
def test_update_credential(self):
cid = exchange_keys.add_credential(
"okx", "__test_okx", "k1", "s1", "p1", False, True,
)
self.assertTrue(exchange_keys.update_credential(cid, label="renamed", enabled=False))
cred = exchange_keys.get_credential(cid)
self.assertEqual(cred["label"], "renamed")
self.assertFalse(cred["enabled"])
exchange_keys.delete_credential(cid)
def test_automation_get_set(self):
exchange_keys.automation_set("__test_auto", "yes")
self.assertEqual(exchange_keys.automation_get("__test_auto"), "yes")
def test_automation_defaults(self):
cfg = exchange_keys.automation_all()
for k in ["enabled", "dry_run", "leverage", "position_size_pct",
"max_open_trades", "allowed_directions"]:
self.assertIn(k, cfg)
# ──────────────────────────────────────────────
# 4. exchange_adapters (DRY-RUN)
# ──────────────────────────────────────────────
class ExchangeAdaptersTest(unittest.TestCase):
def test_dry_run_no_credential(self):
adapter = exchange_adapters.make_adapter(None, dry_run=True)
self.assertIsInstance(adapter, exchange_adapters.DryRunAdapter)
self.assertEqual(adapter.get_balance("USDT"), 1000.0)
result = adapter.place_market_order("BTCUSDT", "long", 0.01)
self.assertTrue(result.ok)
self.assertEqual(result.filled_qty, 0.01)
def test_dry_run_with_credential_dict(self):
cred = {"exchange": "binance", "api_key": "K", "api_secret": "S",
"passphrase": None, "testnet": True}
adapter = exchange_adapters.make_adapter(cred, dry_run=True)
self.assertEqual(adapter.exchange, "binance")
self.assertTrue(adapter.testnet)
close = adapter.close_position("BTCUSDT")
self.assertTrue(close.ok)
# ──────────────────────────────────────────────
# 5. signal computation (compute_signals 가 새 임계값을 읽고 NameError 안 나는지)
# ──────────────────────────────────────────────
class SignalComputationTest(unittest.TestCase):
def test_compute_signals_no_nameerror(self):
# 백엔드만 import — Streamlit UI 코드는 실행 안 됨.
import importlib
# Streamlit 의 set_page_config 가 import 시점에 호출되어 컨테이너 밖에서 실행 시
# 에러가 날 수 있어, 격리된 작은 DataFrame 으로 compute_signals 만 호출.
import pandas as pd
import numpy as np
# 200개 더미 캔들 (지표 계산에 충분)
n = 200
rng = np.random.default_rng(seed=42)
close = 100 + np.cumsum(rng.normal(0, 0.5, n))
df = pd.DataFrame({
"open_time": pd.date_range("2026-01-01", periods=n, freq="5min"),
"open": close + rng.normal(0, 0.1, n),
"high": close + np.abs(rng.normal(0, 0.3, n)),
"low": close - np.abs(rng.normal(0, 0.3, n)),
"close": close,
"volume": rng.uniform(1000, 5000, n),
"taker_buy_vol": rng.uniform(500, 2500, n),
})
df["taker_sell_vol"] = df["volume"] - df["taker_buy_vol"]
# 지표 계산
import ta
df["MA7"] = df["close"].rolling(7).mean()
df["MA25"] = df["close"].rolling(25).mean()
df["BB_mid"] = df["close"].rolling(20).mean()
df["BB_std"] = df["close"].rolling(20).std()
df["BB_upper"] = df["BB_mid"] + 2 * df["BB_std"]
df["BB_lower"] = df["BB_mid"] - 2 * df["BB_std"]
df["RSI"] = ta.momentum.RSIIndicator(df["close"], window=14).rsi()
macd = ta.trend.MACD(df["close"])
df["MACD"] = macd.macd()
df["MACD_signal"] = macd.macd_signal()
df["MACD_hist"] = macd.macd_diff()
from app_streamlit import compute_signals
result = compute_signals(df, "5m")
# 결과 검증 — 컬럼 존재 + boolean cast 가능한지 (dtype 자체는 object 가
# 되더라도 동작엔 무방, fillna 거치며 mixed type 될 수 있음).
for col in ["long_signal", "short_signal", "strong_long_signal", "strong_short_signal",
"vol_long_signal", "vol_short_signal", "reversal_long_signal",
"reversal_short_signal", "exhaustion_long", "exhaustion_short"]:
self.assertIn(col, result.columns, f"{col} missing")
# boolean cast 가 NaN 없이 성공해야 함
casted = result[col].fillna(False).astype(bool)
self.assertEqual(len(casted), len(result), f"{col} length mismatch after cast")
if __name__ == "__main__":
unittest.main(verbosity=2)