4c1dc4082e
Build and Push Images / build-and-push (push) Has been cancelled
이전 세션들에서 작업된 아래 범위를 모두 포함: Fleet 서브시스템 (src/fleet/) - fleetDeviceService / fleetCommandService / fleetDeploymentService / fleetReleaseService - fleetMetricsService, fleetScriptService, fleetEdgeConfigService - Edge 디바이스 관리, 커맨드 발행, 배포/릴리스, 스크립트 동기화 Collector 확장 - centralMqttForwarder / centralForwarderConfigService - equipmentStateService, pythonHookRunner, scriptCache - Modbus/OPC-UA/S7/XGT 프로토콜 클라이언트 - targetDbIntrospection (저장 DB 조회) Routes / API - automationDashboardRoutes, centralForwarderRoutes, equipmentStateRoutes DB - importEdgeConfig (Python cached config → Pipeline DB) - seedDataSources (external_db_connections 초기 시드) 엣지 배포 리소스 - docker/edge/Dockerfile.backend.prod, Dockerfile.frontend.prod - docker/edge/docker-compose.edge.yml 프론트엔드 - admin/automaticMng (centralForwarder, dashboard, equipmentState) - admin/fleet (commands, devices, deployments, releases, scripts, alerts) - admin/pipeline-device 개선 (저장 DB 드롭다운, 태그 매핑 등) - ExternalDbConnectionModal, ScriptsManagerDialog 등 신규 컴포넌트 - lib/api: automationDashboard, centralForwarder, equipmentState, fleet docs/ - EDGE_SERVER_STRUCTURE, FLEET_COMPLETE, FLEET_EDGE_INTEGRATION, FLEET_HOOK_INTEGRATION Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
5.2 KiB
5.2 KiB
Fleet × Edge Data Collector 연동 가이드
로컬 Pipeline과 엣지(공장) Python Data Collector를 연동하는 방법입니다.
연동 방식
[Python Data Collector] [Pipeline (로컬)]
▲ ▲
│ 1. GET /api/fleet/v1/edges/ │
│ {edgeId}/config │
│ │
│ │
│ 2. PLC 수집 수행 │
│ │
│ 3. vexplor/devices/{edgeId}/ │
│ data 로 MQTT publish │
│ vexplor/devices/{edgeId}/ │
└──── status (heartbeat) ────────▶│
│
▼
fleet_edge_raw_data
fleet_heartbeats
엣지 설정 (.env)
기존 엣지 /home/wace/data-collector/.env 수정:
# Pipeline 서버 URL (Fleet API + MQTT)
EDGE_SERVER_URL=http://<pipeline-host>:8080
MQTT_BROKER_URL=mqtt://<pipeline-host>:1883
# 기존 유지
DEVICE_ID=spifox-001
COMPANY_CODE=spifox
EDGE_CONFIG_SOURCE=api # 'aas' 대신 'api' 선택 시 Pipeline Fleet API 호출
LOG_LEVEL=INFO
# Kafka는 로컬에서 불필요 (Pipeline 내장 MQTT 사용)
# KAFKA_BROKERS= (비워두기)
Pipeline API 엔드포인트
Python Data Collector가 호출하는 엔드포인트:
| 메서드 | 경로 | 용도 |
|---|---|---|
GET |
/api/fleet/v1/edges/{edgeId}/config |
수집 설정 조회 (ETag 캐싱) |
GET |
/api/fleet/edge/{edgeId}/config |
위와 동일 (alias) |
응답 형식 (Python EdgeConfig 모델과 호환):
{
"version": "2026-04-17T07:25:26.766Z",
"edge_id": "edge-spifox-001",
"edge_name": "스피폭스 엣지 #1",
"devices": [
{
"id": "1",
"name": "CASE프레스_PLC_01",
"protocol": "plc_ethernet",
"connection": {
"host": "192.168.1.10",
"port": 2004,
"unit_id": 1
},
"interval_ms": 1000,
"enabled": true,
"tags": [
{
"name": "temperature",
"address": "40001",
"data_type": "UINT16",
"byte_order": "BIG_ENDIAN",
"scale": 0.1,
"offset": 0,
"unit": "°C"
}
]
}
],
"aggregation_interval_sec": 60,
"local_retention_days": 7
}
MQTT 토픽 규칙
Python이 발행하는 토픽:
| 토픽 | 페이로드 | 주기 |
|---|---|---|
vexplor/devices/{edgeId}/status |
heartbeat (CPU/메모리/디스크) | 30초 |
vexplor/devices/{edgeId}/data |
태그 값 (아래 참조) | interval_ms |
vexplor/devices/{edgeId}/responses |
커맨드 응답 | 요청 시 |
데이터 페이로드 예시
{
"timestamp": "2026-04-17T08:00:00.123Z",
"equipment_id": 4,
"connection_id": 1,
"tags": {
"temperature": 25.4,
"pressure": 11.2,
"status": true,
"mode": "AUTO"
}
}
Pipeline은 이 데이터를 fleet_edge_raw_data 테이블에 자동 적재합니다.
로컬 테스트
Pipeline이 로컬에 떠있는 상태에서 테스트 엣지 시뮬레이터:
# MQTT heartbeat 발송 (자동 등록)
docker exec pipeline-backend node -e "
const mqtt = require('mqtt');
const c = mqtt.connect('mqtt://127.0.0.1:1883');
c.on('connect', () => {
c.publish('vexplor/devices/edge-test-001/status', JSON.stringify({
cpu_percent: 25, memory_percent: 45, disk_percent: 60,
ip_address: '192.168.1.100', status: 'online'
}), { qos: 1 }, () => c.end(true));
});
"
# 설정 조회
curl http://localhost:8080/api/fleet/edge/edge-test-001/config
# 태그 데이터 발송
docker exec pipeline-backend node -e "
const mqtt = require('mqtt');
const c = mqtt.connect('mqtt://127.0.0.1:1883');
c.on('connect', () => {
c.publish('vexplor/devices/edge-test-001/data', JSON.stringify({
timestamp: new Date().toISOString(),
equipment_id: 4,
tags: { temperature: 25.5, pressure: 11.2 }
}), { qos: 1 }, () => c.end(true));
});
"
포트 정리
로컬 Pipeline이 노출하는 포트:
| 포트 | 용도 |
|---|---|
8080 |
REST API (Fleet + Pipeline) |
1883 |
MQTT TCP 브로커 (내장 aedes) |
8083 |
MQTT WebSocket (브라우저 클라이언트) |
9771 |
프론트엔드 |
흐름 요약
- 엣지 부팅: Python이 Pipeline에 heartbeat 발행 →
fleet_devices에 자동 등록 - 설정 조회: Python이
/api/fleet/v1/edges/{id}/config호출 → 현재 장비/태그 설정 받음 - PLC 수집: 설정된 대로 Modbus/OPC UA/S7 등으로 주기 수집
- MQTT 발행:
vexplor/devices/{id}/data로 실시간 값 발행 - Pipeline 저장: MQTT 구독 →
fleet_edge_raw_data적재 - 대시보드 표시:
/admin/fleet/data에서 실시간 차트 + 최신값 조회
설정 변경 시 반영
사용자가 웹에서 태그 설정을 변경하면:
pipeline_tag_mappingsUPDATE- Python이 다음 config sync 주기(기본 30초) 시 변경 감지
version(ETag) 기반이라 변경 없으면 304 응답 (트래픽 절약)- Python이 자동으로 새 설정으로 수집 재시작
Python 재시작 불필요 — 설정은 런타임에 동적 반영됩니다.