Files
pipeline/docs/FLEET_EDGE_INTEGRATION.md
chpark 4c1dc4082e
Build and Push Images / build-and-push (push) Has been cancelled
feat: Fleet/Collector/엣지 배포 관련 누적 작업 일괄 커밋
이전 세션들에서 작업된 아래 범위를 모두 포함:

Fleet 서브시스템 (src/fleet/)
- fleetDeviceService / fleetCommandService / fleetDeploymentService / fleetReleaseService
- fleetMetricsService, fleetScriptService, fleetEdgeConfigService
- Edge 디바이스 관리, 커맨드 발행, 배포/릴리스, 스크립트 동기화

Collector 확장
- centralMqttForwarder / centralForwarderConfigService
- equipmentStateService, pythonHookRunner, scriptCache
- Modbus/OPC-UA/S7/XGT 프로토콜 클라이언트
- targetDbIntrospection (저장 DB 조회)

Routes / API
- automationDashboardRoutes, centralForwarderRoutes, equipmentStateRoutes

DB
- importEdgeConfig (Python cached config → Pipeline DB)
- seedDataSources (external_db_connections 초기 시드)

엣지 배포 리소스
- docker/edge/Dockerfile.backend.prod, Dockerfile.frontend.prod
- docker/edge/docker-compose.edge.yml

프론트엔드
- admin/automaticMng (centralForwarder, dashboard, equipmentState)
- admin/fleet (commands, devices, deployments, releases, scripts, alerts)
- admin/pipeline-device 개선 (저장 DB 드롭다운, 태그 매핑 등)
- ExternalDbConnectionModal, ScriptsManagerDialog 등 신규 컴포넌트
- lib/api: automationDashboard, centralForwarder, equipmentState, fleet

docs/
- EDGE_SERVER_STRUCTURE, FLEET_COMPLETE, FLEET_EDGE_INTEGRATION, FLEET_HOOK_INTEGRATION

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 20:00:06 +09:00

5.2 KiB
Raw Permalink Blame History

Fleet × Edge Data Collector 연동 가이드

로컬 Pipeline과 엣지(공장) Python Data Collector를 연동하는 방법입니다.

연동 방식

[Python Data Collector]           [Pipeline (로컬)]
  ▲                                 ▲
  │ 1. GET /api/fleet/v1/edges/     │
  │      {edgeId}/config            │
  │                                 │
  │                                 │
  │ 2. PLC 수집 수행                │
  │                                 │
  │ 3. vexplor/devices/{edgeId}/    │
  │      data 로 MQTT publish       │
  │      vexplor/devices/{edgeId}/  │
  └──── status (heartbeat) ────────▶│
                                    │
                                    ▼
                               fleet_edge_raw_data
                               fleet_heartbeats

엣지 설정 (.env)

기존 엣지 /home/wace/data-collector/.env 수정:

# Pipeline 서버 URL (Fleet API + MQTT)
EDGE_SERVER_URL=http://<pipeline-host>:8080
MQTT_BROKER_URL=mqtt://<pipeline-host>:1883

# 기존 유지
DEVICE_ID=spifox-001
COMPANY_CODE=spifox
EDGE_CONFIG_SOURCE=api   # 'aas' 대신 'api' 선택 시 Pipeline Fleet API 호출
LOG_LEVEL=INFO

# Kafka는 로컬에서 불필요 (Pipeline 내장 MQTT 사용)
# KAFKA_BROKERS= (비워두기)

Pipeline API 엔드포인트

Python Data Collector가 호출하는 엔드포인트:

메서드 경로 용도
GET /api/fleet/v1/edges/{edgeId}/config 수집 설정 조회 (ETag 캐싱)
GET /api/fleet/edge/{edgeId}/config 위와 동일 (alias)

응답 형식 (Python EdgeConfig 모델과 호환):

