Files
pipeline/backend-node/src/services/collector/pythonHookRunner.ts
T
chpark 4c1dc4082e
Build and Push Images / build-and-push (push) Has been cancelled
feat: Fleet/Collector/엣지 배포 관련 누적 작업 일괄 커밋
이전 세션들에서 작업된 아래 범위를 모두 포함:

Fleet 서브시스템 (src/fleet/)
- fleetDeviceService / fleetCommandService / fleetDeploymentService / fleetReleaseService
- fleetMetricsService, fleetScriptService, fleetEdgeConfigService
- Edge 디바이스 관리, 커맨드 발행, 배포/릴리스, 스크립트 동기화

Collector 확장
- centralMqttForwarder / centralForwarderConfigService
- equipmentStateService, pythonHookRunner, scriptCache
- Modbus/OPC-UA/S7/XGT 프로토콜 클라이언트
- targetDbIntrospection (저장 DB 조회)

Routes / API
- automationDashboardRoutes, centralForwarderRoutes, equipmentStateRoutes

DB
- importEdgeConfig (Python cached config → Pipeline DB)
- seedDataSources (external_db_connections 초기 시드)

엣지 배포 리소스
- docker/edge/Dockerfile.backend.prod, Dockerfile.frontend.prod
- docker/edge/docker-compose.edge.yml

프론트엔드
- admin/automaticMng (centralForwarder, dashboard, equipmentState)
- admin/fleet (commands, devices, deployments, releases, scripts, alerts)
- admin/pipeline-device 개선 (저장 DB 드롭다운, 태그 매핑 등)
- ExternalDbConnectionModal, ScriptsManagerDialog 등 신규 컴포넌트
- lib/api: automationDashboard, centralForwarder, equipmentState, fleet

docs/
- EDGE_SERVER_STRUCTURE, FLEET_COMPLETE, FLEET_EDGE_INTEGRATION, FLEET_HOOK_INTEGRATION

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 20:00:06 +09:00

228 lines
6.6 KiB
TypeScript

