feat: Fleet/Collector/엣지 배포 관련 누적 작업 일괄 커밋
Build and Push Images / build-and-push (push) Has been cancelled

이전 세션들에서 작업된 아래 범위를 모두 포함:

Fleet 서브시스템 (src/fleet/)
- fleetDeviceService / fleetCommandService / fleetDeploymentService / fleetReleaseService
- fleetMetricsService, fleetScriptService, fleetEdgeConfigService
- Edge 디바이스 관리, 커맨드 발행, 배포/릴리스, 스크립트 동기화

Collector 확장
- centralMqttForwarder / centralForwarderConfigService
- equipmentStateService, pythonHookRunner, scriptCache
- Modbus/OPC-UA/S7/XGT 프로토콜 클라이언트
- targetDbIntrospection (저장 DB 조회)

Routes / API
- automationDashboardRoutes, centralForwarderRoutes, equipmentStateRoutes

DB
- importEdgeConfig (Python cached config → Pipeline DB)
- seedDataSources (external_db_connections 초기 시드)

엣지 배포 리소스
- docker/edge/Dockerfile.backend.prod, Dockerfile.frontend.prod
- docker/edge/docker-compose.edge.yml

프론트엔드
- admin/automaticMng (centralForwarder, dashboard, equipmentState)
- admin/fleet (commands, devices, deployments, releases, scripts, alerts)
- admin/pipeline-device 개선 (저장 DB 드롭다운, 태그 매핑 등)
- ExternalDbConnectionModal, ScriptsManagerDialog 등 신규 컴포넌트
- lib/api: automationDashboard, centralForwarder, equipmentState, fleet

