fix(pipeline): XGT 프로토콜/이벤트 리포터/라우트 순서 수정
XGT FEnet 클라이언트
- 응답 data_length offset [12] → [16] (요청 시에만 [12,16] 둘 다 씀; 응답은 PLC가 [12:14]를 CPU 정보로 덮음)
- socket.setTimeout idle 타임아웃 제거 → connect 전용 수동 타이머 + setKeepAlive(10s). 폴링 간격(5s)마다 재연결되던 문제 해결
Edge 이벤트 리포터
- edgeStatusReporter.ts 추가: 60초 간격 edge_status_1 하트비트 + edge_events_1 이벤트 기록
- 기동/종료 이벤트 + PLC 상태 전이(connected/disconnected/error) 자동 기록
- PIPELINE_EDGE_* env 로 edge_id/company_id/UUID/table/interval 주입
edge_telemetry metadata 포맷 교정
- migrated_at → forwarded_at, _pipeline 추가 블록 제거
- 프로덕션 원본 스키마와 완전 호환: {priority, device_id(UUID), forwarded_at}
라우트 순서 버그 수정
- pipelineDeviceConnectionRoutes.ts: /target-databases* 가 /:id 뒤에 있어 /:id 가 먼저 매칭됨 → UI 저장 DB 드롭다운 비어있던 문제
- 정적 경로를 /:id 위로 이동
프론트 API URL 해석 일반화
- NEXT_PUBLIC_API_URL 가 localhost인데 브라우저는 원격이면 env 무시하고 현재 origin의 :8080 사용 → 엣지 원격 접속 시 API 연결 보장
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
+69
-2
@@ -40,22 +40,30 @@ process.on("uncaughtException", (error: Error) => {
|
||||
});
|
||||
|
||||
// SIGTERM 시그널 처리 (Docker/Kubernetes 환경)
|
||||
process.on("SIGTERM", () => {
|
||||
process.on("SIGTERM", async () => {
|
||||
logger.info("📴 SIGTERM 시그널 수신, graceful shutdown 시작...");
|
||||
const { stopAiAssistant } = require("./utils/startAiAssistant");
|
||||
stopAiAssistant();
|
||||
const { imapConnectionPool } = require("./services/imapConnectionPool");
|
||||
imapConnectionPool.destroyAll();
|
||||
try {
|
||||
const { stopEdgeReporter } = require("./services/collector/edgeStatusReporter");
|
||||
await stopEdgeReporter();
|
||||
} catch {}
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
// SIGINT 시그널 처리 (Ctrl+C)
|
||||
process.on("SIGINT", () => {
|
||||
process.on("SIGINT", async () => {
|
||||
logger.info("📴 SIGINT 시그널 수신, graceful shutdown 시작...");
|
||||
const { stopAiAssistant } = require("./utils/startAiAssistant");
|
||||
stopAiAssistant();
|
||||
const { imapConnectionPool } = require("./services/imapConnectionPool");
|
||||
imapConnectionPool.destroyAll();
|
||||
try {
|
||||
const { stopEdgeReporter } = require("./services/collector/edgeStatusReporter");
|
||||
await stopEdgeReporter();
|
||||
} catch {}
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
@@ -87,6 +95,9 @@ import dataRoutes from "./routes/dataRoutes";
|
||||
import testButtonDataflowRoutes from "./routes/testButtonDataflowRoutes";
|
||||
import externalDbConnectionRoutes from "./routes/externalDbConnectionRoutes";
|
||||
import externalRestApiConnectionRoutes from "./routes/externalRestApiConnectionRoutes";
|
||||
import centralForwarderRoutes from "./routes/centralForwarderRoutes";
|
||||
import equipmentStateRoutes from "./routes/equipmentStateRoutes";
|
||||
import automationDashboardRoutes from "./routes/automationDashboardRoutes";
|
||||
import multiConnectionRoutes from "./routes/multiConnectionRoutes";
|
||||
import screenFileRoutes from "./routes/screenFileRoutes";
|
||||
//import dbTypeCategoryRoutes from "./routes/dbTypeCategoryRoutes";
|
||||
@@ -327,6 +338,9 @@ app.use("/api/data", dataRoutes);
|
||||
app.use("/api/test-button-dataflow", testButtonDataflowRoutes);
|
||||
app.use("/api/external-db-connections", externalDbConnectionRoutes);
|
||||
app.use("/api/external-rest-api-connections", externalRestApiConnectionRoutes);
|
||||
app.use("/api/central-forwarder", centralForwarderRoutes);
|
||||
app.use("/api/equipment-state", equipmentStateRoutes);
|
||||
app.use("/api/automation-dashboard", automationDashboardRoutes);
|
||||
app.use("/api/multi-connection", multiConnectionRoutes);
|
||||
app.use("/api/screen-files", screenFileRoutes);
|
||||
// app.use("/api/batch-configs", batchRoutes); // 레거시 → batchManagementRoutes로 통합
|
||||
@@ -399,6 +413,10 @@ app.use("/api/ai-knowledge", knowledgeRoutes); // AI 지식 파일 라이브러
|
||||
app.use("/api/ai-schedules", aiScheduleRoutes); // AI 스케줄러
|
||||
app.use("/api/ai/v1", aiProxyRoutes); // AI 엔진 (자체 LLM 클라이언트)
|
||||
app.use("/api/pipeline-device-connections", pipelineDeviceConnectionRoutes); // 파이프라인 장비 연결
|
||||
|
||||
// Fleet Management (엣지 디바이스 원격 관리)
|
||||
import fleetRoutes from "./fleet/fleetRoutes";
|
||||
app.use("/api/fleet", fleetRoutes);
|
||||
app.use("/api/vehicle", vehicleTripRoutes); // 차량 운행 이력 관리
|
||||
app.use("/api/approval", approvalRoutes); // 결재 시스템
|
||||
app.use("/api/user-mail", userMailRoutes); // 사용자 메일 계정
|
||||
@@ -458,6 +476,15 @@ const server = app.listen(PORT, HOST, async () => {
|
||||
logger.error("❌ Socket.IO initialization failed:", error);
|
||||
}
|
||||
|
||||
// Fleet 모듈 초기화 (내장 MQTT 브로커 + 디바이스 관리)
|
||||
try {
|
||||
const { initializeFleet } = await import("./fleet");
|
||||
await initializeFleet();
|
||||
logger.info("🚁 Fleet 모듈 초기화 완료");
|
||||
} catch (error) {
|
||||
logger.error("❌ Fleet 초기화 실패:", error);
|
||||
}
|
||||
|
||||
// 비동기 초기화 작업 (에러가 발생해도 서버는 유지)
|
||||
initializeServices().catch(err => {
|
||||
logger.error('❌ 서비스 초기화 중 치명적 에러 발생:', err);
|
||||
@@ -482,6 +509,10 @@ async function initializeServices() {
|
||||
runMenuRenameMigration,
|
||||
runKnowledgeLibraryMigration,
|
||||
runPipelineDeviceMigration,
|
||||
runCentralForwarderMigration,
|
||||
runProtocolConstraintMigration,
|
||||
runDataTargetMigration,
|
||||
runEdgeDeviceIdentifierMigration,
|
||||
} = await import("./database/runMigration");
|
||||
|
||||
await runDashboardMigration();
|
||||
@@ -497,6 +528,16 @@ async function initializeServices() {
|
||||
await runMenuRenameMigration();
|
||||
await runKnowledgeLibraryMigration();
|
||||
await runPipelineDeviceMigration();
|
||||
await runCentralForwarderMigration();
|
||||
await runProtocolConstraintMigration();
|
||||
await runDataTargetMigration();
|
||||
await runEdgeDeviceIdentifierMigration();
|
||||
|
||||
// 기본 데이터 소스 연결 시드 (IDC 엣지 관련 연결)
|
||||
const { seedDefaultDataSources } = await import(
|
||||
"./database/seedDataSources"
|
||||
);
|
||||
await seedDefaultDataSources();
|
||||
} catch (error) {
|
||||
logger.error(`❌ 마이그레이션 실패:`, error);
|
||||
}
|
||||
@@ -515,6 +556,32 @@ async function initializeServices() {
|
||||
const { initSmartFactoryScheduler } = await import("./utils/smartFactoryLog");
|
||||
await initSmartFactoryScheduler();
|
||||
logger.info(`🏭 스마트공장 로그 스케줄러가 시작되었습니다.`);
|
||||
|
||||
// 중앙 MQTT 포워더 초기화 (is_enabled='Y' 설정 자동 시작)
|
||||
const { startAllEnabled } = await import(
|
||||
"./services/collector/centralMqttForwarder"
|
||||
);
|
||||
await startAllEnabled();
|
||||
logger.info(`📡 중앙 MQTT 포워더가 시작되었습니다.`);
|
||||
|
||||
// 장비 수집기 자동 시작 (엣지 배포 시 env=true로 켜기)
|
||||
if (process.env.ENABLE_AUTO_COLLECTOR === "true") {
|
||||
const { startAllActivePolling } = await import(
|
||||
"./services/collector/deviceCollectorService"
|
||||
);
|
||||
const started = await startAllActivePolling();
|
||||
logger.info(`🔌 장비 수집기 자동 시작: ${started}개 연결`);
|
||||
} else {
|
||||
logger.info(
|
||||
`🔌 장비 수집기 자동 시작 OFF (ENABLE_AUTO_COLLECTOR=true로 켜세요)`
|
||||
);
|
||||
}
|
||||
|
||||
// Edge Reporter (edge_status_1 하트비트 + edge_events_1)
|
||||
const { startEdgeReporter } = await import(
|
||||
"./services/collector/edgeStatusReporter"
|
||||
);
|
||||
await startEdgeReporter();
|
||||
} catch (error) {
|
||||
logger.error(`❌ 배치 스케줄러 초기화 실패:`, error);
|
||||
}
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
import { Router, Response } from "express";
|
||||
import { PipelineDeviceConnectionService } from "../services/pipelineDeviceConnectionService";
|
||||
import { PROTOCOL_OPTIONS, PROTOCOL_DEFAULTS, TAG_DATA_TYPE_OPTIONS, ADDRESS_TYPE_OPTIONS } from "../types/pipelineDeviceTypes";
|
||||
import { PROTOCOL_OPTIONS, PROTOCOL_DEFAULTS, TAG_DATA_TYPE_OPTIONS, ADDRESS_TYPE_OPTIONS, BYTE_ORDER_OPTIONS } from "../types/pipelineDeviceTypes";
|
||||
import { authenticateToken } from "../middleware/authMiddleware";
|
||||
import { AuthenticatedRequest } from "../types/auth";
|
||||
import { query } from "../database/db";
|
||||
|
||||
const router = Router();
|
||||
|
||||
@@ -13,10 +14,45 @@ router.use(authenticateToken);
|
||||
router.get("/protocols", async (req: AuthenticatedRequest, res: Response) => {
|
||||
res.json({
|
||||
success: true,
|
||||
data: { protocols: PROTOCOL_OPTIONS, defaults: PROTOCOL_DEFAULTS, tagDataTypes: TAG_DATA_TYPE_OPTIONS, addressTypes: ADDRESS_TYPE_OPTIONS },
|
||||
data: {
|
||||
protocols: PROTOCOL_OPTIONS,
|
||||
defaults: PROTOCOL_DEFAULTS,
|
||||
tagDataTypes: TAG_DATA_TYPE_OPTIONS,
|
||||
addressTypes: ADDRESS_TYPE_OPTIONS,
|
||||
byteOrders: BYTE_ORDER_OPTIONS,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
// ===== 장비 목록 (pipeline_equipment) - 드롭다운용 =====
|
||||
router.get("/equipment-list", async (req: AuthenticatedRequest, res: Response) => {
|
||||
try {
|
||||
const userCompanyCode = req.user?.companyCode;
|
||||
const search = (req.query.search as string) || "";
|
||||
const params: any[] = [];
|
||||
let where = "WHERE is_active = 'Y'";
|
||||
|
||||
if (userCompanyCode && userCompanyCode !== "*") {
|
||||
params.push(userCompanyCode);
|
||||
where += ` AND (company_code = $${params.length} OR company_code = '공통')`;
|
||||
}
|
||||
if (search.trim()) {
|
||||
params.push(`%${search.trim()}%`);
|
||||
where += ` AND (equipment_name ILIKE $${params.length} OR equipment_code ILIKE $${params.length})`;
|
||||
}
|
||||
|
||||
const rows = await query<any>(
|
||||
`SELECT id, equipment_code, equipment_name, equipment_type, manufacturer, model, serial_number
|
||||
FROM pipeline_equipment ${where}
|
||||
ORDER BY equipment_name LIMIT 500`,
|
||||
params,
|
||||
);
|
||||
res.json({ success: true, data: rows });
|
||||
} catch (e: any) {
|
||||
res.status(500).json({ success: false, message: e.message });
|
||||
}
|
||||
});
|
||||
|
||||
// ===== 태그 수정/삭제 (정적 경로 우선) =====
|
||||
router.put("/tags/:tagId", async (req: AuthenticatedRequest, res: Response) => {
|
||||
try {
|
||||
@@ -66,6 +102,42 @@ router.get("/", async (req: AuthenticatedRequest, res: Response) => {
|
||||
}
|
||||
});
|
||||
|
||||
// ===== Target DB 조회 (정적 경로는 반드시 /:id 위에) =====
|
||||
router.get("/target-databases", async (req: AuthenticatedRequest, res: Response) => {
|
||||
try {
|
||||
const { listTargetDatabases } = await import("../services/targetDbIntrospection");
|
||||
const companyCode =
|
||||
req.user?.companyCode === "*" ? (req.query.company_code as string | undefined) : req.user?.companyCode;
|
||||
const data = await listTargetDatabases(companyCode);
|
||||
res.json({ success: true, data });
|
||||
} catch (e: any) {
|
||||
res.status(500).json({ success: false, message: e.message });
|
||||
}
|
||||
});
|
||||
|
||||
router.get("/target-databases/:dbId/tables", async (req: AuthenticatedRequest, res: Response) => {
|
||||
try {
|
||||
const { listTables } = await import("../services/targetDbIntrospection");
|
||||
const data = await listTables(Number(req.params.dbId));
|
||||
res.json({ success: true, data });
|
||||
} catch (e: any) {
|
||||
res.status(500).json({ success: false, message: e.message });
|
||||
}
|
||||
});
|
||||
|
||||
router.get(
|
||||
"/target-databases/:dbId/tables/:tableName/columns",
|
||||
async (req: AuthenticatedRequest, res: Response) => {
|
||||
try {
|
||||
const { listColumns } = await import("../services/targetDbIntrospection");
|
||||
const data = await listColumns(Number(req.params.dbId), req.params.tableName);
|
||||
res.json({ success: true, data });
|
||||
} catch (e: any) {
|
||||
res.status(500).json({ success: false, message: e.message });
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
router.get("/:id", async (req: AuthenticatedRequest, res: Response) => {
|
||||
try {
|
||||
const result = await PipelineDeviceConnectionService.getConnectionById(parseInt(req.params.id));
|
||||
@@ -144,4 +216,178 @@ router.post("/:id/tags", async (req: AuthenticatedRequest, res: Response) => {
|
||||
}
|
||||
});
|
||||
|
||||
// ===== 태그 컬럼 매핑 일괄 업데이트 =====
|
||||
router.put("/:id/tag-column-mapping", async (req: AuthenticatedRequest, res: Response) => {
|
||||
try {
|
||||
const connectionId = parseInt(req.params.id);
|
||||
const { mapping } = req.body as { mapping: Array<{ tag_id: number; target_column_name: string | null }> };
|
||||
if (!Array.isArray(mapping)) {
|
||||
return res.status(400).json({ success: false, message: "mapping 배열 필요" });
|
||||
}
|
||||
const { query } = await import("../database/db");
|
||||
for (const m of mapping) {
|
||||
await query(
|
||||
`UPDATE pipeline_tag_mappings SET target_column_name = $1, updated_at = NOW()
|
||||
WHERE id = $2 AND connection_id = $3`,
|
||||
[m.target_column_name, m.tag_id, connectionId]
|
||||
);
|
||||
}
|
||||
res.json({ success: true, updated: mapping.length });
|
||||
} catch (e: any) {
|
||||
res.status(500).json({ success: false, message: e.message });
|
||||
}
|
||||
});
|
||||
|
||||
// ===== 수동 1회 수집 트리거 (변경 사항 즉시 검증용) =====
|
||||
router.post("/:id/collect-once", async (req: AuthenticatedRequest, res: Response) => {
|
||||
try {
|
||||
const connectionId = parseInt(req.params.id);
|
||||
const { collectDevice, writeToTargetDb } = await import(
|
||||
"../services/collector/deviceCollectorService"
|
||||
);
|
||||
const { upsertEquipmentState } = await import(
|
||||
"../services/collector/equipmentStateService"
|
||||
);
|
||||
|
||||
const data = await collectDevice(connectionId);
|
||||
|
||||
// 수집 성공 시 DB 저장 (Long/Wide 자동 감지)
|
||||
if (data.plcState === "connected" && Object.keys(data.tags).length > 0) {
|
||||
await upsertEquipmentState(data).catch(() => {});
|
||||
await writeToTargetDb(data).catch(err =>
|
||||
res.locals.targetDbError = (err as Error).message
|
||||
);
|
||||
}
|
||||
|
||||
return res.json({
|
||||
success: true,
|
||||
data: {
|
||||
plc_state: data.plcState,
|
||||
error_message: data.errorMessage,
|
||||
tags_count: Object.keys(data.tags).length,
|
||||
tags: data.tags,
|
||||
target_db_error: res.locals.targetDbError,
|
||||
timestamp: data.timestamp,
|
||||
},
|
||||
});
|
||||
} catch (e: any) {
|
||||
return res.status(500).json({ success: false, message: e.message });
|
||||
}
|
||||
});
|
||||
|
||||
// ===== 훅 체인 테스트 (수정 화면에서 값 넣고 실행) =====
|
||||
router.post("/:id/test-chain", async (req: AuthenticatedRequest, res: Response) => {
|
||||
try {
|
||||
const connectionId = parseInt(req.params.id);
|
||||
const { tag_name, raw_value, save_to_db } = req.body as {
|
||||
tag_name: string;
|
||||
raw_value: unknown;
|
||||
save_to_db?: boolean;
|
||||
};
|
||||
if (!tag_name) {
|
||||
return res.status(400).json({ success: false, message: "tag_name 필수" });
|
||||
}
|
||||
|
||||
const { getHooksForConnection } = await import(
|
||||
"../services/collector/scriptCache"
|
||||
);
|
||||
const { executeHook } = await import("../services/collector/pythonHookRunner");
|
||||
|
||||
const steps: Array<{
|
||||
stage: string;
|
||||
hook_id?: number;
|
||||
hook_name?: string;
|
||||
input: unknown;
|
||||
output: unknown;
|
||||
error?: string;
|
||||
duration_ms?: number;
|
||||
}> = [];
|
||||
|
||||
// 1. transform 체인
|
||||
let value = raw_value;
|
||||
const transforms = await getHooksForConnection(connectionId, "transform");
|
||||
for (const h of transforms) {
|
||||
const r = await executeHook({
|
||||
hook_type: "transform",
|
||||
code: h.code,
|
||||
tag_name,
|
||||
raw_value: value,
|
||||
context: { hook_id: h.id, hook_name: h.script_name },
|
||||
timeout_ms: h.timeout_ms,
|
||||
});
|
||||
steps.push({
|
||||
stage: "transform",
|
||||
hook_id: h.id,
|
||||
hook_name: h.script_name,
|
||||
input: value,
|
||||
output: r.value,
|
||||
error: r.error,
|
||||
duration_ms: r.duration_ms,
|
||||
});
|
||||
if (!r.success) break;
|
||||
value = r.value;
|
||||
}
|
||||
|
||||
// 2. filter
|
||||
let kept = true;
|
||||
const filters = await getHooksForConnection(connectionId, "filter");
|
||||
for (const h of filters) {
|
||||
const r = await executeHook({
|
||||
hook_type: "filter",
|
||||
code: h.code,
|
||||
tag_name,
|
||||
value,
|
||||
context: { hook_id: h.id },
|
||||
timeout_ms: h.timeout_ms,
|
||||
});
|
||||
steps.push({
|
||||
stage: "filter",
|
||||
hook_id: h.id,
|
||||
hook_name: h.script_name,
|
||||
input: value,
|
||||
output: r.skip ? "DROP" : "PASS",
|
||||
error: r.error,
|
||||
duration_ms: r.duration_ms,
|
||||
});
|
||||
if (r.success && r.skip) {
|
||||
kept = false;
|
||||
}
|
||||
}
|
||||
|
||||
// 3. DB 저장 옵션
|
||||
let saved = false;
|
||||
let saved_where: string | null = null;
|
||||
if (save_to_db && kept) {
|
||||
const { upsertEquipmentState } = await import(
|
||||
"../services/collector/equipmentStateService"
|
||||
);
|
||||
await upsertEquipmentState({
|
||||
connectionId,
|
||||
connectionName: "test",
|
||||
protocol: "",
|
||||
companyCode: "*",
|
||||
timestamp: new Date().toISOString(),
|
||||
plcState: "connected",
|
||||
errorMessage: null,
|
||||
tags: { [tag_name]: value as any },
|
||||
});
|
||||
saved = true;
|
||||
saved_where = "equipment_current_state";
|
||||
}
|
||||
|
||||
return res.json({
|
||||
success: true,
|
||||
data: {
|
||||
steps,
|
||||
final_value: value,
|
||||
filter_kept: kept,
|
||||
saved,
|
||||
saved_where,
|
||||
},
|
||||
});
|
||||
} catch (e: any) {
|
||||
return res.status(500).json({ success: false, message: e.message });
|
||||
}
|
||||
});
|
||||
|
||||
export default router;
|
||||
|
||||
@@ -12,8 +12,16 @@ import { query } from "../../database/db";
|
||||
import { logger } from "../../utils/logger";
|
||||
import { XgtClient, getXgtClient, closeAllXgtConnections } from "./protocols/xgtClient";
|
||||
import { ModbusClient } from "./protocols/modbusClient";
|
||||
import { OpcuaClient } from "./protocols/opcuaClient";
|
||||
import { S7Client } from "./protocols/s7Client";
|
||||
import type { XgtTagConfig, XgtReadResult } from "./protocols/xgtClient";
|
||||
import type { ModbusTagConfig, ModbusReadResult } from "./protocols/modbusClient";
|
||||
import type { OpcuaTagConfig } from "./protocols/opcuaClient";
|
||||
import type { S7TagConfig } from "./protocols/s7Client";
|
||||
import { upsertEquipmentState } from "./equipmentStateService";
|
||||
import { ingest as forwardToCentralMqtt } from "./centralMqttForwarder";
|
||||
import { getHooksForConnection } from "./scriptCache";
|
||||
import { executeHook } from "./pythonHookRunner";
|
||||
|
||||
// ─── 타입 ──────────────────────────────────────────
|
||||
|
||||
@@ -60,7 +68,8 @@ export interface CollectedData {
|
||||
// ─── 폴링 타이머 관리 ─────────────────────────────
|
||||
|
||||
const pollingTimers = new Map<number, NodeJS.Timeout>();
|
||||
const clientCache = new Map<number, XgtClient | ModbusClient>();
|
||||
const clientCache = new Map<number, XgtClient | ModbusClient | OpcuaClient | S7Client>();
|
||||
const lastPlcState = new Map<number, CollectedData["plcState"]>();
|
||||
|
||||
// ─── 오프라인 버퍼 (메모리 기반, 추후 SQLite 확장 가능) ───
|
||||
|
||||
@@ -141,7 +150,8 @@ export async function collectDevice(connectionId: number): Promise<CollectedData
|
||||
|
||||
try {
|
||||
switch (device.protocol) {
|
||||
case "PLC_ETHERNET": {
|
||||
case "PLC_ETHERNET":
|
||||
case "LS_XGT": {
|
||||
// LS XGT FEnet
|
||||
const xgtPort = device.port || 2004;
|
||||
const client = getXgtClient(device.host, xgtPort, device.timeout_ms || 3000);
|
||||
@@ -177,6 +187,65 @@ export async function collectDevice(connectionId: number): Promise<CollectedData
|
||||
break;
|
||||
}
|
||||
|
||||
case "OPCUA": {
|
||||
const endpointUrl =
|
||||
(device.protocol_config?.endpoint_url as string) ||
|
||||
`opc.tcp://${device.host}:${device.port || 4840}`;
|
||||
const securityMode =
|
||||
((device.protocol_config?.security_mode as string) as "None" | "Sign" | "SignAndEncrypt") || "None";
|
||||
const username = device.protocol_config?.username as string | undefined;
|
||||
const password = device.protocol_config?.password as string | undefined;
|
||||
|
||||
let client = clientCache.get(device.id) as OpcuaClient;
|
||||
if (!client || !client.isConnected()) {
|
||||
client = new OpcuaClient(endpointUrl, securityMode, username, password, device.timeout_ms || 5000);
|
||||
await client.connect();
|
||||
clientCache.set(device.id, client);
|
||||
}
|
||||
|
||||
const opcuaTags: OpcuaTagConfig[] = tags.map(t => ({
|
||||
tagName: t.tag_name,
|
||||
address: t.address,
|
||||
dataType: t.tag_data_type,
|
||||
scaleFactor: t.scale_factor ?? 1,
|
||||
offsetValue: t.offset_value ?? 0,
|
||||
}));
|
||||
const readings = await client.readTags(opcuaTags);
|
||||
|
||||
for (const r of readings) {
|
||||
result.tags[r.tagName] = r.quality === "good" ? r.value : null;
|
||||
}
|
||||
result.plcState = "connected";
|
||||
break;
|
||||
}
|
||||
|
||||
case "S7":
|
||||
case "SIEMENS_S7": {
|
||||
const rack = (device.protocol_config?.rack as number) ?? 0;
|
||||
const slot = (device.protocol_config?.slot as number) ?? 1;
|
||||
let client = clientCache.get(device.id) as S7Client;
|
||||
if (!client || !client.isConnected()) {
|
||||
client = new S7Client(device.host, rack, slot, device.port || 102, device.timeout_ms || 5000);
|
||||
await client.connect();
|
||||
clientCache.set(device.id, client);
|
||||
}
|
||||
|
||||
const s7Tags: S7TagConfig[] = tags.map(t => ({
|
||||
tagName: t.tag_name,
|
||||
address: t.address,
|
||||
dataType: t.tag_data_type,
|
||||
scaleFactor: t.scale_factor ?? 1,
|
||||
offsetValue: t.offset_value ?? 0,
|
||||
}));
|
||||
const readings = await client.readTags(s7Tags);
|
||||
|
||||
for (const r of readings) {
|
||||
result.tags[r.tagName] = r.quality === "good" ? r.value : null;
|
||||
}
|
||||
result.plcState = "connected";
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
throw new Error(`지원하지 않는 프로토콜: ${device.protocol}`);
|
||||
}
|
||||
@@ -198,24 +267,157 @@ export async function collectDevice(connectionId: number): Promise<CollectedData
|
||||
).catch(() => {});
|
||||
}
|
||||
|
||||
// 수집 후 Python 훅 실행 (transform → filter → derived_tags 순)
|
||||
if (result.plcState === "connected") {
|
||||
await applyHooks(result);
|
||||
}
|
||||
|
||||
// 상태 전이 시 edge_events 기록
|
||||
const prev = lastPlcState.get(device.id);
|
||||
if (prev !== result.plcState) {
|
||||
lastPlcState.set(device.id, result.plcState);
|
||||
if (prev !== undefined) {
|
||||
const { recordEdgeEvent } = await import("./edgeStatusReporter");
|
||||
if (result.plcState === "connected") {
|
||||
await recordEdgeEvent(
|
||||
"collector",
|
||||
"plc_connected",
|
||||
`[${device.connection_name}] PLC 연결 복구`,
|
||||
3,
|
||||
{ connection_id: device.id, protocol: device.protocol, host: device.host },
|
||||
);
|
||||
} else if (result.plcState === "error") {
|
||||
await recordEdgeEvent(
|
||||
"collector",
|
||||
"plc_error",
|
||||
`[${device.connection_name}] PLC 수집 오류: ${result.errorMessage ?? ""}`,
|
||||
5,
|
||||
{ connection_id: device.id, protocol: device.protocol, host: device.host },
|
||||
);
|
||||
} else {
|
||||
await recordEdgeEvent(
|
||||
"collector",
|
||||
"plc_disconnected",
|
||||
`[${device.connection_name}] PLC 연결 해제`,
|
||||
4,
|
||||
{ connection_id: device.id, protocol: device.protocol, host: device.host },
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// ─── Python 훅 적용 ────────────────────────────────
|
||||
|
||||
async function applyHooks(data: CollectedData): Promise<void> {
|
||||
try {
|
||||
// 1. transform 훅: 각 태그 값 변환
|
||||
const transforms = await getHooksForConnection(data.connectionId, "transform");
|
||||
if (transforms.length > 0) {
|
||||
for (const [tagName, rawValue] of Object.entries(data.tags)) {
|
||||
let value = rawValue;
|
||||
for (const hook of transforms) {
|
||||
const res = await executeHook({
|
||||
hook_type: "transform",
|
||||
code: hook.code,
|
||||
tag_name: tagName,
|
||||
raw_value: value,
|
||||
context: { hook_id: hook.id, hook_name: hook.script_name },
|
||||
timeout_ms: hook.timeout_ms,
|
||||
});
|
||||
if (!res.success) {
|
||||
logger.warn(
|
||||
`[Collector] transform 훅 실패 (${hook.script_name}, tag=${tagName}): ${res.error}`
|
||||
);
|
||||
break;
|
||||
}
|
||||
value = res.value as typeof rawValue;
|
||||
}
|
||||
data.tags[tagName] = value;
|
||||
}
|
||||
}
|
||||
|
||||
// 2. filter 훅: False 반환 시 태그 제거
|
||||
const filters = await getHooksForConnection(data.connectionId, "filter");
|
||||
if (filters.length > 0) {
|
||||
for (const tagName of Object.keys(data.tags)) {
|
||||
let keep = true;
|
||||
for (const hook of filters) {
|
||||
const res = await executeHook({
|
||||
hook_type: "filter",
|
||||
code: hook.code,
|
||||
tag_name: tagName,
|
||||
value: data.tags[tagName],
|
||||
context: { hook_id: hook.id },
|
||||
timeout_ms: hook.timeout_ms,
|
||||
});
|
||||
if (!res.success) {
|
||||
logger.warn(`[Collector] filter 훅 실패 (${hook.script_name}): ${res.error}`);
|
||||
continue;
|
||||
}
|
||||
if (res.skip) {
|
||||
keep = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!keep) delete data.tags[tagName];
|
||||
}
|
||||
}
|
||||
|
||||
// 3. derived_tags 훅: 새 태그 추가
|
||||
const derived = await getHooksForConnection(data.connectionId, "derived_tags");
|
||||
for (const hook of derived) {
|
||||
const res = await executeHook({
|
||||
hook_type: "derived_tags",
|
||||
code: hook.code,
|
||||
device_data: { tags: data.tags, device_id: String(data.connectionId) },
|
||||
context: { hook_id: hook.id },
|
||||
timeout_ms: hook.timeout_ms,
|
||||
});
|
||||
if (res.success && res.derived) {
|
||||
Object.assign(data.tags, res.derived as Record<string, typeof data.tags[string]>);
|
||||
} else if (!res.success) {
|
||||
logger.warn(
|
||||
`[Collector] derived_tags 훅 실패 (${hook.script_name}): ${res.error}`
|
||||
);
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error(`[Collector] 훅 적용 중 오류: ${(err as Error).message}`);
|
||||
}
|
||||
}
|
||||
|
||||
// ─── 수집 결과 발행 ───────────────────────────────
|
||||
|
||||
async function publishData(data: CollectedData): Promise<void> {
|
||||
// 1. MQTT 발행
|
||||
// 1. 로컬 MQTT 발행 (UI 실시간 스트리밍용)
|
||||
if (mqttClient && mqttConfig) {
|
||||
try {
|
||||
const topic = `${mqttConfig.topic}/${data.companyCode}/${data.connectionId}`;
|
||||
mqttClient.publish(topic, JSON.stringify(data));
|
||||
} catch (err) {
|
||||
logger.warn(`[Collector] MQTT 발행 실패 — 재시도 큐에 추가`);
|
||||
logger.warn(`[Collector] 로컬 MQTT 발행 실패 — 재시도 큐에 추가`);
|
||||
if (retryQueue.length < MAX_RETRY_QUEUE) retryQueue.push(data);
|
||||
}
|
||||
}
|
||||
|
||||
// 2. DB 저장 (collected_data 테이블이 있으면)
|
||||
// 2. IDC 중앙 MQTT로 포워딩 (Pipeline이 엣지 역할 수행)
|
||||
try {
|
||||
await forwardToCentralMqtt(data);
|
||||
} catch (err) {
|
||||
logger.warn(`[Collector] IDC 포워딩 실패: ${(err as Error).message}`);
|
||||
}
|
||||
|
||||
// 3. 장비 현재값 스냅샷 업데이트
|
||||
try {
|
||||
await upsertEquipmentState(data);
|
||||
} catch (err) {
|
||||
logger.debug(`[Collector] 현재값 업데이트 실패: ${(err as Error).message}`);
|
||||
}
|
||||
|
||||
// 4. 시계열 원본 저장 (pipeline_collected_data)
|
||||
try {
|
||||
await query(
|
||||
`INSERT INTO pipeline_collected_data (connection_id, collected_at, plc_state, tag_values, error_message)
|
||||
@@ -226,6 +428,245 @@ async function publishData(data: CollectedData): Promise<void> {
|
||||
} catch {
|
||||
// 테이블이 없을 수 있음 — 무시
|
||||
}
|
||||
|
||||
// 5. 사용자 지정 외부 DB로 INSERT (옵션)
|
||||
try {
|
||||
await writeToTargetDb(data);
|
||||
} catch (err) {
|
||||
logger.warn(`[Collector] target DB 저장 실패: ${(err as Error).message}`);
|
||||
}
|
||||
}
|
||||
|
||||
// ─── 사용자 지정 외부 DB INSERT ────────────────────
|
||||
|
||||
// 타겟 테이블 스키마 캐시 (dbId:tableName → columnSet)
|
||||
const targetSchemaCache = new Map<string, Set<string>>();
|
||||
const TARGET_SCHEMA_CACHE_TTL_MS = 5 * 60 * 1000;
|
||||
const targetSchemaCacheExpiry = new Map<string, number>();
|
||||
|
||||
async function getTargetTableColumns(
|
||||
dbId: number,
|
||||
tableName: string
|
||||
): Promise<Set<string>> {
|
||||
const key = `${dbId}:${tableName}`;
|
||||
const expiry = targetSchemaCacheExpiry.get(key) ?? 0;
|
||||
if (Date.now() < expiry && targetSchemaCache.has(key)) {
|
||||
return targetSchemaCache.get(key)!;
|
||||
}
|
||||
const { listColumns } = await import("../targetDbIntrospection");
|
||||
const cols = await listColumns(dbId, tableName);
|
||||
const set = new Set(cols.map(c => c.column_name.toLowerCase()));
|
||||
targetSchemaCache.set(key, set);
|
||||
targetSchemaCacheExpiry.set(key, Date.now() + TARGET_SCHEMA_CACHE_TTL_MS);
|
||||
return set;
|
||||
}
|
||||
|
||||
/**
|
||||
* 타겟 테이블이 Long 포맷인지 감지.
|
||||
* (time, tag_name, value 컬럼 모두 존재하면 Long으로 간주)
|
||||
*/
|
||||
function isLongFormat(cols: Set<string>): boolean {
|
||||
return cols.has("tag_name") && cols.has("value");
|
||||
}
|
||||
|
||||
async function runTargetInsert(
|
||||
dbId: number,
|
||||
sql: string,
|
||||
values: unknown[]
|
||||
): Promise<void> {
|
||||
if (dbId === 0) {
|
||||
await query(sql, values);
|
||||
} else {
|
||||
const { executeExternalQuery } = await import("../externalDbHelper");
|
||||
await executeExternalQuery(dbId, sql, values);
|
||||
}
|
||||
}
|
||||
|
||||
export async function writeToTargetDb(data: CollectedData): Promise<void> {
|
||||
const connRows = await query<{
|
||||
target_db_connection_id: number | null;
|
||||
target_table_name: string | null;
|
||||
target_time_column: string | null;
|
||||
target_insert_mode: string | null;
|
||||
}>(
|
||||
`SELECT target_db_connection_id, target_table_name, target_time_column, target_insert_mode
|
||||
FROM pipeline_device_connections
|
||||
WHERE id = $1`,
|
||||
[data.connectionId]
|
||||
);
|
||||
if (!connRows.length) return;
|
||||
|
||||
const cfg = connRows[0];
|
||||
if (cfg.target_db_connection_id === null || cfg.target_db_connection_id === undefined) return;
|
||||
if (!cfg.target_table_name) return;
|
||||
const dbId = cfg.target_db_connection_id;
|
||||
const table = cfg.target_table_name;
|
||||
const timeCol = cfg.target_time_column || "timestamp";
|
||||
|
||||
const tagEntries = Object.entries(data.tags);
|
||||
if (tagEntries.length === 0) return;
|
||||
|
||||
// 타겟 테이블 컬럼 조회 (캐시)
|
||||
let targetCols: Set<string>;
|
||||
try {
|
||||
targetCols = await getTargetTableColumns(dbId, table);
|
||||
} catch (err) {
|
||||
throw new Error(
|
||||
`target 테이블 스키마 조회 실패 (${dbId}/${table}): ${(err as Error).message}`
|
||||
);
|
||||
}
|
||||
|
||||
if (isLongFormat(targetCols)) {
|
||||
// ─── Long 포맷: 태그당 1행 INSERT ─────────────────
|
||||
await writeLongFormat(dbId, table, timeCol, targetCols, data);
|
||||
} else {
|
||||
// ─── Wide 포맷: 1행에 다중 컬럼 (기존 방식) ───────
|
||||
await writeWideFormat(dbId, table, timeCol, data);
|
||||
}
|
||||
}
|
||||
|
||||
async function writeLongFormat(
|
||||
dbId: number,
|
||||
table: string,
|
||||
timeCol: string,
|
||||
cols: Set<string>,
|
||||
data: CollectedData
|
||||
): Promise<void> {
|
||||
const hasCompany = cols.has("company_id");
|
||||
const hasEdge = cols.has("edge_id");
|
||||
const hasQuality = cols.has("quality");
|
||||
const hasMetadata = cols.has("metadata");
|
||||
|
||||
// 연결 메타 정보 조회 (edge_identifier, device_identifier, company_id)
|
||||
const connMeta = await query<{
|
||||
edge_identifier: string | null;
|
||||
device_identifier: string | null;
|
||||
equipment_code: string | null;
|
||||
company_id_full: string | null;
|
||||
}>(
|
||||
`SELECT c.edge_identifier,
|
||||
c.device_identifier,
|
||||
e.equipment_code,
|
||||
COALESCE(NULLIF(c.company_code, '*'), '') AS company_id_full
|
||||
FROM pipeline_device_connections c
|
||||
LEFT JOIN pipeline_equipment e ON c.equipment_id = e.id
|
||||
WHERE c.id = $1`,
|
||||
[data.connectionId]
|
||||
);
|
||||
const meta = connMeta[0] || {} as any;
|
||||
|
||||
// edge_id / device_id 결정 (설정된 값 > equipment_code > fallback)
|
||||
const edgeIdOut =
|
||||
meta.edge_identifier ||
|
||||
`edge-conn-${data.connectionId}`;
|
||||
const deviceIdOut =
|
||||
meta.device_identifier ||
|
||||
meta.equipment_code ||
|
||||
`conn-${data.connectionId}`;
|
||||
const companyIdOut =
|
||||
meta.company_id_full || data.companyCode || "*";
|
||||
|
||||
// IDC edge_telemetry 원본 metadata 포맷 (실제 프로덕션 row 기준)
|
||||
// { priority, device_id:<UUID>, forwarded_at:<ISO> }
|
||||
const nowIso = new Date().toISOString();
|
||||
const baseMetadata: Record<string, unknown> = {
|
||||
priority: 2,
|
||||
device_id: deviceIdOut,
|
||||
forwarded_at: nowIso,
|
||||
};
|
||||
|
||||
const allCols: string[] = [timeCol, "tag_name", "value"];
|
||||
if (hasCompany) allCols.push("company_id");
|
||||
if (hasEdge) allCols.push("edge_id");
|
||||
if (hasQuality) allCols.push("quality");
|
||||
if (hasMetadata) allCols.push("metadata");
|
||||
|
||||
const rows: unknown[] = [];
|
||||
const placeholderRows: string[] = [];
|
||||
let idx = 1;
|
||||
|
||||
for (const [tagName, rawValue] of Object.entries(data.tags)) {
|
||||
const numericValue =
|
||||
typeof rawValue === "number"
|
||||
? rawValue
|
||||
: typeof rawValue === "boolean"
|
||||
? rawValue
|
||||
? 1
|
||||
: 0
|
||||
: rawValue !== null && rawValue !== undefined && !Number.isNaN(Number(rawValue))
|
||||
? Number(rawValue)
|
||||
: null;
|
||||
|
||||
const placeholders: string[] = [];
|
||||
rows.push(data.timestamp);
|
||||
placeholders.push(`$${idx++}`);
|
||||
rows.push(tagName);
|
||||
placeholders.push(`$${idx++}`);
|
||||
rows.push(numericValue);
|
||||
placeholders.push(`$${idx++}`);
|
||||
if (hasCompany) {
|
||||
rows.push(companyIdOut);
|
||||
placeholders.push(`$${idx++}`);
|
||||
}
|
||||
if (hasEdge) {
|
||||
rows.push(edgeIdOut);
|
||||
placeholders.push(`$${idx++}`);
|
||||
}
|
||||
if (hasQuality) {
|
||||
rows.push(rawValue === null || rawValue === undefined ? "bad" : "good");
|
||||
placeholders.push(`$${idx++}`);
|
||||
}
|
||||
if (hasMetadata) {
|
||||
rows.push(JSON.stringify(baseMetadata));
|
||||
placeholders.push(`$${idx++}::jsonb`);
|
||||
}
|
||||
placeholderRows.push(`(${placeholders.join(", ")})`);
|
||||
}
|
||||
|
||||
const sql = `INSERT INTO ${table} (${allCols.join(", ")}) VALUES ${placeholderRows.join(", ")}`;
|
||||
await runTargetInsert(dbId, sql, rows);
|
||||
logger.debug(
|
||||
`[Collector] target DB INSERT 성공 (Long): ${table} ${placeholderRows.length}행 edge=${edgeIdOut} device=${deviceIdOut}`
|
||||
);
|
||||
}
|
||||
|
||||
async function writeWideFormat(
|
||||
dbId: number,
|
||||
table: string,
|
||||
timeCol: string,
|
||||
data: CollectedData
|
||||
): Promise<void> {
|
||||
// 태그 → target 컬럼명 매핑 조회
|
||||
const tagRows = await query<{
|
||||
tag_name: string;
|
||||
target_column_name: string | null;
|
||||
}>(
|
||||
`SELECT tag_name, target_column_name FROM pipeline_tag_mappings
|
||||
WHERE connection_id = $1 AND is_active = 'Y'`,
|
||||
[data.connectionId]
|
||||
);
|
||||
const colMap = new Map<string, string>();
|
||||
for (const t of tagRows) colMap.set(t.tag_name, t.target_column_name || t.tag_name);
|
||||
|
||||
const columns: string[] = [timeCol];
|
||||
const placeholders: string[] = ["$1"];
|
||||
const values: unknown[] = [data.timestamp];
|
||||
let idx = 2;
|
||||
|
||||
for (const [tagName, value] of Object.entries(data.tags)) {
|
||||
const col = colMap.get(tagName) || tagName;
|
||||
columns.push(col);
|
||||
placeholders.push(`$${idx++}`);
|
||||
values.push(value);
|
||||
}
|
||||
|
||||
if (columns.length <= 1) return;
|
||||
|
||||
const sql = `INSERT INTO ${table} (${columns.join(", ")}) VALUES (${placeholders.join(", ")})`;
|
||||
await runTargetInsert(dbId, sql, values);
|
||||
logger.debug(
|
||||
`[Collector] target DB INSERT 성공 (Wide): ${table} (${columns.length - 1}컬럼)`
|
||||
);
|
||||
}
|
||||
|
||||
// ─── 폴링 시작/중지 ──────────────────────────────
|
||||
|
||||
@@ -0,0 +1,161 @@
|
||||
/**
|
||||
* Edge Status / Event Reporter
|
||||
*
|
||||
* IDC TimescaleDB 의 edge_status / edge_events 테이블에 직접 INSERT.
|
||||
* (기존: data-collector → Kafka → kafka-to-central-mqtt → MQTT → IDC consumer → edge_status)
|
||||
* (교체: pipeline-backend → IDC TimescaleDB 직접)
|
||||
*
|
||||
* 환경변수:
|
||||
* PIPELINE_EDGE_REPORTER=true 기동 시 시작
|
||||
* PIPELINE_EDGE_TARGET_DB_ID=3 external_db_connections.id
|
||||
* PIPELINE_EDGE_ID=edge-0f4d04ed
|
||||
* PIPELINE_EDGE_COMPANY_ID=spifox edge_status.company_id (varchar)
|
||||
* PIPELINE_EDGE_COMPANY_UUID=... edge_events.company_id (uuid)
|
||||
* PIPELINE_EDGE_STATUS_TABLE=edge_status_1
|
||||
* PIPELINE_EDGE_EVENTS_TABLE=edge_events_1
|
||||
* PIPELINE_EDGE_STATUS_INTERVAL_SEC=60
|
||||
*/
|
||||
import os from "os";
|
||||
import { logger } from "../../utils/logger";
|
||||
import { executeExternalQuery } from "../externalDbHelper";
|
||||
|
||||
interface ReporterConfig {
|
||||
targetDbId: number;
|
||||
edgeId: string;
|
||||
companyId: string;
|
||||
companyUuid: string | null;
|
||||
statusTable: string;
|
||||
eventsTable: string;
|
||||
intervalSec: number;
|
||||
}
|
||||
|
||||
let cfg: ReporterConfig | null = null;
|
||||
let statusTimer: NodeJS.Timeout | null = null;
|
||||
|
||||
function loadConfig(): ReporterConfig | null {
|
||||
if (process.env.PIPELINE_EDGE_REPORTER !== "true") return null;
|
||||
const targetDbId = Number(process.env.PIPELINE_EDGE_TARGET_DB_ID);
|
||||
const edgeId = process.env.PIPELINE_EDGE_ID;
|
||||
const companyId = process.env.PIPELINE_EDGE_COMPANY_ID;
|
||||
if (!Number.isFinite(targetDbId) || !edgeId || !companyId) {
|
||||
logger.warn("[EdgeReporter] 환경변수 누락 (TARGET_DB_ID/EDGE_ID/COMPANY_ID) — 비활성");
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
targetDbId,
|
||||
edgeId,
|
||||
companyId,
|
||||
companyUuid: process.env.PIPELINE_EDGE_COMPANY_UUID || null,
|
||||
statusTable: process.env.PIPELINE_EDGE_STATUS_TABLE || "edge_status_1",
|
||||
eventsTable: process.env.PIPELINE_EDGE_EVENTS_TABLE || "edge_events_1",
|
||||
intervalSec: Number(process.env.PIPELINE_EDGE_STATUS_INTERVAL_SEC) || 60,
|
||||
};
|
||||
}
|
||||
|
||||
function primaryIpAddress(): string | null {
|
||||
const ifaces = os.networkInterfaces();
|
||||
for (const name of Object.keys(ifaces)) {
|
||||
for (const it of ifaces[name] || []) {
|
||||
if (it.family === "IPv4" && !it.internal) return it.address;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
async function writeStatusRow(status: "online" | "offline"): Promise<void> {
|
||||
if (!cfg) return;
|
||||
try {
|
||||
await executeExternalQuery(
|
||||
cfg.targetDbId,
|
||||
`INSERT INTO ${cfg.statusTable} (time, company_id, edge_id, status, ip_address, firmware_version, metadata)
|
||||
VALUES (NOW(), $1, $2, $3, $4, $5, $6::jsonb)`,
|
||||
[
|
||||
cfg.companyId,
|
||||
cfg.edgeId,
|
||||
status,
|
||||
primaryIpAddress(),
|
||||
process.env.PIPELINE_FIRMWARE_VERSION || null,
|
||||
JSON.stringify({ source: "pipeline-backend", host: os.hostname() }),
|
||||
],
|
||||
);
|
||||
} catch (err) {
|
||||
logger.warn(`[EdgeReporter] status INSERT 실패: ${(err as Error).message}`);
|
||||
}
|
||||
}
|
||||
|
||||
export async function recordEdgeEvent(
|
||||
eventType: string,
|
||||
eventCode: string,
|
||||
message: string,
|
||||
severity = 3,
|
||||
metadata: Record<string, unknown> = {},
|
||||
): Promise<void> {
|
||||
if (!cfg) return;
|
||||
if (!cfg.companyUuid) {
|
||||
// edge_events.company_id 는 UUID 타입이라 UUID 없으면 INSERT 불가 → 경고만
|
||||
logger.debug("[EdgeReporter] event 기록 스킵: PIPELINE_EDGE_COMPANY_UUID 미설정");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await executeExternalQuery(
|
||||
cfg.targetDbId,
|
||||
`INSERT INTO ${cfg.eventsTable} (time, company_id, edge_id, event_type, event_code, message, severity, acknowledged, metadata)
|
||||
VALUES (NOW(), $1::uuid, $2, $3, $4, $5, $6, false, $7::jsonb)`,
|
||||
[
|
||||
cfg.companyUuid,
|
||||
cfg.edgeId,
|
||||
eventType,
|
||||
eventCode,
|
||||
message,
|
||||
severity,
|
||||
JSON.stringify({ source: "pipeline-backend", ...metadata }),
|
||||
],
|
||||
);
|
||||
logger.info(`[EdgeReporter] 이벤트 기록: ${eventType}/${eventCode} — ${message}`);
|
||||
} catch (err) {
|
||||
logger.warn(`[EdgeReporter] event INSERT 실패: ${(err as Error).message}`);
|
||||
}
|
||||
}
|
||||
|
||||
export async function startEdgeReporter(): Promise<void> {
|
||||
cfg = loadConfig();
|
||||
if (!cfg) {
|
||||
logger.info("[EdgeReporter] 비활성 (PIPELINE_EDGE_REPORTER!=true)");
|
||||
return;
|
||||
}
|
||||
logger.info(
|
||||
`[EdgeReporter] 시작 target_db=${cfg.targetDbId} edge=${cfg.edgeId} ` +
|
||||
`status_tbl=${cfg.statusTable} events_tbl=${cfg.eventsTable} interval=${cfg.intervalSec}s`,
|
||||
);
|
||||
|
||||
// 기동 이벤트 + 첫 하트비트
|
||||
await recordEdgeEvent(
|
||||
"pipeline",
|
||||
"startup",
|
||||
"pipeline-backend 기동 — Edge Reporter 활성",
|
||||
3,
|
||||
);
|
||||
await writeStatusRow("online");
|
||||
|
||||
statusTimer = setInterval(() => {
|
||||
writeStatusRow("online").catch(() => undefined);
|
||||
}, cfg.intervalSec * 1000);
|
||||
}
|
||||
|
||||
export async function stopEdgeReporter(): Promise<void> {
|
||||
if (statusTimer) {
|
||||
clearInterval(statusTimer);
|
||||
statusTimer = null;
|
||||
}
|
||||
if (cfg) {
|
||||
await writeStatusRow("offline").catch(() => undefined);
|
||||
await recordEdgeEvent("pipeline", "shutdown", "pipeline-backend 정지", 3).catch(
|
||||
() => undefined,
|
||||
);
|
||||
}
|
||||
cfg = null;
|
||||
}
|
||||
|
||||
export function isEdgeReporterActive(): boolean {
|
||||
return cfg !== null;
|
||||
}
|
||||
@@ -68,30 +68,25 @@ function parseAddress(address: string): { memType: string; xgtAddress: string; o
|
||||
const prefix = MEMORY_TYPES[memLetter];
|
||||
if (!prefix) throw new Error(`지원하지 않는 메모리 영역: ${memLetter}`);
|
||||
|
||||
// XGT는 %DW 뒤에 주소를 바이트 오프셋이 아닌 워드 번호로 씀
|
||||
// D100 → %DW100 (워드 100번)
|
||||
const xgtAddress = `${prefix}${String(num).padStart(5, "0")}`;
|
||||
// LS XGT 표준: Python data-collector와 동일 — 4자리 zero-padding
|
||||
// D1174 → %DW1174, D100 → %DW0100
|
||||
const xgtAddress = `${prefix}${String(num).padStart(4, "0")}`;
|
||||
return { memType: memLetter, xgtAddress, offset: num };
|
||||
}
|
||||
|
||||
// ─── XGT FEnet 프레임 빌더 ────────────────────────
|
||||
|
||||
function buildReadFrame(xgtAddress: string, wordCount: number, invokeId: number = 0): Buffer {
|
||||
// Application data
|
||||
// Python data-collector와 100% 동일 구조:
|
||||
// Header (20B) + AppData (block_count + name_len + name + count)
|
||||
// command/data_type/reserved 는 포함 안 함 (LS XGT 실제 동작과 일치)
|
||||
const addrBytes = Buffer.from(xgtAddress, "ascii");
|
||||
const addrLen = addrBytes.length;
|
||||
|
||||
// App data: command(2) + dataType(2) + reserved(2) + blockCount(2) + addrLen(2) + addr + readCount(2)
|
||||
const appDataLen = 2 + 2 + 2 + 2 + 2 + addrLen + 2;
|
||||
// App data: blockCount(2) + addrLen(2) + addr + readCount(2)
|
||||
const appDataLen = 2 + 2 + addrLen + 2;
|
||||
const appData = Buffer.alloc(appDataLen);
|
||||
let offset = 0;
|
||||
|
||||
// Command: 0x0054 = Read request
|
||||
appData.writeUInt16LE(0x0054, offset); offset += 2;
|
||||
// Data type: 0x0014 = Word (continuous)
|
||||
appData.writeUInt16LE(0x0014, offset); offset += 2;
|
||||
// Reserved
|
||||
appData.writeUInt16LE(0x0000, offset); offset += 2;
|
||||
// Block count: 1
|
||||
appData.writeUInt16LE(0x0001, offset); offset += 2;
|
||||
// Address string length
|
||||
@@ -103,28 +98,17 @@ function buildReadFrame(xgtAddress: string, wordCount: number, invokeId: number
|
||||
|
||||
// Header (20 bytes)
|
||||
const header = Buffer.alloc(20);
|
||||
// Company ID: "LSIS"
|
||||
header.write("LSIS", 0, 4, "ascii");
|
||||
// Reserved (4-5)
|
||||
header.writeUInt16LE(0x0000, 4);
|
||||
// PLC Info (6-7)
|
||||
header.writeUInt16LE(0x0000, 6);
|
||||
// CPU Info: XGK
|
||||
header.writeUInt8(0xa0, 8);
|
||||
// Source: 0x33 (PC → PLC)
|
||||
header.writeUInt8(0x33, 9);
|
||||
// Invoke ID
|
||||
header.writeUInt16LE(0x0000, 4); // Reserved
|
||||
header.writeUInt16LE(0x0000, 6); // PLC Info
|
||||
header.writeUInt8(0xa0, 8); // CPU Info: XGK
|
||||
header.writeUInt8(0x33, 9); // Source: PC→PLC
|
||||
header.writeUInt16LE(invokeId & 0xffff, 10);
|
||||
// Data length
|
||||
header.writeUInt16LE(appDataLen, 12);
|
||||
// Station No
|
||||
header.writeUInt8(0x00, 14);
|
||||
// Network No
|
||||
header.writeUInt8(0x00, 15);
|
||||
// Data length (repeated)
|
||||
header.writeUInt16LE(appDataLen, 16);
|
||||
// Reserved
|
||||
header.writeUInt16LE(0x0000, 18);
|
||||
header.writeUInt16LE(appDataLen, 12); // Data length
|
||||
header.writeUInt8(0x00, 14); // Station No
|
||||
header.writeUInt8(0x00, 15); // Network No
|
||||
header.writeUInt16LE(appDataLen, 16); // Data length (repeated)
|
||||
header.writeUInt16LE(0x0000, 18); // Reserved
|
||||
|
||||
return Buffer.concat([header, appData]);
|
||||
}
|
||||
@@ -138,29 +122,27 @@ function parseReadResponse(response: Buffer): number[] {
|
||||
const companyId = response.toString("ascii", 0, 4);
|
||||
if (companyId !== "LSIS") throw new Error(`잘못된 응답 헤더: ${companyId}`);
|
||||
|
||||
// Data length
|
||||
const dataLen = response.readUInt16LE(12);
|
||||
// Data length — 응답에서는 [16:18] 사용 (Python 참조와 동일)
|
||||
// [12:14] 는 요청 시 data_length 이지만 응답에서는 PLC가 CPU 정보로 덮어씀
|
||||
const dataLen = response.readUInt16LE(16);
|
||||
if (response.length < 20 + dataLen) throw new Error("응답 데이터 불완전");
|
||||
|
||||
// Application data 시작: offset 20
|
||||
// Response: command(2) + dataType(2) + reserved(2) + errorState(2) + blockCount(2) + dataLen(2) + data...
|
||||
// Python data-collector와 동일 구조:
|
||||
// Header(20B) + AppData: block_count(2) + data_len(2) + data
|
||||
// (command/data_type/reserved/errorState 는 응답에도 없음)
|
||||
const appOffset = 20;
|
||||
|
||||
// Error state 확인
|
||||
const errorState = response.readUInt16LE(appOffset + 6);
|
||||
if (errorState !== 0) throw new Error(`PLC 에러 코드: 0x${errorState.toString(16)}`);
|
||||
|
||||
// Block count
|
||||
const blockCount = response.readUInt16LE(appOffset + 8);
|
||||
const blockCount = response.readUInt16LE(appOffset);
|
||||
if (blockCount === 0) return [];
|
||||
|
||||
// Data length (bytes)
|
||||
const wordDataLen = response.readUInt16LE(appOffset + 10);
|
||||
const wordDataLen = response.readUInt16LE(appOffset + 2);
|
||||
const wordCount = wordDataLen / 2;
|
||||
|
||||
// Word 데이터 읽기
|
||||
// Word 데이터
|
||||
const words: number[] = [];
|
||||
const dataStart = appOffset + 12;
|
||||
const dataStart = appOffset + 4;
|
||||
for (let i = 0; i < wordCount; i++) {
|
||||
if (dataStart + i * 2 + 2 <= response.length) {
|
||||
words.push(response.readUInt16LE(dataStart + i * 2));
|
||||
@@ -246,29 +228,35 @@ export class XgtClient {
|
||||
if (this.connected && this.socket) return;
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
this.socket = new net.Socket();
|
||||
this.socket.setTimeout(this.timeout);
|
||||
const sock = new net.Socket();
|
||||
this.socket = sock;
|
||||
|
||||
this.socket.connect(this.port, this.host, () => {
|
||||
// connect 단계 타임아웃 (socket.setTimeout 은 idle 감지라 폴링 간격마다 끊김 → 제거)
|
||||
const connectTimer = setTimeout(() => {
|
||||
sock.destroy();
|
||||
this.connected = false;
|
||||
reject(new Error(`[XGT] 연결 타임아웃: ${this.timeout}ms`));
|
||||
}, this.timeout);
|
||||
|
||||
sock.once("connect", () => {
|
||||
clearTimeout(connectTimer);
|
||||
this.connected = true;
|
||||
sock.setKeepAlive(true, 10_000);
|
||||
logger.info(`[XGT] PLC 연결 성공: ${this.host}:${this.port}`);
|
||||
resolve();
|
||||
});
|
||||
|
||||
this.socket.on("error", (err) => {
|
||||
sock.on("error", (err) => {
|
||||
clearTimeout(connectTimer);
|
||||
this.connected = false;
|
||||
reject(new Error(`[XGT] 연결 실패: ${err.message}`));
|
||||
});
|
||||
|
||||
this.socket.on("timeout", () => {
|
||||
this.socket?.destroy();
|
||||
sock.on("close", () => {
|
||||
this.connected = false;
|
||||
reject(new Error(`[XGT] 연결 타임아웃: ${this.timeout}ms`));
|
||||
});
|
||||
|
||||
this.socket.on("close", () => {
|
||||
this.connected = false;
|
||||
});
|
||||
sock.connect(this.port, this.host);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -284,7 +272,7 @@ export class XgtClient {
|
||||
return this.connected;
|
||||
}
|
||||
|
||||
// 단일 주소 읽기
|
||||
// 단일 주소 읽기 (Python 패턴: 헤더 먼저 → data_length만큼 이어받기)
|
||||
private async rawRead(xgtAddress: string, wordCount: number): Promise<number[]> {
|
||||
if (!this.socket || !this.connected) throw new Error("[XGT] 연결되지 않음");
|
||||
|
||||
@@ -293,11 +281,12 @@ export class XgtClient {
|
||||
return new Promise((resolve, reject) => {
|
||||
const chunks: Buffer[] = [];
|
||||
let resolved = false;
|
||||
let expectedTotal = -1; // -1 = 아직 모름 (헤더 파싱 후 설정)
|
||||
|
||||
const timeout = setTimeout(() => {
|
||||
if (!resolved) {
|
||||
resolved = true;
|
||||
this.socket?.removeAllListeners("data");
|
||||
this.socket?.removeListener("data", onData);
|
||||
reject(new Error(`[XGT] 읽기 타임아웃: ${xgtAddress}`));
|
||||
}
|
||||
}, this.timeout);
|
||||
@@ -306,13 +295,20 @@ export class XgtClient {
|
||||
chunks.push(data);
|
||||
const response = Buffer.concat(chunks);
|
||||
|
||||
// 헤더(20) + 최소 응답 데이터(12) 이상 받았는지 확인
|
||||
if (response.length >= 32) {
|
||||
// 1) 헤더(20바이트) 받았으면 data_length 확인
|
||||
// 응답에서는 [16:18] 에 위치 (Python 참조와 동일)
|
||||
if (expectedTotal < 0 && response.length >= 20) {
|
||||
const dataLen = response.readUInt16LE(16);
|
||||
expectedTotal = 20 + dataLen;
|
||||
}
|
||||
|
||||
// 2) 기대 길이 도달하면 파싱
|
||||
if (expectedTotal > 0 && response.length >= expectedTotal) {
|
||||
clearTimeout(timeout);
|
||||
resolved = true;
|
||||
this.socket?.removeListener("data", onData);
|
||||
try {
|
||||
const words = parseReadResponse(response);
|
||||
const words = parseReadResponse(response.slice(0, expectedTotal));
|
||||
resolve(words);
|
||||
} catch (e) {
|
||||
reject(e);
|
||||
|
||||
+18
-10
@@ -10,26 +10,34 @@ const authLog = (event: string, detail: string) => {
|
||||
}
|
||||
};
|
||||
|
||||
// API URL 동적 설정 - 환경변수 우선 사용
|
||||
// API URL 동적 설정
|
||||
// 우선순위: NEXT_PUBLIC_API_URL > 현재 브라우저 호스트 기반 > localhost
|
||||
// 엣지 배포 시에도 프론트를 접속한 호스트를 그대로 재사용해야 CORS/접근성 문제 없음.
|
||||
const getApiBaseUrl = (): string => {
|
||||
if (process.env.NEXT_PUBLIC_API_URL) {
|
||||
return process.env.NEXT_PUBLIC_API_URL;
|
||||
// env 강제 지정이 있고 명시적으로 절대 URL인 경우 그대로 사용
|
||||
// (단, 'http://localhost:' 로 시작하면서 실제로는 다른 호스트에서 접속된 케이스가 많아
|
||||
// 배포용 컨테이너에선 origin fallback 을 우선시)
|
||||
const envUrl = process.env.NEXT_PUBLIC_API_URL;
|
||||
if (envUrl && typeof window !== "undefined") {
|
||||
const currentHost = window.location.hostname;
|
||||
const envIsLocalhost = /^https?:\/\/(localhost|127\.0\.0\.1)(:|\/|$)/.test(envUrl);
|
||||
const browserIsLocalhost = currentHost === "localhost" || currentHost === "127.0.0.1";
|
||||
// 브라우저가 원격에서 접속인데 env 는 localhost 면 env 무시 (원격에서 localhost:8080 못 감)
|
||||
if (!(envIsLocalhost && !browserIsLocalhost)) return envUrl;
|
||||
} else if (envUrl) {
|
||||
return envUrl;
|
||||
}
|
||||
|
||||
if (typeof window !== "undefined") {
|
||||
const currentHost = window.location.hostname;
|
||||
const currentPort = window.location.port;
|
||||
|
||||
if (currentHost === "v1.vexplor.com") {
|
||||
return "https://api.vexplor.com/api";
|
||||
}
|
||||
|
||||
if (
|
||||
(currentHost === "localhost" || currentHost === "127.0.0.1") &&
|
||||
(currentPort === "9771" || currentPort === "3000")
|
||||
) {
|
||||
return "http://localhost:8080/api";
|
||||
}
|
||||
// 접속 host 기준 포트 8080 백엔드로 — 엣지 배포 일반화 경로
|
||||
const protocol = window.location.protocol;
|
||||
return `${protocol}//${currentHost}:8080/api`;
|
||||
}
|
||||
|
||||
return "http://localhost:8080/api";
|
||||
|
||||
Reference in New Issue
Block a user