Files
pipeline/backend-node/src/services/collector/edgeStatusReporter.ts
T
chpark 156dd1ddb1 fix(pipeline): XGT 프로토콜/이벤트 리포터/라우트 순서 수정
XGT FEnet 클라이언트
- 응답 data_length offset [12] → [16] (요청 시에만 [12,16] 둘 다 씀; 응답은 PLC가 [12:14]를 CPU 정보로 덮음)
- socket.setTimeout idle 타임아웃 제거 → connect 전용 수동 타이머 + setKeepAlive(10s). 폴링 간격(5s)마다 재연결되던 문제 해결

Edge 이벤트 리포터
- edgeStatusReporter.ts 추가: 60초 간격 edge_status_1 하트비트 + edge_events_1 이벤트 기록
- 기동/종료 이벤트 + PLC 상태 전이(connected/disconnected/error) 자동 기록
- PIPELINE_EDGE_* env 로 edge_id/company_id/UUID/table/interval 주입

edge_telemetry metadata 포맷 교정
- migrated_at → forwarded_at, _pipeline 추가 블록 제거
- 프로덕션 원본 스키마와 완전 호환: {priority, device_id(UUID), forwarded_at}

라우트 순서 버그 수정
- pipelineDeviceConnectionRoutes.ts: /target-databases* 가 /:id 뒤에 있어 /:id 가 먼저 매칭됨 → UI 저장 DB 드롭다운 비어있던 문제
- 정적 경로를 /:id 위로 이동

프론트 API URL 해석 일반화
- NEXT_PUBLIC_API_URL 가 localhost인데 브라우저는 원격이면 env 무시하고 현재 origin의 :8080 사용 → 엣지 원격 접속 시 API 연결 보장

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 17:38:53 +09:00

162 lines
5.1 KiB
TypeScript