docs/
- EDGE_SERVER_STRUCTURE, FLEET_COMPLETE, FLEET_EDGE_INTEGRATION, FLEET_HOOK_INTEGRATION

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
chpark
2026-04-23 20:00:06 +09:00
parent 01625d9efd
commit 4c1dc4082e
77 changed files with 14639 additions and 205 deletions
+404
View File
@@ -0,0 +1,404 @@
# 엣지(스피폭스) ↔ IDC 중앙 수집 파이프라인 — 기존 기능 전수 조사 및 파이프라인 이식 가이드
> 조사 대상
> - **엣지 서버(고객사 수집서버)**: `112.168.212.142` — `waceserver` (Ubuntu, Docker Compose)
> - **IDC 중앙 서버**: `211.115.91.170` — `waceserver01` (Ubuntu, **Kubernetes v1.28 single-node**)
> 조사 일자: 2026-04-20
> 목적: 현재 엣지+IDC가 운용 중인 "수집 → 전송 → 적재 → 조회" 전 기능을 **Pipeline 애플리케이션(vexplor_Pipeline)**에 이식하기 위한 스펙 정리
---
## 0. TL;DR — 파이프라인에 넣어야 할 기능 한 줄 요약
| # | 기능 | 현재 위치 | 파이프라인 이식 방식 |
|---|---|---|---|
| 1 | 다중 프로토콜 수집 (XGT/Modbus/OPC UA/S7/MQTT/SQL/CAS) | 엣지 `data-collector` (Python) | **Pipeline Backend 내부 `collectors/` 모듈**로 이식 |
| 2 | Bootstrap(MAC→UUID) / Config 원격 동기화 | 엣지 `data-collector/bootstrap/` | Pipeline 측 `/api/edge/provision`, `/api/edge/config` 제공 |
| 3 | Store & Forward (로컬 Kafka 버퍼 + RetryQueue) | 엣지 Kafka + `publishers/retry_queue.py` | Pipeline 내부 큐(Kafka or Redis Streams) + 재시도 정책 |
| 4 | Kafka → 중앙 MQTT 배치 포워딩 | 엣지 `kafka-to-central-mqtt` (Python, stateless) | Pipeline `services/forwarder/` 서비스로 이식 |
| 5 | MQTT 공유구독 → TimescaleDB 배치 INSERT | **IDC `digital-twin-web-backend` Node.js** (`mqtt-ingestion.service.js`) | Pipeline Backend의 **데이터 소스(TimescaleDB)** 뒤단에 동일 ingestion 서비스 |
| 6 | Fleet Agent 원격 관리(컨테이너 제어/헬스/오프라인큐) | 엣지 `fleet-agent` (Node.js, `device-supervisor`) | Pipeline이 Fleet API(`fleet-api.vexplor.com`) 소비 측으로 통합 |
| 7 | 이미지 자동 배포 체인 | Harbor → Watchtower 5분 폴링 → 라벨 기반 교체 | Pipeline CI/CD에서 Harbor push + 라벨 규약 유지 |
| 8 | 설비 상태 동기화 (개별 `device_id`별) | IDC 백엔드 `equipment-status-sync.service.js` | Pipeline의 `equipmentStatus` 실시간 갱신 모듈 |
**2026-04-20 파이프라인 작업자 발언 (정책 결정)**:
> "그 엣지 코드 변경되서 커밋하면 harbor에 이미지 올라가는데 플릿 에이전트가 주기적으로 harbor에 있는 이미지가 최신값인지 확인해서 변경사항이 있으면 엣지서버 최신화 될거에요"
>
> ⚠️ **사실 보정**: 실제로 Harbor 폴링을 하는 주체는 **Fleet Agent가 아니라 Watchtower 컨테이너**입니다 (5분 간격, `com.centurylinklabs.watchtower.enable=true` 라벨 기준). Fleet Agent는 **원격 제어/상태 보고**만 담당. 파이프라인에 이식할 때 이 부분을 혼동하지 않도록 구분해야 합니다.
---
## 1. 엣지(스피폭스) 서버 — 현재 구성
### 1.1 전체 구성
- **OS**: Ubuntu, Linux 6.8.0-110-generic
- **오케스트레이션**: Docker Compose 전용 (`kubectl`/`kubeadm` 바이너리는 있지만 클러스터는 `10.10.0.74:6443` 연결 거부로 꺼져 있음)
- **이미지 소스**: `harbor.wace.me/vexplor_fleet/*`
- **자동 업데이트**: Watchtower 컨테이너 (`nickfedor/watchtower:latest`, 5분 폴링, 라벨 기반)
### 1.2 기동 중인 컨테이너 (`docker ps` 시점)
| 컨테이너 | 이미지 | 역할 |
|---|---|---|
| `data-collector` | `harbor.wace.me/vexplor_fleet/data-collector:latest` | 메인 수집기 (XGT/Modbus/OPC UA/S7/MQTT/SQL/CAS) |
| `data-collector-alpet` | 동일 | 알펫 전용 (MSSQL, `network_mode: host`, `EDGE_ID=ALPET-001`) |
| `fleet-agent` | `harbor.wace.me/vexplor_fleet/device-supervisor:latest` | 원격 관리/헬스/컨테이너 제어 |
| `kafka-to-central-mqtt` | `harbor.wace.me/vexplor_fleet/kafka-to-central-mqtt:latest` | 로컬 Kafka → 중앙 MQTT 포워더 |
| `watchtower` | `nickfedor/watchtower:latest` | Harbor 폴링 자동 배포 |
| `kafka` | `confluentinc/cp-kafka:7.5.0` (KRaft) | 로컬 Store & Forward 버퍼 |
> `timescaledb`, `kafka-to-timescale`, `emqx`는 통합 compose에 정의만 존재. **현재 미기동** — TimescaleDB는 IDC로 이전됨.
### 1.3 Data Collector 내부 (이식 대상 핵심)
**컨테이너 내부 경로**: `/app/src/data_collector/`, 엔트리 `python -m data_collector.main`
```
data_collector/
├── main.py # EdgeAgent 메인 루프 (bootstrap → config sync → collect → publish)
├── models.py # DeviceData, TagValue
├── bootstrap/
│ ├── aas_client.py # AAS(Asset Admin Shell) API 클라이언트
│ ├── bootstrapper.py # MAC → UUID 프로비저닝
│ └── config_syncer.py # 서버 Config 주기 pull (기본 5분)
├── collectors/
│ ├── base.py / manager.py
│ ├── cas_collector.py / cas_protocol.py
│ ├── modbus_collector.py
│ ├── mqtt_collector.py
│ ├── opcua_collector.py
│ ├── s7_collector.py # Siemens S7
│ ├── sql_collector.py # MSSQL 등
│ ├── xgt_collector.py + xgt_connection_pool.py # LS XGT
├── processors/
│ ├── aggregator.py / converter.py / filter.py
├── publishers/
│ ├── kafka_publisher.py # 로컬 Kafka publish
│ └── retry_queue.py # Store & Forward (max 100,000건)
├── consumers/
│ └── kafka_to_central_mqtt.py # (임베디드 포워더 변형 — 실행은 별도 컨테이너에서)
└── config/
└── settings.py
```
**`EdgeAgent` 책임 (main.py)**:
1. **Bootstrap** — MAC 주소로 VEX Flow 서버(`https://collectormanager.vexplor.com`)에서 UUID 발급
2. **Config Sync**`EDGE_CONFIG_SOURCE=api | aas` 모드로 주기 pull
3. **Collector Manager** — 태그/프로토콜별 Collector 기동
4. **Kafka Publish** — 수집→`edge-raw-data` 토픽, 실패시 `RetryQueue`
5. **변경 감지**`_last_values`로 중복 송신 억제
**실제 운용 환경변수 (스피폭스)**:
```
EDGE_SERVER_URL=https://collectormanager.vexplor.com
EDGE_CONFIG_SOURCE=api
EDGE_KAFKA_BROKERS=kafka:9092
EDGE_MQTT_BROKER_URL=mqtt://emqx:1883 # 로컬 EMQX (현재 미기동)
EDGE_MQTT_ENABLED=true
DEVICE_ID=edge-0f4d04ed
COMPANY_ID=7f5c058c-ef65-45e3-838e-cebaec2d6170 # spifox
```
### 1.4 Fleet Agent (`device-supervisor`) 내부
**언어/구성**: Node.js + TypeScript 빌드 산출물, 패키지명 `device-supervisor` v1.0.2
```
/app/dist/
├── index.js # 엔트리
├── docker.js # dockerode 기반 컨테이너 제어 (/var/run/docker.sock:ro 마운트)
├── heartbeat.js # 주기 하트비트 (HEARTBEAT_INTERVAL=30)
├── metrics.js # systeminformation 기반 시스템 지표
├── mqtt.js # 중앙 MQTT/Fleet API 통신
├── offline/
│ ├── store.js # better-sqlite3 오프라인 큐
│ └── sync.js # 복구 시 재전송
└── config.js
```
**주요 의존성**: `dockerode`, `mqtt`, `systeminformation`, `node-cron`, `better-sqlite3`, `winston`, `axios`
**엔드포인트**: `FLEET_API_URL=https://fleet-api.vexplor.com`, MQTT `mqtt://211.115.91.170:31883`
**관리 대상**: `MANAGED_CONTAINERS=data-collector,kafka` 등 (env로 주입)
**역할 명확화** (⚠️ 전 담당자 발언 보정): Fleet Agent는 **원격 제어/상태 보고/오프라인 큐** 담당. **Harbor 폴링/이미지 교체는 Watchtower가 수행**하며 Fleet Agent와 무관.
### 1.5 Kafka → 중앙 MQTT 포워더 (Stateless Multi-Tenant)
**엔트리**: `python -u /app/forwarder.py`
**토픽 규칙**:
- 데이터: `dt/v1/data/{company_id}/{edge_id}`
- 하트비트: `dt/v1/status/{company_id}/{edge_id}`
- QoS 1, MQTTv5
- 배치: `BATCH_SIZE=50` 또는 `BATCH_TIMEOUT_MS=3000`
**설계 포인트**:
- **Stateless**: 메시지 페이로드의 `edge_id`로 토픽 동적 라우팅 → 하나의 포워더가 다수 Edge 처리 가능
- **Config API** 지원 (선택): `CONFIG_API_URL`이 있으면 CCM/DT Config API에서 `central_mqtt.{host,port,username,password}` 덮어씀
- `edge_stats`로 edge_id별 forwarded/failed/first_seen/last_seen 통계 추적
**Edge → 중앙 최종 MQTT 페이로드**:
```json
{
"timestamp": "2026-04-11 11:20:14.922601",
"edge_id": "aff81fbf-9b4c-43e0-9395-566bf47c3f9c",
"device_id": "75570e41-821c-4813-a212-1131fc6fb538",
"tags": { "태그명1": value, "태그명2": value },
"priority": 2,
"company_id": "spifox",
"forwarded_at": "..."
}
```
(실 Kafka 메시지엔 `plc_state`, `error_message` 같은 부가 필드 존재)
### 1.6 Watchtower 자동 배포
- 컨테이너가 5분(`--interval 300`)마다 Harbor 폴링
- `WATCHTOWER_LABEL_ENABLE=true` — 라벨 `com.centurylinklabs.watchtower.enable=true`가 붙은 컨테이너만 교체
- `WATCHTOWER_CLEANUP=true` — 구 이미지 자동 삭제
- `~/.docker/config.json` 마운트 → Harbor 인증 사용
**라벨 정책**:
- ON (자동 업데이트): `data-collector`, `data-collector-alpet`, `fleet-agent`, `kafka-to-central-mqtt`, `kafka-to-timescale`
- OFF (보수적): `kafka`, `timescaledb`, `watchtower` 자신
---
## 2. IDC 중앙 서버 — 현재 구성
### 2.1 전체 구성
- **OS**: Ubuntu, Linux 6.8.0-101-generic
- **오케스트레이션**: **Kubernetes v1.28.0 single-node** (control-plane = `waceserver01`, flannel CNI)
- **네임스페이스**: `digital-twin`, `fleet`, `ingress-nginx`, `logic-studio`, `wace-business-management`
- **이미지 레지스트리**: `192.168.1.100:5001/digital-twin/*` (내부 Harbor 프록시)
### 2.2 `digital-twin` 네임스페이스 핵심 파드
| Pod | 역할 |
|---|---|
| `digital-twin-mqtt-*` | **EMQX 브로커** (Edge에서 들어오는 원격 MQTT) |
| `digital-twin-timescale-0` | **TimescaleDB** (`edge_telemetry` DB, 시계열 적재) |
| `digital-twin-web-backend` | **MQTT 구독 + TimescaleDB 적재 + API 서버** (Node.js, Express) |
| `digital-twin-web-frontend` | 웹 UI (2 replicas) |
| `digital-twin-web-postgres-0` | 메타데이터 PostgreSQL |
| `digital-twin-web-redis` | 세션/캐시 |
| `basyx-*` | Eclipse BaSyx AAS 스택 (aas-discovery/env/registry, submodel-registry, cd-repository, web-ui, mongodb) |
| `unity-webgl-server` | Unity 3D 뷰어 |
| `vexspace-postgres-0` | Vex Space 전용 Postgres |
### 2.3 NodePort 외부 노출 (211.115.91.170:*)
| 서비스 | NodePort | 내부 포트 | 용도 |
|---|---|---|---|
| `digital-twin-mqtt-external` | **31883** | 1883 (MQTT) | **Edge → 중앙 MQTT 인입** |
| `digital-twin-mqtt-external` | 31084 | 8083 (WS) | MQTT WebSocket |
| `digital-twin-mqtt-external` | 31183 | 18083 | EMQX Dashboard |
| `digital-twin-timescale-external` | **30543** | 5432 | **TimescaleDB 직접 조회** (파이프라인이 붙는 곳) |
| `digital-twin-web-postgres-external` | 30533 | 5432 | 메타 Postgres |
| `vexspace-postgres-external` | 31141 | 5432 | Vex Space DB |
| `fleet-emqx` | 31884 | 1883 | Fleet 네임스페이스 별도 MQTT |
| `fleet-postgres` | 31985 | 5432 | Fleet 메타 DB |
| `ingress-nginx-controller` | 31878/30361/31591 | 80/443/1884 | 공용 ingress (1884는 MQTT over ingress) |
> 프론트엔드의 **"데이터 소스 - PLC_탑씰"**(`211.115.91.170:30543 / edge_telemetry / telemetry_user`)이 바로 `digital-twin-timescale-external`입니다.
### 2.4 MQTT → TimescaleDB 적재 로직 (핵심, 이식 대상)
**위치**: `digital-twin-web-backend` 컨테이너 내 `src/services/ingestion/mqtt-ingestion.service.js`
**언어/스택**: Node.js, `mqtt` 5.14, `pg` 8.17, `sequelize` 6.35 (단, ingestion은 생 `pg` Pool 사용)
**EMQX 접속**:
```
MQTT_BROKER_URL=mqtt://digital-twin-mqtt:1883
MQTT_INGESTION_USER=ingestion
MQTT_INGESTION_PASSWORD=ingestion_secret # ⚠️ 외부용은 ingestion_secret_prod (엣지 .env 기준)
```
**TimescaleDB 접속** (envVar):
```
TIMESCALE_HOST=digital-twin-timescale
TIMESCALE_PORT=5432
TIMESCALE_DB=edge_telemetry
TIMESCALE_USER=telemetry_user
TIMESCALE_PASSWORD=***MASKED***
```
**구독 패턴 (공유구독 — 수평 확장 가능)**:
```
$share/ingestion-group/dt/v1/data/+/+
$share/ingestion-group/dt/v1/status/+/+
```
- `$share/<group>/...` EMQX 공유구독으로 여러 백엔드 replica 간 메시지 분배
- `+/+` 와일드카드로 `{company_id}/{edge_id}` 모두 수신 (ACL 이슈로 `#` 대신 `+/+` 사용)
**처리 흐름 (`handleTelemetryData`)**:
1. 토픽 파싱 → `[company_id, edge_id]`
2. JSON 파싱
3. `item.tags` 딕셔너리면 각 태그마다 row 1건 생성:
```
time, company_id, edge_id, tag_name, value(DOUBLE), quality, metadata(JSON)
```
4. 단일 태그 형식(`tag_name/value`)도 지원
5. **buffer**에 쌓고 `BATCH_SIZE=1000` 또는 `FLUSH_INTERVAL=5s` 도달 시 `batchInsert('edge_telemetry', rows, cols)`
6. Status(하트비트)는 `edge_status` 테이블에 적재 (`status, ip_address, firmware_version, metadata`)
**신뢰성 기능**:
- **Circuit Breaker**: 연속 실패 5회(`CIRCUIT_BREAKER_MAX_FAILURES=5`) 시 OPEN, 60초 후 HALF_OPEN 회복
- **Exponential backoff 재연결** (1s → 60s)
- **버퍼 오버플로우 방지**: `MAX_BUFFER_SIZE=100,000` 초과 시 오래된 80%부터 drop
- **재시도 큐**: 실패 배치 최대 5,000건 재주입 (`MAX_RETRY_BUFFER_SIZE=10,000`)
- **stats 노출**: `messagesReceived/telemetryInserted/statusInserted/errors/droppedMessages/circuitBreakerTrips`
**설비 상태 동기화 (`handleEquipmentDataReceived`)**:
- 메시지 내 `device_id`별로 원본 값(문자열 포함) 보존
- 별도 서비스 `equipment-status-sync.service.js`가 개별 설비 UUID로 조회해 마지막 수신 시각/값 갱신 (Heartbeat도 포함)
### 2.5 TimescaleDB 스키마 (추정 + 기존 코드 근거)
`timescale.config.js`의 `batchInsert` 호출 컬럼과 과거 `kafka_to_timescale.py` INSERT를 조합하면 다음 형태:
**`edge_telemetry`** (hypertable 가능성, time 기준):
| 컬럼 | 타입 | 설명 |
|---|---|---|
| `time` | TIMESTAMPTZ | 수집 시각 |
| `company_id` | TEXT/UUID | 고객사 ID |
| `edge_id` | TEXT | 엣지 장치 ID |
| `tag_name` | TEXT | 태그명 |
| `value` | DOUBLE PRECISION | 수치값 (비수치는 NULL) |
| `quality` | TEXT | `good` 기본 |
| `metadata` | JSONB | `{device_id, priority, forwarded_at, ...}` |
**`edge_status`**:
| 컬럼 | 타입 |
|---|---|
| `time`, `company_id`, `edge_id` | 공통 |
| `status` | TEXT (`online` 기본) |
| `ip_address`, `firmware_version` | TEXT |
| `metadata` | JSONB |
> 실제 `\d+` 확인은 `digital-twin-timescale-0` 파드의 psql 비밀번호가 로컬 환경에서 필요 (envVar `TIMESCALE_PASSWORD`) — 다음 접속 시 실 스키마/인덱스/리텐션 정책/연속집계(continuous aggregate) 확인 필요.
---
## 3. 전체 데이터 흐름
```
[현장 PLC/장비 — 스피폭스 공장]
│ (XGT / Modbus / OPC UA / S7 / MQTT / MSSQL / CAS)
[엣지 서버: data-collector 컨테이너]
· bootstrap (MAC→UUID)
· config sync (5분마다 collectormanager.vexplor.com)
· 프로토콜별 Collector → processors(filter/aggregate/convert) → publish
[로컬 Kafka — edge-raw-data 토픽] ◀─── RetryQueue (실패 재시도, 최대 10만건)
[kafka-to-central-mqtt 포워더]
· batch 50건 / 3초
· 토픽 동적 라우팅: dt/v1/data/{company_id}/{edge_id}
· QoS 1, MQTTv5
▼ (인터넷 경유)
═══════════════════════════════════════════════════════════════
[IDC 중앙: 211.115.91.170 K8s]
[EMQX (digital-twin-mqtt, NodePort 31883)]
· user=ingestion / pass=ingestion_secret_prod
▼ (공유구독 $share/ingestion-group/dt/v1/+/+/+)
[digital-twin-web-backend: mqtt-ingestion.service.js]
· buffer 1000건 / 5초 flush
· Circuit Breaker, Exponential backoff, 버퍼오버플로 방지
· device_id별 → equipment-status-sync.service
▼ pg.batchInsert (ON CONFLICT DO NOTHING)
[TimescaleDB: edge_telemetry DB]
· edge_telemetry (시계열)
· edge_status (하트비트)
▲ NodePort 30543
[Pipeline Frontend — 데이터 소스 "PLC_탑씰"] ← 현재 조회용 read 연결
[Fleet 관리 루프]
fleet-agent(엣지) ──MQTT/HTTPS── fleet-api.vexplor.com ── fleet-emqx(IDC)
└─ dockerode → 엣지 컨테이너 start/stop/restart
[자동 배포 루프]
Harbor(harbor.wace.me) ◀──push── 엣지 코드 CI
│ 5분 폴링 (Watchtower, label=enable)
Watchtower(엣지) ── docker pull & recreate ──▶ 대상 컨테이너 교체
```
---
## 4. Pipeline 애플리케이션에 이식해야 할 기능 (작업 체크리스트)
### 4.1 백엔드 (`backend-node`)
- [ ] **`/api/datasource/timescale`** — TimescaleDB 커넥션 풀 (`pg`) 추가
- envVar: `TIMESCALE_HOST/PORT/DB/USER/PASSWORD` (기본 `211.115.91.170:30543 / edge_telemetry / telemetry_user`)
- `timescale.config.js`의 `batchInsert(table, rows, columns)` 패턴 그대로 포팅 (ON CONFLICT DO NOTHING)
- [ ] **`services/ingestion/mqtt-ingestion.service`** — EMQX 공유구독 + 버퍼 + Circuit Breaker
- 토픽: `$share/<groupId>/dt/v1/data/+/+`, `dt/v1/status/+/+`
- envVar: `MQTT_BROKER_URL`, `MQTT_INGESTION_USER/PASSWORD`, `INGESTION_BATCH_SIZE=1000`, `INGESTION_FLUSH_INTERVAL=5000`, `INGESTION_MAX_BUFFER_SIZE=100000`, `CIRCUIT_BREAKER_MAX_FAILURES=5`, `CIRCUIT_BREAKER_RESET_MS=60000`
- `edge_telemetry` / `edge_status` 2개 테이블 적재 분기
- [ ] **`services/forwarder/kafka-to-mqtt.service`** — (엣지 수집을 파이프라인이 직접 도맡을 경우) 기존 Python `kafka_to_central_mqtt.py`를 Node로 포팅
- [ ] **`services/collectors/*`** — 프로토콜별 수집기 (XGT/Modbus/OPC UA/S7/MQTT/SQL/CAS) Node 이식
- 라이브러리 후보: `modbus-serial`, `node-opcua`, `nodes7`, `mqtt`, `mssql/mysql2/pg`, `ls-electric-xgt`(자체 구현 필요)
- [ ] **`services/bootstrap/provisioning`** — 엣지의 `bootstrap/aas_client.py` + `bootstrapper.py` 역할
- `POST /api/edge/provision`으로 `{mac_address, company_id}` 받아 UUID/access_token 발급
- `GET /api/edge/config?edge_id=...`로 수집 태그/주기 Config 반환 (기존 `config_syncer.py` 호환)
- [ ] **`services/equipment-status-sync`** — `device_id`별 마지막 수신시각/값 갱신
- 기존 프로젝트의 [backend-node/src/services/batchSchedulerService.ts](../backend-node/src/services/batchSchedulerService.ts)와 통합 고려
- [ ] **`services/fleet-agent-bridge`** — Fleet API 소비자
- 엣지에서 올라오는 heartbeat/metrics를 UI에 노출
- 파이프라인 자체를 Fleet 피관리 대상으로도 등록 가능하게 (원격 재시작 허용)
### 4.2 프론트엔드 (`frontend`)
- [ ] 데이터 소스 관리 화면([frontend/app/(main)/admin/automaticMng/batchmngList/](../frontend/app/(main)/admin/automaticMng/batchmngList/))에 **TimescaleDB 타입** 추가 (현재는 MariaDB/PostgreSQL만)
- [ ] 엣지 디바이스 목록(Fleet 연동) 화면 — DEVICE_ID/COMPANY_ID/last_seen/image_version 노출
- [ ] Ingestion 실시간 통계 대시보드 — `messagesReceived/telemetryInserted/droppedMessages/circuitBreakerTrips`
- [ ] 태그별 시계열 조회 — `edge_telemetry` 쿼리 (time_bucket, continuous aggregate 활용)
### 4.3 CI/CD / 배포
- [ ] **Harbor 푸시 파이프라인** — 엣지 컴포넌트(`data-collector`, `fleet-agent`, `kafka-to-central-mqtt`) 이미지 빌드/푸시 단계를 Jenkinsfile에 통합
- [ ] **Watchtower 라벨 정책 유지** — 새 컨테이너는 반드시 `com.centurylinklabs.watchtower.enable=true` 라벨을 명시적으로 붙이거나 떼기 (불투명한 자동 롤아웃 방지)
- [ ] **릴리스 게이트** — `:latest` 즉시 롤아웃을 피할 필요가 있으면 `:stable`/`:canary` 태그 도입 검토
### 4.4 보안/비밀 관리
- [ ] TimescaleDB 비밀번호, MQTT `ingestion` 계정, Harbor 자격, Fleet API 토큰은 **K8s Secret / `.env` 중 한 곳에서만 관리**하고 소스 커밋 금지
- [ ] 현재 IDC `digital-twin-web-backend` Deployment에 **평문으로 `TIMESCALE_PASSWORD` 노출** 중 → 파이프라인 이식 시 `secretKeyRef`로 전환 권장
---
## 5. 외부 엔드포인트 레퍼런스
| 대상 | 주소 | 용도 |
|---|---|---|
| VEX Flow (프로비저닝/Config) | `https://collectormanager.vexplor.com` | data-collector `EDGE_SERVER_URL` |
| Fleet Manager API | `https://fleet-api.vexplor.com` | fleet-agent 원격관리 |
| 중앙 MQTT (EMQX) | `211.115.91.170:31883` → svc `digital-twin-mqtt` | 엣지 → 중앙 데이터 인입 |
| 중앙 TimescaleDB | `211.115.91.170:30543` → svc `digital-twin-timescale` | 시계열 조회/적재 |
| Harbor 레지스트리 | `harbor.wace.me` | 모든 엣지 이미지 소스 |
| 내부 Harbor 프록시(IDC) | `192.168.1.100:5001` | K8s 이미지 풀 경로 |
---
## 6. 추후 확인 필요 사항 (다음 접속 시)
1. **TimescaleDB 실제 스키마** — `\d+ edge_telemetry`, `\d+ edge_status`, hypertable 여부, continuous aggregate, retention policy
2. **`equipment-status-sync.service.js` 전체 소스** — 개별 설비 매칭 로직(equipmentId vs edgeDeviceId fallback)
3. **Fleet Manager API 엔드포인트 계약** — `device-supervisor` 측 `mqtt.js`/`heartbeat.js`의 호출 패턴
4. **EMQX ACL 설정** — `ingestion` 계정이 어떤 토픽에 write/read 권한 갖는지 (로그에서 `#` 구독은 거부 확인됨)
5. **Harbor repository 목록** — `vexplor_fleet/*`, `digital-twin/*` 태깅 규약
6. **Watchtower 라벨 전수 목록** — 각 엣지별로 어떤 컨테이너가 자동배포 대상인지 확정
7. **백엔드 `run-migration` init container** — TimescaleDB 마이그레이션 스크립트(`/app/migrations` 또는 `/app/scripts`) 확인하면 정확한 스키마 확보 가능
---
## 7. 관련 기존 문서
- [FLEET_EDGE_INTEGRATION.md](FLEET_EDGE_INTEGRATION.md)
- [FLEET_HOOK_INTEGRATION.md](FLEET_HOOK_INTEGRATION.md)
- [../customer-snapshot.md](../customer-snapshot.md)
+209
View File
@@ -0,0 +1,209 @@
# Fleet Management - 전체 통합 문서
vexplor_fleet의 모든 기능이 Pipeline으로 통합되었습니다.
## 구조
```
Pipeline (단일 배포)
├─ 백엔드 (Node.js/Express)
│ ├─ Fleet API (/api/fleet/*)
│ ├─ 내장 MQTT 브로커 (aedes, port 1883)
│ ├─ 서비스 레이어
│ │ ├─ fleetDeviceService - 디바이스 등록/관리
│ │ ├─ fleetCommandService - 커맨드 실행 (9종)
│ │ ├─ fleetReleaseService - 릴리즈 관리
│ │ ├─ fleetDeploymentService - 배포 오케스트레이션 (카나리/롤링)
│ │ ├─ fleetHarborService - Harbor Registry 조회
│ │ ├─ fleetTagTemplateService - 태그 템플릿 + 일괄 적용
│ │ ├─ fleetAlertRuleService - 알림 규칙 CRUD
│ │ ├─ fleetProvisionService - DPS 프로비저닝
│ │ ├─ fleetV1MappingService - 레거시 PLC 매핑
│ │ ├─ fleetPlcStatusService - PLC 연결 실시간 상태
│ │ ├─ fleetAuditService - 감사 로그
│ │ ├─ fleetMetricsService - Prometheus 메트릭
│ │ ├─ fleetScriptService - Python Hook 스크립트
│ │ ├─ fleetEdgeConfigService - 엣지 설정 제공
│ │ └─ fleetDataService - 실시간 수집 데이터
│ └─ MQTT 핸들러
│ ├─ vexplor/devices/+/status → 디바이스 heartbeat
│ ├─ vexplor/devices/+/metrics → 메트릭
│ ├─ vexplor/devices/+/responses → 커맨드 응답
│ ├─ vexplor/devices/+/data → 태그 데이터
│ └─ vexplor/devices/+/plc-status → PLC 연결 상태
└─ 프론트엔드 (Next.js, 시스템 관리 메뉴)
├─ 엣지 디바이스 (/admin/fleet/devices)
├─ Fleet 커맨드 (/admin/fleet/commands)
├─ Fleet 알림 (/admin/fleet/alerts)
├─ 실시간 수집 (/admin/fleet/data)
├─ Python Hook (/admin/fleet/scripts)
├─ 배포 관리 (/admin/fleet/deployments)
├─ 릴리즈 관리 (/admin/fleet/releases)
├─ 알림 규칙 (/admin/fleet/rules)
└─ 감사 로그 (/admin/fleet/audit)
```
## DB 스키마 (총 18개 Fleet 테이블)
| 테이블 | 용도 |
|---|---|
| fleet_devices | 엣지 디바이스 레지스트리 |
| fleet_heartbeats | 디바이스 상태 시계열 (30초마다) |
| fleet_commands / command_types | 9종 원격 커맨드 |
| fleet_releases | 릴리즈 버전 관리 |
| fleet_deployments / deployment_status | 배포 작업 + 디바이스별 상태 |
| fleet_alert_rules / alerts | 알림 규칙 + 발생 기록 |
| fleet_edge_raw_data | 실시간 수집 데이터 (시계열) |
| fleet_edge_scripts / script_versions / hook_types | Python Hook (5종, 버전 관리) |
| fleet_plc_connections | PLC 연결 실시간 상태 |
| fleet_tag_templates | 회사/장비별 태그 템플릿 |
| fleet_audit_logs | 전체 이벤트 감사 |
| fleet_users | Fleet 운영자 (SSO) |
| fleet_v1_plc_mapping | 레거시 v1 PLC 태그 매핑 |
## API 엔드포인트 (60+)
### 공개 (인증 없음)
```
POST /api/fleet/provision - DPS 자동 등록
GET /api/fleet/edge/:id/config - 엣지 설정 (ETag 캐싱)
GET /api/fleet/v1/edges/:id/config - 호환 alias
```
### 디바이스 (12개)
```
GET /api/fleet/devices
POST /api/fleet/devices/register
GET /api/fleet/devices/:id
PATCH /api/fleet/devices/:id
DELETE /api/fleet/devices/:id
GET /api/fleet/devices/:id/metrics
GET /api/fleet/devices/:id/latest-values
GET /api/fleet/devices/:id/tags/:tag/timeseries
GET /api/fleet/data/stats
GET /api/fleet/provision/pre-registered
POST /api/fleet/provision/pre-register
GET /api/fleet/stats
```
### 커맨드 (4개)
```
GET /api/fleet/commands
GET /api/fleet/commands/types
POST /api/fleet/commands
```
### Python Hook (10개)
```
GET /api/fleet/scripts/hook-types
GET /api/fleet/scripts
GET /api/fleet/scripts/:id
POST /api/fleet/scripts
PUT /api/fleet/scripts/:id
DELETE /api/fleet/scripts/:id
POST /api/fleet/scripts/dry-run
GET /api/fleet/scripts/:id/versions
GET /api/fleet/scripts/:id/versions/:v
POST /api/fleet/scripts/:id/rollback/:v
```
### 릴리즈 (6개)
```
GET /api/fleet/releases
GET /api/fleet/releases/:id
POST /api/fleet/releases
PUT /api/fleet/releases/:id
DELETE /api/fleet/releases/:id
POST /api/fleet/releases/:id/transition
```
### 배포 (8개)
```
GET /api/fleet/deployments
GET /api/fleet/deployments/:id
GET /api/fleet/deployments/:id/status
POST /api/fleet/deployments
POST /api/fleet/deployments/:id/start
POST /api/fleet/deployments/:id/cancel
POST /api/fleet/deployments/:id/rollback
```
### Harbor (4개)
```
GET /api/fleet/harbor/projects
GET /api/fleet/harbor/projects/:project/repos
GET /api/fleet/harbor/projects/:project/repos/:repo/tags
GET /api/fleet/harbor/ping
```
### 태그 템플릿 (6개)
```
GET /api/fleet/tag-templates
GET /api/fleet/tag-templates/:id
POST /api/fleet/tag-templates
PUT /api/fleet/tag-templates/:id
DELETE /api/fleet/tag-templates/:id
POST /api/fleet/tag-templates/:id/apply/:connectionId
```
### 알림 (7개)
```
GET /api/fleet/alerts
POST /api/fleet/alerts/:id/ack
POST /api/fleet/alerts/:id/resolve
GET /api/fleet/alert-rules
POST /api/fleet/alert-rules
PUT /api/fleet/alert-rules/:id
DELETE /api/fleet/alert-rules/:id
POST /api/fleet/alert-rules/:id/toggle
```
### V1 PLC 매핑 (4개)
```
GET /api/fleet/v1-mappings
POST /api/fleet/v1-mappings
PUT /api/fleet/v1-mappings/:id
DELETE /api/fleet/v1-mappings/:id
```
### PLC 상태, Audit, Metrics
```
GET /api/fleet/plc-status
GET /api/fleet/plc-status/summary
GET /api/fleet/audit-logs
GET /api/fleet/audit-logs/stats
GET /api/fleet/prometheus - Prometheus text format
```
## Device Supervisor 포팅 (엣지 에이전트)
Python Data Collector는 **그대로 유지**하고, 추가로 Node.js Device Supervisor를 엣지에서 돌릴 때는 기존 `vexplor_fleet/device-supervisor/src/` 코드를 그대로 사용합니다. Pipeline 중앙이 MQTT 브로커 역할을 하므로 변경할 건 환경변수만:
```bash
# device-supervisor .env
FLEET_API_URL=http://pipeline.wace.me:8080/api/fleet
MQTT_BROKER_URL=mqtt://pipeline.wace.me:1883
DEVICE_ID=edge-001
COMPANY_CODE=spifox
HEARTBEAT_INTERVAL=30
```
## 환경변수
| 이름 | 기본값 | 설명 |
|---|---|---|
| MQTT_PORT | 1883 | 내장 MQTT TCP |
| MQTT_WS_PORT | 8083 | MQTT WebSocket |
| HARBOR_URL | https://harbor.wace.me | Harbor Registry |
| HARBOR_USER | - | Harbor 사용자 |
| HARBOR_PASSWORD | - | Harbor 비밀번호 |
| FLEET_API_URL | http://localhost:8080/api/fleet | Provisioning 응답용 |
| FLEET_MQTT_BROKER | mqtt://localhost:1883 | Provisioning 응답용 |
## 다음 단계 (선택)
- Grafana 임베드 (Metrics 탭)
- 프로비저닝 토큰 JWT 전환
- 배포 롤아웃 진행률 실시간 WebSocket
- Python 실행 RestrictedPython 적용 (보안 강화)
+181
View File
@@ -0,0 +1,181 @@
# Fleet × Edge Data Collector 연동 가이드
로컬 Pipeline과 엣지(공장) Python Data Collector를 연동하는 방법입니다.
## 연동 방식
```
[Python Data Collector] [Pipeline (로컬)]
▲ ▲
│ 1. GET /api/fleet/v1/edges/ │
│ {edgeId}/config │
│ │
│ │
│ 2. PLC 수집 수행 │
│ │
│ 3. vexplor/devices/{edgeId}/ │
│ data 로 MQTT publish │
│ vexplor/devices/{edgeId}/ │
└──── status (heartbeat) ────────▶│
fleet_edge_raw_data
fleet_heartbeats
```
## 엣지 설정 (.env)
기존 엣지 `/home/wace/data-collector/.env` 수정:
```bash
# Pipeline 서버 URL (Fleet API + MQTT)
EDGE_SERVER_URL=http://<pipeline-host>:8080
MQTT_BROKER_URL=mqtt://<pipeline-host>:1883
# 기존 유지
DEVICE_ID=spifox-001
COMPANY_CODE=spifox
EDGE_CONFIG_SOURCE=api # 'aas' 대신 'api' 선택 시 Pipeline Fleet API 호출
LOG_LEVEL=INFO
# Kafka는 로컬에서 불필요 (Pipeline 내장 MQTT 사용)
# KAFKA_BROKERS= (비워두기)
```
## Pipeline API 엔드포인트
Python Data Collector가 호출하는 엔드포인트:
| 메서드 | 경로 | 용도 |
|---|---|---|
| `GET` | `/api/fleet/v1/edges/{edgeId}/config` | 수집 설정 조회 (ETag 캐싱) |
| `GET` | `/api/fleet/edge/{edgeId}/config` | 위와 동일 (alias) |
응답 형식 (Python `EdgeConfig` 모델과 호환):
```json
{
"version": "2026-04-17T07:25:26.766Z",
"edge_id": "edge-spifox-001",
"edge_name": "스피폭스 엣지 #1",
"devices": [
{
"id": "1",
"name": "CASE프레스_PLC_01",
"protocol": "plc_ethernet",
"connection": {
"host": "192.168.1.10",
"port": 2004,
"unit_id": 1
},
"interval_ms": 1000,
"enabled": true,
"tags": [
{
"name": "temperature",
"address": "40001",
"data_type": "UINT16",
"byte_order": "BIG_ENDIAN",
"scale": 0.1,
"offset": 0,
"unit": "°C"
}
]
}
],
"aggregation_interval_sec": 60,
"local_retention_days": 7
}
```
## MQTT 토픽 규칙
Python이 발행하는 토픽:
| 토픽 | 페이로드 | 주기 |
|---|---|---|
| `vexplor/devices/{edgeId}/status` | heartbeat (CPU/메모리/디스크) | 30초 |
| `vexplor/devices/{edgeId}/data` | 태그 값 (아래 참조) | interval_ms |
| `vexplor/devices/{edgeId}/responses` | 커맨드 응답 | 요청 시 |
### 데이터 페이로드 예시
```json
{
"timestamp": "2026-04-17T08:00:00.123Z",
"equipment_id": 4,
"connection_id": 1,
"tags": {
"temperature": 25.4,
"pressure": 11.2,
"status": true,
"mode": "AUTO"
}
}
```
Pipeline은 이 데이터를 `fleet_edge_raw_data` 테이블에 자동 적재합니다.
## 로컬 테스트
Pipeline이 로컬에 떠있는 상태에서 테스트 엣지 시뮬레이터:
```bash
# MQTT heartbeat 발송 (자동 등록)
docker exec pipeline-backend node -e "
const mqtt = require('mqtt');
const c = mqtt.connect('mqtt://127.0.0.1:1883');
c.on('connect', () => {
c.publish('vexplor/devices/edge-test-001/status', JSON.stringify({
cpu_percent: 25, memory_percent: 45, disk_percent: 60,
ip_address: '192.168.1.100', status: 'online'
}), { qos: 1 }, () => c.end(true));
});
"
# 설정 조회
curl http://localhost:8080/api/fleet/edge/edge-test-001/config
# 태그 데이터 발송
docker exec pipeline-backend node -e "
const mqtt = require('mqtt');
const c = mqtt.connect('mqtt://127.0.0.1:1883');
c.on('connect', () => {
c.publish('vexplor/devices/edge-test-001/data', JSON.stringify({
timestamp: new Date().toISOString(),
equipment_id: 4,
tags: { temperature: 25.5, pressure: 11.2 }
}), { qos: 1 }, () => c.end(true));
});
"
```
## 포트 정리
로컬 Pipeline이 노출하는 포트:
| 포트 | 용도 |
|---|---|
| `8080` | REST API (Fleet + Pipeline) |
| `1883` | MQTT TCP 브로커 (내장 aedes) |
| `8083` | MQTT WebSocket (브라우저 클라이언트) |
| `9771` | 프론트엔드 |
## 흐름 요약
1. **엣지 부팅**: Python이 Pipeline에 heartbeat 발행 → `fleet_devices`에 자동 등록
2. **설정 조회**: Python이 `/api/fleet/v1/edges/{id}/config` 호출 → 현재 장비/태그 설정 받음
3. **PLC 수집**: 설정된 대로 Modbus/OPC UA/S7 등으로 주기 수집
4. **MQTT 발행**: `vexplor/devices/{id}/data` 로 실시간 값 발행
5. **Pipeline 저장**: MQTT 구독 → `fleet_edge_raw_data` 적재
6. **대시보드 표시**: `/admin/fleet/data` 에서 실시간 차트 + 최신값 조회
## 설정 변경 시 반영
사용자가 **웹에서 태그 설정을 변경**하면:
- `pipeline_tag_mappings` UPDATE
- Python이 다음 config sync 주기(기본 30초) 시 변경 감지
- `version` (ETag) 기반이라 변경 없으면 304 응답 (트래픽 절약)
- Python이 자동으로 새 설정으로 수집 재시작
**Python 재시작 불필요** — 설정은 런타임에 동적 반영됩니다.
+327
View File
@@ -0,0 +1,327 @@
# Fleet Hook - 웹에서 Python 로직 편집 가이드
엣지 Data Collector의 동작을 웹에서 Python 스크립트로 커스터마이징하는 기능입니다.
## 개념
```
┌─ Pipeline 웹 UI ─────────────┐
│ 사용자가 Python 함수 편집 │
│ (Monaco 에디터) │
│ ↓ │
│ [테스트] 버튼으로 미리 검증 │
│ ↓ │
│ [저장] → fleet_edge_scripts │
└──────────────┬───────────────┘
│ /api/fleet/v1/edges/{id}/config
│ (ETag 캐싱)
┌─ 엣지 Data Collector (Python) ┐
│ scripts = config["scripts"] │
│ for script in scripts: │
│ load_hook(script) │
│ │
│ 수집 사이클마다: │
│ ├ raw_value = read_plc() │
│ ├ value = transform(...) │ ← Hook 1
│ ├ tags.update(derived(...)) │ ← Hook 2
│ ├ if not filter_data(...): │ ← Hook 3
│ │ skip │
│ ├ alarm_info = alarm(...) │ ← Hook 4
│ ├ payload = pre_send(...) │ ← Hook 5
│ └ publish_mqtt(payload) │
└───────────────────────────────┘
```
## 5가지 Hook 종류
| Hook | 시점 | 입력 | 출력 | 용도 |
|---|---|---|---|---|
| **transform** | 원시값 변환 | tag_name, raw_value, context | 변환된 값 | 센서 스케일링, 단위 변환 |
| **derived_tags** | 파생 태그 계산 | tags 딕셔너리, context | 새 태그 딕셔너리 | 여러 태그 조합 (전력 = V×I) |
| **filter** | 발행 여부 판단 | tags, context | bool | 조건부 수집 (가동 중만) |
| **alarm** | 알람 판정 | tag_name, value, context | dict 또는 None | 임계값 초과 알람 |
| **pre_send** | MQTT 발행 전 | payload, context | 가공된 payload | 최종 메타데이터 추가 |
## 적용 범위 (scope)
- **global**: 모든 엣지에 적용
- **equipment**: 특정 장비만 (pipeline_equipment)
- **connection**: 특정 통신 연결만 (pipeline_device_connections)
- **device**: 특정 엣지 디바이스만 (fleet_devices)
## Python 엣지 쪽 hook loader 샘플
기존 Data Collector 프로젝트(`/Users/chpark/workspace/data-collector/src/data_collector/`)에 추가할 파일:
### `hooks/hook_loader.py`
```python
"""
Hook Loader - Fleet API에서 받은 Python 스크립트를 로드/실행
"""
import logging
from typing import Any, Callable, Dict, List, Optional
import structlog
logger = structlog.get_logger(__name__)
# 허용된 내장 함수/모듈 (보안)
ALLOWED_BUILTINS = {
'abs', 'all', 'any', 'bool', 'bytes', 'dict', 'enumerate', 'filter',
'float', 'int', 'len', 'list', 'map', 'max', 'min', 'print', 'range',
'round', 'set', 'sorted', 'str', 'sum', 'tuple', 'type', 'zip',
'isinstance', 'hasattr', 'getattr', 'True', 'False', 'None',
}
class HookRegistry:
"""Hook 스크립트 등록 및 실행"""
# hook_type → [(script_id, priority, scope, callable, meta)]
hooks: Dict[str, List[Dict[str, Any]]] = {}
# 스크립트 ID → 컴파일된 함수 캐시
compiled: Dict[int, Dict[str, Callable]] = {}
@classmethod
def load_from_config(cls, scripts: List[Dict[str, Any]]) -> None:
"""
Fleet API에서 받은 스크립트 목록을 로드
각 hook별로 priority 순으로 정렬
"""
cls.hooks = {}
cls.compiled = {}
func_name_map = {
"transform": "transform",
"derived_tags": "derived_tags",
"filter": "filter_data",
"alarm": "alarm",
"pre_send": "pre_send",
}
for script in scripts:
try:
hook_type = script["hook_type"]
func_name = func_name_map.get(hook_type)
if not func_name:
continue
# 제한된 네임스페이스에서 컴파일
import math
from datetime import datetime, date
allowed_globals = {
"__builtins__": {k: __builtins__[k] for k in ALLOWED_BUILTINS if k in dir(__builtins__)},
"math": math,
"datetime": datetime,
"date": date,
}
exec(script["code"], allowed_globals)
func = allowed_globals.get(func_name)
if not callable(func):
logger.warning(f"함수 {func_name}가 정의되지 않음: script id={script['id']}")
continue
cls.hooks.setdefault(hook_type, []).append({
"script_id": script["id"],
"script_name": script.get("script_name", ""),
"scope": script.get("scope", "global"),
"equipment_id": script.get("equipment_id"),
"connection_id": script.get("connection_id"),
"priority": script.get("priority", 100),
"timeout_ms": script.get("timeout_ms", 1000),
"func": func,
})
logger.info(f"Hook 로드: {hook_type} / script_id={script['id']} v{script.get('version', 1)}")
except Exception as e:
logger.error(f"Hook 컴파일 실패 (id={script.get('id')}): {e}")
# 우선순위 정렬
for hooks in cls.hooks.values():
hooks.sort(key=lambda h: h["priority"])
@classmethod
def _match_scope(cls, hook: Dict[str, Any], equipment_id: Optional[int], connection_id: Optional[int]) -> bool:
"""스코프 매칭"""
scope = hook.get("scope", "global")
if scope == "global":
return True
if scope == "equipment" and hook.get("equipment_id") == equipment_id:
return True
if scope == "connection" and hook.get("connection_id") == connection_id:
return True
return False
@classmethod
def run_transform(cls, tag_name: str, raw_value: Any, context: dict) -> Any:
"""transform hook 실행 (파이프라인 - 순차 적용)"""
value = raw_value
for hook in cls.hooks.get("transform", []):
if not cls._match_scope(hook, context.get("equipment_id"), context.get("connection_id")):
continue
try:
value = hook["func"](tag_name, value, context)
except Exception as e:
logger.warning(f"transform 실패 (script_id={hook['script_id']}): {e}")
return value
@classmethod
def run_derived_tags(cls, tags: dict, context: dict) -> dict:
"""derived_tags hook 실행 (모든 hook 결과 병합)"""
result = {}
for hook in cls.hooks.get("derived_tags", []):
if not cls._match_scope(hook, context.get("equipment_id"), context.get("connection_id")):
continue
try:
new_tags = hook["func"](tags, context) or {}
if isinstance(new_tags, dict):
result.update(new_tags)
except Exception as e:
logger.warning(f"derived_tags 실패: {e}")
return result
@classmethod
def run_filter(cls, tags: dict, context: dict) -> bool:
"""filter hook 실행 (AND - 모두 True여야 발행)"""
for hook in cls.hooks.get("filter", []):
if not cls._match_scope(hook, context.get("equipment_id"), context.get("connection_id")):
continue
try:
if not hook["func"](tags, context):
return False
except Exception as e:
logger.warning(f"filter 실패: {e}")
return True
@classmethod
def run_alarm(cls, tag_name: str, value: Any, context: dict) -> List[dict]:
"""alarm hook 실행 (모든 알람 수집)"""
alarms = []
for hook in cls.hooks.get("alarm", []):
if not cls._match_scope(hook, context.get("equipment_id"), context.get("connection_id")):
continue
try:
alarm_info = hook["func"](tag_name, value, context)
if alarm_info:
alarm_info["script_id"] = hook["script_id"]
alarms.append(alarm_info)
except Exception as e:
logger.warning(f"alarm 실패: {e}")
return alarms
@classmethod
def run_pre_send(cls, payload: dict, context: dict) -> dict:
"""pre_send hook 실행 (순차 적용)"""
result = payload
for hook in cls.hooks.get("pre_send", []):
if not cls._match_scope(hook, context.get("equipment_id"), context.get("connection_id")):
continue
try:
result = hook["func"](result, context) or result
except Exception as e:
logger.warning(f"pre_send 실패: {e}")
return result
```
### 수집 파이프라인 통합 (`collectors/manager.py`)
```python
# 수집 루프 안에서...
from data_collector.hooks.hook_loader import HookRegistry
async def collect_and_publish(self, device):
raw_data = await self.collector.collect()
context = {
"device_id": self.device_id,
"equipment_id": device.equipment_id,
"connection_id": device.id,
"company_code": self.company_code,
}
# 1. transform 각 태그에 적용
tags = {}
for tag_name, raw_value in raw_data.items():
tags[tag_name] = HookRegistry.run_transform(tag_name, raw_value, context)
# 2. derived_tags 병합
tags.update(HookRegistry.run_derived_tags(tags, context))
# 3. filter 체크
if not HookRegistry.run_filter(tags, context):
logger.debug("filter로 스킵")
return
# 4. alarm 판정
alarms = []
for tag_name, value in tags.items():
alarms.extend(HookRegistry.run_alarm(tag_name, value, context))
if alarms:
# 알람 발행 (MQTT vexplor/devices/{id}/alarms 등)
self.publish_alarms(alarms)
# 5. 최종 payload 가공
payload = {
"timestamp": datetime.now().isoformat(),
"equipment_id": device.equipment_id,
"connection_id": device.id,
"tags": tags,
}
payload = HookRegistry.run_pre_send(payload, context)
# 6. MQTT 발행
self.mqtt.publish(f"vexplor/devices/{self.device_id}/data", payload)
```
### config_syncer에 hook 로드 추가
```python
async def fetch_config(self):
# ... 기존 설정 조회 ...
# Hook 스크립트 로드
if config.get("scripts"):
from data_collector.hooks.hook_loader import HookRegistry
HookRegistry.load_from_config(config["scripts"])
logger.info(f"Hook 스크립트 로드: {len(config['scripts'])}")
```
## 로컬 테스트
Pipeline 웹에서:
1. **시스템 관리 > Python Hook** 메뉴 접근
2. **새 스크립트** → Hook 타입 선택 → 예제 코드 자동 로드
3. 우측 Monaco 에디터에서 편집
4. 좌측 하단 **테스트 입력 JSON** 작성 → **실행** 버튼
5. 결과 확인 후 **저장**
## API 엔드포인트
| 메서드 | 경로 | 용도 |
|---|---|---|
| GET | `/api/fleet/scripts/hook-types` | Hook 타입 5종 + 예제 코드 |
| GET | `/api/fleet/scripts` | 스크립트 목록 |
| POST | `/api/fleet/scripts` | 생성 |
| PUT | `/api/fleet/scripts/:id` | 수정 (자동 버전 증가) |
| DELETE | `/api/fleet/scripts/:id` | 삭제 |
| POST | `/api/fleet/scripts/dry-run` | 저장 전 테스트 실행 |
| GET | `/api/fleet/scripts/:id/versions` | 버전 이력 |
| POST | `/api/fleet/scripts/:id/rollback/:version` | 롤백 |
| GET | `/api/fleet/v1/edges/:id/config` | 엣지용 전체 설정 (scripts 포함) |
## 보안 사항
- Python `exec()` 실행 시 제한된 네임스페이스 (ALLOWED_BUILTINS만)
- `import` 제한 (math, datetime, json만 허용)
- 파일 시스템 / 네트워크 접근 차단
- 각 hook 실행 타임아웃 (기본 1초)
- Dry-run 시 Python 서브프로세스 격리
## 실시간 반영
1. 웹에서 수정 → PUT API 호출
2. DB UPDATE 트리거 → version 증가 + 이력 저장
3. Python이 다음 config sync 주기(기본 30초) 시 새 버전 감지
4. `HookRegistry.load_from_config()` 재실행 → 즉시 적용
5. **Python 재시작 불필요**