feat(collector): MSSQL/MQTT-Sub 수집기 + target DB retry queue + watermark 영속화

MSSQL 수집기 (새 파일 mssqlClient.ts)
- Python sql_collector.py 의 MSSQL 모드 포팅
- watermark 기반 증분 SELECT, batch_mode (row 당 장비), fetch_size 지원
- timestamp_expression 으로 CONVERT/LEFT/RIGHT 등 복합 타임스탬프 표현 가능
- Protocol: MSSQL_DB

MQTT 구독 수집기 (새 파일 mqttCollectorClient.ts)
- tag.address = 토픽, 메시지 캐시 후 readTags() 호출 시 최신값 반환
- payload JSON 파싱 시 json_path 로 특정 필드 추출 가능
- Protocol: MQTT_SUB

Target DB retry queue (pipeline_target_retry_queue)
- 기존 in-memory Array → PG 영속화
- 실패 → enqueue(exp.backoff), 30s 워커가 재시도, 10회 초과 시 폐기
- IDC TimescaleDB 일시 다운 시 데이터 유실 방지 (기존 단순 warn → 적재)

Watermark 영속화 (pipeline_collector_watermark)
- connection_id 당 마지막으로 읽은 타임스탬프 기록
- MSSQL 증분 수집의 핵심, 재기동에도 중복 없음

deviceCollectorService.ts
- case "MSSQL_DB", "MQTT_SUB" 분기 추가
- clientCache 타입에 MssqlClient, MqttCollectorClient 추가
- publishData 5단계 실패 시 enqueueTargetRetry 호출
- startRetryWorker()/stopRetryWorker() 수명 주기 관리

app.ts
- 기동 시 startRetryWorker() 호출

