e4bca14a90
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
286 lines
13 KiB
Markdown
286 lines
13 KiB
Markdown
# Pipeline
|
|
|
|
산업 현장 **장비/외부 DB 데이터 수집** + **AI 에이전트 오케스트레이션**을 한 플랫폼에서 통합 관리하는 풀스택 애플리케이션. 엣지 서버에서 PLC·센서·외부 DB 의 데이터를 표준 포맷으로 수집해 IDC 중앙 시계열 DB(TimescaleDB) 에 적재하고, 동시에 LLM 기반 다중 에이전트 워크플로를 정의·실행하는 운영 플랫폼.
|
|
|
|
기존 Python 기반 `data-collector + fleet-agent + kafka-to-central-mqtt` 의 모든 기능을 흡수해 단일 Node.js 컨테이너로 대체.
|
|
|
|
---
|
|
|
|
## 핵심 기능
|
|
|
|
### 1. 장비 통신 (다중 프로토콜 수집기)
|
|
|
|
[backend-node/src/services/collector/](backend-node/src/services/collector/)
|
|
|
|
| 프로토콜 | 클라이언트 | 용도 |
|
|
|---|---|---|
|
|
| **LS XGT FEnet** | `protocols/xgtClient.ts` | LS Electric XGK/XGI/XGR PLC (TCP 2004) |
|
|
| **Modbus TCP** | `protocols/modbusClient.ts` | 표준 Modbus, byte_order 4종 (BIG/LITTLE/BIG_SWAP/LITTLE_SWAP) |
|
|
| **OPC UA** | `protocols/opcuaClient.ts` | OPC UA 서버 (lazy-load `node-opcua`) |
|
|
| **Siemens S7** | `protocols/s7Client.ts` | S7-1200/1500 (`snap7` 기반) |
|
|
| **MQTT 구독 수집** | `protocols/mqttCollectorClient.ts` | 외부 브로커 토픽 구독 → 태그 캐시, JSON path 추출 |
|
|
| **MSSQL 쿼리** | `protocols/mssqlClient.ts` | watermark 기반 증분 SELECT, batch_mode (row 당 장비), timestamp_expression |
|
|
|
|
설정은 DB(`pipeline_device_connections` + `pipeline_tag_mappings`) 기반. UI 에서 연결 추가 → 태그 정의 → 저장 DB/테이블/컬럼 매핑 → 폴링 자동 시작.
|
|
|
|
### 2. 데이터 적재 (Long/Wide 자동 감지)
|
|
|
|
[deviceCollectorService.ts](backend-node/src/services/collector/deviceCollectorService.ts)
|
|
|
|
- **Long 포맷** 자동 감지 (`time, tag_name, value` 컬럼 존재 시)
|
|
→ 태그 1개당 1행 INSERT, `metadata` JSONB 에 `device_id(UUID) / forwarded_at(ISO) / priority` 저장
|
|
→ IDC `edge_telemetry` / `edge_telemetry_1` 등 시계열 테이블 호환
|
|
- **Wide 포맷**: 1행에 다중 컬럼 (전통적 RDB 스키마)
|
|
- **타겟 DB retry queue** (`pipeline_target_retry_queue` 테이블, 30초 워커, exponential backoff, 최대 10회)
|
|
IDC TimescaleDB 일시 다운 시 데이터 유실 방지
|
|
- **Watermark 영속화** (`pipeline_collector_watermark`)
|
|
MSSQL/SQL 증분 수집 재기동 시 중복 없음
|
|
|
|
### 3. Edge 자가 보고 (heartbeat + 이벤트)
|
|
|
|
[edgeStatusReporter.ts](backend-node/src/services/collector/edgeStatusReporter.ts)
|
|
|
|
- `edge_status_1` 60초 간격 heartbeat (`status: online`, ip_address, host, source)
|
|
- `edge_events_1` 이벤트 기록 (PLC 연결/해제/오류 자동 + pipeline 기동/종료)
|
|
- 환경변수 `PIPELINE_EDGE_*` 로 edge_id / company_id / company_uuid / table / interval 주입
|
|
|
|
### 4. MQTT 통합 (3 in 1)
|
|
|
|
| 역할 | 위치 | 설명 |
|
|
|---|---|---|
|
|
| **내장 MQTT 브로커** | [fleet/mqttBroker.ts](backend-node/src/fleet/mqttBroker.ts) (aedes) | TCP `:1883` + WebSocket `:8083` 동시 노출. 외부 Mosquitto/EMQX 불필요 |
|
|
| **중앙 MQTT 포워더** | [centralMqttForwarder.ts](backend-node/src/services/collector/centralMqttForwarder.ts) | 수집 데이터 → 원격 IDC MQTT (`dt/v1/data/{co}/{edge}`), 60s heartbeat (`dt/v1/status/...`) |
|
|
| **MQTT 구독 수집기** | [protocols/mqttCollectorClient.ts](backend-node/src/services/collector/protocols/mqttCollectorClient.ts) | 외부 브로커 토픽을 태그로 끌어옴 |
|
|
|
|
### 5. 외부 DB 연결 관리
|
|
|
|
[externalDbConnectionService.ts](backend-node/src/services/externalDbConnectionService.ts) + [externalDbHelper.ts](backend-node/src/services/externalDbHelper.ts)
|
|
|
|
- **PostgreSQL / MySQL/MariaDB / MSSQL / Oracle** 통합 풀
|
|
- 비밀번호 AES 암호화 저장 (`PASSWORD_ENCRYPTION_KEY`)
|
|
- UI 에서 연결 등록 → 테이블/컬럼 introspection → 매핑 정의 (배치, 수집 타겟 등)
|
|
|
|
### 6. AI 에이전트 오케스트레이션
|
|
|
|
`backend-node/src/services/`
|
|
|
|
| 모듈 | 역할 |
|
|
|---|---|
|
|
| `aiAgentService.ts` | 에이전트 정의 (모델/프롬프트/툴) |
|
|
| `aiAgentGroupService.ts` | 그룹(다중 에이전트 묶음) + 멤버 + 실행 순서 |
|
|
| `aiAgentProviderService.ts` | LLM provider (Anthropic / OpenAI / 로컬 등) |
|
|
| `aiAgentApiKeyService.ts` | provider API key 보관 (암호화) |
|
|
| `aiAgentUsageService.ts` | 토큰/요청 사용량 집계 |
|
|
| `aiAgentConversationService.ts` | 대화 히스토리 관리 |
|
|
| `multiAgentExecutionEngine.ts` | **그룹 실행 엔진** — 순차/병렬/조건 분기, connector 결과 주입, 토큰 집계 |
|
|
| `llmClient.ts` | LLM 호출 래퍼 |
|
|
| `aiSchedulerService.ts` | cron 기반 자동 실행 (ai_agent_schedules) |
|
|
| `aiAnalysisLogService.ts` | 실행 로그/감사 |
|
|
|
|
UI: [admin/aiAssistant](frontend/app/(main)/admin/aiAssistant/) (knowledge / workspace / agents / groups …)
|
|
|
|
### 7. Fleet (엣지 디바이스 관리)
|
|
|
|
[backend-node/src/fleet/](backend-node/src/fleet/)
|
|
|
|
- `fleetDeviceService` — 디바이스 등록/heartbeat 수신
|
|
- `fleetCommandService` — 원격 명령 (재시작 등) 발행/추적
|
|
- `fleetDeploymentService` + `fleetReleaseService` — OTA 배포/릴리스 관리
|
|
- `fleetScriptService` — 디바이스 스크립트 동기화
|
|
- `fleetAlertRuleService` — heartbeat/메트릭 임계값 알림
|
|
- `fleetHarborService` — Harbor 레지스트리 이미지 동기화
|
|
|
|
### 8. Python Hook (수집 후처리)
|
|
|
|
[pythonHookRunner.ts](backend-node/src/services/collector/pythonHookRunner.ts) + [scriptCache.ts](backend-node/src/services/collector/scriptCache.ts)
|
|
|
|
- `transform`(태그 단위) → `filter` → `derived_tags` 순으로 적용
|
|
- 사용자가 작성한 Python 스니펫을 sandbox 실행 (timeout/메모리 제한)
|
|
- 스크립트 priority 로 실행 순서 제어
|
|
|
|
### 9. 배치 (외부 API/DB → 내부 동기화)
|
|
|
|
`batch_configs` + `batch_mappings` + [batchSchedulerService.ts](backend-node/src/services/batchSchedulerService.ts)
|
|
|
|
- REST API / 외부 DB → 내부 테이블로 column 매핑 일괄 import
|
|
- save_mode: `INSERT` / `UPSERT` (conflict_key 지정)
|
|
- cron 스케줄 + 수동 실행
|
|
- 실 사용 예: AAS 장비목록·자산 토폴로지 동기화, 환율, 기상특보
|
|
|
|
### 10. 데이터플로 (시각 워크플로)
|
|
|
|
`flow_definitions` + [flowExecutionService.ts](backend-node/src/services/flowExecutionService.ts)
|
|
|
|
- 노드 기반 데이터 이동/조건/변환
|
|
- `dataflowDiagramService.ts` — 다이어그램 저장
|
|
- `enhancedDataflowControlService.ts` — 단계 제어
|
|
- AI 그룹 실행에서도 connector 로 호출 가능
|
|
|
|
---
|
|
|
|
## 기술 스택
|
|
|
|
### Frontend
|
|
- **Next.js** (App Router, Turbopack) + **TypeScript**
|
|
- **shadcn/ui** + **Radix UI** + Tailwind CSS
|
|
- TanStack Query + React Context
|
|
- Lucide React 아이콘
|
|
|
|
### Backend
|
|
- **Node.js 20+** / **TypeScript**
|
|
- **Express** + express-rate-limit + express-async-errors
|
|
- **PostgreSQL** (`pg`) — 메인 메타데이터 DB
|
|
- **mssql / mysql2 / oracledb (lazy)** — 외부 DB 통합
|
|
- **mqtt** + **aedes** — 클라이언트 + 내장 브로커
|
|
- **dockerode** — Docker 컨테이너 제어 (Fleet 배포)
|
|
- **bcryptjs / jsonwebtoken** — 인증
|
|
- **AES** 암호화 — DB 비번/API key 보호
|
|
|
|
### 인프라
|
|
- **Docker** + Docker Compose (개발/프로덕션 분리)
|
|
- **Harbor** 프라이빗 레지스트리 (`harbor.wace.me/vexplor_fleet`)
|
|
- **Watchtower** — 엣지 자동 이미지 업데이트
|
|
- **TimescaleDB** — 시계열 적재 (IDC NodePort `:30543`)
|
|
|
|
---
|
|
|
|
## 디렉터리 구조
|
|
|
|
```
|
|
.
|
|
├── backend-node/
|
|
│ └── src/
|
|
│ ├── app.ts # Express 부트스트랩 + 부팅 시 폴링/리포터/배치 기동
|
|
│ ├── routes/ # 100+ 라우터 (도메인별)
|
|
│ ├── controllers/
|
|
│ ├── services/
|
|
│ │ ├── collector/ # 장비 수집 (XGT/Modbus/OPCUA/S7/MQTT/MSSQL)
|
|
│ │ │ ├── deviceCollectorService.ts
|
|
│ │ │ ├── edgeStatusReporter.ts
|
|
│ │ │ ├── centralMqttForwarder.ts
|
|
│ │ │ └── protocols/
|
|
│ │ ├── aiAgent*.ts # AI 에이전트 묶음
|
|
│ │ ├── multiAgentExecutionEngine.ts
|
|
│ │ ├── batchSchedulerService.ts
|
|
│ │ ├── flow*.ts # 데이터플로
|
|
│ │ └── externalDb*.ts # 외부 DB 통합
|
|
│ ├── fleet/ # 엣지 디바이스 관리
|
|
│ ├── database/
|
|
│ └── utils/
|
|
├── frontend/
|
|
│ ├── app/(main)/admin/ # 관리자 페이지
|
|
│ │ ├── pipeline-device/ # 장비 통신 설정
|
|
│ │ ├── automaticMng/ # 배치/대시보드/포워더/장비상태
|
|
│ │ ├── aiAssistant/ # AI 에이전트
|
|
│ │ └── fleet/ # Fleet 관리
|
|
│ ├── components/
|
|
│ └── lib/api/ # 백엔드 API 클라이언트
|
|
├── docker/
|
|
│ ├── dev/ # 로컬 개발 compose
|
|
│ └── edge/ # 엣지 배포 compose + Dockerfile
|
|
└── docs/ # 운영 가이드
|
|
```
|
|
|
|
---
|
|
|
|
## 빠른 시작 (로컬 개발)
|
|
|
|
```bash
|
|
# 1. 백엔드
|
|
cd backend-node
|
|
cp .env.example .env # DATABASE_URL, JWT_SECRET, PASSWORD_ENCRYPTION_KEY 설정
|
|
npm install
|
|
npm run dev # nodemon, ts-node, hot reload
|
|
|
|
# 2. 프론트엔드 (별도 터미널)
|
|
cd frontend
|
|
npm install
|
|
npm run dev # http://localhost:9771
|
|
|
|
# OR Docker Compose (둘 다 한 번에)
|
|
docker compose -f docker/dev/docker-compose.backend.mac.yml up -d
|
|
docker compose -f docker/dev/docker-compose.frontend.mac.yml up -d
|
|
```
|
|
|
|
기본 접속: `http://localhost:9771` (프론트) / `http://localhost:8080/api` (백엔드)
|
|
|
|
### 주요 환경변수
|
|
|
|
| 변수 | 설명 |
|
|
|---|---|
|
|
| `DATABASE_URL` | Pipeline 메타 PostgreSQL 연결 |
|
|
| `JWT_SECRET` | JWT 서명 키 |
|
|
| `PASSWORD_ENCRYPTION_KEY` | 외부 DB 비번/API key AES 키 (32 bytes) |
|
|
| `CORS_ORIGIN` | 허용 origin (콤마 구분) |
|
|
| `MQTT_PORT` / `MQTT_WS_PORT` | 내장 브로커 포트 (기본 1883/8083) |
|
|
| `ENABLE_AUTO_COLLECTOR` | 부팅 시 활성 연결 자동 폴링 시작 (`true/false`) |
|
|
| `PIPELINE_EDGE_REPORTER` | edge_status/edge_events 자동 적재 활성 |
|
|
| `PIPELINE_EDGE_*` | edge_id, company_id/uuid, target_db_id, table, interval |
|
|
|
|
---
|
|
|
|
## 엣지 배포
|
|
|
|
```bash
|
|
# 이미지 빌드 + Harbor push (amd64)
|
|
docker buildx build --platform linux/amd64 \
|
|
-f docker/edge/Dockerfile.backend.prod \
|
|
-t harbor.wace.me/vexplor_fleet/pipeline-backend:latest --push ./backend-node
|
|
|
|
# 엣지 서버에서
|
|
cd /home/wace/pipeline-edge
|
|
docker compose -f docker-compose.edge.yml pull
|
|
docker compose -f docker-compose.edge.yml up -d
|
|
```
|
|
|
|
`docker/edge/docker-compose.edge.yml` 에 backend + front + 내장 MQTT 브로커 포트(1883/8083) 노출.
|
|
|
|
`watchtower` 가 Harbor 폴링하면서 새 이미지 자동 pull + recreate.
|
|
|
|
---
|
|
|
|
## 데이터 흐름 (실제 운영)
|
|
|
|
```
|
|
PLC/센서/외부 DB
|
|
│
|
|
▼ (다중 프로토콜 수집)
|
|
pipeline-backend (엣지)
|
|
│
|
|
├──→ MQTT 발행 (내부/UI 실시간 스트리밍)
|
|
├──→ pipeline_collected_data (메타 DB 시계열 원본)
|
|
├──→ Python 훅 (transform/filter/derived)
|
|
├──→ central MQTT (옵션, 외부 시스템 호환)
|
|
└──→ IDC TimescaleDB (edge_telemetry / _1) ◀── 핵심 적재 경로
|
|
실패 시 → pipeline_target_retry_queue → 30s 워커 재시도
|
|
|
|
별도 경로:
|
|
edge_status_1 ◀── 60초 heartbeat
|
|
edge_events_1 ◀── PLC 연결 전이 + 기동/종료 이벤트
|
|
```
|
|
|
|
---
|
|
|
|
## 주요 DB 테이블
|
|
|
|
| 영역 | 테이블 |
|
|
|---|---|
|
|
| 장비 통신 | `pipeline_device_connections`, `pipeline_tag_mappings`, `pipeline_equipment` |
|
|
| 수집 결과 | `pipeline_collected_data`, `pipeline_target_retry_queue`, `pipeline_collector_watermark` |
|
|
| 외부 연동 | `external_db_connections`, `external_rest_api_connections`, `flow_external_db_connection` |
|
|
| 배치 | `batch_configs`, `batch_mappings`, `batch_execution_logs` |
|
|
| 데이터플로 | `flow_definitions`, `flow_connections`, `flow_executions` |
|
|
| AI | `ai_agents`, `ai_agent_groups`, `ai_agent_group_members`, `ai_agent_schedules`, `ai_analysis_logs` |
|
|
| Fleet | `fleet_devices`, `fleet_heartbeats`, `fleet_commands`, `fleet_releases`, `fleet_deployments` |
|
|
| 멀티테넌시 | `user_info`, `user_dept`, `menu_info`, `rel_menu_auth` |
|
|
|
|
---
|
|
|
|
## 인증 / 권한
|
|
|
|
- JWT 토큰 (Access 24h / Refresh 7d 기본)
|
|
- bcrypt 비번 해싱
|
|
- 회사 격리 (`company_code`), `*` = 슈퍼관리자
|
|
- 라우트 단위 미들웨어 (`authMiddleware`) + 메뉴 권한 (`rel_menu_auth`)
|
|
|