""" 백엔드 통합 테스트 — 컨테이너 안에서 실행 (DATABASE_URL / ENCRYPTION_KEY 환경변수 필요). 실행: docker compose exec app python -m pytest tests/ -v 또는: docker compose exec app python tests/test_backend.py 각 테스트는 독립적이고 실제 DB / Binance 와 통신. """ import os import sys import time import unittest from datetime import datetime, timedelta sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import settings_db import trades_db import exchange_keys import exchange_adapters import alert_state # ────────────────────────────────────────────── # 1. settings_db # ────────────────────────────────────────────── class SettingsDBTest(unittest.TestCase): @classmethod def setUpClass(cls): settings_db.init_db_with_env_defaults() def test_defaults_seeded(self): v = settings_db.get("alert_symbol") self.assertTrue(v in ("BTCUSDT", "ETHUSDT", "SOLUSDT", "BNBUSDT") or v.endswith("USDT")) def test_set_and_get(self): settings_db.set_value("__test_key", "hello") self.assertEqual(settings_db.get("__test_key"), "hello") def test_get_int(self): settings_db.set_value("__test_int", "42") self.assertEqual(settings_db.get_int("__test_int"), 42) self.assertEqual(settings_db.get_int("__no_such_key", 7), 7) def test_get_float(self): settings_db.set_value("__test_float", "0.0075") self.assertAlmostEqual(settings_db.get_float("__test_float"), 0.0075) def test_get_bool(self): settings_db.set_value("__test_bool", "1") self.assertTrue(settings_db.get_bool("__test_bool")) settings_db.set_value("__test_bool", "0") self.assertFalse(settings_db.get_bool("__test_bool")) def test_get_list(self): settings_db.set_value("__test_list", "5m,15m,30m") self.assertEqual(settings_db.get_list("__test_list"), ["5m", "15m", "30m"]) def test_threshold_defaults_present(self): for k in ["long_rsi_max", "short_rsi_min", "body_pct_min", "vol_exhaustion_mult", "fr_short_extreme", "candle_limit_desktop", "polling_interval_sec"]: v = settings_db.get(k) self.assertIsNotNone(v, f"{k} missing") self.assertNotEqual(v, "", f"{k} empty") # ────────────────────────────────────────────── # 2. trades_db (PostgreSQL) # ────────────────────────────────────────────── class TradesDBTest(unittest.TestCase): @classmethod def setUpClass(cls): if not trades_db._enabled(): raise unittest.SkipTest("DATABASE_URL not set") trades_db.init_db() def test_record_entry_and_exit_long(self): ct = datetime.utcnow().replace(microsecond=0) trades_db.record_entry( "TESTUSDT", "5m", "long", ["strong_long_signal"], ct, 100.0, 99.25, ) trades_db.record_exit("TESTUSDT", "5m", "long", ct, 110.0, "stop_loss") rows = trades_db.fetch_trades(limit=10) match = [r for r in rows if r["symbol"] == "TESTUSDT" and r["candle_time"] == ct] self.assertTrue(match, "trade not found after record_entry+record_exit") r = match[0] self.assertEqual(r["status"], "stop_loss") self.assertAlmostEqual(r["pnl_pct"], (110.0 - 100.0) / 100.0 * 100, places=2) def test_record_entry_and_exit_short(self): ct = datetime.utcnow().replace(microsecond=0) - timedelta(seconds=1) trades_db.record_entry( "TESTUSDT", "5m", "short", ["short_signal"], ct, 100.0, 100.75, ) trades_db.record_exit("TESTUSDT", "5m", "short", ct, 90.0, "reversal") rows = trades_db.fetch_trades(limit=10) r = next(r for r in rows if r["candle_time"] == ct and r["direction"] == "short") self.assertEqual(r["status"], "reversal") self.assertAlmostEqual(r["pnl_pct"], (100.0 - 90.0) / 100.0 * 100, places=2) def test_duplicate_entry_ignored(self): ct = datetime.utcnow().replace(microsecond=0) - timedelta(seconds=2) trades_db.record_entry("DUPUSDT", "5m", "long", ["x"], ct, 100.0, 99.0) trades_db.record_entry("DUPUSDT", "5m", "long", ["y"], ct, 100.0, 99.0) rows = [r for r in trades_db.fetch_trades(limit=20) if r["symbol"] == "DUPUSDT" and r["candle_time"] == ct] self.assertEqual(len(rows), 1) def test_log_signal_events(self): ct = datetime.utcnow().replace(microsecond=0) trades_db.log_signal_events( "TESTUSDT", "5m", [{"sig": "long_signal", "direction": "long", "candle_time": ct, "row": {"open": 100.0}}], ) events = trades_db.fetch_signal_events(limit=100) self.assertTrue(any(e["symbol"] == "TESTUSDT" for e in events)) # ────────────────────────────────────────────── # 3. exchange_keys (Fernet 암호화 + automation_config) # ────────────────────────────────────────────── class ExchangeKeysTest(unittest.TestCase): @classmethod def setUpClass(cls): if not exchange_keys._enabled(): raise unittest.SkipTest("DATABASE_URL or cryptography not set") exchange_keys.init_db() def test_add_and_decrypt(self): cid = exchange_keys.add_credential( exchange="binance", label="__test_main", api_key="AK_TEST_12345", api_secret="SK_TEST_67890", passphrase=None, testnet=True, enabled=True, ) self.assertIsNotNone(cid) cred = exchange_keys.get_credential(cid) self.assertEqual(cred["api_key"], "AK_TEST_12345") self.assertEqual(cred["api_secret"], "SK_TEST_67890") self.assertTrue(cred["testnet"]) # 마스킹 확인 creds = exchange_keys.list_credentials() match = [c for c in creds if c["id"] == cid][0] self.assertNotEqual(match["api_key_masked"], "AK_TEST_12345") # 마스킹됨 self.assertIn("…", match["api_key_masked"]) # 정리 exchange_keys.delete_credential(cid) def test_update_credential(self): cid = exchange_keys.add_credential( "okx", "__test_okx", "k1", "s1", "p1", False, True, ) self.assertTrue(exchange_keys.update_credential(cid, label="renamed", enabled=False)) cred = exchange_keys.get_credential(cid) self.assertEqual(cred["label"], "renamed") self.assertFalse(cred["enabled"]) exchange_keys.delete_credential(cid) def test_automation_get_set(self): exchange_keys.automation_set("__test_auto", "yes") self.assertEqual(exchange_keys.automation_get("__test_auto"), "yes") def test_automation_defaults(self): cfg = exchange_keys.automation_all() for k in ["enabled", "dry_run", "leverage", "position_size_pct", "max_open_trades", "allowed_directions"]: self.assertIn(k, cfg) # ────────────────────────────────────────────── # 4. exchange_adapters (DRY-RUN) # ────────────────────────────────────────────── class ExchangeAdaptersTest(unittest.TestCase): def test_dry_run_no_credential(self): adapter = exchange_adapters.make_adapter(None, dry_run=True) self.assertIsInstance(adapter, exchange_adapters.DryRunAdapter) self.assertEqual(adapter.get_balance("USDT"), 1000.0) result = adapter.place_market_order("BTCUSDT", "long", 0.01) self.assertTrue(result.ok) self.assertEqual(result.filled_qty, 0.01) def test_dry_run_with_credential_dict(self): cred = {"exchange": "binance", "api_key": "K", "api_secret": "S", "passphrase": None, "testnet": True} adapter = exchange_adapters.make_adapter(cred, dry_run=True) self.assertEqual(adapter.exchange, "binance") self.assertTrue(adapter.testnet) close = adapter.close_position("BTCUSDT") self.assertTrue(close.ok) # ────────────────────────────────────────────── # 5. signal computation (compute_signals 가 새 임계값을 읽고 NameError 안 나는지) # ────────────────────────────────────────────── class SignalComputationTest(unittest.TestCase): def test_compute_signals_no_nameerror(self): # 백엔드만 import — Streamlit UI 코드는 실행 안 됨. import importlib # Streamlit 의 set_page_config 가 import 시점에 호출되어 컨테이너 밖에서 실행 시 # 에러가 날 수 있어, 격리된 작은 DataFrame 으로 compute_signals 만 호출. import pandas as pd import numpy as np # 200개 더미 캔들 (지표 계산에 충분) n = 200 rng = np.random.default_rng(seed=42) close = 100 + np.cumsum(rng.normal(0, 0.5, n)) df = pd.DataFrame({ "open_time": pd.date_range("2026-01-01", periods=n, freq="5min"), "open": close + rng.normal(0, 0.1, n), "high": close + np.abs(rng.normal(0, 0.3, n)), "low": close - np.abs(rng.normal(0, 0.3, n)), "close": close, "volume": rng.uniform(1000, 5000, n), "taker_buy_vol": rng.uniform(500, 2500, n), }) df["taker_sell_vol"] = df["volume"] - df["taker_buy_vol"] # 지표 계산 import ta df["MA7"] = df["close"].rolling(7).mean() df["MA25"] = df["close"].rolling(25).mean() df["BB_mid"] = df["close"].rolling(20).mean() df["BB_std"] = df["close"].rolling(20).std() df["BB_upper"] = df["BB_mid"] + 2 * df["BB_std"] df["BB_lower"] = df["BB_mid"] - 2 * df["BB_std"] df["RSI"] = ta.momentum.RSIIndicator(df["close"], window=14).rsi() macd = ta.trend.MACD(df["close"]) df["MACD"] = macd.macd() df["MACD_signal"] = macd.macd_signal() df["MACD_hist"] = macd.macd_diff() from app_streamlit import compute_signals result = compute_signals(df, "5m") # 결과 검증 — 컬럼 존재 + boolean cast 가능한지 (dtype 자체는 object 가 # 되더라도 동작엔 무방, fillna 거치며 mixed type 될 수 있음). for col in ["long_signal", "short_signal", "strong_long_signal", "strong_short_signal", "vol_long_signal", "vol_short_signal", "reversal_long_signal", "reversal_short_signal", "exhaustion_long", "exhaustion_short"]: self.assertIn(col, result.columns, f"{col} missing") # boolean cast 가 NaN 없이 성공해야 함 casted = result[col].fillna(False).astype(bool) self.assertEqual(len(casted), len(result), f"{col} length mismatch after cast") if __name__ == "__main__": unittest.main(verbosity=2)