feat(batch): Phase 4 — BatchExecutor ETL 본체 (FROM → Transform → TO)

vexplor_rps batchSchedulerService.executeBatchMappings 의 1:1 이식.

흐름:
  1. mappings 를 fixed/non-fixed 로 partition
  2. non-fixed 를 (from_connection_type, from_connection_id, from_table_name) 으로 그룹화
  3. 그룹별 FROM 읽기 → MappingTransformer.transformRow → TO 저장
  4. (totalRecords, successRecords, failedRecords) 집계

FROM 소스:
  - internal     : sqlSession.getConnection() 의 동적 SELECT (식별자 화이트리스트 escape)
  - external_db  : ExternalDbConnectionService.executeQuery (SELECT-only)
  - restapi      : ExternalRestApiConnectionService.fetchData (등록된 연결 + dataArrayPath)

TO 대상:
  - internal     : INSERT / UPSERT(save_mode + conflict_key, ON CONFLICT DO UPDATE/NOTHING)
                   updated_date 컬럼 있으면 자동으로 NOW() 갱신
  - restapi      : 행 단위 POST/PUT/DELETE — testConnection 으로 호출
  - external_db  : 미지원 (보안 정책: ExternalDbConnectionService 가 SELECT-only)

vexplor_rps 대비 단순화 항목 (필요 시 후속 Phase 로 분리):
  - to_api_body 템플릿 기반 일괄 전송
  - URL_PATH_PARAM 컬럼 처리
  - auth_tokens 자동 조회 (inline-mode REST API: from_connection_id 없이 from_api_url 직접 호출)
  - row_filter_config

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
hjjeong
2026-05-13 10:30:27 +09:00
parent f9a9c67891
commit 17172cf9b3
@@ -0,0 +1,391 @@
package com.erp.batch;
import com.erp.service.ExternalDbConnectionService;
import com.erp.service.ExternalRestApiConnectionService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSession;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.*;
import java.util.regex.Pattern;
/**
* 배치 ETL 실행기 — vexplor_rps batchSchedulerService.executeBatchMappings 의 1:1 이식.
*
* 흐름:
* 1. 매핑을 (fixed | non-fixed) 로 partition
* 2. non-fixed 매핑을 (from_connection_type, from_connection_id, from_table_name) 키로 그룹화
* 3. 그룹별로 FROM 데이터 읽기 → MappingTransformer 로 행 변환 → TO 저장
* 4. (totalRecords, successRecords, failedRecords) 집계
*
* FROM 소스 지원:
* - internal : 현 tenant DB 의 테이블 (JDBC 직접 SELECT, LIMIT 1000)
* - external_db : ExternalDbConnectionService.executeQuery (SELECT-only 보안 정책)
* - restapi : ExternalRestApiConnectionService.fetchData (등록된 연결 + dataArrayPath)
*
* TO 대상 지원:
* - internal : 현 tenant DB INSERT / UPSERT (save_mode + conflict_key)
* - restapi : 행 단위 POST/PUT/DELETE — testConnection 으로 호출
* - external_db : 미지원 (ExternalDbConnectionService 가 SELECT-only 라 의도적으로 차단)
*
* 미지원 (vexplor_rps 대비 단순화):
* - to_api_body 템플릿 기반 일괄 전송
* - URL_PATH_PARAM 컬럼 처리
* - auth_tokens 자동 조회 (inline-mode REST API)
* - row_filter_config
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class BatchExecutor {
private final SqlSession sqlSession;
private final ExternalDbConnectionService externalDb;
private final ExternalRestApiConnectionService externalRest;
/** PostgreSQL 식별자 화이트리스트 (영문/숫자/언더스코어만). SQL injection 방어용. */
private static final Pattern SAFE_IDENT = Pattern.compile("[A-Za-z_][A-Za-z0-9_]*");
private static final int FROM_LIMIT = 1000;
public ExecutionResult execute(Map<String, Object> config) {
ExecutionResult r = new ExecutionResult();
Object mappingsRaw = config.get("batch_mappings");
if (!(mappingsRaw instanceof List)) {
log.warn("배치 매핑이 없습니다: {}", config.get("batch_name"));
return r;
}
@SuppressWarnings("unchecked")
List<Map<String, Object>> mappings = (List<Map<String, Object>>) mappingsRaw;
if (mappings.isEmpty()) {
log.warn("배치 매핑이 없습니다: {}", config.get("batch_name"));
return r;
}
// 1. fixed 분리
MappingTransformer.Partition partition = MappingTransformer.partitionFixed(mappings);
// 2. non-fixed 그룹화 (from_connection 기준)
Map<String, List<Map<String, Object>>> tableGroups = new LinkedHashMap<>();
for (Map<String, Object> m : partition.nonFixed) {
String key = str(m.get("from_connection_type")) + ":"
+ (m.get("from_connection_id") == null ? "internal" : m.get("from_connection_id"))
+ ":" + str(m.get("from_table_name"));
tableGroups.computeIfAbsent(key, k -> new ArrayList<>()).add(m);
}
if (tableGroups.isEmpty() && !partition.fixed.isEmpty()) {
log.warn("일반 매핑이 없고 고정값 매핑만 있어 실행 불가");
return r;
}
String companyCode = str(config.get("company_code"));
String saveMode = strOr(config.get("save_mode"), "INSERT");
String conflictKey = str(config.get("conflict_key"));
String dataArrayPath = str(config.get("data_array_path"));
// 3. 그룹별 처리
for (Map.Entry<String, List<Map<String, Object>>> e : tableGroups.entrySet()) {
String key = e.getKey();
List<Map<String, Object>> groupMappings = e.getValue();
Map<String, Object> first = groupMappings.get(0);
try {
log.info("테이블 처리 시작: {} → {} 컬럼 매핑", key, groupMappings.size());
// FROM 읽기
List<Map<String, Object>> fromData = readFrom(first, groupMappings, dataArrayPath, companyCode);
r.totalRecords += fromData.size();
// Transform
String toConnType = str(first.get("to_connection_type"));
List<Map<String, Object>> mappedRows = new ArrayList<>(fromData.size());
for (Map<String, Object> row : fromData) {
mappedRows.add(MappingTransformer.transformRow(
row, groupMappings, partition.fixed, toConnType, companyCode));
}
// TO 저장
WriteResult wr = writeTo(first, mappedRows, saveMode, conflictKey, companyCode);
r.successRecords += wr.success;
r.failedRecords += wr.failed;
} catch (Exception ex) {
log.error("테이블 처리 중 오류: {} — {}", key, ex.getMessage(), ex);
r.errorMessages.add(key + ": " + ex.getMessage());
}
}
return r;
}
// ── FROM 읽기 ───────────────────────────────────────────────────────────
private List<Map<String, Object>> readFrom(
Map<String, Object> firstMapping,
List<Map<String, Object>> groupMappings,
String dataArrayPath,
String companyCode
) {
String type = str(firstMapping.get("from_connection_type"));
String tableName = str(firstMapping.get("from_table_name"));
List<String> columns = new ArrayList<>();
for (Map<String, Object> m : groupMappings) {
String col = str(m.get("from_column_name"));
if (col != null && !col.isEmpty() && !columns.contains(col)) columns.add(col);
}
if ("restapi".equals(type)) {
return readFromRestApi(firstMapping, dataArrayPath, companyCode);
}
if ("external".equals(type) || "external_db".equals(type)) {
return readFromExternalDb(firstMapping, columns);
}
// internal (기본)
return readFromInternal(tableName, columns);
}
/** Internal DB 의 동적 SELECT. sqlSession 의 현 tenant connection 사용. */
private List<Map<String, Object>> readFromInternal(String tableName, List<String> columns) {
if (tableName == null) throw new IllegalArgumentException("from_table_name 누락");
if (columns.isEmpty()) throw new IllegalArgumentException("from_column_name 매핑 없음");
StringBuilder sql = new StringBuilder("SELECT ");
for (int i = 0; i < columns.size(); i++) {
if (i > 0) sql.append(", ");
sql.append(safeIdent(columns.get(i)));
}
sql.append(" FROM ").append(safeIdent(tableName));
sql.append(" LIMIT ").append(FROM_LIMIT);
try (Connection c = sqlSession.getConnection();
PreparedStatement ps = c.prepareStatement(sql.toString());
ResultSet rs = ps.executeQuery()) {
return materialize(rs);
} catch (SQLException e) {
throw new RuntimeException("internal SELECT 실패: " + e.getMessage(), e);
}
}
/** External DB SELECT — ExternalDbConnectionService.executeQuery 경유 (SELECT-only). */
@SuppressWarnings("unchecked")
private List<Map<String, Object>> readFromExternalDb(Map<String, Object> firstMapping, List<String> columns) {
Object connIdObj = firstMapping.get("from_connection_id");
if (connIdObj == null) throw new IllegalArgumentException("external_db 인데 from_connection_id 가 비어있음");
long connId = Long.parseLong(connIdObj.toString());
String tableName = str(firstMapping.get("from_table_name"));
StringBuilder sql = new StringBuilder("SELECT ");
for (int i = 0; i < columns.size(); i++) {
if (i > 0) sql.append(", ");
sql.append(safeIdent(columns.get(i)));
}
sql.append(" FROM ").append(safeIdent(tableName)).append(" LIMIT ").append(FROM_LIMIT);
Map<String, Object> result = externalDb.executeQuery(connId, sql.toString());
Object data = result.get("data");
return data instanceof List ? (List<Map<String, Object>>) data : List.of();
}
/** REST API → ExternalRestApiConnectionService.fetchData. dataArrayPath 로 배열 추출. */
@SuppressWarnings("unchecked")
private List<Map<String, Object>> readFromRestApi(
Map<String, Object> firstMapping, String dataArrayPath, String companyCode
) {
Object connIdObj = firstMapping.get("from_connection_id");
if (connIdObj == null) {
throw new UnsupportedOperationException(
"REST API 등록 연결 없는 inline-mode (from_api_url 직접 호출) 는 현재 미지원");
}
int connId = Integer.parseInt(connIdObj.toString());
String endpoint = str(firstMapping.get("from_table_name"));
Map<String, Object> params = new HashMap<>();
if (companyCode != null) params.put("company_code", companyCode);
Map<String, Object> result = externalRest.fetchData(connId, endpoint, dataArrayPath, params);
if (!Boolean.TRUE.equals(result.get("success"))) {
throw new RuntimeException("REST API 호출 실패: " + result.getOrDefault("message", ""));
}
Object data = result.get("data");
if (!(data instanceof Map)) return List.of();
Object rows = ((Map<String, Object>) data).get("rows");
if (!(rows instanceof List)) return List.of();
List<Object> raw = (List<Object>) rows;
List<Map<String, Object>> out = new ArrayList<>(raw.size());
for (Object o : raw) if (o instanceof Map) out.add((Map<String, Object>) o);
return out;
}
// ── TO 저장 ────────────────────────────────────────────────────────────
@Transactional
public WriteResult writeTo(
Map<String, Object> firstMapping,
List<Map<String, Object>> rows,
String saveMode,
String conflictKey,
String companyCode
) {
if (rows == null || rows.isEmpty()) return new WriteResult();
String type = str(firstMapping.get("to_connection_type"));
String tableName = str(firstMapping.get("to_table_name"));
if ("restapi".equals(type)) {
return writeToRestApi(firstMapping, rows, companyCode);
}
if ("external".equals(type) || "external_db".equals(type)) {
throw new UnsupportedOperationException(
"external_db TO 쓰기는 현재 미지원 (ExternalDbConnectionService 가 SELECT-only)");
}
return writeToInternal(tableName, rows, saveMode, conflictKey);
}
/** Internal DB INSERT / UPSERT — 행 단위 PreparedStatement. */
private WriteResult writeToInternal(String tableName, List<Map<String, Object>> rows,
String saveMode, String conflictKey) {
WriteResult r = new WriteResult();
if (tableName == null) throw new IllegalArgumentException("to_table_name 누락");
safeIdent(tableName);
try (Connection c = sqlSession.getConnection()) {
for (Map<String, Object> row : rows) {
try {
String sql = buildInsertSql(tableName, row, saveMode, conflictKey);
try (PreparedStatement ps = c.prepareStatement(sql)) {
int idx = 1;
for (Object v : row.values()) {
ps.setObject(idx++, v);
}
ps.executeUpdate();
r.success++;
}
} catch (SQLException e) {
log.error("INSERT 실패 row={} — {}", row, e.getMessage());
r.failed++;
}
}
} catch (SQLException e) {
throw new RuntimeException("internal write 실패: " + e.getMessage(), e);
}
return r;
}
/** INSERT (또는 UPSERT) SQL 생성. row 의 key 순서로 컬럼/플레이스홀더 배열. */
private String buildInsertSql(String tableName, Map<String, Object> row,
String saveMode, String conflictKey) {
List<String> cols = new ArrayList<>(row.keySet());
StringBuilder sql = new StringBuilder("INSERT INTO ").append(safeIdent(tableName)).append(" (");
for (int i = 0; i < cols.size(); i++) {
if (i > 0) sql.append(", ");
sql.append(safeIdent(cols.get(i)));
}
sql.append(") VALUES (");
for (int i = 0; i < cols.size(); i++) {
if (i > 0) sql.append(", ");
sql.append("?");
}
sql.append(")");
if ("UPSERT".equalsIgnoreCase(saveMode) && conflictKey != null && !conflictKey.isEmpty()) {
safeIdent(conflictKey);
List<String> updateCols = new ArrayList<>();
for (String col : cols) if (!col.equalsIgnoreCase(conflictKey)) updateCols.add(col);
sql.append(" ON CONFLICT (").append(conflictKey).append(") ");
if (updateCols.isEmpty()) {
sql.append("DO NOTHING");
} else {
sql.append("DO UPDATE SET ");
for (int i = 0; i < updateCols.size(); i++) {
if (i > 0) sql.append(", ");
String c = safeIdent(updateCols.get(i));
sql.append(c).append(" = EXCLUDED.").append(c);
}
if (cols.stream().anyMatch(c -> c.equalsIgnoreCase("updated_date"))) {
sql.append(", updated_date = NOW()");
}
}
}
return sql.toString();
}
/** REST API TO — 행 단위로 testConnection 호출 (POST/PUT/DELETE). */
private WriteResult writeToRestApi(Map<String, Object> firstMapping,
List<Map<String, Object>> rows, String companyCode) {
WriteResult r = new WriteResult();
String baseUrl = str(firstMapping.get("to_api_url"));
String endpoint = str(firstMapping.get("to_table_name"));
String method = strOr(firstMapping.get("to_api_method"), "POST");
for (Map<String, Object> row : rows) {
try {
Map<String, Object> testReq = new LinkedHashMap<>();
testReq.put("base_url", baseUrl);
testReq.put("endpoint", endpoint);
testReq.put("method", method);
testReq.put("body", row);
testReq.put("auth_type", "none");
testReq.put("timeout", 30000);
Map<String, Object> result = externalRest.testConnection(testReq, companyCode);
if (Boolean.TRUE.equals(result.get("success"))) r.success++; else r.failed++;
} catch (Exception e) {
log.error("REST API 전송 실패 row={} — {}", row, e.getMessage());
r.failed++;
}
}
return r;
}
// ── 유틸 ────────────────────────────────────────────────────────────────
private static List<Map<String, Object>> materialize(ResultSet rs) throws SQLException {
ResultSetMetaData md = rs.getMetaData();
int n = md.getColumnCount();
List<Map<String, Object>> rows = new ArrayList<>();
while (rs.next()) {
Map<String, Object> row = new LinkedHashMap<>();
for (int i = 1; i <= n; i++) row.put(md.getColumnLabel(i), rs.getObject(i));
rows.add(row);
}
return rows;
}
private static String safeIdent(String s) {
if (s == null || !SAFE_IDENT.matcher(s).matches()) {
throw new IllegalArgumentException("Unsafe identifier: " + s);
}
return s;
}
private static String str(Object v) { return v == null ? null : v.toString(); }
private static String strOr(Object v, String fallback) {
String s = str(v);
return (s == null || s.isEmpty()) ? fallback : s;
}
// ── 결과 클래스 ────────────────────────────────────────────────────────
public static final class ExecutionResult {
public int totalRecords = 0;
public int successRecords = 0;
public int failedRecords = 0;
public final List<String> errorMessages = new ArrayList<>();
public Map<String, Object> toMap() {
Map<String, Object> m = new LinkedHashMap<>();
m.put("total_records", totalRecords);
m.put("success_records", successRecords);
m.put("failed_records", failedRecords);
m.put("error_message", errorMessages.isEmpty() ? null : String.join("\n", errorMessages));
return m;
}
}
public static final class WriteResult {
public int success = 0;
public int failed = 0;
}
}