Enhance batch management functionality by adding node flow execution support and improving batch configuration creation. Introduce new API endpoint for retrieving node flows and update existing batch services to handle execution types. Update frontend components to support new scheduling options and node flow selection.

This commit is contained in:
DDD1542
2026-03-19 15:07:07 +09:00
parent 7f781b0177
commit 43cf91e748
41 changed files with 4020 additions and 3155 deletions
@@ -126,29 +126,41 @@ export class BatchManagementController {
*/
static async createBatchConfig(req: AuthenticatedRequest, res: Response) {
try {
const { batchName, description, cronSchedule, mappings, isActive } =
req.body;
const {
batchName, description, cronSchedule, mappings, isActive,
executionType, nodeFlowId, nodeFlowContext,
} = req.body;
const companyCode = req.user?.companyCode;
if (
!batchName ||
!cronSchedule ||
!mappings ||
!Array.isArray(mappings)
) {
if (!batchName || !cronSchedule) {
return res.status(400).json({
success: false,
message:
"필수 필드가 누락되었습니다. (batchName, cronSchedule, mappings)",
message: "필수 필드가 누락되었습니다. (batchName, cronSchedule)",
});
}
const batchConfig = await BatchService.createBatchConfig({
batchName,
description,
cronSchedule,
mappings,
isActive: isActive !== undefined ? isActive : true,
} as CreateBatchConfigRequest);
// 노드 플로우 타입은 매핑 없이 생성 가능
if (executionType !== "node_flow" && (!mappings || !Array.isArray(mappings))) {
return res.status(400).json({
success: false,
message: "매핑 타입은 mappings 배열이 필요합니다.",
});
}
const batchConfig = await BatchService.createBatchConfig(
{
batchName,
description,
cronSchedule,
mappings: mappings || [],
isActive: isActive === false || isActive === "N" ? "N" : "Y",
companyCode: companyCode || "",
executionType: executionType || "mapping",
nodeFlowId: nodeFlowId || null,
nodeFlowContext: nodeFlowContext || null,
} as CreateBatchConfigRequest,
req.user?.userId
);
return res.status(201).json({
success: true,
@@ -769,6 +781,55 @@ export class BatchManagementController {
}
}
/**
* 노드 플로우 목록 조회 (배치 설정에서 플로우 선택용)
* GET /api/batch-management/node-flows
*/
static async getNodeFlows(req: AuthenticatedRequest, res: Response) {
try {
const companyCode = req.user?.companyCode;
let flowQuery: string;
let flowParams: any[] = [];
if (companyCode === "*") {
flowQuery = `
SELECT flow_id, flow_name, flow_description AS description, company_code,
COALESCE(jsonb_array_length(
CASE WHEN flow_data IS NOT NULL AND flow_data::text != ''
THEN (flow_data::jsonb -> 'nodes')
ELSE '[]'::jsonb END
), 0) AS node_count
FROM node_flows
ORDER BY flow_name
`;
} else {
flowQuery = `
SELECT flow_id, flow_name, flow_description AS description, company_code,
COALESCE(jsonb_array_length(
CASE WHEN flow_data IS NOT NULL AND flow_data::text != ''
THEN (flow_data::jsonb -> 'nodes')
ELSE '[]'::jsonb END
), 0) AS node_count
FROM node_flows
WHERE company_code = $1
ORDER BY flow_name
`;
flowParams = [companyCode];
}
const result = await query(flowQuery, flowParams);
return res.json({ success: true, data: result });
} catch (error) {
console.error("노드 플로우 목록 조회 오류:", error);
return res.status(500).json({
success: false,
message: "노드 플로우 목록 조회 실패",
error: error instanceof Error ? error.message : "알 수 없는 오류",
});
}
}
/**
* 배치 대시보드 통계 조회
* GET /api/batch-management/stats
@@ -14,6 +14,12 @@ const router = Router();
*/
router.get("/stats", authenticateToken, BatchManagementController.getBatchStats);
/**
* GET /api/batch-management/node-flows
* 배치 설정에서 노드 플로우 선택용 목록 조회
*/
router.get("/node-flows", authenticateToken, BatchManagementController.getNodeFlows);
/**
* GET /api/batch-management/connections
* 사용 가능한 커넥션 목록 조회
+56 -3
View File
@@ -13,7 +13,54 @@ import { auditLogService, getClientIp } from "../../services/auditLogService";
const router = Router();
/**
* 플로우 목록 조회
* flow_data에서 요약 정보 추출
*/
function extractFlowSummary(flowData: any) {
try {
const parsed = typeof flowData === "string" ? JSON.parse(flowData) : flowData;
const nodes = parsed?.nodes || [];
const edges = parsed?.edges || [];
const nodeTypes: Record<string, number> = {};
nodes.forEach((n: any) => {
const t = n.type || "unknown";
nodeTypes[t] = (nodeTypes[t] || 0) + 1;
});
// 미니 토폴로지용 간소화된 좌표 (0~1 정규화)
let topology = null;
if (nodes.length > 0) {
const xs = nodes.map((n: any) => n.position?.x || 0);
const ys = nodes.map((n: any) => n.position?.y || 0);
const minX = Math.min(...xs), maxX = Math.max(...xs);
const minY = Math.min(...ys), maxY = Math.max(...ys);
const rangeX = maxX - minX || 1;
const rangeY = maxY - minY || 1;
topology = {
nodes: nodes.map((n: any) => ({
id: n.id,
type: n.type,
x: (((n.position?.x || 0) - minX) / rangeX),
y: (((n.position?.y || 0) - minY) / rangeY),
})),
edges: edges.map((e: any) => [e.source, e.target]),
};
}
return {
nodeCount: nodes.length,
edgeCount: edges.length,
nodeTypes,
topology,
};
} catch {
return { nodeCount: 0, edgeCount: 0, nodeTypes: {}, topology: null };
}
}
/**
* 플로우 목록 조회 (summary 포함)
*/
router.get("/", async (req: AuthenticatedRequest, res: Response) => {
try {
@@ -24,6 +71,7 @@ router.get("/", async (req: AuthenticatedRequest, res: Response) => {
flow_id as "flowId",
flow_name as "flowName",
flow_description as "flowDescription",
flow_data as "flowData",
company_code as "companyCode",
created_at as "createdAt",
updated_at as "updatedAt"
@@ -32,7 +80,6 @@ router.get("/", async (req: AuthenticatedRequest, res: Response) => {
const params: any[] = [];
// 슈퍼 관리자가 아니면 회사별 필터링
if (userCompanyCode && userCompanyCode !== "*") {
sqlQuery += ` WHERE company_code = $1`;
params.push(userCompanyCode);
@@ -42,9 +89,15 @@ router.get("/", async (req: AuthenticatedRequest, res: Response) => {
const flows = await query(sqlQuery, params);
const flowsWithSummary = flows.map((flow: any) => {
const summary = extractFlowSummary(flow.flowData);
const { flowData, ...rest } = flow;
return { ...rest, summary };
});
return res.json({
success: true,
data: flows,
data: flowsWithSummary,
});
} catch (error) {
logger.error("플로우 목록 조회 실패:", error);
@@ -122,20 +122,22 @@ export class BatchSchedulerService {
}
/**
* 배치 설정 실행
* 배치 설정 실행 - execution_type에 따라 매핑 또는 노드 플로우 실행
*/
static async executeBatchConfig(config: any) {
const startTime = new Date();
let executionLog: any = null;
try {
logger.info(`배치 실행 시작: ${config.batch_name} (ID: ${config.id})`);
logger.info(`배치 실행 시작: ${config.batch_name} (ID: ${config.id}, type: ${config.execution_type || "mapping"})`);
// 매핑 정보가 없으면 상세 조회로 다시 가져오기
if (!config.batch_mappings || config.batch_mappings.length === 0) {
const fullConfig = await BatchService.getBatchConfigById(config.id);
if (fullConfig.success && fullConfig.data) {
config = fullConfig.data;
// 상세 조회 (매핑 또는 노드플로우 정보가 없을 수 있음)
if (!config.execution_type || config.execution_type === "mapping") {
if (!config.batch_mappings || config.batch_mappings.length === 0) {
const fullConfig = await BatchService.getBatchConfigById(config.id);
if (fullConfig.success && fullConfig.data) {
config = fullConfig.data;
}
}
}
@@ -165,12 +167,17 @@ export class BatchSchedulerService {
executionLog = executionLogResponse.data;
// 실제 배치 실행 로직 (수동 실행과 동일한 로직 사용)
const result = await this.executeBatchMappings(config);
let result: { totalRecords: number; successRecords: number; failedRecords: number };
if (config.execution_type === "node_flow") {
result = await this.executeNodeFlow(config);
} else {
result = await this.executeBatchMappings(config);
}
// 실행 로그 업데이트 (성공)
await BatchExecutionLogService.updateExecutionLog(executionLog.id, {
execution_status: "SUCCESS",
execution_status: result.failedRecords > 0 ? "PARTIAL" : "SUCCESS",
end_time: new Date(),
duration_ms: Date.now() - startTime.getTime(),
total_records: result.totalRecords,
@@ -182,12 +189,10 @@ export class BatchSchedulerService {
`배치 실행 완료: ${config.batch_name} (처리된 레코드: ${result.totalRecords})`
);
// 성공 결과 반환
return result;
} catch (error) {
logger.error(`배치 실행 중 오류 발생: ${config.batch_name}`, error);
// 실행 로그 업데이트 (실패)
if (executionLog) {
await BatchExecutionLogService.updateExecutionLog(executionLog.id, {
execution_status: "FAILED",
@@ -198,7 +203,6 @@ export class BatchSchedulerService {
});
}
// 실패 결과 반환
return {
totalRecords: 0,
successRecords: 0,
@@ -207,6 +211,43 @@ export class BatchSchedulerService {
}
}
/**
* 노드 플로우 실행 - NodeFlowExecutionService에 위임
*/
private static async executeNodeFlow(config: any) {
if (!config.node_flow_id) {
throw new Error("노드 플로우 ID가 설정되지 않았습니다.");
}
const { NodeFlowExecutionService } = await import(
"./nodeFlowExecutionService"
);
const contextData: Record<string, any> = {
companyCode: config.company_code,
batchConfigId: config.id,
batchName: config.batch_name,
executionSource: "batch_scheduler",
...(config.node_flow_context || {}),
};
logger.info(
`노드 플로우 실행: flowId=${config.node_flow_id}, batch=${config.batch_name}`
);
const flowResult = await NodeFlowExecutionService.executeFlow(
config.node_flow_id,
contextData
);
// 노드 플로우 실행 결과를 배치 로그 형식으로 변환
return {
totalRecords: flowResult.summary.total,
successRecords: flowResult.summary.success,
failedRecords: flowResult.summary.failed,
};
}
/**
* 배치 매핑 실행 (수동 실행과 동일한 로직)
*/
+26 -7
View File
@@ -72,9 +72,12 @@ export class BatchService {
const total = parseInt(countResult[0].count);
const totalPages = Math.ceil(total / limit);
// 목록 조회
// 목록 조회 (최근 실행 정보 포함)
const configs = await query<any>(
`SELECT bc.*
`SELECT bc.*,
(SELECT bel.execution_status FROM batch_execution_logs bel WHERE bel.batch_config_id = bc.id ORDER BY bel.start_time DESC LIMIT 1) as last_status,
(SELECT bel.start_time FROM batch_execution_logs bel WHERE bel.batch_config_id = bc.id ORDER BY bel.start_time DESC LIMIT 1) as last_executed_at,
(SELECT bel.total_records FROM batch_execution_logs bel WHERE bel.batch_config_id = bc.id ORDER BY bel.start_time DESC LIMIT 1) as last_total_records
FROM batch_configs bc
${whereClause}
ORDER BY bc.created_date DESC
@@ -82,9 +85,6 @@ export class BatchService {
[...values, limit, offset]
);
// 매핑 정보 조회 (N+1 문제 해결을 위해 별도 쿼리 대신 여기서는 생략하고 상세 조회에서 처리)
// 하지만 목록에서도 간단한 정보는 필요할 수 있음
return {
success: true,
data: configs as BatchConfig[],
@@ -176,8 +176,8 @@ export class BatchService {
// 배치 설정 생성
const batchConfigResult = await client.query(
`INSERT INTO batch_configs
(batch_name, description, cron_schedule, is_active, company_code, save_mode, conflict_key, auth_service_name, data_array_path, created_by, created_date, updated_date)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW(), NOW())
(batch_name, description, cron_schedule, is_active, company_code, save_mode, conflict_key, auth_service_name, data_array_path, execution_type, node_flow_id, node_flow_context, created_by, created_date, updated_date)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, NOW(), NOW())
RETURNING *`,
[
data.batchName,
@@ -189,6 +189,9 @@ export class BatchService {
data.conflictKey || null,
data.authServiceName || null,
data.dataArrayPath || null,
data.executionType || "mapping",
data.nodeFlowId || null,
data.nodeFlowContext ? JSON.stringify(data.nodeFlowContext) : null,
userId,
]
);
@@ -332,6 +335,22 @@ export class BatchService {
updateFields.push(`data_array_path = $${paramIndex++}`);
updateValues.push(data.dataArrayPath || null);
}
if (data.executionType !== undefined) {
updateFields.push(`execution_type = $${paramIndex++}`);
updateValues.push(data.executionType);
}
if (data.nodeFlowId !== undefined) {
updateFields.push(`node_flow_id = $${paramIndex++}`);
updateValues.push(data.nodeFlowId || null);
}
if (data.nodeFlowContext !== undefined) {
updateFields.push(`node_flow_context = $${paramIndex++}`);
updateValues.push(
data.nodeFlowContext
? JSON.stringify(data.nodeFlowContext)
: null
);
}
// 배치 설정 업데이트
const batchConfigResult = await client.query(
+21 -6
View File
@@ -79,6 +79,9 @@ export interface BatchMapping {
created_date?: Date;
}
// 배치 실행 타입: 기존 매핑 방식 또는 노드 플로우 실행
export type BatchExecutionType = "mapping" | "node_flow";
// 배치 설정 타입
export interface BatchConfig {
id?: number;
@@ -87,15 +90,21 @@ export interface BatchConfig {
cron_schedule: string;
is_active: "Y" | "N";
company_code?: string;
save_mode?: "INSERT" | "UPSERT"; // 저장 모드 (기본: INSERT)
conflict_key?: string; // UPSERT 시 충돌 기준 컬럼명
auth_service_name?: string; // REST API 인증에 사용할 토큰 서비스명
data_array_path?: string; // REST API 응답에서 데이터 배열 경로 (예: response, data.items)
save_mode?: "INSERT" | "UPSERT";
conflict_key?: string;
auth_service_name?: string;
data_array_path?: string;
execution_type?: BatchExecutionType;
node_flow_id?: number;
node_flow_context?: Record<string, any>;
created_by?: string;
created_date?: Date;
updated_by?: string;
updated_date?: Date;
batch_mappings?: BatchMapping[];
last_status?: string;
last_executed_at?: string;
last_total_records?: number;
}
export interface BatchConnectionInfo {
@@ -149,7 +158,10 @@ export interface CreateBatchConfigRequest {
saveMode?: "INSERT" | "UPSERT";
conflictKey?: string;
authServiceName?: string;
dataArrayPath?: string; // REST API 응답에서 데이터 배열 경로
dataArrayPath?: string;
executionType?: BatchExecutionType;
nodeFlowId?: number;
nodeFlowContext?: Record<string, any>;
mappings: BatchMappingRequest[];
}
@@ -161,7 +173,10 @@ export interface UpdateBatchConfigRequest {
saveMode?: "INSERT" | "UPSERT";
conflictKey?: string;
authServiceName?: string;
dataArrayPath?: string; // REST API 응답에서 데이터 배열 경로
dataArrayPath?: string;
executionType?: BatchExecutionType;
nodeFlowId?: number;
nodeFlowContext?: Record<string, any>;
mappings?: BatchMappingRequest[];
}