4c1dc4082e
Build and Push Images / build-and-push (push) Has been cancelled
이전 세션들에서 작업된 아래 범위를 모두 포함: Fleet 서브시스템 (src/fleet/) - fleetDeviceService / fleetCommandService / fleetDeploymentService / fleetReleaseService - fleetMetricsService, fleetScriptService, fleetEdgeConfigService - Edge 디바이스 관리, 커맨드 발행, 배포/릴리스, 스크립트 동기화 Collector 확장 - centralMqttForwarder / centralForwarderConfigService - equipmentStateService, pythonHookRunner, scriptCache - Modbus/OPC-UA/S7/XGT 프로토콜 클라이언트 - targetDbIntrospection (저장 DB 조회) Routes / API - automationDashboardRoutes, centralForwarderRoutes, equipmentStateRoutes DB - importEdgeConfig (Python cached config → Pipeline DB) - seedDataSources (external_db_connections 초기 시드) 엣지 배포 리소스 - docker/edge/Dockerfile.backend.prod, Dockerfile.frontend.prod - docker/edge/docker-compose.edge.yml 프론트엔드 - admin/automaticMng (centralForwarder, dashboard, equipmentState) - admin/fleet (commands, devices, deployments, releases, scripts, alerts) - admin/pipeline-device 개선 (저장 DB 드롭다운, 태그 매핑 등) - ExternalDbConnectionModal, ScriptsManagerDialog 등 신규 컴포넌트 - lib/api: automationDashboard, centralForwarder, equipmentState, fleet docs/ - EDGE_SERVER_STRUCTURE, FLEET_COMPLETE, FLEET_EDGE_INTEGRATION, FLEET_HOOK_INTEGRATION Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
12 KiB
12 KiB
Fleet Hook - 웹에서 Python 로직 편집 가이드
엣지 Data Collector의 동작을 웹에서 Python 스크립트로 커스터마이징하는 기능입니다.
개념
┌─ Pipeline 웹 UI ─────────────┐
│ 사용자가 Python 함수 편집 │
│ (Monaco 에디터) │
│ ↓ │
│ [테스트] 버튼으로 미리 검증 │
│ ↓ │
│ [저장] → fleet_edge_scripts │
└──────────────┬───────────────┘
│
│ /api/fleet/v1/edges/{id}/config
│ (ETag 캐싱)
▼
┌─ 엣지 Data Collector (Python) ┐
│ scripts = config["scripts"] │
│ for script in scripts: │
│ load_hook(script) │
│ │
│ 수집 사이클마다: │
│ ├ raw_value = read_plc() │
│ ├ value = transform(...) │ ← Hook 1
│ ├ tags.update(derived(...)) │ ← Hook 2
│ ├ if not filter_data(...): │ ← Hook 3
│ │ skip │
│ ├ alarm_info = alarm(...) │ ← Hook 4
│ ├ payload = pre_send(...) │ ← Hook 5
│ └ publish_mqtt(payload) │
└───────────────────────────────┘
5가지 Hook 종류
| Hook | 시점 | 입력 | 출력 | 용도 |
|---|---|---|---|---|
| transform | 원시값 변환 | tag_name, raw_value, context | 변환된 값 | 센서 스케일링, 단위 변환 |
| derived_tags | 파생 태그 계산 | tags 딕셔너리, context | 새 태그 딕셔너리 | 여러 태그 조합 (전력 = V×I) |
| filter | 발행 여부 판단 | tags, context | bool | 조건부 수집 (가동 중만) |
| alarm | 알람 판정 | tag_name, value, context | dict 또는 None | 임계값 초과 알람 |
| pre_send | MQTT 발행 전 | payload, context | 가공된 payload | 최종 메타데이터 추가 |
적용 범위 (scope)
- global: 모든 엣지에 적용
- equipment: 특정 장비만 (pipeline_equipment)
- connection: 특정 통신 연결만 (pipeline_device_connections)
- device: 특정 엣지 디바이스만 (fleet_devices)
Python 엣지 쪽 hook loader 샘플
기존 Data Collector 프로젝트(/Users/chpark/workspace/data-collector/src/data_collector/)에 추가할 파일:
hooks/hook_loader.py
"""
Hook Loader - Fleet API에서 받은 Python 스크립트를 로드/실행
"""
import logging
from typing import Any, Callable, Dict, List, Optional
import structlog
logger = structlog.get_logger(__name__)
# 허용된 내장 함수/모듈 (보안)
ALLOWED_BUILTINS = {
'abs', 'all', 'any', 'bool', 'bytes', 'dict', 'enumerate', 'filter',
'float', 'int', 'len', 'list', 'map', 'max', 'min', 'print', 'range',
'round', 'set', 'sorted', 'str', 'sum', 'tuple', 'type', 'zip',
'isinstance', 'hasattr', 'getattr', 'True', 'False', 'None',
}
class HookRegistry:
"""Hook 스크립트 등록 및 실행"""
# hook_type → [(script_id, priority, scope, callable, meta)]
hooks: Dict[str, List[Dict[str, Any]]] = {}
# 스크립트 ID → 컴파일된 함수 캐시
compiled: Dict[int, Dict[str, Callable]] = {}
@classmethod
def load_from_config(cls, scripts: List[Dict[str, Any]]) -> None:
"""
Fleet API에서 받은 스크립트 목록을 로드
각 hook별로 priority 순으로 정렬
"""
cls.hooks = {}
cls.compiled = {}
func_name_map = {
"transform": "transform",
"derived_tags": "derived_tags",
"filter": "filter_data",
"alarm": "alarm",
"pre_send": "pre_send",
}
for script in scripts:
try:
hook_type = script["hook_type"]
func_name = func_name_map.get(hook_type)
if not func_name:
continue
# 제한된 네임스페이스에서 컴파일
import math
from datetime import datetime, date
allowed_globals = {
"__builtins__": {k: __builtins__[k] for k in ALLOWED_BUILTINS if k in dir(__builtins__)},
"math": math,
"datetime": datetime,
"date": date,
}
exec(script["code"], allowed_globals)
func = allowed_globals.get(func_name)
if not callable(func):
logger.warning(f"함수 {func_name}가 정의되지 않음: script id={script['id']}")
continue
cls.hooks.setdefault(hook_type, []).append({
"script_id": script["id"],
"script_name": script.get("script_name", ""),
"scope": script.get("scope", "global"),
"equipment_id": script.get("equipment_id"),
"connection_id": script.get("connection_id"),
"priority": script.get("priority", 100),
"timeout_ms": script.get("timeout_ms", 1000),
"func": func,
})
logger.info(f"Hook 로드: {hook_type} / script_id={script['id']} v{script.get('version', 1)}")
except Exception as e:
logger.error(f"Hook 컴파일 실패 (id={script.get('id')}): {e}")
# 우선순위 정렬
for hooks in cls.hooks.values():
hooks.sort(key=lambda h: h["priority"])
@classmethod
def _match_scope(cls, hook: Dict[str, Any], equipment_id: Optional[int], connection_id: Optional[int]) -> bool:
"""스코프 매칭"""
scope = hook.get("scope", "global")
if scope == "global":
return True
if scope == "equipment" and hook.get("equipment_id") == equipment_id:
return True
if scope == "connection" and hook.get("connection_id") == connection_id:
return True
return False
@classmethod
def run_transform(cls, tag_name: str, raw_value: Any, context: dict) -> Any:
"""transform hook 실행 (파이프라인 - 순차 적용)"""
value = raw_value
for hook in cls.hooks.get("transform", []):
if not cls._match_scope(hook, context.get("equipment_id"), context.get("connection_id")):
continue
try:
value = hook["func"](tag_name, value, context)
except Exception as e:
logger.warning(f"transform 실패 (script_id={hook['script_id']}): {e}")
return value
@classmethod
def run_derived_tags(cls, tags: dict, context: dict) -> dict:
"""derived_tags hook 실행 (모든 hook 결과 병합)"""
result = {}
for hook in cls.hooks.get("derived_tags", []):
if not cls._match_scope(hook, context.get("equipment_id"), context.get("connection_id")):
continue
try:
new_tags = hook["func"](tags, context) or {}
if isinstance(new_tags, dict):
result.update(new_tags)
except Exception as e:
logger.warning(f"derived_tags 실패: {e}")
return result
@classmethod
def run_filter(cls, tags: dict, context: dict) -> bool:
"""filter hook 실행 (AND - 모두 True여야 발행)"""
for hook in cls.hooks.get("filter", []):
if not cls._match_scope(hook, context.get("equipment_id"), context.get("connection_id")):
continue
try:
if not hook["func"](tags, context):
return False
except Exception as e:
logger.warning(f"filter 실패: {e}")
return True
@classmethod
def run_alarm(cls, tag_name: str, value: Any, context: dict) -> List[dict]:
"""alarm hook 실행 (모든 알람 수집)"""
alarms = []
for hook in cls.hooks.get("alarm", []):
if not cls._match_scope(hook, context.get("equipment_id"), context.get("connection_id")):
continue
try:
alarm_info = hook["func"](tag_name, value, context)
if alarm_info:
alarm_info["script_id"] = hook["script_id"]
alarms.append(alarm_info)
except Exception as e:
logger.warning(f"alarm 실패: {e}")
return alarms
@classmethod
def run_pre_send(cls, payload: dict, context: dict) -> dict:
"""pre_send hook 실행 (순차 적용)"""
result = payload
for hook in cls.hooks.get("pre_send", []):
if not cls._match_scope(hook, context.get("equipment_id"), context.get("connection_id")):
continue
try:
result = hook["func"](result, context) or result
except Exception as e:
logger.warning(f"pre_send 실패: {e}")
return result
수집 파이프라인 통합 (collectors/manager.py)
# 수집 루프 안에서...
from data_collector.hooks.hook_loader import HookRegistry
async def collect_and_publish(self, device):
raw_data = await self.collector.collect()
context = {
"device_id": self.device_id,
"equipment_id": device.equipment_id,
"connection_id": device.id,
"company_code": self.company_code,
}
# 1. transform 각 태그에 적용
tags = {}
for tag_name, raw_value in raw_data.items():
tags[tag_name] = HookRegistry.run_transform(tag_name, raw_value, context)
# 2. derived_tags 병합
tags.update(HookRegistry.run_derived_tags(tags, context))
# 3. filter 체크
if not HookRegistry.run_filter(tags, context):
logger.debug("filter로 스킵")
return
# 4. alarm 판정
alarms = []
for tag_name, value in tags.items():
alarms.extend(HookRegistry.run_alarm(tag_name, value, context))
if alarms:
# 알람 발행 (MQTT vexplor/devices/{id}/alarms 등)
self.publish_alarms(alarms)
# 5. 최종 payload 가공
payload = {
"timestamp": datetime.now().isoformat(),
"equipment_id": device.equipment_id,
"connection_id": device.id,
"tags": tags,
}
payload = HookRegistry.run_pre_send(payload, context)
# 6. MQTT 발행
self.mqtt.publish(f"vexplor/devices/{self.device_id}/data", payload)
config_syncer에 hook 로드 추가
async def fetch_config(self):
# ... 기존 설정 조회 ...
# Hook 스크립트 로드
if config.get("scripts"):
from data_collector.hooks.hook_loader import HookRegistry
HookRegistry.load_from_config(config["scripts"])
logger.info(f"Hook 스크립트 로드: {len(config['scripts'])}개")
로컬 테스트
Pipeline 웹에서:
- 시스템 관리 > Python Hook 메뉴 접근
- 새 스크립트 → Hook 타입 선택 → 예제 코드 자동 로드
- 우측 Monaco 에디터에서 편집
- 좌측 하단 테스트 입력 JSON 작성 → 실행 버튼
- 결과 확인 후 저장
API 엔드포인트
| 메서드 | 경로 | 용도 |
|---|---|---|
| GET | /api/fleet/scripts/hook-types |
Hook 타입 5종 + 예제 코드 |
| GET | /api/fleet/scripts |
스크립트 목록 |
| POST | /api/fleet/scripts |
생성 |
| PUT | /api/fleet/scripts/:id |
수정 (자동 버전 증가) |
| DELETE | /api/fleet/scripts/:id |
삭제 |
| POST | /api/fleet/scripts/dry-run |
저장 전 테스트 실행 |
| GET | /api/fleet/scripts/:id/versions |
버전 이력 |
| POST | /api/fleet/scripts/:id/rollback/:version |
롤백 |
| GET | /api/fleet/v1/edges/:id/config |
엣지용 전체 설정 (scripts 포함) |
보안 사항
- Python
exec()실행 시 제한된 네임스페이스 (ALLOWED_BUILTINS만) import제한 (math, datetime, json만 허용)- 파일 시스템 / 네트워크 접근 차단
- 각 hook 실행 타임아웃 (기본 1초)
- Dry-run 시 Python 서브프로세스 격리
실시간 반영
- 웹에서 수정 → PUT API 호출
- DB UPDATE 트리거 → version 증가 + 이력 저장
- Python이 다음 config sync 주기(기본 30초) 시 새 버전 감지
HookRegistry.load_from_config()재실행 → 즉시 적용- Python 재시작 불필요