Files
pipeline/backend-node/src/services/collector/protocols/mqttCollectorClient.ts
T
chpark 01625d9efd feat(collector): MSSQL/MQTT-Sub 수집기 + target DB retry queue + watermark 영속화
MSSQL 수집기 (새 파일 mssqlClient.ts)
- Python sql_collector.py 의 MSSQL 모드 포팅
- watermark 기반 증분 SELECT, batch_mode (row 당 장비), fetch_size 지원
- timestamp_expression 으로 CONVERT/LEFT/RIGHT 등 복합 타임스탬프 표현 가능
- Protocol: MSSQL_DB

MQTT 구독 수집기 (새 파일 mqttCollectorClient.ts)
- tag.address = 토픽, 메시지 캐시 후 readTags() 호출 시 최신값 반환
- payload JSON 파싱 시 json_path 로 특정 필드 추출 가능
- Protocol: MQTT_SUB

Target DB retry queue (pipeline_target_retry_queue)
- 기존 in-memory Array → PG 영속화
- 실패 → enqueue(exp.backoff), 30s 워커가 재시도, 10회 초과 시 폐기
- IDC TimescaleDB 일시 다운 시 데이터 유실 방지 (기존 단순 warn → 적재)

Watermark 영속화 (pipeline_collector_watermark)
- connection_id 당 마지막으로 읽은 타임스탬프 기록
- MSSQL 증분 수집의 핵심, 재기동에도 중복 없음

deviceCollectorService.ts
- case "MSSQL_DB", "MQTT_SUB" 분기 추가
- clientCache 타입에 MssqlClient, MqttCollectorClient 추가
- publishData 5단계 실패 시 enqueueTargetRetry 호출
- startRetryWorker()/stopRetryWorker() 수명 주기 관리

app.ts
- 기동 시 startRetryWorker() 호출

pipelineDeviceTypes.ts
- DeviceProtocol union 에 MSSQL_DB, MQTT_SUB 추가
- PROTOCOL_OPTIONS / PROTOCOL_DEFAULTS 에 등록
- UI 드롭다운 및 기본값 지원

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 18:20:31 +09:00

222 lines
6.6 KiB
TypeScript

