feat(edge): 디지털 트윈용 장비 메타(IP/Protocol) IDC TimescaleDB 적재 + 송신 페이로드 보강
Build and Push Images / build-and-push (push) Has been cancelled

- edgeDeviceConfigReporter 신규: pipeline_device_connections + pipeline_equipment 조인해
  IDC TimescaleDB의 edge_device_config_1 에 5분 주기로 적재 (DISTINCT ON 으로 최신값 조회용)
- app.ts: 부팅 시 startEdgeDeviceConfigReporter, SIGTERM/SIGINT 시 graceful stop 추가
- CollectedData 에 host/port 필드 추가, collectDevice 에서 device row 값 채움
- centralMqttForwarder.buildPayload 에 protocol/host/port 포함 (IDC 컨슈머가 활용)
- 312 마이그레이션(fleet_edge_raw_data host/port 컬럼) 등록 (sql 파일은 .gitignore 로 미동봉, OPS 절차로 적용)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
chpark
2026-05-07 15:38:38 +09:00
parent 080cfe9585
commit 77d35220b1
6 changed files with 246 additions and 0 deletions
+16
View File
@@ -50,6 +50,10 @@ process.on("SIGTERM", async () => {
const { stopEdgeReporter } = require("./services/collector/edgeStatusReporter");
await stopEdgeReporter();
} catch {}
try {
const { stopEdgeDeviceConfigReporter } = require("./services/collector/edgeDeviceConfigReporter");
await stopEdgeDeviceConfigReporter();
} catch {}
process.exit(0);
});
@@ -64,6 +68,10 @@ process.on("SIGINT", async () => {
const { stopEdgeReporter } = require("./services/collector/edgeStatusReporter");
await stopEdgeReporter();
} catch {}
try {
const { stopEdgeDeviceConfigReporter } = require("./services/collector/edgeDeviceConfigReporter");
await stopEdgeDeviceConfigReporter();
} catch {}
process.exit(0);
});
@@ -513,6 +521,7 @@ async function initializeServices() {
runProtocolConstraintMigration,
runDataTargetMigration,
runEdgeDeviceIdentifierMigration,
runFleetEdgeRawDeviceEndpointMigration,
} = await import("./database/runMigration");
await runDashboardMigration();
@@ -532,6 +541,7 @@ async function initializeServices() {
await runProtocolConstraintMigration();
await runDataTargetMigration();
await runEdgeDeviceIdentifierMigration();
await runFleetEdgeRawDeviceEndpointMigration();
// 기본 데이터 소스 연결 시드 (IDC 엣지 관련 연결)
const { seedDefaultDataSources } = await import(
@@ -583,6 +593,12 @@ async function initializeServices() {
);
await startEdgeReporter();
// Edge Device Config Reporter (edge_device_config_1 — 장비별 IP/Protocol 메타)
const { startEdgeDeviceConfigReporter } = await import(
"./services/collector/edgeDeviceConfigReporter"
);
await startEdgeDeviceConfigReporter();
// Target DB Retry Worker (IDC 일시 다운 시 INSERT 실패분 재시도)
const { startRetryWorker } = await import(
"./services/collector/deviceCollectorService"
+15
View File
@@ -384,6 +384,21 @@ export async function runEdgeDeviceIdentifierMigration() {
}
}
export async function runFleetEdgeRawDeviceEndpointMigration() {
try {
const sqlFilePath = path.join(
__dirname,
"../../db/migrations/312_add_device_endpoint_to_fleet_edge_raw_data.sql"
);
if (!fs.existsSync(sqlFilePath)) return;
const sqlContent = fs.readFileSync(sqlFilePath, "utf8");
await PostgreSQLService.query(sqlContent);
console.log("✅ fleet_edge_raw_data host/port 컬럼 추가 완료");
} catch (error) {
console.error("❌ fleet_edge_raw_data host/port 마이그레이션 실패:", error);
}
}
export async function runOpenClawMigration() {
try {
console.log("🔄 OpenClaw AI 에이전트 마이그레이션 시작...");
@@ -366,6 +366,8 @@ router.post("/:id/test-chain", async (req: AuthenticatedRequest, res: Response)
connectionName: "test",
protocol: "",
companyCode: "*",
host: null,
port: null,
timestamp: new Date().toISOString(),
plcState: "connected",
errorMessage: null,
@@ -420,6 +420,9 @@ function buildPayload(cfg: ForwarderConfig, data: CollectedData): string {
edge_id: cfg.edge_id,
device_id: String(data.connectionId),
connection_name: data.connectionName,
protocol: data.protocol,
host: data.host,
port: data.port,
tags: data.tags,
priority: 2,
company_id: cfg.company_id,
@@ -65,6 +65,8 @@ export interface CollectedData {
connectionName: string;
protocol: string;
companyCode: string;
host: string | null;
port: number | null;
timestamp: string;
plcState: "connected" | "disconnected" | "error";
errorMessage: string | null;
@@ -151,6 +153,8 @@ export async function collectDevice(connectionId: number): Promise<CollectedData
connectionName: device.connection_name,
protocol: device.protocol,
companyCode: device.company_code || "",
host: device.host || null,
port: device.port ?? null,
timestamp: new Date().toISOString(),
plcState: "disconnected",
errorMessage: null,
@@ -0,0 +1,206 @@
/**
* Edge Device Config Reporter
*
* 파이프라인의 pipeline_device_connections + pipeline_equipment 정보를
* IDC TimescaleDB 의 edge_device_config_1 테이블에 주기적으로 적재한다.
*
* 디지털 트윈은 엣지 서버에 직접 접속할 수 없으므로, 장비별 IP/Protocol
* 메타데이터를 텔레메트리와 같은 경로(IDC TimescaleDB)로 흘려야 한다.
*
* 환경변수 (edgeStatusReporter.ts 와 공유):
* PIPELINE_EDGE_REPORTER=true 기동 시 시작
* PIPELINE_EDGE_TARGET_DB_ID=3 external_db_connections.id (IDC TimescaleDB)
* PIPELINE_EDGE_ID=edge-0f4d04ed
* PIPELINE_EDGE_COMPANY_ID=spifox
* PIPELINE_EDGE_DEVICE_CONFIG_TABLE=edge_device_config_1
* PIPELINE_EDGE_DEVICE_CONFIG_INTERVAL_SEC=300 (기본 5분)
*
* 적재 주기:
* 기동 시 1회 + intervalSec 마다 1회. 변경 빈도가 낮으므로 5분이 기본값.
* 디지털 트윈 백엔드는 DISTINCT ON (equipment_code) 으로 최신 1행만 조회.
*/
import { logger } from "../../utils/logger";
import { query } from "../../database/db";
import { executeExternalQuery } from "../externalDbHelper";
interface ConfigReporterConfig {
targetDbId: number;
edgeId: string;
companyId: string;
table: string;
intervalSec: number;
}
let cfg: ConfigReporterConfig | null = null;
let timer: NodeJS.Timeout | null = null;
function loadConfig(): ConfigReporterConfig | null {
if (process.env.PIPELINE_EDGE_REPORTER !== "true") return null;
const targetDbId = Number(process.env.PIPELINE_EDGE_TARGET_DB_ID);
const edgeId = process.env.PIPELINE_EDGE_ID;
const companyId = process.env.PIPELINE_EDGE_COMPANY_ID;
if (!Number.isFinite(targetDbId) || !edgeId || !companyId) return null;
return {
targetDbId,
edgeId,
companyId,
table:
process.env.PIPELINE_EDGE_DEVICE_CONFIG_TABLE || "edge_device_config_1",
intervalSec:
Number(process.env.PIPELINE_EDGE_DEVICE_CONFIG_INTERVAL_SEC) || 300,
};
}
interface PipelineConnRow {
id: number;
equipment_id: number | null;
equipment_code: string | null;
equipment_name: string | null;
connection_name: string;
protocol: string;
host: string;
port: number;
polling_interval_ms: number | null;
timeout_ms: number | null;
retry_count: number | null;
status: string | null;
last_test_date: Date | null;
last_test_result: string | null;
last_test_message: string | null;
protocol_config: unknown;
company_code: string | null;
}
async function fetchActiveConnections(
companyCode: string
): Promise<PipelineConnRow[]> {
return await query<PipelineConnRow>(
`SELECT d.id,
d.equipment_id,
e.equipment_code,
e.equipment_name,
d.connection_name,
d.protocol,
d.host,
d.port,
d.polling_interval_ms,
d.timeout_ms,
d.retry_count,
d.status,
d.last_test_date,
d.last_test_result,
d.last_test_message,
d.protocol_config,
d.company_code
FROM pipeline_device_connections d
LEFT JOIN pipeline_equipment e ON d.equipment_id = e.id
WHERE d.is_active = 'Y'
AND ($1 = '*' OR d.company_code = $1)
AND e.equipment_code IS NOT NULL`,
[companyCode]
);
}
async function writeOneSnapshot(): Promise<void> {
if (!cfg) return;
let rows: PipelineConnRow[] = [];
try {
rows = await fetchActiveConnections(cfg.companyId);
} catch (err) {
logger.warn(
`[EdgeDeviceConfigReporter] 연결 조회 실패: ${(err as Error).message}`
);
return;
}
if (rows.length === 0) {
logger.debug(
`[EdgeDeviceConfigReporter] 적재 대상 연결 없음 (company=${cfg.companyId})`
);
return;
}
let ok = 0;
let fail = 0;
for (const r of rows) {
try {
await executeExternalQuery(
cfg.targetDbId,
`INSERT INTO ${cfg.table}
(time, company_id, edge_id, equipment_code, equipment_name,
connection_id, connection_name, protocol, host, port,
polling_interval_ms, timeout_ms, retry_count, status,
last_test_date, last_test_result, last_test_message,
protocol_config, metadata)
VALUES (NOW(), $1, $2, $3, $4,
$5, $6, $7, $8, $9,
$10, $11, $12, $13,
$14, $15, $16,
$17::jsonb, $18::jsonb)`,
[
cfg.companyId,
cfg.edgeId,
r.equipment_code,
r.equipment_name,
r.id,
r.connection_name,
r.protocol,
r.host,
r.port,
r.polling_interval_ms ?? null,
r.timeout_ms ?? null,
r.retry_count ?? null,
r.status ?? null,
r.last_test_date ?? null,
r.last_test_result ?? null,
r.last_test_message ?? null,
r.protocol_config ? JSON.stringify(r.protocol_config) : null,
JSON.stringify({ source: "pipeline-backend" }),
]
);
ok += 1;
} catch (err) {
fail += 1;
logger.warn(
`[EdgeDeviceConfigReporter] INSERT 실패 (${r.equipment_code}): ${
(err as Error).message
}`
);
}
}
logger.info(
`[EdgeDeviceConfigReporter] 적재 완료: ok=${ok} fail=${fail} total=${rows.length}`
);
}
export async function startEdgeDeviceConfigReporter(): Promise<void> {
cfg = loadConfig();
if (!cfg) {
logger.info(
"[EdgeDeviceConfigReporter] 비활성 (PIPELINE_EDGE_REPORTER!=true 또는 환경변수 누락)"
);
return;
}
logger.info(
`[EdgeDeviceConfigReporter] 시작 target_db=${cfg.targetDbId} edge=${cfg.edgeId} ` +
`table=${cfg.table} interval=${cfg.intervalSec}s`
);
// 기동 시 1회 즉시 적재
await writeOneSnapshot();
timer = setInterval(() => {
writeOneSnapshot().catch(() => undefined);
}, cfg.intervalSec * 1000);
}
export async function stopEdgeDeviceConfigReporter(): Promise<void> {
if (timer) {
clearInterval(timer);
timer = null;
}
cfg = null;
}
export function isEdgeDeviceConfigReporterActive(): boolean {
return cfg !== null;
}