Files
pipeline/backend-node/src/services/collector/deviceCollectorService.ts
T
h.offthatmuz 931127505a feat(pipeline): R3 sentinel sanitize (string+number) + R4.0 inbound ports
R3 — PLC sentinel(-480910 / -481000) drop policy (root cause fix)
  - new: src/domain/policies/tag-value-sanitizer.policy.ts
  - wired in deviceCollectorService.publishData() 진입부 — 모든 sink
    (로컬 MQTT, IDC central MQTT, equipmentState, target DB) 직전 1회 호출
  - typeof bug 수정: PLC/Edge 가 '-480910.000000' string 으로 전송하는 케이스
    포함. coerceNumeric() 으로 number/string 양쪽 안전 변환 후 Set 매칭.
  - T6 (Claude tracer agent) 진단 결과 — Edge 컨테이너에서 hash field 가
    '-480910.000000' string 으로 적재되어 typeof === 'number' 만 검사하던
    이전 로직 통과. dt-web 응답까지 sentinel 도달 확인됨.

R4.0 — Inbound Port 인터페이스만 정의 (런타임 영향 0)
  - new: src/ports/inbound/plc-source.port.ts (PlcSourcePort)
  - new: src/ports/inbound/rest-request.port.ts (RestRequestPort, RestResponse)
  - new: src/ports/inbound/scheduled-trigger.port.ts (ScheduledTriggerPort)
  - 어댑터 구현은 R4.1+ 단계에서 진행 (deviceCollectorService 의 thin wrapper).

영향:
  - Edge pipeline-backend 빌드 시 sanitize 호출 활성화 → IDC Redis 까지
    sentinel 도달 차단. dt-web 의 운영 워크어라운드 제거 가능.
  - 4개 신규 파일 + 1개 기존 파일 +5 lines 수정.

Constraint: chpark 의 로컬 hex 작업과 동기화 필요 — git pull main 후 머지/리베이스 권장
Confidence: high (T6 tracer 가 진단 + Edge build 산물 코드 위치 일치)
Scope-risk: narrow (publishData 진입부 1줄 + 4 신규 파일)
Directive: SENTINEL_VALUES set 변경 시 coerceNumeric 의 string 처리도 함께 갱신
Not-tested: chpark 의 로컬 R1~R5 작업과의 충돌 (사용자가 안내 예정)
2026-05-15 10:30:30 +09:00

1032 lines
36 KiB
TypeScript