/**
* MQTT Collector - MQTT 토픽 구독으로 태그 값 수집.
*
* Python data_collector/collectors/mqtt_collector.py 포팅.
*
* 동작:
* 1) 지정 브로커에 연결, 태그의 address 필드를 topic 으로 구독
* 2) 메시지 수신 시 topic → 최신 값 캐시
* 3) readTags() 호출 시 캐시의 최근 값 반환 (pull 방식)
*
* 연결 파라미터(protocol_config 예):
* {
* "broker_url": "mqtt://192.168.1.10:1883",
* "username": "user",
* "password": "pass",
* "client_id": "pipeline-collector",
* "qos": 1,
* "keepalive_sec": 60
* }
*
* tag.address 가 MQTT topic 으로 사용됨. payload 는
* (1) 순수 숫자/불리언 문자열, 또는
* (2) JSON (tag.json_path 로 특정 필드 추출, 미설정이면 전체) 가능.
*/
import mqtt, { MqttClient as MqttLib, IClientOptions } from "mqtt";
import { logger } from "../../../utils/logger";
export interface MqttCollectorTag {
tagName: string;
topic: string;
dataType: string; // "BOOL" | "INT16" | "UINT16" | "INT32" | "UINT32" | "FLOAT32" | "STRING"
jsonPath?: string; // 점 표기 ("a.b.c"), 생략 시 payload 전체
scaleFactor?: number;
offsetValue?: number;
}
export interface MqttCollectorReadResult {
tagName: string;
topic: string;
value: number | string | boolean | null;
quality: "good" | "bad";
timestamp: Date;
}
export interface MqttCollectorConfig {
brokerUrl: string; // "mqtt://host:port" or "mqtts://..."
username?: string;
password?: string;
clientId?: string;
qos?: 0 | 1 | 2;
keepaliveSec?: number;
}
interface CacheEntry {
raw: any;
at: Date;
}
export class MqttCollectorClient {
private cfg: MqttCollectorConfig;
private client: MqttLib | null = null;
private cache = new Map<string, CacheEntry>();
private subscribed = new Set<string>();
private connected = false;
constructor(cfg: MqttCollectorConfig) {
this.cfg = cfg;
}
async connect(tags: MqttCollectorTag[]): Promise<void> {
if (this.connected && this.client) {
// 새 태그 토픽이 있으면 구독
this.subscribeTopics(tags);
return;
}
const opts: IClientOptions = {
clientId: this.cfg.clientId || `pipeline-collector-${Date.now()}`,
username: this.cfg.username,
password: this.cfg.password,
keepalive: this.cfg.keepaliveSec ?? 60,
reconnectPeriod: 5000,
connectTimeout: 10000,
};
return new Promise((resolve, reject) => {
const c = mqtt.connect(this.cfg.brokerUrl, opts);
this.client = c;
const onConnect = () => {
this.connected = true;
c.off("error", onError);
logger.info(`[MQTT-Collector] 연결 성공: ${this.cfg.brokerUrl}`);
this.subscribeTopics(tags);
resolve();
};
const onError = (err: Error) => {
this.connected = false;
c.off("connect", onConnect);
reject(new Error(`[MQTT-Collector] 연결 실패: ${err.message}`));
};
c.once("connect", onConnect);
c.once("error", onError);
c.on("message", (topic, payload) => {
try {
const str = payload.toString("utf-8");
let parsed: any = str;
try {
parsed = JSON.parse(str);
} catch {}
this.cache.set(topic, { raw: parsed, at: new Date() });
} catch (err) {
logger.debug(`[MQTT-Collector] 메시지 처리 실패 topic=${topic}: ${(err as Error).message}`);
}
});
c.on("close", () => {
this.connected = false;
});
});
}
private subscribeTopics(tags: MqttCollectorTag[]): void {
if (!this.client) return;
const qos = this.cfg.qos ?? 0;
for (const t of tags) {
if (!t.topic || this.subscribed.has(t.topic)) continue;
this.client.subscribe(t.topic, { qos }, (err) => {
if (err) logger.warn(`[MQTT-Collector] 구독 실패 ${t.topic}: ${err.message}`);
else {
this.subscribed.add(t.topic);
logger.debug(`[MQTT-Collector] 구독 성공 topic=${t.topic} qos=${qos}`);
}
});
}
}
async disconnect(): Promise<void> {
return new Promise((resolve) => {
if (!this.client) {
this.connected = false;
return resolve();
}
this.client.end(true, {}, () => {
this.client = null;
this.connected = false;
this.cache.clear();
this.subscribed.clear();
resolve();
});
});
}
isConnected(): boolean {
return this.connected;
}
async readTags(tags: MqttCollectorTag[]): Promise<MqttCollectorReadResult[]> {
const out: MqttCollectorReadResult[] = [];
const now = new Date();
for (const t of tags) {
const cached = this.cache.get(t.topic);
if (!cached) {
out.push({ tagName: t.tagName, topic: t.topic, value: null, quality: "bad", timestamp: now });
continue;
}
let raw = cached.raw;
if (t.jsonPath && typeof raw === "object" && raw !== null) {
raw = t.jsonPath.split(".").reduce((acc: any, k) => (acc == null ? acc : acc[k]), raw);
}
const value = coerce(raw, t.dataType, t.scaleFactor, t.offsetValue);
out.push({
tagName: t.tagName,
topic: t.topic,
value,
quality: value === null ? "bad" : "good",
timestamp: cached.at,
});
}
return out;
}
}
function coerce(
raw: any,
dataType: string,
scale?: number,
offset?: number,
): number | string | boolean | null {
if (raw === null || raw === undefined) return null;
const t = (dataType || "").toUpperCase();
try {
if (t === "BOOL") {
if (typeof raw === "boolean") return raw;
if (typeof raw === "number") return raw !== 0;
const s = String(raw).trim().toLowerCase();
return s === "true" || s === "1" || s === "on";
}
if (t === "STRING") return String(raw);
let v = Number(raw);
if (!Number.isFinite(v)) return null;
if (scale && scale !== 1) v = v * scale;
if (offset) v = v + offset;
if (t.startsWith("INT") || t.startsWith("UINT")) return Math.trunc(v);
return v;
} catch {
return null;
}
}
// ─── pool ─────────────────────────────────────────
const pool = new Map<string, MqttCollectorClient>();
export function getMqttCollectorClient(cfg: MqttCollectorConfig): MqttCollectorClient {
const key = `${cfg.brokerUrl}@${cfg.clientId || "anon"}`;
if (!pool.has(key)) pool.set(key, new MqttCollectorClient(cfg));
return pool.get(key)!;
}
export function closeAllMqttCollectors(): void {
for (const [, c] of pool) c.disconnect().catch(() => undefined);
pool.clear();
}