Files
pipeline/backend-node/src/services/batchSchedulerService.ts
T
chpark 4c1dc4082e
Build and Push Images / build-and-push (push) Has been cancelled
feat: Fleet/Collector/엣지 배포 관련 누적 작업 일괄 커밋
이전 세션들에서 작업된 아래 범위를 모두 포함:

Fleet 서브시스템 (src/fleet/)
- fleetDeviceService / fleetCommandService / fleetDeploymentService / fleetReleaseService
- fleetMetricsService, fleetScriptService, fleetEdgeConfigService
- Edge 디바이스 관리, 커맨드 발행, 배포/릴리스, 스크립트 동기화

Collector 확장
- centralMqttForwarder / centralForwarderConfigService
- equipmentStateService, pythonHookRunner, scriptCache
- Modbus/OPC-UA/S7/XGT 프로토콜 클라이언트
- targetDbIntrospection (저장 DB 조회)

Routes / API
- automationDashboardRoutes, centralForwarderRoutes, equipmentStateRoutes

DB
- importEdgeConfig (Python cached config → Pipeline DB)
- seedDataSources (external_db_connections 초기 시드)

엣지 배포 리소스
- docker/edge/Dockerfile.backend.prod, Dockerfile.frontend.prod
- docker/edge/docker-compose.edge.yml

프론트엔드
- admin/automaticMng (centralForwarder, dashboard, equipmentState)
- admin/fleet (commands, devices, deployments, releases, scripts, alerts)
- admin/pipeline-device 개선 (저장 DB 드롭다운, 태그 매핑 등)
- ExternalDbConnectionModal, ScriptsManagerDialog 등 신규 컴포넌트
- lib/api: automationDashboard, centralForwarder, equipmentState, fleet

docs/
- EDGE_SERVER_STRUCTURE, FLEET_COMPLETE, FLEET_EDGE_INTEGRATION, FLEET_HOOK_INTEGRATION

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 20:00:06 +09:00

767 lines
29 KiB
TypeScript