/**
* Device Collector Service
* - pipeline_device_connections + pipeline_tag_mappings 설정 기반
* - 프로토콜별 PLC 읽기 (XGT, Modbus 등)
* - 읽은 데이터 → MQTT 발행 + DB 저장
* - 오프라인 버퍼 (발행 실패 시 재시도)
*
* Python data-collector의 EdgeAgent + CollectorManager 포팅
*/
import { query } from "../../database/db";
import { logger } from "../../utils/logger";
import { XgtClient, getXgtClient, closeAllXgtConnections } from "./protocols/xgtClient";
import { ModbusClient } from "./protocols/modbusClient";
import { OpcuaClient } from "./protocols/opcuaClient";
import { S7Client } from "./protocols/s7Client";
import { getMssqlClient, MssqlClient, MssqlTagConfig } from "./protocols/mssqlClient";
import {
getMqttCollectorClient,
MqttCollectorClient,
MqttCollectorTag,
} from "./protocols/mqttCollectorClient";
import type { XgtTagConfig, XgtReadResult } from "./protocols/xgtClient";
import type { ModbusTagConfig, ModbusReadResult } from "./protocols/modbusClient";
import type { OpcuaTagConfig } from "./protocols/opcuaClient";
import type { S7TagConfig } from "./protocols/s7Client";
import { upsertEquipmentState } from "./equipmentStateService";
import { ingest as forwardToCentralMqtt } from "./centralMqttForwarder";
import { getHooksForConnection } from "./scriptCache";
import { executeHook } from "./pythonHookRunner";
import { TagValueSanitizer } from "../../domain/policies/tag-value-sanitizer.policy";
// R3 — PLC sentinel 값(-480910 등) 차단. 모든 sink 직전에 호출.
// 2026-05-15: typeof bug fix — string sentinel 도 처리 (T6 진단).
const tagSanitizer = new TagValueSanitizer({ onSentinel: "drop" });
// ─── 타입 ──────────────────────────────────────────
interface DeviceConnection {
id: number;
connection_name: string;
protocol: string;
host: string;
port: number;
protocol_config: Record<string, unknown>;
polling_interval_ms: number;
timeout_ms: number;
retry_count: number;
status: string;
company_code: string;
}
interface TagMapping {
id: number;
connection_id: number;
tag_name: string;
tag_display_name: string | null;
tag_unit: string | null;
tag_data_type: string;
address: string;
address_type: string | null;
scale_factor: number;
offset_value: number;
min_value: number | null;
max_value: number | null;
}
export interface CollectedData {
connectionId: number;
connectionName: string;
protocol: string;
companyCode: string;
host: string | null;
port: number | null;
timestamp: string;
plcState: "connected" | "disconnected" | "error";
errorMessage: string | null;
tags: Record<string, number | boolean | string | null>;
}
// ─── 폴링 타이머 관리 ─────────────────────────────
const pollingTimers = new Map<number, NodeJS.Timeout>();
const clientCache = new Map<
number,
XgtClient | ModbusClient | OpcuaClient | S7Client | MssqlClient | MqttCollectorClient
>();
const lastPlcState = new Map<number, CollectedData["plcState"]>();
// ─── 오프라인 버퍼 (메모리 기반, 추후 SQLite 확장 가능) ───
const retryQueue: CollectedData[] = [];
const MAX_RETRY_QUEUE = 10000;
// ─── MQTT 발행 (옵션) ─────────────────────────────
let mqttClient: { publish: (topic: string, message: string) => void } | null = null;
let mqttConfig: { brokerUrl: string; topic: string } | null = null;
export function setMqttPublisher(config: { brokerUrl: string; topic: string }, client: { publish: (topic: string, message: string) => void }) {
mqttConfig = config;
mqttClient = client;
logger.info(`[Collector] MQTT 퍼블리셔 설정: ${config.brokerUrl}${config.topic}`);
}
// ─── 태그 매핑 → 프로토콜 태그 변환 ────────────────
function toXgtTags(tags: TagMapping[]): XgtTagConfig[] {
return tags.map(t => ({
tagName: t.tag_name,
address: t.address,
dataType: mapDataType(t.tag_data_type),
scaleFactor: t.scale_factor ?? 1,
offsetValue: t.offset_value ?? 0,
}));
}
function toModbusTags(tags: TagMapping[]): ModbusTagConfig[] {
return tags.map(t => ({
tagName: t.tag_name,
address: t.address,
dataType: t.tag_data_type as ModbusTagConfig["dataType"],
scaleFactor: t.scale_factor ?? 1,
offsetValue: t.offset_value ?? 0,
}));
}
function mapDataType(dt: string): "BOOL" | "INT16" | "UINT16" | "INT32" | "UINT32" | "FLOAT32" {
switch (dt.toUpperCase()) {
case "BOOLEAN": return "BOOL";
case "INT16": return "INT16";
case "INT32": return "INT32";
case "FLOAT32": return "FLOAT32";
case "FLOAT64": return "FLOAT32"; // Node.js에서는 FLOAT32로 처리
default: return "INT16";
}
}
// ─── 단일 디바이스 수집 실행 ──────────────────────
export async function collectDevice(connectionId: number): Promise<CollectedData> {
// DB에서 연결 + 태그 조회
const connections = await query<DeviceConnection>(
"SELECT * FROM pipeline_device_connections WHERE id = $1",
[connectionId]
);
if (!connections.length) throw new Error(`연결 ID ${connectionId}를 찾을 수 없습니다.`);
const device = connections[0];
const tags = await query<TagMapping>(
"SELECT * FROM pipeline_tag_mappings WHERE connection_id = $1 AND is_active = 'Y' ORDER BY tag_name",
[connectionId]
);
if (!tags.length) throw new Error(`태그가 없습니다 (connection_id=${connectionId})`);
const result: CollectedData = {
connectionId: device.id,
connectionName: device.connection_name,
protocol: device.protocol,
companyCode: device.company_code || "",
host: device.host || null,
port: device.port ?? null,
timestamp: new Date().toISOString(),
plcState: "disconnected",
errorMessage: null,
tags: {},
};
try {
switch (device.protocol) {
case "PLC_ETHERNET":
case "LS_XGT": {
// LS XGT FEnet
const xgtPort = device.port || 2004;
const client = getXgtClient(device.host, xgtPort, device.timeout_ms || 3000);
if (!client.isConnected()) await client.connect();
clientCache.set(device.id, client);
const xgtTags = toXgtTags(tags);
const readings = await client.readTags(xgtTags);
for (const r of readings) {
result.tags[r.tagName] = r.quality === "good" ? r.value : null;
}
result.plcState = "connected";
break;
}
case "MODBUS_TCP": {
const unitId = (device.protocol_config?.unit_id as number) || 1;
let client = clientCache.get(device.id) as ModbusClient;
if (!client || !client.isConnected()) {
client = new ModbusClient(device.host, device.port || 502, unitId, device.timeout_ms || 3000);
await client.connect();
clientCache.set(device.id, client);
}
const modbusTags = toModbusTags(tags);
const readings = await client.readTags(modbusTags);
for (const r of readings) {
result.tags[r.tagName] = r.quality === "good" ? r.value : null;
}
result.plcState = "connected";
break;
}
case "OPCUA": {
const endpointUrl =
(device.protocol_config?.endpoint_url as string) ||
`opc.tcp://${device.host}:${device.port || 4840}`;
const securityMode =
((device.protocol_config?.security_mode as string) as "None" | "Sign" | "SignAndEncrypt") || "None";
const username = device.protocol_config?.username as string | undefined;
const password = device.protocol_config?.password as string | undefined;
let client = clientCache.get(device.id) as OpcuaClient;
if (!client || !client.isConnected()) {
client = new OpcuaClient(endpointUrl, securityMode, username, password, device.timeout_ms || 5000);
await client.connect();
clientCache.set(device.id, client);
}
const opcuaTags: OpcuaTagConfig[] = tags.map(t => ({
tagName: t.tag_name,
address: t.address,
dataType: t.tag_data_type,
scaleFactor: t.scale_factor ?? 1,
offsetValue: t.offset_value ?? 0,
}));
const readings = await client.readTags(opcuaTags);
for (const r of readings) {
result.tags[r.tagName] = r.quality === "good" ? r.value : null;
}
result.plcState = "connected";
break;
}
case "S7":
case "SIEMENS_S7": {
const rack = (device.protocol_config?.rack as number) ?? 0;
const slot = (device.protocol_config?.slot as number) ?? 1;
let client = clientCache.get(device.id) as S7Client;
if (!client || !client.isConnected()) {
client = new S7Client(device.host, rack, slot, device.port || 102, device.timeout_ms || 5000);
await client.connect();
clientCache.set(device.id, client);
}
const s7Tags: S7TagConfig[] = tags.map(t => ({
tagName: t.tag_name,
address: t.address,
dataType: t.tag_data_type,
scaleFactor: t.scale_factor ?? 1,
offsetValue: t.offset_value ?? 0,
}));
const readings = await client.readTags(s7Tags);
for (const r of readings) {
result.tags[r.tagName] = r.quality === "good" ? r.value : null;
}
result.plcState = "connected";
break;
}
case "MQTT_SUB": {
// MQTT 구독 기반 수집 (tag.address = topic)
const pc = device.protocol_config || {};
const cfg = {
brokerUrl: (pc.broker_url as string) || `mqtt://${device.host}:${device.port || 1883}`,
username: pc.username as string | undefined,
password: pc.password as string | undefined,
clientId: (pc.client_id as string) || `pipeline-coll-${device.id}`,
qos: ((pc.qos as number) ?? 0) as 0 | 1 | 2,
keepaliveSec: (pc.keepalive_sec as number) ?? 60,
};
const mqttTags: MqttCollectorTag[] = tags.map(t => ({
tagName: t.tag_name,
topic: t.address,
dataType: t.tag_data_type,
jsonPath: (t as any).json_path as string | undefined,
scaleFactor: t.scale_factor ?? 1,
offsetValue: t.offset_value ?? 0,
}));
let client = clientCache.get(device.id) as MqttCollectorClient | undefined;
if (!client || !client.isConnected()) {
client = getMqttCollectorClient(cfg);
await client.connect(mqttTags);
clientCache.set(device.id, client);
}
const readings = await client.readTags(mqttTags);
for (const r of readings) {
result.tags[r.tagName] = r.quality === "good" ? r.value : null;
}
result.plcState = "connected";
break;
}
case "MSSQL_DB": {
// SQL Server 기반 수집 (Python sql_collector.py 의 MSSQL 모드 포팅)
// protocol_config 로부터 연결/쿼리/watermark 파라미터 읽음
const pc = device.protocol_config || {};
const mssqlCfg = {
host: device.host,
port: device.port || 1433,
database: (pc.database as string) || "",
username: (pc.username as string) || "",
password: (pc.password as string) || "",
query: (pc.query as string) || "",
timestampColumn: pc.timestamp_column as string | undefined,
timestampExpression: pc.timestamp_expression as string | undefined,
idColumn: pc.id_column as string | undefined,
nameColumn: pc.name_column as string | undefined,
batchMode: (pc.batch_mode as boolean) ?? false,
fetchSize: (pc.fetch_size as number) ?? 500,
deviceIdPrefix: (pc.device_id_prefix as string) || "sql",
tdsVersion: pc.tds_version as string | undefined,
timeout: device.timeout_ms || 30000,
};
let client = clientCache.get(device.id) as MssqlClient | undefined;
if (!client || !client.isConnected()) {
client = getMssqlClient(mssqlCfg);
await client.connect();
clientCache.set(device.id, client);
}
const mssqlTags: MssqlTagConfig[] = tags.map(t => ({
tagName: t.tag_name,
columnName: (t as any).column_name || t.address || t.tag_name,
dataType: t.tag_data_type,
scaleFactor: t.scale_factor ?? 1,
offsetValue: t.offset_value ?? 0,
}));
const watermark = await loadWatermark(device.id);
const { batches, newWatermark } = await client.readBatch(mssqlTags, watermark);
if (batches.length === 0) {
// 신규 row 없음 — plcState connected 로 두되 태그 비어있음
result.plcState = "connected";
} else if (mssqlCfg.batchMode) {
// batch_mode: 여러 row → 각 row 별 발행은 processOnePoll 에서 불가.
// 우선 첫 row 의 값을 기본 tags 로, 나머지 row 들은 별도 write 로 보낸다.
for (const b of batches) {
// 태그명에 equipmentId 접두 (중복 방지)
for (const [k, v] of Object.entries(b.tags)) {
const key = b.equipmentId ? `${b.equipmentId}_${k}` : k;
result.tags[key] = v;
}
}
result.plcState = "connected";
} else {
const last = batches[batches.length - 1];
for (const [k, v] of Object.entries(last.tags)) result.tags[k] = v;
result.plcState = "connected";
}
if (newWatermark && newWatermark.getTime() !== watermark?.getTime()) {
await saveWatermark(device.id, newWatermark);
}
break;
}
default:
throw new Error(`지원하지 않는 프로토콜: ${device.protocol}`);
}
// 연결 상태 업데이트
await query(
"UPDATE pipeline_device_connections SET status = 'active', last_test_date = NOW(), last_test_result = 'success', last_test_message = $1 WHERE id = $2",
[`수집 성공: ${Object.keys(result.tags).length}개 태그`, device.id]
).catch(() => {});
} catch (err) {
result.plcState = "error";
result.errorMessage = (err as Error).message;
logger.error(`[Collector] 수집 실패 (${device.connection_name}): ${result.errorMessage}`);
await query(
"UPDATE pipeline_device_connections SET status = 'error', last_test_date = NOW(), last_test_result = 'failure', last_test_message = $1 WHERE id = $2",
[result.errorMessage, device.id]
).catch(() => {});
}
// 수집 후 Python 훅 실행 (transform → filter → derived_tags 순)
if (result.plcState === "connected") {
await applyHooks(result);
}
// 상태 전이 시 edge_events 기록
const prev = lastPlcState.get(device.id);
if (prev !== result.plcState) {
lastPlcState.set(device.id, result.plcState);
if (prev !== undefined) {
const { recordEdgeEvent } = await import("./edgeStatusReporter");
if (result.plcState === "connected") {
await recordEdgeEvent(
"collector",
"plc_connected",
`[${device.connection_name}] PLC 연결 복구`,
3,
{ connection_id: device.id, protocol: device.protocol, host: device.host },
);
} else if (result.plcState === "error") {
await recordEdgeEvent(
"collector",
"plc_error",
`[${device.connection_name}] PLC 수집 오류: ${result.errorMessage ?? ""}`,
5,
{ connection_id: device.id, protocol: device.protocol, host: device.host },
);
} else {
await recordEdgeEvent(
"collector",
"plc_disconnected",
`[${device.connection_name}] PLC 연결 해제`,
4,
{ connection_id: device.id, protocol: device.protocol, host: device.host },
);
}
}
}
return result;
}
// ─── Python 훅 적용 ────────────────────────────────
async function applyHooks(data: CollectedData): Promise<void> {
try {
// 1. transform 훅: 각 태그 값 변환
const transforms = await getHooksForConnection(data.connectionId, "transform");
if (transforms.length > 0) {
for (const [tagName, rawValue] of Object.entries(data.tags)) {
let value = rawValue;
for (const hook of transforms) {
const res = await executeHook({
hook_type: "transform",
code: hook.code,
tag_name: tagName,
raw_value: value,
context: { hook_id: hook.id, hook_name: hook.script_name },
timeout_ms: hook.timeout_ms,
});
if (!res.success) {
logger.warn(
`[Collector] transform 훅 실패 (${hook.script_name}, tag=${tagName}): ${res.error}`
);
break;
}
value = res.value as typeof rawValue;
}
data.tags[tagName] = value;
}
}
// 2. filter 훅: False 반환 시 태그 제거
const filters = await getHooksForConnection(data.connectionId, "filter");
if (filters.length > 0) {
for (const tagName of Object.keys(data.tags)) {
let keep = true;
for (const hook of filters) {
const res = await executeHook({
hook_type: "filter",
code: hook.code,
tag_name: tagName,
value: data.tags[tagName],
context: { hook_id: hook.id },
timeout_ms: hook.timeout_ms,
});
if (!res.success) {
logger.warn(`[Collector] filter 훅 실패 (${hook.script_name}): ${res.error}`);
continue;
}
if (res.skip) {
keep = false;
break;
}
}
if (!keep) delete data.tags[tagName];
}
}
// 3. derived_tags 훅: 새 태그 추가
const derived = await getHooksForConnection(data.connectionId, "derived_tags");
for (const hook of derived) {
const res = await executeHook({
hook_type: "derived_tags",
code: hook.code,
device_data: { tags: data.tags, device_id: String(data.connectionId) },
context: { hook_id: hook.id },
timeout_ms: hook.timeout_ms,
});
if (res.success && res.derived) {
Object.assign(data.tags, res.derived as Record<string, typeof data.tags[string]>);
} else if (!res.success) {
logger.warn(
`[Collector] derived_tags 훅 실패 (${hook.script_name}): ${res.error}`
);
}
}
} catch (err) {
logger.error(`[Collector] 훅 적용 중 오류: ${(err as Error).message}`);
}
}
// ─── 수집 결과 발행 ───────────────────────────────
async function publishData(data: CollectedData): Promise<void> {
// R3 — sentinel sanitize (PLC 미연결/리셋 시 -480910 등 transient garbage 차단)
// 모든 sink (로컬 MQTT, IDC central MQTT, equipmentState, target DB) 직전.
data = { ...data, tags: tagSanitizer.sanitize(data.tags) };
// 1. 로컬 MQTT 발행 (UI 실시간 스트리밍용)
if (mqttClient && mqttConfig) {
try {
const topic = `${mqttConfig.topic}/${data.companyCode}/${data.connectionId}`;
mqttClient.publish(topic, JSON.stringify(data));
} catch (err) {
logger.warn(`[Collector] 로컬 MQTT 발행 실패 — 재시도 큐에 추가`);
if (retryQueue.length < MAX_RETRY_QUEUE) retryQueue.push(data);
}
}
// 2. IDC 중앙 MQTT로 포워딩 (Pipeline이 엣지 역할 수행)
try {
await forwardToCentralMqtt(data);
} catch (err) {
logger.warn(`[Collector] IDC 포워딩 실패: ${(err as Error).message}`);
}
// 3. 장비 현재값 스냅샷 업데이트
try {
await upsertEquipmentState(data);
} catch (err) {
logger.debug(`[Collector] 현재값 업데이트 실패: ${(err as Error).message}`);
}
// 4. 시계열 원본 저장 (pipeline_collected_data)
try {
await query(
`INSERT INTO pipeline_collected_data (connection_id, collected_at, plc_state, tag_values, error_message)
VALUES ($1, $2, $3, $4::jsonb, $5)
ON CONFLICT DO NOTHING`,
[data.connectionId, data.timestamp, data.plcState, JSON.stringify(data.tags), data.errorMessage]
);
} catch {
// 테이블이 없을 수 있음 — 무시
}
// 5. 사용자 지정 외부 DB로 INSERT (옵션)
try {
await writeToTargetDb(data);
} catch (err) {
const msg = (err as Error).message;
logger.warn(`[Collector] target DB 저장 실패 — retry queue 에 적재: ${msg}`);
await enqueueTargetRetry(data, msg).catch(() => undefined);
}
}
// ─── 사용자 지정 외부 DB INSERT ────────────────────
// 타겟 테이블 스키마 캐시 (dbId:tableName → columnSet)
const targetSchemaCache = new Map<string, Set<string>>();
const TARGET_SCHEMA_CACHE_TTL_MS = 5 * 60 * 1000;
const targetSchemaCacheExpiry = new Map<string, number>();
async function getTargetTableColumns(
dbId: number,
tableName: string
): Promise<Set<string>> {
const key = `${dbId}:${tableName}`;
const expiry = targetSchemaCacheExpiry.get(key) ?? 0;
if (Date.now() < expiry && targetSchemaCache.has(key)) {
return targetSchemaCache.get(key)!;
}
const { listColumns } = await import("../targetDbIntrospection");
const cols = await listColumns(dbId, tableName);
const set = new Set(cols.map(c => c.column_name.toLowerCase()));
targetSchemaCache.set(key, set);
targetSchemaCacheExpiry.set(key, Date.now() + TARGET_SCHEMA_CACHE_TTL_MS);
return set;
}
/**
* 타겟 테이블이 Long 포맷인지 감지.
* (time, tag_name, value 컬럼 모두 존재하면 Long으로 간주)
*/
function isLongFormat(cols: Set<string>): boolean {
return cols.has("tag_name") && cols.has("value");
}
async function runTargetInsert(
dbId: number,
sql: string,
values: unknown[]
): Promise<void> {
if (dbId === 0) {
await query(sql, values);
} else {
const { executeExternalQuery } = await import("../externalDbHelper");
await executeExternalQuery(dbId, sql, values);
}
}
export async function writeToTargetDb(data: CollectedData): Promise<void> {
const connRows = await query<{
target_db_connection_id: number | null;
target_table_name: string | null;
target_time_column: string | null;
target_insert_mode: string | null;
}>(
`SELECT target_db_connection_id, target_table_name, target_time_column, target_insert_mode
FROM pipeline_device_connections
WHERE id = $1`,
[data.connectionId]
);
if (!connRows.length) return;
const cfg = connRows[0];
if (cfg.target_db_connection_id === null || cfg.target_db_connection_id === undefined) return;
if (!cfg.target_table_name) return;
const dbId = cfg.target_db_connection_id;
const table = cfg.target_table_name;
const timeCol = cfg.target_time_column || "timestamp";
const tagEntries = Object.entries(data.tags);
if (tagEntries.length === 0) return;
// 타겟 테이블 컬럼 조회 (캐시)
let targetCols: Set<string>;
try {
targetCols = await getTargetTableColumns(dbId, table);
} catch (err) {
throw new Error(
`target 테이블 스키마 조회 실패 (${dbId}/${table}): ${(err as Error).message}`
);
}
if (isLongFormat(targetCols)) {
// ─── Long 포맷: 태그당 1행 INSERT ─────────────────
await writeLongFormat(dbId, table, timeCol, targetCols, data);
} else {
// ─── Wide 포맷: 1행에 다중 컬럼 (기존 방식) ───────
await writeWideFormat(dbId, table, timeCol, data);
}
}
async function writeLongFormat(
dbId: number,
table: string,
timeCol: string,
cols: Set<string>,
data: CollectedData
): Promise<void> {
const hasCompany = cols.has("company_id");
const hasEdge = cols.has("edge_id");
const hasQuality = cols.has("quality");
const hasMetadata = cols.has("metadata");
// 연결 메타 정보 조회 (edge_identifier, device_identifier, company_id)
const connMeta = await query<{
edge_identifier: string | null;
device_identifier: string | null;
equipment_code: string | null;
company_id_full: string | null;
}>(
`SELECT c.edge_identifier,
c.device_identifier,
e.equipment_code,
COALESCE(NULLIF(c.company_code, '*'), '') AS company_id_full
FROM pipeline_device_connections c
LEFT JOIN pipeline_equipment e ON c.equipment_id = e.id
WHERE c.id = $1`,
[data.connectionId]
);
const meta = connMeta[0] || {} as any;
// edge_id / device_id 결정 (설정된 값 > equipment_code > fallback)
const edgeIdOut =
meta.edge_identifier ||
`edge-conn-${data.connectionId}`;
const deviceIdOut =
meta.device_identifier ||
meta.equipment_code ||
`conn-${data.connectionId}`;
const companyIdOut =
meta.company_id_full || data.companyCode || "*";
// IDC edge_telemetry 원본 metadata 포맷 (실제 프로덕션 row 기준)
// { priority, device_id:<UUID>, forwarded_at:<ISO> }
const nowIso = new Date().toISOString();
const baseMetadata: Record<string, unknown> = {
priority: 2,
device_id: deviceIdOut,
forwarded_at: nowIso,
};
const allCols: string[] = [timeCol, "tag_name", "value"];
if (hasCompany) allCols.push("company_id");
if (hasEdge) allCols.push("edge_id");
if (hasQuality) allCols.push("quality");
if (hasMetadata) allCols.push("metadata");
const rows: unknown[] = [];
const placeholderRows: string[] = [];
let idx = 1;
for (const [tagName, rawValue] of Object.entries(data.tags)) {
const numericValue =
typeof rawValue === "number"
? rawValue
: typeof rawValue === "boolean"
? rawValue
? 1
: 0
: rawValue !== null && rawValue !== undefined && !Number.isNaN(Number(rawValue))
? Number(rawValue)
: null;
const placeholders: string[] = [];
rows.push(data.timestamp);
placeholders.push(`$${idx++}`);
rows.push(tagName);
placeholders.push(`$${idx++}`);
rows.push(numericValue);
placeholders.push(`$${idx++}`);
if (hasCompany) {
rows.push(companyIdOut);
placeholders.push(`$${idx++}`);
}
if (hasEdge) {
rows.push(edgeIdOut);
placeholders.push(`$${idx++}`);
}
if (hasQuality) {
rows.push(rawValue === null || rawValue === undefined ? "bad" : "good");
placeholders.push(`$${idx++}`);
}
if (hasMetadata) {
rows.push(JSON.stringify(baseMetadata));
placeholders.push(`$${idx++}::jsonb`);
}
placeholderRows.push(`(${placeholders.join(", ")})`);
}
const sql = `INSERT INTO ${table} (${allCols.join(", ")}) VALUES ${placeholderRows.join(", ")}`;
await runTargetInsert(dbId, sql, rows);
logger.debug(
`[Collector] target DB INSERT 성공 (Long): ${table} ${placeholderRows.length}행 edge=${edgeIdOut} device=${deviceIdOut}`
);
}
async function writeWideFormat(
dbId: number,
table: string,
timeCol: string,
data: CollectedData
): Promise<void> {
// 태그 → target 컬럼명 매핑 조회
const tagRows = await query<{
tag_name: string;
target_column_name: string | null;
}>(
`SELECT tag_name, target_column_name FROM pipeline_tag_mappings
WHERE connection_id = $1 AND is_active = 'Y'`,
[data.connectionId]
);
const colMap = new Map<string, string>();
for (const t of tagRows) colMap.set(t.tag_name, t.target_column_name || t.tag_name);
const columns: string[] = [timeCol];
const placeholders: string[] = ["$1"];
const values: unknown[] = [data.timestamp];
let idx = 2;
for (const [tagName, value] of Object.entries(data.tags)) {
const col = colMap.get(tagName) || tagName;
columns.push(col);
placeholders.push(`$${idx++}`);
values.push(value);
}
if (columns.length <= 1) return;
const sql = `INSERT INTO ${table} (${columns.join(", ")}) VALUES (${placeholders.join(", ")})`;
await runTargetInsert(dbId, sql, values);
logger.debug(
`[Collector] target DB INSERT 성공 (Wide): ${table} (${columns.length - 1}컬럼)`
);
}
// ─── 폴링 시작/중지 ──────────────────────────────
export async function startPolling(connectionId: number): Promise<void> {
if (pollingTimers.has(connectionId)) {
logger.warn(`[Collector] 이미 폴링 중: connection_id=${connectionId}`);
return;
}
const connections = await query<DeviceConnection>(
"SELECT * FROM pipeline_device_connections WHERE id = $1 AND is_active = 'Y'",
[connectionId]
);
if (!connections.length) throw new Error(`활성 연결을 찾을 수 없습니다: ${connectionId}`);
const device = connections[0];
const interval = device.polling_interval_ms || 1000;
logger.info(`[Collector] 폴링 시작: ${device.connection_name} (${device.protocol}, ${interval}ms 간격)`);
// 즉시 한 번 실행
const data = await collectDevice(connectionId);
await publishData(data);
// 주기적 폴링
const timer = setInterval(async () => {
try {
const collected = await collectDevice(connectionId);
await publishData(collected);
} catch (err) {
logger.error(`[Collector] 폴링 에러 (${device.connection_name}): ${(err as Error).message}`);
}
}, interval);
pollingTimers.set(connectionId, timer);
}
export function stopPolling(connectionId: number): void {
const timer = pollingTimers.get(connectionId);
if (timer) {
clearInterval(timer);
pollingTimers.delete(connectionId);
logger.info(`[Collector] 폴링 중지: connection_id=${connectionId}`);
}
// 클라이언트 연결도 정리
const client = clientCache.get(connectionId);
if (client) {
client.disconnect();
clientCache.delete(connectionId);
}
}
export function stopAllPolling(): void {
for (const [id] of pollingTimers) {
stopPolling(id);
}
closeAllXgtConnections();
logger.info("[Collector] 모든 폴링 중지");
}
// ─── 활성 연결 전체 폴링 시작 ────────────────────
export async function startAllActivePolling(): Promise<number> {
const connections = await query<DeviceConnection>(
"SELECT * FROM pipeline_device_connections WHERE is_active = 'Y' AND status != 'error' ORDER BY id"
);
let started = 0;
for (const conn of connections) {
try {
await startPolling(conn.id);
started++;
} catch (err) {
logger.error(`[Collector] 폴링 시작 실패 (${conn.connection_name}): ${(err as Error).message}`);
}
}
logger.info(`[Collector] 전체 폴링 시작: ${started}/${connections.length}개 연결`);
return started;
}
// ─── 상태 조회 ────────────────────────────────────
export function getPollingStatus(): { connectionId: number; active: boolean }[] {
const result: { connectionId: number; active: boolean }[] = [];
for (const [id] of pollingTimers) {
result.push({ connectionId: id, active: true });
}
return result;
}
export function getRetryQueueSize(): number {
return retryQueue.length;
}
// ─── MSSQL watermark 영속화 ───────────────────────
// pipeline_collector_watermark(connection_id, watermark) 테이블에 저장.
// 테이블이 없으면 자동 생성 (첫 사용 시).
let watermarkTableReady = false;
async function ensureWatermarkTable(): Promise<void> {
if (watermarkTableReady) return;
try {
await query(`
CREATE TABLE IF NOT EXISTS pipeline_collector_watermark (
connection_id INTEGER PRIMARY KEY,
watermark TIMESTAMPTZ,
updated_at TIMESTAMPTZ DEFAULT NOW()
)
`);
watermarkTableReady = true;
} catch (err) {
logger.warn(`[Collector] watermark 테이블 생성 실패: ${(err as Error).message}`);
}
}
async function loadWatermark(connectionId: number): Promise<Date | null> {
await ensureWatermarkTable();
try {
const rows = await query<{ watermark: Date | null }>(
"SELECT watermark FROM pipeline_collector_watermark WHERE connection_id=$1",
[connectionId],
);
return rows[0]?.watermark ?? null;
} catch {
return null;
}
}
async function saveWatermark(connectionId: number, ts: Date): Promise<void> {
await ensureWatermarkTable();
try {
await query(
`INSERT INTO pipeline_collector_watermark (connection_id, watermark, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (connection_id)
DO UPDATE SET watermark = EXCLUDED.watermark, updated_at = NOW()`,
[connectionId, ts],
);
} catch (err) {
logger.warn(`[Collector] watermark 저장 실패: ${(err as Error).message}`);
}
}
// ─── 영속 retry queue (target DB INSERT 실패분) ───
// pipeline_target_retry_queue 테이블에 원본 CollectedData JSON 저장.
// 워커가 주기적으로 꺼내서 재시도.
let retryTableReady = false;
async function ensureRetryTable(): Promise<void> {
if (retryTableReady) return;
try {
await query(`
CREATE TABLE IF NOT EXISTS pipeline_target_retry_queue (
id BIGSERIAL PRIMARY KEY,
connection_id INTEGER NOT NULL,
payload JSONB NOT NULL,
retry_count INTEGER DEFAULT 0,
last_error TEXT,
next_retry_at TIMESTAMPTZ DEFAULT NOW(),
created_at TIMESTAMPTZ DEFAULT NOW()
)
`);
await query(`
CREATE INDEX IF NOT EXISTS idx_pipeline_target_retry_queue_next
ON pipeline_target_retry_queue(next_retry_at)
`);
retryTableReady = true;
} catch (err) {
logger.warn(`[Collector] retry 테이블 생성 실패: ${(err as Error).message}`);
}
}
async function enqueueTargetRetry(data: CollectedData, err: string): Promise<void> {
await ensureRetryTable();
try {
await query(
`INSERT INTO pipeline_target_retry_queue (connection_id, payload, last_error)
VALUES ($1, $2::jsonb, $3)`,
[data.connectionId, JSON.stringify(data), err.substring(0, 500)],
);
} catch {}
}
const RETRY_WORKER_INTERVAL_MS = 30_000;
const RETRY_MAX_ATTEMPTS = 10;
let retryWorkerTimer: NodeJS.Timeout | null = null;
export function startRetryWorker(): void {
if (retryWorkerTimer) return;
retryWorkerTimer = setInterval(() => {
processRetryBatch().catch(() => undefined);
}, RETRY_WORKER_INTERVAL_MS);
logger.info(`[Collector] target retry worker 시작 (${RETRY_WORKER_INTERVAL_MS}ms 주기)`);
}
export function stopRetryWorker(): void {
if (retryWorkerTimer) {
clearInterval(retryWorkerTimer);
retryWorkerTimer = null;
}
}
async function processRetryBatch(): Promise<void> {
await ensureRetryTable();
const rows = await query<{
id: number;
payload: CollectedData;
retry_count: number;
}>(
`SELECT id, payload, retry_count
FROM pipeline_target_retry_queue
WHERE next_retry_at <= NOW() AND retry_count < $1
ORDER BY id ASC
LIMIT 50`,
[RETRY_MAX_ATTEMPTS],
);
if (rows.length === 0) return;
logger.info(`[Collector] retry worker: ${rows.length}건 재시도`);
for (const row of rows) {
try {
await writeToTargetDb(row.payload);
await query(`DELETE FROM pipeline_target_retry_queue WHERE id=$1`, [row.id]);
} catch (err) {
const newCount = row.retry_count + 1;
const backoffSec = Math.min(60 * Math.pow(2, newCount), 3600);
await query(
`UPDATE pipeline_target_retry_queue
SET retry_count=$1, last_error=$2,
next_retry_at = NOW() + ($3 || ' seconds')::interval
WHERE id=$4`,
[newCount, String((err as Error).message).substring(0, 500), backoffSec, row.id],
);
}
}
}
// publishData 내부에서 target DB 실패 시 호출할 수 있도록 export
export { enqueueTargetRetry };