/**
* Edge Status / Event Reporter
*
* IDC TimescaleDB 의 edge_status / edge_events 테이블에 직접 INSERT.
* (기존: data-collector → Kafka → kafka-to-central-mqtt → MQTT → IDC consumer → edge_status)
* (교체: pipeline-backend → IDC TimescaleDB 직접)
*
* 환경변수:
* PIPELINE_EDGE_REPORTER=true 기동 시 시작
* PIPELINE_EDGE_TARGET_DB_ID=3 external_db_connections.id
* PIPELINE_EDGE_ID=edge-0f4d04ed
* PIPELINE_EDGE_COMPANY_ID=spifox edge_status.company_id (varchar)
* PIPELINE_EDGE_COMPANY_UUID=... edge_events.company_id (uuid)
* PIPELINE_EDGE_STATUS_TABLE=edge_status_1
* PIPELINE_EDGE_EVENTS_TABLE=edge_events_1
* PIPELINE_EDGE_STATUS_INTERVAL_SEC=60
*/
import os from "os";
import { logger } from "../../utils/logger";
import { executeExternalQuery } from "../externalDbHelper";
interface ReporterConfig {
targetDbId: number;
edgeId: string;
companyId: string;
companyUuid: string | null;
statusTable: string;
eventsTable: string;
intervalSec: number;
}
let cfg: ReporterConfig | null = null;
let statusTimer: NodeJS.Timeout | null = null;
function loadConfig(): ReporterConfig | null {
if (process.env.PIPELINE_EDGE_REPORTER !== "true") return null;
const targetDbId = Number(process.env.PIPELINE_EDGE_TARGET_DB_ID);
const edgeId = process.env.PIPELINE_EDGE_ID;
const companyId = process.env.PIPELINE_EDGE_COMPANY_ID;
if (!Number.isFinite(targetDbId) || !edgeId || !companyId) {
logger.warn("[EdgeReporter] 환경변수 누락 (TARGET_DB_ID/EDGE_ID/COMPANY_ID) — 비활성");
return null;
}
return {
targetDbId,
edgeId,
companyId,
companyUuid: process.env.PIPELINE_EDGE_COMPANY_UUID || null,
statusTable: process.env.PIPELINE_EDGE_STATUS_TABLE || "edge_status_1",
eventsTable: process.env.PIPELINE_EDGE_EVENTS_TABLE || "edge_events_1",
intervalSec: Number(process.env.PIPELINE_EDGE_STATUS_INTERVAL_SEC) || 60,
};
}
function primaryIpAddress(): string | null {
const ifaces = os.networkInterfaces();
for (const name of Object.keys(ifaces)) {
for (const it of ifaces[name] || []) {
if (it.family === "IPv4" && !it.internal) return it.address;
}
}
return null;
}
async function writeStatusRow(status: "online" | "offline"): Promise<void> {
if (!cfg) return;
try {
await executeExternalQuery(
cfg.targetDbId,
`INSERT INTO ${cfg.statusTable} (time, company_id, edge_id, status, ip_address, firmware_version, metadata)
VALUES (NOW(), $1, $2, $3, $4, $5, $6::jsonb)`,
[
cfg.companyId,
cfg.edgeId,
status,
primaryIpAddress(),
process.env.PIPELINE_FIRMWARE_VERSION || null,
JSON.stringify({ source: "pipeline-backend", host: os.hostname() }),
],
);
} catch (err) {
logger.warn(`[EdgeReporter] status INSERT 실패: ${(err as Error).message}`);
}
}
export async function recordEdgeEvent(
eventType: string,
eventCode: string,
message: string,
severity = 3,
metadata: Record<string, unknown> = {},
): Promise<void> {
if (!cfg) return;
if (!cfg.companyUuid) {
// edge_events.company_id 는 UUID 타입이라 UUID 없으면 INSERT 불가 → 경고만
logger.debug("[EdgeReporter] event 기록 스킵: PIPELINE_EDGE_COMPANY_UUID 미설정");
return;
}
try {
await executeExternalQuery(
cfg.targetDbId,
`INSERT INTO ${cfg.eventsTable} (time, company_id, edge_id, event_type, event_code, message, severity, acknowledged, metadata)
VALUES (NOW(), $1::uuid, $2, $3, $4, $5, $6, false, $7::jsonb)`,
[
cfg.companyUuid,
cfg.edgeId,
eventType,
eventCode,
message,
severity,
JSON.stringify({ source: "pipeline-backend", ...metadata }),
],
);
logger.info(`[EdgeReporter] 이벤트 기록: ${eventType}/${eventCode}${message}`);
} catch (err) {
logger.warn(`[EdgeReporter] event INSERT 실패: ${(err as Error).message}`);
}
}
export async function startEdgeReporter(): Promise<void> {
cfg = loadConfig();
if (!cfg) {
logger.info("[EdgeReporter] 비활성 (PIPELINE_EDGE_REPORTER!=true)");
return;
}
logger.info(
`[EdgeReporter] 시작 target_db=${cfg.targetDbId} edge=${cfg.edgeId} ` +
`status_tbl=${cfg.statusTable} events_tbl=${cfg.eventsTable} interval=${cfg.intervalSec}s`,
);
// 기동 이벤트 + 첫 하트비트
await recordEdgeEvent(
"pipeline",
"startup",
"pipeline-backend 기동 — Edge Reporter 활성",
3,
);
await writeStatusRow("online");
statusTimer = setInterval(() => {
writeStatusRow("online").catch(() => undefined);
}, cfg.intervalSec * 1000);
}
export async function stopEdgeReporter(): Promise<void> {
if (statusTimer) {
clearInterval(statusTimer);
statusTimer = null;
}
if (cfg) {
await writeStatusRow("offline").catch(() => undefined);
await recordEdgeEvent("pipeline", "shutdown", "pipeline-backend 정지", 3).catch(
() => undefined,
);
}
cfg = null;
}
export function isEdgeReporterActive(): boolean {
return cfg !== null;
}