feat(pipeline): R3 sentinel sanitize (string+number) + R4.0 inbound ports
R3 — PLC sentinel(-480910 / -481000) drop policy (root cause fix)
- new: src/domain/policies/tag-value-sanitizer.policy.ts
- wired in deviceCollectorService.publishData() 진입부 — 모든 sink
(로컬 MQTT, IDC central MQTT, equipmentState, target DB) 직전 1회 호출
- typeof bug 수정: PLC/Edge 가 '-480910.000000' string 으로 전송하는 케이스
포함. coerceNumeric() 으로 number/string 양쪽 안전 변환 후 Set 매칭.
- T6 (Claude tracer agent) 진단 결과 — Edge 컨테이너에서 hash field 가
'-480910.000000' string 으로 적재되어 typeof === 'number' 만 검사하던
이전 로직 통과. dt-web 응답까지 sentinel 도달 확인됨.
R4.0 — Inbound Port 인터페이스만 정의 (런타임 영향 0)
- new: src/ports/inbound/plc-source.port.ts (PlcSourcePort)
- new: src/ports/inbound/rest-request.port.ts (RestRequestPort, RestResponse)
- new: src/ports/inbound/scheduled-trigger.port.ts (ScheduledTriggerPort)
- 어댑터 구현은 R4.1+ 단계에서 진행 (deviceCollectorService 의 thin wrapper).
영향:
- Edge pipeline-backend 빌드 시 sanitize 호출 활성화 → IDC Redis 까지
sentinel 도달 차단. dt-web 의 운영 워크어라운드 제거 가능.
- 4개 신규 파일 + 1개 기존 파일 +5 lines 수정.
Constraint: chpark 의 로컬 hex 작업과 동기화 필요 — git pull main 후 머지/리베이스 권장
Confidence: high (T6 tracer 가 진단 + Edge build 산물 코드 위치 일치)
Scope-risk: narrow (publishData 진입부 1줄 + 4 신규 파일)
Directive: SENTINEL_VALUES set 변경 시 coerceNumeric 의 string 처리도 함께 갱신
Not-tested: chpark 의 로컬 R1~R5 작업과의 충돌 (사용자가 안내 예정)
This commit is contained in:
@@ -0,0 +1,62 @@
|
||||
// 알려진 PLC sentinel 값 — 미연결/리셋 시 전송되는 transient garbage
|
||||
const SENTINEL_VALUES = new Set<number>([
|
||||
-480910, // PLC 리셋 직후
|
||||
-481000, // 예비 (발견 시 추가)
|
||||
]);
|
||||
|
||||
export interface TagValueSanitizerOptions {
|
||||
/** sentinel 발견 시 동작: 'drop' 태그 제거 | 'null' null로 치환 */
|
||||
onSentinel?: "drop" | "null";
|
||||
}
|
||||
|
||||
type TagValue = number | boolean | string | null;
|
||||
|
||||
/**
|
||||
* PLC/Edge 가 sentinel 값을 string 으로 전송할 수 있음 (예: C# `ToString("F6")`
|
||||
* 결과 "-480910.000000"). `typeof === "number"` 만 검사하면 모든 string sentinel
|
||||
* 이 통과 → Redis 까지 도달.
|
||||
*
|
||||
* 안전 변환: number 그대로 / string 은 trim + 양끝 쿼트 제거 후 Number(),
|
||||
* 그 외 타입은 NaN 으로 처리 (null/undefined/boolean 도 sentinel 아님).
|
||||
*
|
||||
* 2026-05-15 — string sentinel 처리 추가 (T6 진단으로 root cause 확정).
|
||||
*/
|
||||
function coerceNumeric(value: unknown): number {
|
||||
if (typeof value === "number") return value;
|
||||
if (typeof value === "string") {
|
||||
const trimmed = value.trim().replace(/^"|"$/g, "");
|
||||
if (trimmed === "") return NaN;
|
||||
return Number(trimmed);
|
||||
}
|
||||
return NaN;
|
||||
}
|
||||
|
||||
function matchesSentinel(value: unknown): boolean {
|
||||
const n = coerceNumeric(value);
|
||||
return !Number.isNaN(n) && SENTINEL_VALUES.has(n);
|
||||
}
|
||||
|
||||
export class TagValueSanitizer {
|
||||
constructor(
|
||||
private readonly opts: TagValueSanitizerOptions = { onSentinel: "drop" }
|
||||
) {}
|
||||
|
||||
/** sanitize: sentinel 값 필터 (비파괴적 — 새 객체 반환) */
|
||||
sanitize(tags: Record<string, TagValue>): Record<string, TagValue> {
|
||||
const result: Record<string, TagValue> = {};
|
||||
for (const [name, value] of Object.entries(tags)) {
|
||||
if (matchesSentinel(value)) {
|
||||
if (this.opts.onSentinel === "null") result[name] = null;
|
||||
// 'drop': 키 자체를 포함하지 않음
|
||||
continue;
|
||||
}
|
||||
result[name] = value;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/** isSentinel: 단일 값 검사 (number/string 모두 처리) */
|
||||
static isSentinel(value: unknown): boolean {
|
||||
return matchesSentinel(value);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
// src/ports/inbound/plc-source.port.ts
|
||||
//
|
||||
// Phase R4.0 — Inbound Port: PLC 폴링 source 의 lifecycle 추상화.
|
||||
//
|
||||
// 어댑터 (예정): src/adapters/inbound/plc-poller/
|
||||
// - startAll(): 활성 connection 전체 폴링 시작
|
||||
// - start(connectionId): 단일 connection 폴링 시작
|
||||
// - stop(connectionId): 단일 connection 폴링 중지
|
||||
// - stopAll(): 전체 중지 (graceful shutdown)
|
||||
// - collectOnce(connectionId): 1회 즉시 수집 (REST collect-once 트리거)
|
||||
//
|
||||
// 현재 (R4.0): 인터페이스만 정의. 누구도 import 하지 않음 → 런타임 영향 0.
|
||||
// R4.1 에서 deviceCollectorService 의 thin wrapper 로 구현 예정.
|
||||
|
||||
import type { CollectedData } from "../../services/collector/deviceCollectorService";
|
||||
|
||||
/** PLC 폴링 source의 lifecycle 추상화 */
|
||||
export interface PlcSourcePort {
|
||||
/** 활성 connection 전체 폴링 시작. 반환: 실제로 시작된 개수 */
|
||||
startAll(): Promise<number>;
|
||||
/** 단일 connection 폴링 시작 */
|
||||
start(connectionId: number): Promise<void>;
|
||||
/** 단일 connection 폴링 중지 */
|
||||
stop(connectionId: number): void;
|
||||
/** 전체 중지 (graceful shutdown) */
|
||||
stopAll(): void;
|
||||
/** 1회 즉시 수집 (REST collect-once 트리거용) */
|
||||
collectOnce(connectionId: number): Promise<CollectedData>;
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
// src/ports/inbound/rest-request.port.ts
|
||||
//
|
||||
// Phase R4.0 — Inbound Port: REST 핸들러를 domain 에서 보는 추상.
|
||||
//
|
||||
// 어댑터 (예정): src/adapters/inbound/http-rest/
|
||||
// - Express 의존성 격리 — domain 은 Express Req/Res 타입을 직접 import 하지 않음
|
||||
// - 라우트 핸들러는 Express → RestRequestPort 변환 후 domain 호출
|
||||
//
|
||||
// 현재 (R4.0): 인터페이스만 정의. 누구도 import 하지 않음 → 런타임 영향 0.
|
||||
|
||||
/** REST 핸들러를 domain 에서 보는 추상 — Express 의존 격리 */
|
||||
export interface RestRequestPort<TParams = unknown, TBody = unknown> {
|
||||
params: TParams;
|
||||
body: TBody;
|
||||
user?: { id: number; companyCode: string };
|
||||
}
|
||||
|
||||
/** REST 응답 — 표준 ApiResponse 와 동일 형태 */
|
||||
export interface RestResponse<T = unknown> {
|
||||
success: boolean;
|
||||
data?: T;
|
||||
error?: string;
|
||||
message?: string;
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
// src/ports/inbound/scheduled-trigger.port.ts
|
||||
//
|
||||
// Phase R4.0 — Inbound Port: cron tick 추상.
|
||||
//
|
||||
// 어댑터 (예정): src/adapters/inbound/cron/
|
||||
// - node-cron 라이브러리 의존성 격리
|
||||
// - 파일 수집 / Batch / AI Cron 등 모든 cron 사용처에서 동일 인터페이스 사용
|
||||
//
|
||||
// 현재 (R4.0): 인터페이스만 정의. 누구도 import 하지 않음 → 런타임 영향 0.
|
||||
|
||||
/** cron task handle — 외부에서 stop() 만 호출 가능하도록 좁힌 인터페이스 */
|
||||
export interface ScheduledTaskHandle {
|
||||
stop(): void;
|
||||
}
|
||||
|
||||
/** cron tick 의 추상 — node-cron 의존 격리 */
|
||||
export interface ScheduledTriggerPort {
|
||||
/** cron 표현식 등록 후 task handle 반환 */
|
||||
register(
|
||||
cronExpr: string,
|
||||
handler: () => Promise<void>
|
||||
): ScheduledTaskHandle;
|
||||
/** 등록된 task 전체 중지 */
|
||||
stopAll(): void;
|
||||
}
|
||||
@@ -28,6 +28,11 @@ import { upsertEquipmentState } from "./equipmentStateService";
|
||||
import { ingest as forwardToCentralMqtt } from "./centralMqttForwarder";
|
||||
import { getHooksForConnection } from "./scriptCache";
|
||||
import { executeHook } from "./pythonHookRunner";
|
||||
import { TagValueSanitizer } from "../../domain/policies/tag-value-sanitizer.policy";
|
||||
|
||||
// R3 — PLC sentinel 값(-480910 등) 차단. 모든 sink 직전에 호출.
|
||||
// 2026-05-15: typeof bug fix — string sentinel 도 처리 (T6 진단).
|
||||
const tagSanitizer = new TagValueSanitizer({ onSentinel: "drop" });
|
||||
|
||||
// ─── 타입 ──────────────────────────────────────────
|
||||
|
||||
@@ -502,6 +507,10 @@ async function applyHooks(data: CollectedData): Promise<void> {
|
||||
// ─── 수집 결과 발행 ───────────────────────────────
|
||||
|
||||
async function publishData(data: CollectedData): Promise<void> {
|
||||
// R3 — sentinel sanitize (PLC 미연결/리셋 시 -480910 등 transient garbage 차단)
|
||||
// 모든 sink (로컬 MQTT, IDC central MQTT, equipmentState, target DB) 직전.
|
||||
data = { ...data, tags: tagSanitizer.sanitize(data.tags) };
|
||||
|
||||
// 1. 로컬 MQTT 발행 (UI 실시간 스트리밍용)
|
||||
if (mqttClient && mqttConfig) {
|
||||
try {
|
||||
|
||||
Reference in New Issue
Block a user