{
  "version": "2026-04-17T07:25:26.766Z",
  "edge_id": "edge-spifox-001",
  "edge_name": "스피폭스 엣지 #1",
  "devices": [
    {
      "id": "1",
      "name": "CASE프레스_PLC_01",
      "protocol": "plc_ethernet",
      "connection": {
        "host": "192.168.1.10",
        "port": 2004,
        "unit_id": 1
      },
      "interval_ms": 1000,
      "enabled": true,
      "tags": [
        {
          "name": "temperature",
          "address": "40001",
          "data_type": "UINT16",
          "byte_order": "BIG_ENDIAN",
          "scale": 0.1,
          "offset": 0,
          "unit": "°C"
        }
      ]
    }
  ],
  "aggregation_interval_sec": 60,
  "local_retention_days": 7
}

MQTT 토픽 규칙

Python이 발행하는 토픽:

토픽 페이로드 주기
vexplor/devices/{edgeId}/status heartbeat (CPU/메모리/디스크) 30초
vexplor/devices/{edgeId}/data 태그 값 (아래 참조) interval_ms
vexplor/devices/{edgeId}/responses 커맨드 응답 요청 시

데이터 페이로드 예시

{
  "timestamp": "2026-04-17T08:00:00.123Z",
  "equipment_id": 4,
  "connection_id": 1,
  "tags": {
    "temperature": 25.4,
    "pressure": 11.2,
    "status": true,
    "mode": "AUTO"
  }
}

Pipeline은 이 데이터를 fleet_edge_raw_data 테이블에 자동 적재합니다.

로컬 테스트

Pipeline이 로컬에 떠있는 상태에서 테스트 엣지 시뮬레이터:

# MQTT heartbeat 발송 (자동 등록)
docker exec pipeline-backend node -e "
const mqtt = require('mqtt');
const c = mqtt.connect('mqtt://127.0.0.1:1883');
c.on('connect', () => {
  c.publish('vexplor/devices/edge-test-001/status', JSON.stringify({
    cpu_percent: 25, memory_percent: 45, disk_percent: 60,
    ip_address: '192.168.1.100', status: 'online'
  }), { qos: 1 }, () => c.end(true));
});
"

# 설정 조회
curl http://localhost:8080/api/fleet/edge/edge-test-001/config

# 태그 데이터 발송
docker exec pipeline-backend node -e "
const mqtt = require('mqtt');
const c = mqtt.connect('mqtt://127.0.0.1:1883');
c.on('connect', () => {
  c.publish('vexplor/devices/edge-test-001/data', JSON.stringify({
    timestamp: new Date().toISOString(),
    equipment_id: 4,
    tags: { temperature: 25.5, pressure: 11.2 }
  }), { qos: 1 }, () => c.end(true));
});
"

포트 정리

로컬 Pipeline이 노출하는 포트:

포트 용도
8080 REST API (Fleet + Pipeline)
1883 MQTT TCP 브로커 (내장 aedes)
8083 MQTT WebSocket (브라우저 클라이언트)
9771 프론트엔드

흐름 요약

  1. 엣지 부팅: Python이 Pipeline에 heartbeat 발행 → fleet_devices에 자동 등록
  2. 설정 조회: Python이 /api/fleet/v1/edges/{id}/config 호출 → 현재 장비/태그 설정 받음
  3. PLC 수집: 설정된 대로 Modbus/OPC UA/S7 등으로 주기 수집
  4. MQTT 발행: vexplor/devices/{id}/data 로 실시간 값 발행
  5. Pipeline 저장: MQTT 구독 → fleet_edge_raw_data 적재
  6. 대시보드 표시: /admin/fleet/data 에서 실시간 차트 + 최신값 조회

설정 변경 시 반영

사용자가 웹에서 태그 설정을 변경하면:

  • pipeline_tag_mappings UPDATE
  • Python이 다음 config sync 주기(기본 30초) 시 변경 감지
  • version (ETag) 기반이라 변경 없으면 304 응답 (트래픽 절약)
  • Python이 자동으로 새 설정으로 수집 재시작

Python 재시작 불필요 — 설정은 런타임에 동적 반영됩니다.