Files
chpark e4bca14a90 docs(README): 운영 메모 / 라이선스 섹션 제거
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 18:30:54 +09:00

286 lines
13 KiB
Markdown

# Pipeline
산업 현장 **장비/외부 DB 데이터 수집** + **AI 에이전트 오케스트레이션**을 한 플랫폼에서 통합 관리하는 풀스택 애플리케이션. 엣지 서버에서 PLC·센서·외부 DB 의 데이터를 표준 포맷으로 수집해 IDC 중앙 시계열 DB(TimescaleDB) 에 적재하고, 동시에 LLM 기반 다중 에이전트 워크플로를 정의·실행하는 운영 플랫폼.
기존 Python 기반 `data-collector + fleet-agent + kafka-to-central-mqtt` 의 모든 기능을 흡수해 단일 Node.js 컨테이너로 대체.
---
## 핵심 기능
### 1. 장비 통신 (다중 프로토콜 수집기)
[backend-node/src/services/collector/](backend-node/src/services/collector/)
| 프로토콜 | 클라이언트 | 용도 |
|---|---|---|
| **LS XGT FEnet** | `protocols/xgtClient.ts` | LS Electric XGK/XGI/XGR PLC (TCP 2004) |
| **Modbus TCP** | `protocols/modbusClient.ts` | 표준 Modbus, byte_order 4종 (BIG/LITTLE/BIG_SWAP/LITTLE_SWAP) |
| **OPC UA** | `protocols/opcuaClient.ts` | OPC UA 서버 (lazy-load `node-opcua`) |
| **Siemens S7** | `protocols/s7Client.ts` | S7-1200/1500 (`snap7` 기반) |
| **MQTT 구독 수집** | `protocols/mqttCollectorClient.ts` | 외부 브로커 토픽 구독 → 태그 캐시, JSON path 추출 |
| **MSSQL 쿼리** | `protocols/mssqlClient.ts` | watermark 기반 증분 SELECT, batch_mode (row 당 장비), timestamp_expression |
설정은 DB(`pipeline_device_connections` + `pipeline_tag_mappings`) 기반. UI 에서 연결 추가 → 태그 정의 → 저장 DB/테이블/컬럼 매핑 → 폴링 자동 시작.
### 2. 데이터 적재 (Long/Wide 자동 감지)
[deviceCollectorService.ts](backend-node/src/services/collector/deviceCollectorService.ts)
- **Long 포맷** 자동 감지 (`time, tag_name, value` 컬럼 존재 시)
→ 태그 1개당 1행 INSERT, `metadata` JSONB 에 `device_id(UUID) / forwarded_at(ISO) / priority` 저장
→ IDC `edge_telemetry` / `edge_telemetry_1` 등 시계열 테이블 호환
- **Wide 포맷**: 1행에 다중 컬럼 (전통적 RDB 스키마)
- **타겟 DB retry queue** (`pipeline_target_retry_queue` 테이블, 30초 워커, exponential backoff, 최대 10회)
IDC TimescaleDB 일시 다운 시 데이터 유실 방지
- **Watermark 영속화** (`pipeline_collector_watermark`)
MSSQL/SQL 증분 수집 재기동 시 중복 없음
### 3. Edge 자가 보고 (heartbeat + 이벤트)
[edgeStatusReporter.ts](backend-node/src/services/collector/edgeStatusReporter.ts)
- `edge_status_1` 60초 간격 heartbeat (`status: online`, ip_address, host, source)
- `edge_events_1` 이벤트 기록 (PLC 연결/해제/오류 자동 + pipeline 기동/종료)
- 환경변수 `PIPELINE_EDGE_*` 로 edge_id / company_id / company_uuid / table / interval 주입
### 4. MQTT 통합 (3 in 1)
| 역할 | 위치 | 설명 |
|---|---|---|
| **내장 MQTT 브로커** | [fleet/mqttBroker.ts](backend-node/src/fleet/mqttBroker.ts) (aedes) | TCP `:1883` + WebSocket `:8083` 동시 노출. 외부 Mosquitto/EMQX 불필요 |
| **중앙 MQTT 포워더** | [centralMqttForwarder.ts](backend-node/src/services/collector/centralMqttForwarder.ts) | 수집 데이터 → 원격 IDC MQTT (`dt/v1/data/{co}/{edge}`), 60s heartbeat (`dt/v1/status/...`) |
| **MQTT 구독 수집기** | [protocols/mqttCollectorClient.ts](backend-node/src/services/collector/protocols/mqttCollectorClient.ts) | 외부 브로커 토픽을 태그로 끌어옴 |
### 5. 외부 DB 연결 관리
[externalDbConnectionService.ts](backend-node/src/services/externalDbConnectionService.ts) + [externalDbHelper.ts](backend-node/src/services/externalDbHelper.ts)
- **PostgreSQL / MySQL/MariaDB / MSSQL / Oracle** 통합 풀
- 비밀번호 AES 암호화 저장 (`PASSWORD_ENCRYPTION_KEY`)
- UI 에서 연결 등록 → 테이블/컬럼 introspection → 매핑 정의 (배치, 수집 타겟 등)
### 6. AI 에이전트 오케스트레이션
`backend-node/src/services/`
| 모듈 | 역할 |
|---|---|
| `aiAgentService.ts` | 에이전트 정의 (모델/프롬프트/툴) |
| `aiAgentGroupService.ts` | 그룹(다중 에이전트 묶음) + 멤버 + 실행 순서 |
| `aiAgentProviderService.ts` | LLM provider (Anthropic / OpenAI / 로컬 등) |
| `aiAgentApiKeyService.ts` | provider API key 보관 (암호화) |
| `aiAgentUsageService.ts` | 토큰/요청 사용량 집계 |
| `aiAgentConversationService.ts` | 대화 히스토리 관리 |
| `multiAgentExecutionEngine.ts` | **그룹 실행 엔진** — 순차/병렬/조건 분기, connector 결과 주입, 토큰 집계 |
| `llmClient.ts` | LLM 호출 래퍼 |
| `aiSchedulerService.ts` | cron 기반 자동 실행 (ai_agent_schedules) |
| `aiAnalysisLogService.ts` | 실행 로그/감사 |
UI: [admin/aiAssistant](frontend/app/(main)/admin/aiAssistant/) (knowledge / workspace / agents / groups …)
### 7. Fleet (엣지 디바이스 관리)
[backend-node/src/fleet/](backend-node/src/fleet/)
- `fleetDeviceService` — 디바이스 등록/heartbeat 수신
- `fleetCommandService` — 원격 명령 (재시작 등) 발행/추적
- `fleetDeploymentService` + `fleetReleaseService` — OTA 배포/릴리스 관리
- `fleetScriptService` — 디바이스 스크립트 동기화
- `fleetAlertRuleService` — heartbeat/메트릭 임계값 알림
- `fleetHarborService` — Harbor 레지스트리 이미지 동기화
### 8. Python Hook (수집 후처리)
[pythonHookRunner.ts](backend-node/src/services/collector/pythonHookRunner.ts) + [scriptCache.ts](backend-node/src/services/collector/scriptCache.ts)
- `transform`(태그 단위) → `filter``derived_tags` 순으로 적용
- 사용자가 작성한 Python 스니펫을 sandbox 실행 (timeout/메모리 제한)
- 스크립트 priority 로 실행 순서 제어
### 9. 배치 (외부 API/DB → 내부 동기화)
`batch_configs` + `batch_mappings` + [batchSchedulerService.ts](backend-node/src/services/batchSchedulerService.ts)
- REST API / 외부 DB → 내부 테이블로 column 매핑 일괄 import
- save_mode: `INSERT` / `UPSERT` (conflict_key 지정)
- cron 스케줄 + 수동 실행
- 실 사용 예: AAS 장비목록·자산 토폴로지 동기화, 환율, 기상특보
### 10. 데이터플로 (시각 워크플로)
`flow_definitions` + [flowExecutionService.ts](backend-node/src/services/flowExecutionService.ts)
- 노드 기반 데이터 이동/조건/변환
- `dataflowDiagramService.ts` — 다이어그램 저장
- `enhancedDataflowControlService.ts` — 단계 제어
- AI 그룹 실행에서도 connector 로 호출 가능
---
## 기술 스택
### Frontend
- **Next.js** (App Router, Turbopack) + **TypeScript**
- **shadcn/ui** + **Radix UI** + Tailwind CSS
- TanStack Query + React Context
- Lucide React 아이콘
### Backend
- **Node.js 20+** / **TypeScript**
- **Express** + express-rate-limit + express-async-errors
- **PostgreSQL** (`pg`) — 메인 메타데이터 DB
- **mssql / mysql2 / oracledb (lazy)** — 외부 DB 통합
- **mqtt** + **aedes** — 클라이언트 + 내장 브로커
- **dockerode** — Docker 컨테이너 제어 (Fleet 배포)
- **bcryptjs / jsonwebtoken** — 인증
- **AES** 암호화 — DB 비번/API key 보호
### 인프라
- **Docker** + Docker Compose (개발/프로덕션 분리)
- **Harbor** 프라이빗 레지스트리 (`harbor.wace.me/vexplor_fleet`)
- **Watchtower** — 엣지 자동 이미지 업데이트
- **TimescaleDB** — 시계열 적재 (IDC NodePort `:30543`)
---
## 디렉터리 구조
```
.
├── backend-node/
│ └── src/
│ ├── app.ts # Express 부트스트랩 + 부팅 시 폴링/리포터/배치 기동
│ ├── routes/ # 100+ 라우터 (도메인별)
│ ├── controllers/
│ ├── services/
│ │ ├── collector/ # 장비 수집 (XGT/Modbus/OPCUA/S7/MQTT/MSSQL)
│ │ │ ├── deviceCollectorService.ts
│ │ │ ├── edgeStatusReporter.ts
│ │ │ ├── centralMqttForwarder.ts
│ │ │ └── protocols/
│ │ ├── aiAgent*.ts # AI 에이전트 묶음
│ │ ├── multiAgentExecutionEngine.ts
│ │ ├── batchSchedulerService.ts
│ │ ├── flow*.ts # 데이터플로
│ │ └── externalDb*.ts # 외부 DB 통합
│ ├── fleet/ # 엣지 디바이스 관리
│ ├── database/
│ └── utils/
├── frontend/
│ ├── app/(main)/admin/ # 관리자 페이지
│ │ ├── pipeline-device/ # 장비 통신 설정
│ │ ├── automaticMng/ # 배치/대시보드/포워더/장비상태
│ │ ├── aiAssistant/ # AI 에이전트
│ │ └── fleet/ # Fleet 관리
│ ├── components/
│ └── lib/api/ # 백엔드 API 클라이언트
├── docker/
│ ├── dev/ # 로컬 개발 compose
│ └── edge/ # 엣지 배포 compose + Dockerfile
└── docs/ # 운영 가이드
```
---
## 빠른 시작 (로컬 개발)
```bash
# 1. 백엔드
cd backend-node
cp .env.example .env # DATABASE_URL, JWT_SECRET, PASSWORD_ENCRYPTION_KEY 설정
npm install
npm run dev # nodemon, ts-node, hot reload
# 2. 프론트엔드 (별도 터미널)
cd frontend
npm install
npm run dev # http://localhost:9771
# OR Docker Compose (둘 다 한 번에)
docker compose -f docker/dev/docker-compose.backend.mac.yml up -d
docker compose -f docker/dev/docker-compose.frontend.mac.yml up -d
```
기본 접속: `http://localhost:9771` (프론트) / `http://localhost:8080/api` (백엔드)
### 주요 환경변수
| 변수 | 설명 |
|---|---|
| `DATABASE_URL` | Pipeline 메타 PostgreSQL 연결 |
| `JWT_SECRET` | JWT 서명 키 |
| `PASSWORD_ENCRYPTION_KEY` | 외부 DB 비번/API key AES 키 (32 bytes) |
| `CORS_ORIGIN` | 허용 origin (콤마 구분) |
| `MQTT_PORT` / `MQTT_WS_PORT` | 내장 브로커 포트 (기본 1883/8083) |
| `ENABLE_AUTO_COLLECTOR` | 부팅 시 활성 연결 자동 폴링 시작 (`true/false`) |
| `PIPELINE_EDGE_REPORTER` | edge_status/edge_events 자동 적재 활성 |
| `PIPELINE_EDGE_*` | edge_id, company_id/uuid, target_db_id, table, interval |
---
## 엣지 배포
```bash
# 이미지 빌드 + Harbor push (amd64)
docker buildx build --platform linux/amd64 \
-f docker/edge/Dockerfile.backend.prod \
-t harbor.wace.me/vexplor_fleet/pipeline-backend:latest --push ./backend-node
# 엣지 서버에서
cd /home/wace/pipeline-edge
docker compose -f docker-compose.edge.yml pull
docker compose -f docker-compose.edge.yml up -d
```
`docker/edge/docker-compose.edge.yml` 에 backend + front + 내장 MQTT 브로커 포트(1883/8083) 노출.
`watchtower` 가 Harbor 폴링하면서 새 이미지 자동 pull + recreate.
---
## 데이터 흐름 (실제 운영)
```
PLC/센서/외부 DB
▼ (다중 프로토콜 수집)
pipeline-backend (엣지)
├──→ MQTT 발행 (내부/UI 실시간 스트리밍)
├──→ pipeline_collected_data (메타 DB 시계열 원본)
├──→ Python 훅 (transform/filter/derived)
├──→ central MQTT (옵션, 외부 시스템 호환)
└──→ IDC TimescaleDB (edge_telemetry / _1) ◀── 핵심 적재 경로
실패 시 → pipeline_target_retry_queue → 30s 워커 재시도
별도 경로:
edge_status_1 ◀── 60초 heartbeat
edge_events_1 ◀── PLC 연결 전이 + 기동/종료 이벤트
```
---
## 주요 DB 테이블
| 영역 | 테이블 |
|---|---|
| 장비 통신 | `pipeline_device_connections`, `pipeline_tag_mappings`, `pipeline_equipment` |
| 수집 결과 | `pipeline_collected_data`, `pipeline_target_retry_queue`, `pipeline_collector_watermark` |
| 외부 연동 | `external_db_connections`, `external_rest_api_connections`, `flow_external_db_connection` |
| 배치 | `batch_configs`, `batch_mappings`, `batch_execution_logs` |
| 데이터플로 | `flow_definitions`, `flow_connections`, `flow_executions` |
| AI | `ai_agents`, `ai_agent_groups`, `ai_agent_group_members`, `ai_agent_schedules`, `ai_analysis_logs` |
| Fleet | `fleet_devices`, `fleet_heartbeats`, `fleet_commands`, `fleet_releases`, `fleet_deployments` |
| 멀티테넌시 | `user_info`, `user_dept`, `menu_info`, `rel_menu_auth` |
---
## 인증 / 권한
- JWT 토큰 (Access 24h / Refresh 7d 기본)
- bcrypt 비번 해싱
- 회사 격리 (`company_code`), `*` = 슈퍼관리자
- 라우트 단위 미들웨어 (`authMiddleware`) + 메뉴 권한 (`rel_menu_auth`)