pipelineDeviceTypes.ts
- DeviceProtocol union 에 MSSQL_DB, MQTT_SUB 추가
- PROTOCOL_OPTIONS / PROTOCOL_DEFAULTS 에 등록
- UI 드롭다운 및 기본값 지원

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
chpark
2026-04-23 18:20:31 +09:00
parent 156dd1ddb1
commit 01625d9efd
5 changed files with 863 additions and 30 deletions
+7
View File
@@ -582,6 +582,13 @@ async function initializeServices() {
"./services/collector/edgeStatusReporter"
);
await startEdgeReporter();
// Target DB Retry Worker (IDC 일시 다운 시 INSERT 실패분 재시도)
const { startRetryWorker } = await import(
"./services/collector/deviceCollectorService"
);
startRetryWorker();
logger.info(`🔁 target DB retry worker 가동`);
} catch (error) {
logger.error(`❌ 배치 스케줄러 초기화 실패:`, error);
}
@@ -14,6 +14,12 @@ import { XgtClient, getXgtClient, closeAllXgtConnections } from "./protocols/xgt
import { ModbusClient } from "./protocols/modbusClient";
import { OpcuaClient } from "./protocols/opcuaClient";
import { S7Client } from "./protocols/s7Client";
import { getMssqlClient, MssqlClient, MssqlTagConfig } from "./protocols/mssqlClient";
import {
getMqttCollectorClient,
MqttCollectorClient,
MqttCollectorTag,
} from "./protocols/mqttCollectorClient";
import type { XgtTagConfig, XgtReadResult } from "./protocols/xgtClient";
import type { ModbusTagConfig, ModbusReadResult } from "./protocols/modbusClient";
import type { OpcuaTagConfig } from "./protocols/opcuaClient";
@@ -68,7 +74,10 @@ export interface CollectedData {
// ─── 폴링 타이머 관리 ─────────────────────────────
const pollingTimers = new Map<number, NodeJS.Timeout>();
const clientCache = new Map<number, XgtClient | ModbusClient | OpcuaClient | S7Client>();
const clientCache = new Map<
number,
XgtClient | ModbusClient | OpcuaClient | S7Client | MssqlClient | MqttCollectorClient
>();
const lastPlcState = new Map<number, CollectedData["plcState"]>();
// ─── 오프라인 버퍼 (메모리 기반, 추후 SQLite 확장 가능) ───
@@ -246,6 +255,103 @@ export async function collectDevice(connectionId: number): Promise<CollectedData
break;
}
case "MQTT_SUB": {
// MQTT 구독 기반 수집 (tag.address = topic)
const pc = device.protocol_config || {};
const cfg = {
brokerUrl: (pc.broker_url as string) || `mqtt://${device.host}:${device.port || 1883}`,
username: pc.username as string | undefined,
password: pc.password as string | undefined,
clientId: (pc.client_id as string) || `pipeline-coll-${device.id}`,
qos: ((pc.qos as number) ?? 0) as 0 | 1 | 2,
keepaliveSec: (pc.keepalive_sec as number) ?? 60,
};
const mqttTags: MqttCollectorTag[] = tags.map(t => ({
tagName: t.tag_name,
topic: t.address,
dataType: t.tag_data_type,
jsonPath: (t as any).json_path as string | undefined,
scaleFactor: t.scale_factor ?? 1,
offsetValue: t.offset_value ?? 0,
}));
let client = clientCache.get(device.id) as MqttCollectorClient | undefined;
if (!client || !client.isConnected()) {
client = getMqttCollectorClient(cfg);
await client.connect(mqttTags);
clientCache.set(device.id, client);
}
const readings = await client.readTags(mqttTags);
for (const r of readings) {
result.tags[r.tagName] = r.quality === "good" ? r.value : null;
}
result.plcState = "connected";
break;
}
case "MSSQL_DB": {
// SQL Server 기반 수집 (Python sql_collector.py 의 MSSQL 모드 포팅)
// protocol_config 로부터 연결/쿼리/watermark 파라미터 읽음
const pc = device.protocol_config || {};
const mssqlCfg = {
host: device.host,
port: device.port || 1433,
database: (pc.database as string) || "",
username: (pc.username as string) || "",
password: (pc.password as string) || "",
query: (pc.query as string) || "",
timestampColumn: pc.timestamp_column as string | undefined,
timestampExpression: pc.timestamp_expression as string | undefined,
idColumn: pc.id_column as string | undefined,
nameColumn: pc.name_column as string | undefined,
batchMode: (pc.batch_mode as boolean) ?? false,
fetchSize: (pc.fetch_size as number) ?? 500,
deviceIdPrefix: (pc.device_id_prefix as string) || "sql",
tdsVersion: pc.tds_version as string | undefined,
timeout: device.timeout_ms || 30000,
};
let client = clientCache.get(device.id) as MssqlClient | undefined;
if (!client || !client.isConnected()) {
client = getMssqlClient(mssqlCfg);
await client.connect();
clientCache.set(device.id, client);
}
const mssqlTags: MssqlTagConfig[] = tags.map(t => ({
tagName: t.tag_name,
columnName: (t as any).column_name || t.address || t.tag_name,
dataType: t.tag_data_type,
scaleFactor: t.scale_factor ?? 1,
offsetValue: t.offset_value ?? 0,
}));
const watermark = await loadWatermark(device.id);
const { batches, newWatermark } = await client.readBatch(mssqlTags, watermark);
if (batches.length === 0) {
// 신규 row 없음 — plcState connected 로 두되 태그 비어있음
result.plcState = "connected";
} else if (mssqlCfg.batchMode) {
// batch_mode: 여러 row → 각 row 별 발행은 processOnePoll 에서 불가.
// 우선 첫 row 의 값을 기본 tags 로, 나머지 row 들은 별도 write 로 보낸다.
for (const b of batches) {
// 태그명에 equipmentId 접두 (중복 방지)
for (const [k, v] of Object.entries(b.tags)) {
const key = b.equipmentId ? `${b.equipmentId}_${k}` : k;
result.tags[key] = v;
}
}
result.plcState = "connected";
} else {
const last = batches[batches.length - 1];
for (const [k, v] of Object.entries(last.tags)) result.tags[k] = v;
result.plcState = "connected";
}
if (newWatermark && newWatermark.getTime() !== watermark?.getTime()) {
await saveWatermark(device.id, newWatermark);
}
break;
}
default:
throw new Error(`지원하지 않는 프로토콜: ${device.protocol}`);
}
@@ -433,7 +539,9 @@ async function publishData(data: CollectedData): Promise<void> {
try {
await writeToTargetDb(data);
} catch (err) {
logger.warn(`[Collector] target DB 저장 실패: ${(err as Error).message}`);
const msg = (err as Error).message;
logger.warn(`[Collector] target DB 저장 실패 — retry queue 에 적재: ${msg}`);
await enqueueTargetRetry(data, msg).catch(() => undefined);
}
}
@@ -763,3 +871,148 @@ export function getPollingStatus(): { connectionId: number; active: boolean }[]
export function getRetryQueueSize(): number {
return retryQueue.length;
}
// ─── MSSQL watermark 영속화 ───────────────────────
// pipeline_collector_watermark(connection_id, watermark) 테이블에 저장.
// 테이블이 없으면 자동 생성 (첫 사용 시).
let watermarkTableReady = false;
async function ensureWatermarkTable(): Promise<void> {
if (watermarkTableReady) return;
try {
await query(`
CREATE TABLE IF NOT EXISTS pipeline_collector_watermark (
connection_id INTEGER PRIMARY KEY,
watermark TIMESTAMPTZ,
updated_at TIMESTAMPTZ DEFAULT NOW()
)
`);
watermarkTableReady = true;
} catch (err) {
logger.warn(`[Collector] watermark 테이블 생성 실패: ${(err as Error).message}`);
}
}
async function loadWatermark(connectionId: number): Promise<Date | null> {
await ensureWatermarkTable();
try {
const rows = await query<{ watermark: Date | null }>(
"SELECT watermark FROM pipeline_collector_watermark WHERE connection_id=$1",
[connectionId],
);
return rows[0]?.watermark ?? null;
} catch {
return null;
}
}
async function saveWatermark(connectionId: number, ts: Date): Promise<void> {
await ensureWatermarkTable();
try {
await query(
`INSERT INTO pipeline_collector_watermark (connection_id, watermark, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (connection_id)
DO UPDATE SET watermark = EXCLUDED.watermark, updated_at = NOW()`,
[connectionId, ts],
);
} catch (err) {
logger.warn(`[Collector] watermark 저장 실패: ${(err as Error).message}`);
}
}
// ─── 영속 retry queue (target DB INSERT 실패분) ───
// pipeline_target_retry_queue 테이블에 원본 CollectedData JSON 저장.
// 워커가 주기적으로 꺼내서 재시도.
let retryTableReady = false;
async function ensureRetryTable(): Promise<void> {
if (retryTableReady) return;
try {
await query(`
CREATE TABLE IF NOT EXISTS pipeline_target_retry_queue (
id BIGSERIAL PRIMARY KEY,
connection_id INTEGER NOT NULL,
payload JSONB NOT NULL,
retry_count INTEGER DEFAULT 0,
last_error TEXT,
next_retry_at TIMESTAMPTZ DEFAULT NOW(),
created_at TIMESTAMPTZ DEFAULT NOW()
)
`);
await query(`
CREATE INDEX IF NOT EXISTS idx_pipeline_target_retry_queue_next
ON pipeline_target_retry_queue(next_retry_at)
`);
retryTableReady = true;
} catch (err) {
logger.warn(`[Collector] retry 테이블 생성 실패: ${(err as Error).message}`);
}
}
async function enqueueTargetRetry(data: CollectedData, err: string): Promise<void> {
await ensureRetryTable();
try {
await query(
`INSERT INTO pipeline_target_retry_queue (connection_id, payload, last_error)
VALUES ($1, $2::jsonb, $3)`,
[data.connectionId, JSON.stringify(data), err.substring(0, 500)],
);
} catch {}
}
const RETRY_WORKER_INTERVAL_MS = 30_000;
const RETRY_MAX_ATTEMPTS = 10;
let retryWorkerTimer: NodeJS.Timeout | null = null;
export function startRetryWorker(): void {
if (retryWorkerTimer) return;
retryWorkerTimer = setInterval(() => {
processRetryBatch().catch(() => undefined);
}, RETRY_WORKER_INTERVAL_MS);
logger.info(`[Collector] target retry worker 시작 (${RETRY_WORKER_INTERVAL_MS}ms 주기)`);
}
export function stopRetryWorker(): void {
if (retryWorkerTimer) {
clearInterval(retryWorkerTimer);
retryWorkerTimer = null;
}
}
async function processRetryBatch(): Promise<void> {
await ensureRetryTable();
const rows = await query<{
id: number;
payload: CollectedData;
retry_count: number;
}>(
`SELECT id, payload, retry_count
FROM pipeline_target_retry_queue
WHERE next_retry_at <= NOW() AND retry_count < $1
ORDER BY id ASC
LIMIT 50`,
[RETRY_MAX_ATTEMPTS],
);
if (rows.length === 0) return;
logger.info(`[Collector] retry worker: ${rows.length}건 재시도`);
for (const row of rows) {
try {
await writeToTargetDb(row.payload);
await query(`DELETE FROM pipeline_target_retry_queue WHERE id=$1`, [row.id]);
} catch (err) {
const newCount = row.retry_count + 1;
const backoffSec = Math.min(60 * Math.pow(2, newCount), 3600);
await query(
`UPDATE pipeline_target_retry_queue
SET retry_count=$1, last_error=$2,
next_retry_at = NOW() + ($3 || ' seconds')::interval
WHERE id=$4`,
[newCount, String((err as Error).message).substring(0, 500), backoffSec, row.id],
);
}
}
}
// publishData 내부에서 target DB 실패 시 호출할 수 있도록 export
export { enqueueTargetRetry };
@@ -0,0 +1,221 @@
/**
* MQTT Collector - MQTT 토픽 구독으로 태그 값 수집.
*
* Python data_collector/collectors/mqtt_collector.py 포팅.
*
* 동작:
* 1) 지정 브로커에 연결, 태그의 address 필드를 topic 으로 구독
* 2) 메시지 수신 시 topic → 최신 값 캐시
* 3) readTags() 호출 시 캐시의 최근 값 반환 (pull 방식)
*
* 연결 파라미터(protocol_config 예):
* {
* "broker_url": "mqtt://192.168.1.10:1883",
* "username": "user",
* "password": "pass",
* "client_id": "pipeline-collector",
* "qos": 1,
* "keepalive_sec": 60
* }
*
* tag.address 가 MQTT topic 으로 사용됨. payload 는
* (1) 순수 숫자/불리언 문자열, 또는
* (2) JSON (tag.json_path 로 특정 필드 추출, 미설정이면 전체) 가능.
*/
import mqtt, { MqttClient as MqttLib, IClientOptions } from "mqtt";
import { logger } from "../../../utils/logger";
export interface MqttCollectorTag {
tagName: string;
topic: string;
dataType: string; // "BOOL" | "INT16" | "UINT16" | "INT32" | "UINT32" | "FLOAT32" | "STRING"
jsonPath?: string; // 점 표기 ("a.b.c"), 생략 시 payload 전체
scaleFactor?: number;
offsetValue?: number;
}
export interface MqttCollectorReadResult {
tagName: string;
topic: string;
value: number | string | boolean | null;
quality: "good" | "bad";
timestamp: Date;
}
export interface MqttCollectorConfig {
brokerUrl: string; // "mqtt://host:port" or "mqtts://..."
username?: string;
password?: string;
clientId?: string;
qos?: 0 | 1 | 2;
keepaliveSec?: number;
}
interface CacheEntry {
raw: any;
at: Date;
}
export class MqttCollectorClient {
private cfg: MqttCollectorConfig;
private client: MqttLib | null = null;
private cache = new Map<string, CacheEntry>();
private subscribed = new Set<string>();
private connected = false;
constructor(cfg: MqttCollectorConfig) {
this.cfg = cfg;
}
async connect(tags: MqttCollectorTag[]): Promise<void> {
if (this.connected && this.client) {
// 새 태그 토픽이 있으면 구독
this.subscribeTopics(tags);
return;
}
const opts: IClientOptions = {
clientId: this.cfg.clientId || `pipeline-collector-${Date.now()}`,
username: this.cfg.username,
password: this.cfg.password,
keepalive: this.cfg.keepaliveSec ?? 60,
reconnectPeriod: 5000,
connectTimeout: 10000,
};
return new Promise((resolve, reject) => {
const c = mqtt.connect(this.cfg.brokerUrl, opts);
this.client = c;
const onConnect = () => {
this.connected = true;
c.off("error", onError);
logger.info(`[MQTT-Collector] 연결 성공: ${this.cfg.brokerUrl}`);
this.subscribeTopics(tags);
resolve();
};
const onError = (err: Error) => {
this.connected = false;
c.off("connect", onConnect);
reject(new Error(`[MQTT-Collector] 연결 실패: ${err.message}`));
};
c.once("connect", onConnect);
c.once("error", onError);
c.on("message", (topic, payload) => {
try {
const str = payload.toString("utf-8");
let parsed: any = str;
try {
parsed = JSON.parse(str);
} catch {}
this.cache.set(topic, { raw: parsed, at: new Date() });
} catch (err) {
logger.debug(`[MQTT-Collector] 메시지 처리 실패 topic=${topic}: ${(err as Error).message}`);
}
});
c.on("close", () => {
this.connected = false;
});
});
}
private subscribeTopics(tags: MqttCollectorTag[]): void {
if (!this.client) return;
const qos = this.cfg.qos ?? 0;
for (const t of tags) {
if (!t.topic || this.subscribed.has(t.topic)) continue;
this.client.subscribe(t.topic, { qos }, (err) => {
if (err) logger.warn(`[MQTT-Collector] 구독 실패 ${t.topic}: ${err.message}`);
else {
this.subscribed.add(t.topic);
logger.debug(`[MQTT-Collector] 구독 성공 topic=${t.topic} qos=${qos}`);
}
});
}
}
async disconnect(): Promise<void> {
return new Promise((resolve) => {
if (!this.client) {
this.connected = false;
return resolve();
}
this.client.end(true, {}, () => {
this.client = null;
this.connected = false;
this.cache.clear();
this.subscribed.clear();
resolve();
});
});
}
isConnected(): boolean {
return this.connected;
}
async readTags(tags: MqttCollectorTag[]): Promise<MqttCollectorReadResult[]> {
const out: MqttCollectorReadResult[] = [];
const now = new Date();
for (const t of tags) {
const cached = this.cache.get(t.topic);
if (!cached) {
out.push({ tagName: t.tagName, topic: t.topic, value: null, quality: "bad", timestamp: now });
continue;
}
let raw = cached.raw;
if (t.jsonPath && typeof raw === "object" && raw !== null) {
raw = t.jsonPath.split(".").reduce((acc: any, k) => (acc == null ? acc : acc[k]), raw);
}
const value = coerce(raw, t.dataType, t.scaleFactor, t.offsetValue);
out.push({
tagName: t.tagName,
topic: t.topic,
value,
quality: value === null ? "bad" : "good",
timestamp: cached.at,
});
}
return out;
}
}
function coerce(
raw: any,
dataType: string,
scale?: number,
offset?: number,
): number | string | boolean | null {
if (raw === null || raw === undefined) return null;
const t = (dataType || "").toUpperCase();
try {
if (t === "BOOL") {
if (typeof raw === "boolean") return raw;
if (typeof raw === "number") return raw !== 0;
const s = String(raw).trim().toLowerCase();
return s === "true" || s === "1" || s === "on";
}
if (t === "STRING") return String(raw);
let v = Number(raw);
if (!Number.isFinite(v)) return null;
if (scale && scale !== 1) v = v * scale;
if (offset) v = v + offset;
if (t.startsWith("INT") || t.startsWith("UINT")) return Math.trunc(v);
return v;
} catch {
return null;
}
}
// ─── pool ─────────────────────────────────────────
const pool = new Map<string, MqttCollectorClient>();
export function getMqttCollectorClient(cfg: MqttCollectorConfig): MqttCollectorClient {
const key = `${cfg.brokerUrl}@${cfg.clientId || "anon"}`;
if (!pool.has(key)) pool.set(key, new MqttCollectorClient(cfg));
return pool.get(key)!;
}
export function closeAllMqttCollectors(): void {
for (const [, c] of pool) c.disconnect().catch(() => undefined);
pool.clear();
}
@@ -0,0 +1,236 @@
/**
* MSSQL Collector - SQL Server 기반 데이터 수집
*
* Python data_collector/collectors/sql_collector.py 의 MSSQL 모드 포팅.
*
* 동작:
* 1) 대상 DB 에 SELECT 쿼리 실행 (watermark 기반 증분)
* 2) 결과 row 의 지정 컬럼을 태그 값으로 매핑
* 3) batch 모드: 1 row = 1 장비 (id_column 이 equipment ID)
* 4) watermark 저장 (다음 수집 시 그 이후 row만)
*
* 연결 파라미터(protocol_config JSON 예):
* {
* "query": "SELECT * FROM dbo.설비가동정보",
* "database": "ALPET",
* "username": "wace",
* "password": "wace",
* "timestamp_column": "일자",
* "timestamp_expression": "CONVERT(datetime, 일자 + ' ' + LEFT(시간,2) + ':' + RIGHT(시간,2))",
* "id_column": "설비", // batch_mode 시 장비 식별 컬럼
* "name_column": "설비",
* "batch_mode": true,
* "fetch_size": 500,
* "device_id_prefix": "alpet",
* "tds_version": "7.3"
* }
*
* tag 매핑: tag.address 또는 별도 column_name 필드를 row 컬럼명으로 사용.
*/
import { logger } from "../../../utils/logger";
export interface MssqlTagConfig {
tagName: string;
columnName: string; // row 에서 추출할 컬럼명
dataType: string; // "string" | "float32" | "int32" | "bool" ...
scaleFactor?: number;
offsetValue?: number;
}
export interface MssqlReadBatch {
equipmentId: string | null; // batch_mode=true 일 때 row 별 장비 ID
timestamp: Date;
tags: Record<string, number | string | boolean | null>;
}
export interface MssqlConnectionConfig {
host: string;
port: number;
database: string;
username: string;
password: string;
query: string;
timestampColumn?: string;
timestampExpression?: string;
idColumn?: string;
nameColumn?: string;
batchMode?: boolean;
fetchSize?: number;
deviceIdPrefix?: string;
tdsVersion?: string;
timeout?: number;
}
export class MssqlClient {
private cfg: MssqlConnectionConfig;
private pool: any | null = null;
private connected = false;
constructor(cfg: MssqlConnectionConfig) {
this.cfg = cfg;
}
async connect(): Promise<void> {
if (this.connected && this.pool) return;
const sql = require("mssql");
const config: any = {
user: this.cfg.username,
password: this.cfg.password,
server: this.cfg.host,
port: this.cfg.port,
database: this.cfg.database,
options: {
encrypt: true,
trustServerCertificate: true,
enableArithAbort: true,
tdsVersion: this.cfg.tdsVersion || "7_4",
},
requestTimeout: this.cfg.timeout || 30000,
connectionTimeout: this.cfg.timeout || 30000,
pool: { max: 2, min: 0, idleTimeoutMillis: 30000 },
};
this.pool = await sql.connect(config);
this.connected = true;
logger.info(`[MSSQL] 연결 성공: ${this.cfg.host}:${this.cfg.port}/${this.cfg.database}`);
}
async disconnect(): Promise<void> {
if (this.pool) {
try {
await this.pool.close();
} catch {}
this.pool = null;
this.connected = false;
}
}
isConnected(): boolean {
return this.connected;
}
/**
* watermark 이후 row 들을 읽어 태그 배열로 반환.
* batchMode=true 이면 row 당 1개 MssqlReadBatch (equipment 분리),
* false 이면 모든 row 를 합친 단일 MssqlReadBatch.
*/
async readBatch(
tags: MssqlTagConfig[],
watermark: Date | null,
): Promise<{ batches: MssqlReadBatch[]; newWatermark: Date | null }> {
if (!this.pool) throw new Error("[MSSQL] 연결되지 않음");
const sql = require("mssql");
const tsCol = this.cfg.timestampColumn;
const tsExpr = this.cfg.timestampExpression;
const limit = this.cfg.fetchSize ?? 500;
// WHERE 절 — watermark 이후
// timestamp_expression 이 있으면 그 expr 기준, 없으면 단순 컬럼
let sqlQuery = this.cfg.query.trim();
// SELECT 로 시작하는 쿼리를 서브쿼리로 감싸서 WHERE/ORDER BY 주입
if (watermark && tsCol) {
const filterExpr = tsExpr || tsCol;
sqlQuery = `SELECT TOP (${limit}) * FROM (${sqlQuery}) AS _src
WHERE ${filterExpr} > @watermark
ORDER BY ${filterExpr} ASC`;
} else {
sqlQuery = `SELECT TOP (${limit}) * FROM (${sqlQuery}) AS _src`;
if (tsCol) {
const filterExpr = tsExpr || tsCol;
sqlQuery += ` ORDER BY ${filterExpr} ASC`;
}
}
const request = this.pool.request();
if (watermark) request.input("watermark", sql.DateTime2, watermark);
const res = await request.query(sqlQuery);
const rows: any[] = res.recordset || [];
if (rows.length === 0) {
return { batches: [], newWatermark: watermark };
}
const batches: MssqlReadBatch[] = [];
let maxTs: Date | null = watermark;
const rowToBatch = (row: any): MssqlReadBatch => {
const tagValues: Record<string, number | string | boolean | null> = {};
for (const t of tags) {
const raw = row[t.columnName];
if (raw === undefined || raw === null) {
tagValues[t.tagName] = null;
continue;
}
tagValues[t.tagName] = coerceValue(raw, t.dataType, t.scaleFactor, t.offsetValue);
}
let rowTs: Date;
if (tsCol && row[tsCol] instanceof Date) rowTs = row[tsCol];
else if (tsCol && row[tsCol] !== undefined) rowTs = new Date(row[tsCol]);
else rowTs = new Date();
if (!maxTs || rowTs > maxTs) maxTs = rowTs;
const equipId = this.cfg.batchMode && this.cfg.idColumn ? String(row[this.cfg.idColumn] ?? "") : null;
return { equipmentId: equipId, timestamp: rowTs, tags: tagValues };
};
if (this.cfg.batchMode) {
for (const row of rows) batches.push(rowToBatch(row));
} else {
// 모든 row 를 하나로 합침 — 마지막 row 가 "최신값" 을 갖는 것으로 취급
const merged = rowToBatch(rows[rows.length - 1]);
batches.push(merged);
}
return { batches, newWatermark: maxTs };
}
}
function coerceValue(
raw: any,
dataType: string,
scale?: number,
offset?: number,
): number | string | boolean | null {
const t = (dataType || "").toLowerCase();
try {
if (t === "bool" || t === "boolean") {
return typeof raw === "boolean" ? raw : Boolean(Number(raw));
}
if (t === "string" || t === "varchar" || t === "text") {
return String(raw);
}
if (t.startsWith("int") || t === "uint16" || t === "uint32") {
let v = parseInt(raw, 10);
if (!Number.isFinite(v)) return null;
if (scale && scale !== 1) v = v * scale;
if (offset) v = v + offset;
return v;
}
if (t.startsWith("float") || t === "double" || t === "real" || t === "number") {
let v = Number(raw);
if (!Number.isFinite(v)) return null;
if (scale && scale !== 1) v = v * scale;
if (offset) v = v + offset;
return v;
}
// fallback: 숫자로 시도, 실패하면 문자열
const n = Number(raw);
return Number.isFinite(n) ? n : String(raw);
} catch {
return String(raw);
}
}
// ─── Connection Pool (host:db key) ────────────────
const pool = new Map<string, MssqlClient>();
export function getMssqlClient(cfg: MssqlConnectionConfig): MssqlClient {
const key = `${cfg.host}:${cfg.port}/${cfg.database}@${cfg.username}`;
if (!pool.has(key)) pool.set(key, new MssqlClient(cfg));
return pool.get(key)!;
}
export function closeAllMssqlConnections(): void {
for (const [, c] of pool) c.disconnect().catch(() => undefined);
pool.clear();
}
+144 -28
View File
@@ -1,13 +1,42 @@
// 파이프라인 장비 연결 관련 타입 정의
// 파이프라인 장비 통신 관련 타입 정의
// data-collector 프로젝트의 모델을 참고하여 확장
export type DeviceProtocol =
| "MODBUS_TCP"
| "MODBUS_RTU"
| "OPCUA"
| "SIEMENS_S7"
| "LS_XGT"
| "MQTT"
| "MQTT_SUB"
| "MSSQL_DB"
| "REST_API";
export type ByteOrder =
| "BIG_ENDIAN"
| "LITTLE_ENDIAN"
| "BIG_ENDIAN_SWAP"
| "LITTLE_ENDIAN_SWAP";
export type TagDataType =
| "BOOL"
| "UINT16"
| "INT16"
| "UINT32"
| "INT32"
| "FLOAT32"
| "FLOAT64"
| "STRING";
export interface PipelineDeviceConnection {
id?: number;
equipment_id?: number | null; // pipeline_equipment FK
connection_name: string;
description?: string | null;
protocol: "PLC_ETHERNET" | "MODBUS_TCP" | "OPCUA" | "MQTT" | "REST_API";
protocol: DeviceProtocol;
host: string;
port: number;
protocol_config?: Record<string, unknown>;
protocol_config?: ProtocolConfig;
polling_interval_ms?: number;
timeout_ms?: number;
retry_count?: number;
@@ -23,19 +52,75 @@ export interface PipelineDeviceConnection {
// 조인 필드
tag_count?: number;
company_name?: string;
equipment_name?: string;
equipment_code?: string;
}
// 프로토콜별 세부 설정 (protocol_config JSONB에 저장)
export interface ModbusTcpConfig {
unit_id: number; // Slave ID (1-255)
}
export interface ModbusRtuConfig {
serial_port: string; // /dev/ttyUSB0 또는 COM3
baudrate: number; // 9600, 19200, ...
parity: "N" | "E" | "O";
stopbits: 1 | 2;
unit_id: number;
}
export interface OpcUaConfig {
security_policy?: "None" | "Basic128Rsa15" | "Basic256" | "Basic256Sha256";
username?: string;
password?: string;
use_subscription?: boolean;
publishing_interval_ms?: number;
}
export interface SiemensS7Config {
rack: number; // 0
slot: number; // 1 (S7-300/400: 2, S7-1200/1500: 1)
cpu_type?: "S7-300" | "S7-400" | "S7-1200" | "S7-1500";
}
export interface LsXgtConfig {
cpu_type?: "XGK" | "XGI" | "XGR";
slot?: number;
}
export interface MqttConfig {
username?: string;
password?: string;
client_id?: string;
qos?: 0 | 1 | 2;
keepalive_sec?: number;
use_tls?: boolean;
topics?: string[]; // 구독할 토픽 리스트
}
export type ProtocolConfig =
| ModbusTcpConfig
| ModbusRtuConfig
| OpcUaConfig
| SiemensS7Config
| LsXgtConfig
| MqttConfig
| Record<string, unknown>;
export interface PipelineTagMapping {
id?: number;
connection_id: number;
tag_name: string;
tag_display_name?: string | null;
tag_unit?: string | null;
tag_data_type: "INT16" | "INT32" | "FLOAT32" | "FLOAT64" | "BOOLEAN" | "STRING";
address: string;
tag_data_type: TagDataType;
address: string; // 프로토콜별 주소 (Modbus: 레지스터번호, OPCUA: node_id, MQTT: topic)
address_type?: "WORD" | "DWORD" | "FLOAT" | "BIT" | "STRING" | null;
scale_factor?: number;
byte_order?: ByteOrder;
bit_index?: number | null; // UINT16에서 비트 추출
scale_factor?: number; // value * scale + offset
offset_value?: number;
deadband?: number | null; // 변경 임계값 (노이즈 필터)
min_value?: number | null;
max_value?: number | null;
description?: string | null;
@@ -50,6 +135,7 @@ export interface PipelineDeviceConnectionFilter {
company_code?: string;
search?: string;
status?: string;
equipment_id?: number;
}
export interface DeviceConnectionTestResult {
@@ -67,39 +153,69 @@ export interface DeviceConnectionTestResult {
};
}
// 프로토콜 옵션
// 프로토콜 옵션 (UI 표시용)
export const PROTOCOL_OPTIONS = [
{ value: "PLC_ETHERNET", label: "PLC Ethernet (MC Protocol)" },
{ value: "MODBUS_TCP", label: "Modbus TCP" },
{ value: "OPCUA", label: "OPC-UA" },
{ value: "MQTT", label: "MQTT" },
{ value: "REST_API", label: "REST API" },
{ value: "MODBUS_TCP", label: "Modbus TCP", color: "#F59E0B", defaultPort: 502 },
{ value: "MODBUS_RTU", label: "Modbus RTU (Serial)", color: "#F97316", defaultPort: 0 },
{ value: "OPCUA", label: "OPC UA", color: "#10B981", defaultPort: 4840 },
{ value: "SIEMENS_S7", label: "Siemens S7", color: "#6366F1", defaultPort: 102 },
{ value: "LS_XGT", label: "LS XGT (LS Electric)", color: "#8B5CF6", defaultPort: 2004 },
{ value: "MQTT", label: "MQTT (발행)", color: "#EC4899", defaultPort: 1883 },
{ value: "MQTT_SUB", label: "MQTT (구독 수집)", color: "#DB2777", defaultPort: 1883 },
{ value: "MSSQL_DB", label: "MSSQL 쿼리 수집", color: "#14B8A6", defaultPort: 1433 },
{ value: "REST_API", label: "REST API", color: "#3B82F6", defaultPort: 443 },
];
// 프로토콜별 기본 포트
export const PROTOCOL_DEFAULTS: Record<string, { port: number }> = {
PLC_ETHERNET: { port: 5000 },
MODBUS_TCP: { port: 502 },
OPCUA: { port: 4840 },
MQTT: { port: 1883 },
REST_API: { port: 443 },
export const PROTOCOL_DEFAULTS: Record<string, { port: number; config: ProtocolConfig }> = {
MODBUS_TCP: { port: 502, config: { unit_id: 1 } },
MODBUS_RTU: { port: 0, config: { serial_port: "/dev/ttyUSB0", baudrate: 9600, parity: "N", stopbits: 1, unit_id: 1 } },
OPCUA: { port: 4840, config: { security_policy: "None", use_subscription: false } },
SIEMENS_S7: { port: 102, config: { rack: 0, slot: 1, cpu_type: "S7-1200" } },
LS_XGT: { port: 2004, config: { cpu_type: "XGK", slot: 0 } },
MQTT: { port: 1883, config: { qos: 0, keepalive_sec: 60, use_tls: false } },
MQTT_SUB: { port: 1883, config: { qos: 0, keepalive_sec: 60, client_id: "" } },
MSSQL_DB: {
port: 1433,
config: {
database: "",
username: "",
password: "",
query: "",
timestamp_column: "",
timestamp_expression: "",
id_column: "",
name_column: "",
batch_mode: false,
fetch_size: 500,
device_id_prefix: "sql",
tds_version: "7_4",
},
},
REST_API: { port: 443, config: {} },
};
// 태그 데이터 타입 옵션
export const TAG_DATA_TYPE_OPTIONS = [
{ value: "INT16", label: "INT16 (정수 16비트)" },
{ value: "INT32", label: "INT32 (정수 32비트)" },
{ value: "BOOL", label: "BOOL (불리언)" },
{ value: "UINT16", label: "UINT16 (부호없는 16비트)" },
{ value: "INT16", label: "INT16 (부호있는 16비트)" },
{ value: "UINT32", label: "UINT32 (부호없는 32비트)" },
{ value: "INT32", label: "INT32 (부호있는 32비트)" },
{ value: "FLOAT32", label: "FLOAT32 (실수 32비트)" },
{ value: "FLOAT64", label: "FLOAT64 (실수 64비트)" },
{ value: "BOOLEAN", label: "BOOLEAN (불리언)" },
{ value: "STRING", label: "STRING (문자열)" },
];
// 주소 타입 옵션
export const BYTE_ORDER_OPTIONS = [
{ value: "BIG_ENDIAN", label: "Big Endian (ABCD)" },
{ value: "LITTLE_ENDIAN", label: "Little Endian (DCBA)" },
{ value: "BIG_ENDIAN_SWAP", label: "Big Endian Byte Swap (BADC)" },
{ value: "LITTLE_ENDIAN_SWAP", label: "Little Endian Byte Swap (CDAB)" },
];
export const ADDRESS_TYPE_OPTIONS = [
{ value: "WORD", label: "WORD" },
{ value: "DWORD", label: "DWORD" },
{ value: "FLOAT", label: "FLOAT" },
{ value: "BIT", label: "BIT" },
{ value: "WORD", label: "WORD (Holding Register)" },
{ value: "DWORD", label: "DWORD (Holding Register x2)" },
{ value: "FLOAT", label: "FLOAT (IEEE 754)" },
{ value: "BIT", label: "BIT (Coil)" },
{ value: "STRING", label: "STRING" },
];