/**
* Python Hook Runner
*
* Pipeline(Node.js)이 사용자 작성 Python 훅을 **자식 프로세스**로 실행.
* 엣지 Python data-collector를 대체하기 위한 핵심 컴포넌트.
*
* 훅 타입별 계약:
* transform(tag_name, raw_value, context) → 변환된 값
* filter(tag_name, value, context) → True면 통과, False면 버림
* derived_tags(device_data, context) → { new_tag_name: value, ... }
* alarm(tag_name, value, context) → None/{level, message}
*
* 보안/안전:
* - python3 자식 프로세스로 격리
* - timeout 초과 시 SIGKILL
* - stdout 용량 제한 (1MB)
* - OS-level이므로 Node 이벤트 루프 블록 안 함
*/
import { spawn } from "child_process";
import { logger } from "../../utils/logger";
export type HookType =
| "transform"
| "filter"
| "aggregator"
| "alarm"
| "derived_tags"
| "pre_send";
export interface HookInput {
hook_type: HookType;
code: string;
tag_name?: string;
raw_value?: unknown;
value?: unknown;
device_data?: Record<string, unknown>;
context?: Record<string, unknown>;
timeout_ms?: number;
}
export interface HookResult {
success: boolean;
value?: unknown;
skip?: boolean;
alarm?: { level: string; message: string } | null;
derived?: Record<string, unknown>;
error?: string;
duration_ms?: number;
}
// Python 쪽에서 실행할 runner 스크립트 (한 번 생성해 재사용)
const PYTHON_RUNNER_SCRIPT = `
import sys, json, traceback, signal, resource
# 메모리 제한 (128MB)
try:
resource.setrlimit(resource.RLIMIT_AS, (128*1024*1024, 128*1024*1024))
except Exception:
pass
def main():
raw = sys.stdin.read()
try:
payload = json.loads(raw)
except Exception as e:
print(json.dumps({"success": False, "error": f"JSON parse error: {e}"}))
return
hook_type = payload.get("hook_type")
code = payload.get("code", "")
context = payload.get("context") or {}
# 사용자 코드 exec — 함수 정의만 추출
user_globals = {"__builtins__": __builtins__}
try:
exec(code, user_globals)
except Exception as e:
print(json.dumps({"success": False, "error": f"Compile error: {e}\\n{traceback.format_exc()}"}))
return
fn = user_globals.get(hook_type)
if not callable(fn):
print(json.dumps({"success": False, "error": f"function '{hook_type}' not defined"}))
return
try:
if hook_type == "transform":
value = fn(payload.get("tag_name"), payload.get("raw_value"), context)
out = {"success": True, "value": value}
elif hook_type == "filter":
keep = fn(payload.get("tag_name"), payload.get("value"), context)
out = {"success": True, "skip": not bool(keep)}
elif hook_type == "alarm":
alarm = fn(payload.get("tag_name"), payload.get("value"), context)
out = {"success": True, "alarm": alarm}
elif hook_type == "derived_tags":
derived = fn(payload.get("device_data") or {}, context) or {}
out = {"success": True, "derived": derived}
elif hook_type == "aggregator":
value = fn(payload.get("tag_name"), payload.get("value"), context)
out = {"success": True, "value": value}
elif hook_type == "pre_send":
value = fn(payload.get("device_data") or {}, context)
out = {"success": True, "value": value}
else:
out = {"success": False, "error": f"unknown hook_type {hook_type}"}
except Exception as e:
out = {"success": False, "error": f"Runtime error: {e}\\n{traceback.format_exc()}"}
try:
print(json.dumps(out, default=str))
except Exception as e:
print(json.dumps({"success": False, "error": f"serialize error: {e}"}))
if __name__ == "__main__":
main()
`.trim();
/** 훅 하나 실행. 타임아웃 강제 kill. */
export async function executeHook(input: HookInput): Promise<HookResult> {
const timeoutMs = input.timeout_ms ?? 1500;
const start = Date.now();
return new Promise<HookResult>((resolve) => {
const child = spawn("python3", ["-c", PYTHON_RUNNER_SCRIPT], {
stdio: ["pipe", "pipe", "pipe"],
});
let stdout = "";
let stderr = "";
let stdoutBytes = 0;
const MAX_STDOUT = 1024 * 1024; // 1MB
let killed = false;
const killTimer = setTimeout(() => {
killed = true;
try {
child.kill("SIGKILL");
} catch {
/* noop */
}
}, timeoutMs);
child.stdout.on("data", (chunk: Buffer) => {
stdoutBytes += chunk.length;
if (stdoutBytes > MAX_STDOUT) {
killed = true;
try {
child.kill("SIGKILL");
} catch {
/* noop */
}
return;
}
stdout += chunk.toString("utf8");
});
child.stderr.on("data", (chunk: Buffer) => {
stderr += chunk.toString("utf8");
});
child.on("error", (err) => {
clearTimeout(killTimer);
resolve({
success: false,
error: `spawn error: ${err.message}`,
duration_ms: Date.now() - start,
});
});
child.on("close", (code) => {
clearTimeout(killTimer);
const duration = Date.now() - start;
if (killed) {
return resolve({
success: false,
error: `timeout ${timeoutMs}ms 초과 또는 stdout 한계 초과`,
duration_ms: duration,
});
}
if (code !== 0) {
return resolve({
success: false,
error: `python exit ${code}: ${stderr || stdout}`.slice(0, 2000),
duration_ms: duration,
});
}
try {
const parsed = JSON.parse(stdout.trim().split("\n").pop() || "{}");
resolve({ ...parsed, duration_ms: duration });
} catch (err) {
resolve({
success: false,
error: `result parse fail: ${(err as Error).message} — raw=${stdout.slice(0, 500)}`,
duration_ms: duration,
});
}
});
try {
child.stdin.write(JSON.stringify(input));
child.stdin.end();
} catch (err) {
logger.warn(`[PyHook] stdin 쓰기 실패: ${(err as Error).message}`);
}
});
}
/** python3 사용 가능 여부 확인 (부팅 시 1회 체크용) */
export async function checkPython3Available(): Promise<boolean> {
return new Promise<boolean>((resolve) => {
const child = spawn("python3", ["--version"], { stdio: "pipe" });
child.on("error", () => resolve(false));
child.on("close", (code) => resolve(code === 0));
setTimeout(() => {
try {
child.kill();
} catch {
/* noop */
}
resolve(false);
}, 3000);
});
}