import cron, { ScheduledTask } from "node-cron";
import { BatchService } from "./batchService";
import { BatchExecutionLogService } from "./batchExecutionLogService";
import { logger } from "../utils/logger";
import { query } from "../database/db";
export class BatchSchedulerService {
private static scheduledTasks: Map<number, ScheduledTask> = new Map();
/**
* 모든 활성 배치의 스케줄링 초기화
*/
static async initializeScheduler() {
try {
logger.info("배치 스케줄러 초기화 시작");
const batchConfigsResponse = await BatchService.getBatchConfigs({
is_active: "Y",
});
if (!batchConfigsResponse.success || !batchConfigsResponse.data) {
logger.warn("스케줄링할 활성 배치 설정이 없습니다.");
return;
}
const batchConfigs = batchConfigsResponse.data;
logger.info(`${batchConfigs.length}개의 배치 설정 스케줄링 등록`);
for (const config of batchConfigs) {
await this.scheduleBatch(config);
}
logger.info("배치 스케줄러 초기화 완료");
} catch (error) {
logger.error("배치 스케줄러 초기화 중 오류 발생:", error);
}
}
/**
* 개별 배치 작업 스케줄링
*/
static async scheduleBatch(config: any) {
try {
// 기존 스케줄이 있으면 제거
if (this.scheduledTasks.has(config.id)) {
this.scheduledTasks.get(config.id)?.stop();
this.scheduledTasks.delete(config.id);
}
if (config.is_active !== "Y") {
logger.info(
`배치 스케줄링 건너뜀 (비활성 상태): ${config.batch_name} (ID: ${config.id})`
);
return;
}
if (!cron.validate(config.cron_schedule)) {
logger.error(
`유효하지 않은 Cron 표현식: ${config.cron_schedule} (Batch ID: ${config.id})`
);
return;
}
logger.info(
`배치 스케줄 등록: ${config.batch_name} (ID: ${config.id}, Cron: ${config.cron_schedule})`
);
const task = cron.schedule(
config.cron_schedule,
async () => {
logger.info(
`스케줄에 의한 배치 실행 시작: ${config.batch_name} (ID: ${config.id})`
);
await this.executeBatchConfig(config);
},
{
timezone: "Asia/Seoul", // 한국 시간 기준으로 스케줄 실행
}
);
this.scheduledTasks.set(config.id, task);
} catch (error) {
logger.error(`배치 스케줄링 중 오류 발생 (ID: ${config.id}):`, error);
}
}
/**
* 배치 스케줄 업데이트 (설정 변경 시 호출)
*/
static async updateBatchSchedule(
configId: number,
executeImmediately: boolean = true
) {
try {
const result = await BatchService.getBatchConfigById(configId);
if (!result.success || !result.data) {
// 설정이 없으면 스케줄 제거
if (this.scheduledTasks.has(configId)) {
this.scheduledTasks.get(configId)?.stop();
this.scheduledTasks.delete(configId);
}
return;
}
const config = result.data;
// 스케줄 재등록
await this.scheduleBatch(config);
// 즉시 실행 옵션이 있으면 실행
/*
if (executeImmediately && config.is_active === "Y") {
logger.info(`배치 설정 변경 후 즉시 실행: ${config.batch_name}`);
this.executeBatchConfig(config).catch((err) =>
logger.error(`즉시 실행 중 오류 발생:`, err)
);
}
*/
} catch (error) {
logger.error(`배치 스케줄 업데이트 실패: ID ${configId}`, error);
}
}
/**
* 배치 설정 실행 - execution_type에 따라 매핑 또는 노드 플로우 실행
*/
static async executeBatchConfig(config: any) {
const startTime = new Date();
let executionLog: any = null;
try {
logger.info(`배치 실행 시작: ${config.batch_name} (ID: ${config.id}, type: ${config.execution_type || "mapping"})`);
// 상세 조회 (매핑 또는 노드플로우 정보가 없을 수 있음)
if (!config.execution_type || config.execution_type === "mapping" || config.execution_type === "rest_api_sync") {
if (!config.batch_mappings || config.batch_mappings.length === 0) {
const fullConfig = await BatchService.getBatchConfigById(config.id);
if (fullConfig.success && fullConfig.data) {
config = fullConfig.data;
}
}
}
// 실행 로그 생성
const executionLogResponse =
await BatchExecutionLogService.createExecutionLog({
batch_config_id: config.id,
company_code: config.company_code,
execution_status: "RUNNING",
start_time: startTime,
total_records: 0,
success_records: 0,
failed_records: 0,
});
if (!executionLogResponse.success || !executionLogResponse.data) {
logger.error(
`배치 실행 로그 생성 실패: ${config.batch_name}`,
executionLogResponse.message
);
return {
totalRecords: 0,
successRecords: 0,
failedRecords: 1,
};
}
executionLog = executionLogResponse.data;
let result: { totalRecords: number; successRecords: number; failedRecords: number };
if (config.execution_type === "node_flow") {
result = await this.executeNodeFlow(config);
} else if (config.execution_type === "ai_agent") {
result = await this.executeAiAgent(config);
} else if (config.execution_type === "rest_api_sync") {
// REST API 동기화 (mapping과 동일 로직이지만 타입 구분)
result = await this.executeBatchMappings(config);
} else if (config.execution_type === "device_collection") {
result = await this.executeDeviceCollection(config);
} else if (config.execution_type === "crawling") {
result = await this.executeCrawling(config);
} else {
result = await this.executeBatchMappings(config);
}
// 실행 로그 업데이트 (성공)
await BatchExecutionLogService.updateExecutionLog(executionLog.id, {
execution_status: result.failedRecords > 0 ? "PARTIAL" : "SUCCESS",
end_time: new Date(),
duration_ms: Date.now() - startTime.getTime(),
total_records: result.totalRecords,
success_records: result.successRecords,
failed_records: result.failedRecords,
});
logger.info(
`배치 실행 완료: ${config.batch_name} (처리된 레코드: ${result.totalRecords})`
);
return result;
} catch (error) {
logger.error(`배치 실행 중 오류 발생: ${config.batch_name}`, error);
if (executionLog) {
await BatchExecutionLogService.updateExecutionLog(executionLog.id, {
execution_status: "FAILED",
end_time: new Date(),
duration_ms: Date.now() - startTime.getTime(),
error_message:
error instanceof Error ? error.message : "알 수 없는 오류",
});
}
return {
totalRecords: 0,
successRecords: 0,
failedRecords: 1,
};
}
}
/**
* 노드 플로우 실행 - NodeFlowExecutionService에 위임
*/
private static async executeNodeFlow(config: any) {
if (!config.node_flow_id) {
throw new Error("노드 플로우 ID가 설정되지 않았습니다.");
}
const { NodeFlowExecutionService } = await import(
"./nodeFlowExecutionService"
);
const contextData: Record<string, any> = {
companyCode: config.company_code,
batchConfigId: config.id,
batchName: config.batch_name,
executionSource: "batch_scheduler",
...(config.node_flow_context || {}),
};
logger.info(
`노드 플로우 실행: flowId=${config.node_flow_id}, batch=${config.batch_name}`
);
const flowResult = await NodeFlowExecutionService.executeFlow(
config.node_flow_id,
contextData
);
// 노드 플로우 실행 결과를 배치 로그 형식으로 변환
return {
totalRecords: flowResult.summary.total,
successRecords: flowResult.summary.success,
failedRecords: flowResult.summary.failed,
};
}
/**
* AI 멀티 에이전트 실행 - MultiAgentExecutionEngine에 위임
*/
private static async executeAiAgent(config: any) {
const { MultiAgentExecutionEngine } = await import("./multiAgentExecutionEngine");
const { AiAnalysisLogService } = await import("./aiAnalysisLogService");
const groupId = config.node_flow_context?.ai_group_id || config.ai_group_id;
const inputMessage = config.node_flow_context?.ai_input_message || config.description || "분석을 실행해주세요";
if (!groupId) {
throw new Error("AI 에이전트 그룹 ID가 설정되지 않았습니다.");
}
logger.info(`AI 에이전트 실행: groupId=${groupId}, batch=${config.batch_name}`);
const result = await MultiAgentExecutionEngine.execute(groupId, inputMessage, {
userId: config.created_by || "batch_scheduler",
});
// 알림 발송 (notification 설정이 있으면)
const notification = config.node_flow_context?.notification;
if (notification) {
const title = `[AI] ${config.batch_name} 실행 결과`;
const summary = result.finalSummary.substring(0, 2000);
// 메신저 알림 (시스템 내 채팅)
if (notification.messenger) {
try {
const { query: dbQuery } = await import("../database/db");
const recipients = notification.messenger_recipients || [];
const sender = config.created_by || "system";
const companyCode = config.company_code || "*";
for (const recipientId of recipients) {
// DM 방 찾기 또는 생성
let room = await dbQuery<any>(
`SELECT r.id FROM messenger_rooms r
JOIN messenger_participants p1 ON p1.room_id = r.id AND p1.user_id = $1
JOIN messenger_participants p2 ON p2.room_id = r.id AND p2.user_id = $2
WHERE r.company_code = $3 AND r.room_type = 'dm' LIMIT 1`,
[sender, recipientId, companyCode]
);
let roomId = room?.[0]?.id;
if (!roomId) {
const created = await dbQuery<any>(
`INSERT INTO messenger_rooms (company_code, room_type, created_by) VALUES ($1, 'dm', $2) RETURNING id`,
[companyCode, sender]
);
roomId = created[0].id;
await dbQuery(
`INSERT INTO messenger_participants (room_id, user_id) VALUES ($1, $2), ($1, $3)`,
[roomId, sender, recipientId]
);
}
await dbQuery(
`INSERT INTO messenger_messages (room_id, sender_id, company_code, content, message_type)
VALUES ($1, $2, $3, $4, 'text')`,
[roomId, sender, companyCode, `${title}\n\n${summary}`]
);
await dbQuery(`UPDATE messenger_rooms SET updated_at = NOW() WHERE id = $1`, [roomId]);
}
} catch (e) { logger.warn("메신저 알림 실패:", e); }
}
// 이메일 알림
if (notification.email && Array.isArray(notification.email) && notification.email.length > 0) {
try {
const { mailSendSimpleService } = await import("./mailSendSimpleService");
for (const to of notification.email) {
await mailSendSimpleService.sendMail({
to,
subject: title,
html: `<h3>${config.batch_name}</h3><pre>${summary}</pre>`,
} as any).catch(() => {});
}
} catch (e) { logger.warn("이메일 알림 실패:", e); }
}
// 웹훅
if (notification.webhook) {
try {
const axios = (await import("axios")).default;
await axios.post(notification.webhook, {
text: `🤖 [${config.batch_name}] 실행 완료\n${result.finalSummary.substring(0, 1000)}`,
}, { timeout: 10000 });
} catch { /* ignore */ }
}
}
return {
totalRecords: result.steps.length,
successRecords: result.steps.filter((s) => !s.response.startsWith("[실행 실패]")).length,
failedRecords: result.steps.filter((s) => s.response.startsWith("[실행 실패]")).length,
};
}
/**
* 장비 데이터 수집 실행 — 실제 PLC 통신
*/
private static async executeDeviceCollection(config: any) {
const context = config.node_flow_context || {};
const connectionId = context.device_connection_id;
if (!connectionId) {
throw new Error("장비 연결 ID가 설정되지 않았습니다.");
}
// DeviceCollectorService로 실제 PLC 데이터 수집
const { collectDevice } = await import("./collector/deviceCollectorService");
const result = await collectDevice(connectionId);
const tagCount = Object.keys(result.tags).length;
const successCount = Object.values(result.tags).filter(v => v !== null).length;
const failedCount = tagCount - successCount;
logger.info(
`장비 데이터 수집 완료: ${result.connectionName} (${result.protocol}) - ` +
`${successCount}/${tagCount}개 태그 | PLC: ${result.plcState}`
);
// 대상 테이블에 수집 결과 저장 (설정된 경우)
const targetTable = context.target_table;
if (targetTable) {
try {
await query(
`INSERT INTO ${targetTable} (connection_id, connection_name, collected_at, plc_state, tag_values, error_message)
VALUES ($1, $2, NOW(), $3, $4::jsonb, $5)`,
[connectionId, result.connectionName, result.plcState, JSON.stringify(result.tags), result.errorMessage]
);
} catch (err) {
logger.warn(`수집 결과 저장 실패 (${targetTable}): ${(err as Error).message}`);
}
}
return { totalRecords: tagCount, successRecords: successCount, failedRecords: failedCount };
}
/**
* 크롤링 실행
*/
private static async executeCrawling(config: any) {
const context = config.node_flow_context || {};
const configId = context.crawl_config_id;
if (!configId) {
throw new Error("크롤링 설정 ID가 지정되지 않았습니다.");
}
const crawlConfig = await query<any>("SELECT * FROM crawl_configs WHERE id = $1", [configId]);
if (!crawlConfig.length) throw new Error("크롤링 설정을 찾을 수 없습니다.");
const cfg = crawlConfig[0];
logger.info(`크롤링 실행: ${cfg.name} (${cfg.url})`);
// 간단한 HTTP GET으로 데이터 수집
try {
const axios = (await import("axios")).default;
const response = await axios.get(cfg.url, { timeout: 30000 });
const targetTable = context.target_table;
if (targetTable) {
// 결과를 지정된 테이블에 저장
await query(
`INSERT INTO ${targetTable} (url, content, status_code, crawled_at) VALUES ($1, $2, $3, NOW())`,
[cfg.url, typeof response.data === "string" ? response.data : JSON.stringify(response.data), response.status]
).catch(() => {});
}
return { totalRecords: 1, successRecords: 1, failedRecords: 0 };
} catch (e: any) {
logger.warn(`크롤링 실패: ${cfg.url} - ${e.message}`);
return { totalRecords: 1, successRecords: 0, failedRecords: 1 };
}
}
/**
* 배치 매핑 실행 (수동 실행과 동일한 로직)
*/
private static async executeBatchMappings(config: any) {
let totalRecords = 0;
let successRecords = 0;
let failedRecords = 0;
if (!config.batch_mappings || config.batch_mappings.length === 0) {
logger.warn(`배치 매핑이 없습니다: ${config.batch_name}`);
return { totalRecords, successRecords, failedRecords };
}
// 테이블별로 매핑을 그룹화
// 고정값 매핑(mapping_type === 'fixed')은 별도 그룹으로 분리하지 않고 나중에 처리
const tableGroups = new Map<string, typeof config.batch_mappings>();
const fixedMappingsGlobal: typeof config.batch_mappings = [];
for (const mapping of config.batch_mappings) {
// 고정값 매핑은 별도로 모아둠 (FROM 소스가 필요 없음)
if (mapping.mapping_type === "fixed") {
fixedMappingsGlobal.push(mapping);
continue;
}
const key = `${mapping.from_connection_type}:${mapping.from_connection_id || "internal"}:${mapping.from_table_name}`;
if (!tableGroups.has(key)) {
tableGroups.set(key, []);
}
tableGroups.get(key)!.push(mapping);
}
// 고정값 매핑만 있고 일반 매핑이 없는 경우 처리
if (tableGroups.size === 0 && fixedMappingsGlobal.length > 0) {
logger.warn(
`일반 매핑이 없고 고정값 매핑만 있습니다. 고정값만으로는 배치를 실행할 수 없습니다.`
);
return { totalRecords, successRecords, failedRecords };
}
// 각 테이블 그룹별로 처리
for (const [tableKey, mappings] of tableGroups) {
try {
const firstMapping = mappings[0];
logger.info(
`테이블 처리 시작: ${tableKey} -> ${mappings.length}개 컬럼 매핑`
);
let fromData: any[] = [];
// FROM 데이터 조회 (DB 또는 REST API)
if (firstMapping.from_connection_type === "restapi") {
// from_api_url이 없으면 external_rest_api_connections에서 조회
let apiUrl = firstMapping.from_api_url;
let apiMethod = firstMapping.from_api_method;
let apiKey = firstMapping.from_api_key || "";
if (!apiUrl && firstMapping.from_connection_id) {
const connRes = await query<any>(
`SELECT base_url, endpoint_path, default_method, auth_type, auth_config
FROM external_rest_api_connections WHERE id = $1`,
[firstMapping.from_connection_id]
);
if (connRes.length > 0) {
const conn = connRes[0];
const base = (conn.base_url || "").replace(/\/$/, "");
const path = conn.endpoint_path || "";
apiUrl = base + (path.startsWith("/") ? path : `/${path}`);
apiMethod = conn.default_method || "GET";
if (conn.auth_type === "bearer" && conn.auth_config?.token) {
apiKey = conn.auth_config.token;
} else if (conn.auth_type === "apikey" && conn.auth_config?.key) {
apiKey = conn.auth_config.key;
}
logger.info(`API 연결 조회 성공: ${apiUrl} (method: ${apiMethod})`);
}
}
// REST API에서 데이터 조회
logger.info(
`REST API에서 데이터 조회: ${apiUrl}`
);
const { BatchExternalDbService } = await import(
"./batchExternalDbService"
);
// auth_service_name이 설정된 경우 auth_tokens에서 토큰 조회 (멀티테넌시 적용)
if (config.auth_service_name) {
let tokenQuery: string;
let tokenParams: any[];
if (config.company_code === "*") {
// 최고 관리자 배치: 모든 회사 토큰 조회 가능
tokenQuery = `SELECT access_token FROM auth_tokens
WHERE service_name = $1
ORDER BY created_date DESC LIMIT 1`;
tokenParams = [config.auth_service_name];
} else {
// 일반 회사 배치: 자신의 회사 토큰만 조회
tokenQuery = `SELECT access_token FROM auth_tokens
WHERE service_name = $1 AND company_code = $2
ORDER BY created_date DESC LIMIT 1`;
tokenParams = [config.auth_service_name, config.company_code];
}
const tokenResult = await query<{ access_token: string }>(
tokenQuery,
tokenParams
);
if (tokenResult.length > 0 && tokenResult[0].access_token) {
apiKey = tokenResult[0].access_token;
logger.info(
`auth_tokens에서 토큰 조회 성공: ${config.auth_service_name}`
);
} else {
logger.warn(
`auth_tokens에서 토큰을 찾을 수 없음: ${config.auth_service_name}`
);
}
}
// 👇 Body 파라미터 추가 (POST 요청 시)
const apiResult = await BatchExternalDbService.getDataFromRestApi(
apiUrl!,
apiKey,
firstMapping.from_table_name,
(apiMethod as "GET" | "POST" | "PUT" | "DELETE") || "GET",
mappings.map((m: any) => m.from_column_name),
100, // limit
// 파라미터 정보 전달
firstMapping.from_api_param_type,
firstMapping.from_api_param_name,
firstMapping.from_api_param_value,
firstMapping.from_api_param_source,
// 👇 Body 전달 (FROM - REST API - POST 요청)
firstMapping.from_api_body
);
if (apiResult.success && apiResult.data) {
// apiResult.data가 이미 배열 형태(BatchExternalDbService가 뽑아낸 레코드)면 그대로 사용
// 객체 형태(API 응답 원본)면 data_array_path로 추출
if (Array.isArray(apiResult.data) && apiResult.data.length > 0 && apiResult.data[0] && typeof apiResult.data[0] === "object" && !Array.isArray(apiResult.data[0])) {
// 이미 레코드 배열 형태
fromData = apiResult.data;
logger.info(`REST API에서 ${fromData.length}개 레코드 수신 (배열 형태)`);
} else if (config.data_array_path) {
// 원본 응답 객체에서 경로로 배열 추출
const extractArrayByPath = (obj: any, path: string): any[] => {
if (!path) return Array.isArray(obj) ? obj : [obj];
const keys = path.split(".");
let current = obj;
for (const key of keys) {
if (current === null || current === undefined) return [];
current = current[key];
}
return Array.isArray(current)
? current
: current
? [current]
: [];
};
const rawData =
Array.isArray(apiResult.data) && apiResult.data.length === 1
? apiResult.data[0]
: apiResult.data;
fromData = extractArrayByPath(rawData, config.data_array_path);
logger.info(
`데이터 배열 경로 '${config.data_array_path}'에서 ${fromData.length}개 레코드 추출`
);
} else {
fromData = Array.isArray(apiResult.data) ? apiResult.data : [apiResult.data];
}
} else {
throw new Error(`REST API 데이터 조회 실패: ${apiResult.message}`);
}
} else {
// DB에서 데이터 조회
const fromColumns = mappings.map((m: any) => m.from_column_name);
fromData = await BatchService.getDataFromTableWithColumns(
firstMapping.from_table_name,
fromColumns,
firstMapping.from_connection_type as "internal" | "external",
firstMapping.from_connection_id || undefined
);
}
totalRecords += fromData.length;
// 컬럼 매핑 적용하여 TO 테이블 형식으로 변환
// 유틸리티 함수: 점 표기법을 사용하여 중첩된 객체 값 가져오기
const getValueByPath = (obj: any, path: string) => {
if (!path) return undefined;
// path가 'response.access_token' 처럼 점을 포함하는 경우
if (path.includes(".")) {
return path.split(".").reduce((acc, part) => acc && acc[part], obj);
}
// 단순 키인 경우
return obj[path];
};
const mappedData = fromData.map((row) => {
const mappedRow: any = {};
for (const mapping of mappings) {
// 고정값 매핑은 이미 분리되어 있으므로 여기서는 처리하지 않음
if (mapping.mapping_type === "fixed") {
continue;
}
// DB → REST API 배치인지 확인
if (
firstMapping.to_connection_type === "restapi" &&
mapping.to_api_body
) {
// DB → REST API: 원본 컬럼명을 키로 사용 (템플릿 처리용)
mappedRow[mapping.from_column_name] =
row[mapping.from_column_name];
} else {
// REST API -> DB (POST 요청 포함) 또는 DB -> DB
// row[mapping.from_column_name] 대신 getValueByPath 사용
const value = getValueByPath(row, mapping.from_column_name);
mappedRow[mapping.to_column_name] = value;
}
}
// 고정값 매핑 적용 (전역으로 분리된 fixedMappingsGlobal 사용)
for (const fixedMapping of fixedMappingsGlobal) {
// from_column_name에 고정값이 저장되어 있음
mappedRow[fixedMapping.to_column_name] =
fixedMapping.from_column_name;
}
// 멀티테넌시: TO가 DB일 때 company_code 자동 주입
// - 배치 설정에 company_code가 있고
// - 매핑에서 company_code를 명시적으로 다루지 않은 경우만
if (
firstMapping.to_connection_type !== "restapi" &&
config.company_code &&
mappedRow.company_code === undefined
) {
mappedRow.company_code = config.company_code;
}
return mappedRow;
});
// TO 테이블에 데이터 삽입 (DB 또는 REST API)
let insertResult: { successCount: number; failedCount: number };
if (firstMapping.to_connection_type === "restapi") {
// REST API로 데이터 전송
logger.info(
`REST API로 데이터 전송: ${firstMapping.to_api_url}${firstMapping.to_table_name}`
);
const { BatchExternalDbService } = await import(
"./batchExternalDbService"
);
// DB → REST API 배치인지 확인 (to_api_body가 있으면 템플릿 기반)
const hasTemplate = mappings.some((m: any) => m.to_api_body);
if (hasTemplate) {
// 템플릿 기반 REST API 전송 (DB → REST API 배치)
const templateBody = firstMapping.to_api_body || "{}";
logger.info(
`템플릿 기반 REST API 전송, Request Body 템플릿: ${templateBody}`
);
// URL 경로 컬럼 찾기 (PUT/DELETE용)
const urlPathColumn = mappings.find(
(m: any) => m.to_column_name === "URL_PATH_PARAM"
)?.from_column_name;
const apiResult =
await BatchExternalDbService.sendDataToRestApiWithTemplate(
firstMapping.to_api_url!,
firstMapping.to_api_key!,
firstMapping.to_table_name,
(firstMapping.to_api_method as "POST" | "PUT" | "DELETE") ||
"POST",
templateBody,
mappedData,
urlPathColumn
);
if (apiResult.success && apiResult.data) {
insertResult = apiResult.data;
} else {
throw new Error(
`템플릿 기반 REST API 데이터 전송 실패: ${apiResult.message}`
);
}
} else {
// 기존 REST API 전송 (REST API → DB 배치) - 사실 이 경우는 거의 없음 (REST to REST)
// 지원하지 않음
logger.warn(
"REST API -> REST API (단순 매핑)은 아직 지원하지 않습니다."
);
insertResult = { successCount: 0, failedCount: 0 };
}
} else {
// DB에 데이터 삽입 (save_mode, conflict_key 지원)
insertResult = await BatchService.insertDataToTable(
firstMapping.to_table_name,
mappedData,
firstMapping.to_connection_type as "internal" | "external",
firstMapping.to_connection_id || undefined,
(config.save_mode as "INSERT" | "UPSERT") || "INSERT",
config.conflict_key || undefined
);
}
successRecords += insertResult.successCount;
failedRecords += insertResult.failedCount;
} catch (error) {
logger.error(`테이블 처리 중 오류 발생: ${tableKey}`, error);
// 해당 테이블 처리 실패는 전체 실패로 간주하지 않고, 실패 카운트만 증가?
// 여기서는 일단 실패 로그만 남기고 계속 진행 (필요시 정책 변경)
}
}
return { totalRecords, successRecords, failedRecords };
}
/**
* 개별 배치 작업 스케줄링 (scheduleBatch의 별칭)
*/
static async scheduleBatchConfig(config: any) {
return this.scheduleBatch(config);
}
}