Files
wace_rps/backend-node/src/services/erpBatchSeedService.ts
T
chpark 97b333dd2e Amaranth(Wehago) ERP REST API 연계 + 배치 시스템 강화
부팅 시 자동 시드:
- 외부 REST API 연결 6종 (부서/사원/거래처/창고/계정과목/Wehago 사용자)
- 매칭 배치 6개 + Wehago HMAC-SHA256 서명 자동 부착 (erpApiClient/erpPresetSeedService/erpBatchSeedService)
- 동기화 대상 테이블/컬럼 보장 idempotent 마이그레이션 (erpTableMigration)

배치 기능 확장:
- 조건부 매핑 (mapping_type='conditional') — when/then/default 규칙으로 값 변환 (예: enrlFg=J01→active)
- 행 단위 제외 필터 (row_filter_config) — 특정 API 필드 값 가진 행 동기화 제외 (예: loginId=wace 통합 ERP 계정 제외)
- save_mode/conflict_key 기반 UPSERT, data_array_path 응답 배열 추출

UI 정비:
- 배치/플로우/메일/REST API 목록에 페이징 + FullHD 컴팩트 레이아웃
- 배치 편집 화면 한 화면 풀 활용 — TO 패널 가로 그리드, FROM 패널 등록 연결 한 줄 요약, 응답/JSON/파라미터 details 접힘
- ResponsiveDataView/AdminPageRenderer 쿼리 파라미터(?edit=N) 파싱

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 09:48:28 +09:00

430 lines
17 KiB
TypeScript

/**
* Amaranth (Wehago/RPS ERP) → 내부 DB 동기화 배치 5종 자동 시드
*
* 부팅 시 1회 실행. 외부 REST API 연결 5종(부서/사원/거래처/창고/계정과목)을 찾아서
* 각 연결마다 batch_configs + batch_mappings 한 묶음씩 생성합니다.
* - 동일 batch_name 이 이미 존재하면 건너뜀 (idempotent)
* - from_connection_type = "restapi", from_connection_id = REST API 연결 ID
* → 실행 시 batchSchedulerService 가 Wehago HMAC 인증을 자동으로 적용합니다.
*/
import { getPool, transaction } from "../database/db";
import { logger } from "../utils/logger";
const DEFAULT_COMPANY_CODE = process.env.DEFAULT_COMPANY_CODE || "COMPANY_16";
interface BatchPreset {
batch_name: string;
description: string;
cron_schedule: string;
// 매칭할 REST API 연결의 connection_name
connection_name: string;
// 응답 안에서 데이터 배열을 꺼낼 경로 (dataArrayPath)
data_array_path: string;
// 적재 대상 DB 테이블 (내부 DB)
to_table_name: string;
// 매핑: API 응답 필드 → DB 컬럼
column_map: Array<{ from: string; to: string }>;
// UPSERT 충돌 기준 컬럼 (없으면 INSERT)
save_mode: "INSERT" | "UPSERT";
conflict_key?: string;
// 행 단위 제외 필터 — Wehago 응답의 특정 행을 동기화에서 제외할 때 사용
// 예) loginId="wace" 인 통합 ERP 계정은 내부 admin과 user_id 충돌 → 제외
row_filter_config?: { exclude: Array<{ column: string; op?: string; value: any }> };
}
const PRESETS: BatchPreset[] = [
{
batch_name: "Amaranth → 부서 동기화",
description:
"Wehago api16S10 → dept_info. 응답 실제 필드: deptCd / deptNm / parentDeptCd / deptLevel / orderSq / regDt / toDt 등.",
cron_schedule: "0 3 * * *",
connection_name: "Amaranth - 부서",
data_array_path: "resultData",
to_table_name: "dept_info",
save_mode: "UPSERT",
conflict_key: "dept_code",
column_map: [
{ from: "deptCd", to: "dept_code" },
{ from: "deptNm", to: "dept_name" },
{ from: "parentDeptCd", to: "parent_dept_code" },
{ from: "deptLevel", to: "dept_level" },
{ from: "orderSq", to: "sort_seq" },
],
},
{
batch_name: "Amaranth → 사원(인사정보) 동기화",
description:
"Wehago api16S05 → user_info. 응답 실제 필드: empCd(사번), empSeq(시퀀스), loginId(로그인ID), korNm(한글명), hrsp/hcls(직책/직급) 등. sabun 기준 UPSERT.",
cron_schedule: "10 3 * * *",
connection_name: "Amaranth - 사원",
data_array_path: "resultData",
to_table_name: "user_info",
save_mode: "UPSERT",
conflict_key: "sabun",
// 통합 ERP 계정 (loginId='wace') 은 내부 관리자 user_id='wace' 와 충돌하므로 제외
row_filter_config: { exclude: [{ column: "loginId", op: "eq", value: "wace" }] },
column_map: [
{ from: "empCd", to: "sabun" }, // 사번
{ from: "loginId", to: "user_id" }, // 로그인 ID — 이미 api16S05 응답에 있음
{ from: "empSeq", to: "emp_seq" }, // Wehago 사원 시퀀스
{ from: "korNm", to: "user_name" },
{ from: "enlsNm", to: "user_name_eng" },
{ from: "deptCd", to: "dept_code" },
{ from: "deptNm", to: "dept_name" },
{ from: "hrspCd", to: "position_code" },
{ from: "hrspNm", to: "position_name" },
{ from: "hclsCd", to: "rank_code" },
{ from: "hclsNm", to: "rank_name" },
{ from: "emalAdd", to: "email" },
{ from: "outemalAdd", to: "out_email" },
{ from: "tel", to: "tel" },
{ from: "emgcTel", to: "cell_phone" },
{ from: "joinDt", to: "join_date" },
{ from: "rtrDt", to: "retire_date" },
{ from: "enrlFg", to: "work_status" },
],
},
{
batch_name: "Amaranth → Wehago 사용자(loginId) 동기화",
description:
"Wehago api99u01A11 → user_info.user_id 보강. api16S05 에 loginId 가 없는 사용자가 있을 때만 보충 (백업용).",
cron_schedule: "20 3 * * *",
connection_name: "Amaranth - Wehago 사용자",
data_array_path: "resultData",
to_table_name: "user_info",
save_mode: "UPSERT",
conflict_key: "emp_seq",
column_map: [
{ from: "empSeq", to: "emp_seq" },
{ from: "loginId", to: "user_id" },
],
},
{
batch_name: "Amaranth → 거래처 동기화",
description:
"Wehago api16S11 → customer_mng. 응답 실제 필드: trCd/trNm/trKrNm/trEnNm/ceoNm/tel/fax/email/zip/streceiveAddr1/2/useYn/trFg/nationCd/stemp* 등.",
cron_schedule: "25 3 * * *",
connection_name: "Amaranth - 거래처",
data_array_path: "resultData",
to_table_name: "customer_mng",
save_mode: "UPSERT",
conflict_key: "customer_code",
column_map: [
{ from: "trCd", to: "customer_code" },
{ from: "trNm", to: "customer_name" },
{ from: "trKrNm", to: "short_name" },
{ from: "ceoNm", to: "ceo_name" },
{ from: "tel", to: "tel" },
{ from: "fax", to: "fax_no" },
{ from: "email", to: "email" },
{ from: "zip", to: "zip_code" },
{ from: "streceiveAddr1", to: "address" },
{ from: "streceiveAddr2", to: "address_detail" },
{ from: "trFg", to: "customer_type" },
{ from: "nationCd", to: "nation_code" },
{ from: "stempEmpNm", to: "charge_name" },
{ from: "stempTel", to: "charge_tel" },
{ from: "stempEmail", to: "charge_email" },
{ from: "stempHp", to: "hp_no" },
{ from: "depositor", to: "account_owner" },
{ from: "useYn", to: "use_yn" },
],
},
{
batch_name: "Amaranth → 창고 동기화",
description:
"Wehago api20A00S00801 → warehouse_info. 응답 실제 필드: baselocCd/baselocNm/baselocFg/baselocDc/inlocCd/inlocNm/outlocCd/outlocNm/useYn 등.",
cron_schedule: "30 3 * * *",
connection_name: "Amaranth - 창고",
data_array_path: "resultData",
to_table_name: "warehouse_info",
save_mode: "UPSERT",
conflict_key: "warehouse_code",
column_map: [
{ from: "baselocCd", to: "warehouse_code" },
{ from: "baselocNm", to: "warehouse_name" },
{ from: "baselocFg", to: "baseloc_fg" },
{ from: "useYn", to: "use_yn" },
],
},
{
batch_name: "Amaranth → 계정과목 동기화",
description:
"Wehago api11A02 → account_code_info. 응답 실제 필드: acctCd/acctNm/acctNmk/groupCd/groupNm/drcrFg/subDisp/chFg/budFg/attrFg/racctCd 등.",
cron_schedule: "40 3 * * *",
connection_name: "Amaranth - 계정과목",
data_array_path: "resultData",
to_table_name: "account_code_info",
save_mode: "UPSERT",
conflict_key: "account_code",
column_map: [
{ from: "acctCd", to: "account_code" },
{ from: "acctNm", to: "account_name" },
{ from: "acctNmk", to: "account_short" },
{ from: "groupCd", to: "group_code" },
{ from: "groupNm", to: "group_name" },
{ from: "drcrFg", to: "dr_cr_fg" },
{ from: "subDisp", to: "sub_disp" },
{ from: "subDispNm", to: "sub_disp_name" },
{ from: "chFg", to: "ch_fg" },
{ from: "chFgNm", to: "ch_fg_name" },
{ from: "budFg", to: "bud_fg" },
{ from: "budFgNm", to: "bud_fg_name" },
{ from: "attrFg", to: "attr_fg" },
{ from: "attrFgNm", to: "attr_fg_name" },
{ from: "racctCd", to: "racct_code" },
{ from: "racctNm", to: "racct_name" },
{ from: "fillYn", to: "fill_yn" },
{ from: "extInputCd", to: "ext_input_cd" },
],
},
];
export async function seedAmaranthBatches(): Promise<void> {
const pool = getPool();
// 시퀀스 보정 (과거에 INSERT가 직접 들어가면서 어긋났을 수 있음)
for (const tbl of ["batch_mappings", "batch_configs", "batch_execution_logs"]) {
try {
await pool.query(
`SELECT setval(pg_get_serial_sequence($1, 'id'),
COALESCE((SELECT MAX(id) FROM ${tbl}), 1),
true)`,
[tbl]
);
} catch (e: any) {
logger.warn(`[batchSeed] ${tbl} 시퀀스 보정 실패 (무시): ${e?.message}`);
}
}
// 1) Amaranth REST API 연결 5종을 한 번에 조회 (connection_name → {id, base_url, endpoint, method, body})
const connRows = await pool.query<{
id: number;
connection_name: string;
base_url: string;
endpoint_path: string | null;
default_method: string | null;
default_request_body: string | null;
}>(
`SELECT id, connection_name, base_url, endpoint_path, default_method, default_request_body
FROM external_rest_api_connections
WHERE connection_name = ANY($1)`,
[PRESETS.map((p) => p.connection_name)]
);
const connByName = new Map(connRows.rows.map((c) => [c.connection_name, c]));
let created = 0;
let topupCount = 0;
let skipped = 0;
let missing = 0;
for (const preset of PRESETS) {
const conn = connByName.get(preset.connection_name);
if (!conn) {
logger.warn(`[batchSeed] REST API 연결 없음 (스킵): ${preset.connection_name}`);
missing++;
continue;
}
// 이미 동일 이름의 배치가 있으면 → 정합성 보정 (conflict_key 갱신, 누락 매핑 보충, 폐기된 매핑 제거)
const existing = await pool.query<{
id: number;
conflict_key: string | null;
save_mode: string | null;
data_array_path: string | null;
row_filter_config: string | null;
}>(
"SELECT id, conflict_key, save_mode, data_array_path, row_filter_config FROM batch_configs WHERE batch_name = $1 AND company_code = $2 LIMIT 1",
[preset.batch_name, DEFAULT_COMPANY_CODE]
);
if ((existing.rowCount ?? 0) > 0) {
const existingRow = existing.rows[0];
const existingId = existingRow.id;
// ── 1) 배치 설정의 conflict_key / save_mode / data_array_path / row_filter_config 가 프리셋과 다르면 갱신 ──
const presetFilter = preset.row_filter_config
? JSON.stringify(preset.row_filter_config)
: null;
// 기존 row_filter_config 가 비어있을 때만 프리셋 값을 주입 (사용자가 UI에서 설정한 값은 보존)
const shouldSeedFilter = !existingRow.row_filter_config && presetFilter;
const needsCfgUpdate =
existingRow.conflict_key !== (preset.conflict_key || null) ||
existingRow.save_mode !== preset.save_mode ||
existingRow.data_array_path !== preset.data_array_path ||
shouldSeedFilter;
if (needsCfgUpdate) {
await pool.query(
`UPDATE batch_configs
SET conflict_key = $1, save_mode = $2, data_array_path = $3,
row_filter_config = COALESCE(row_filter_config, $4),
updated_date = NOW()
WHERE id = $5`,
[
preset.conflict_key || null,
preset.save_mode,
preset.data_array_path,
presetFilter,
existingId,
]
);
logger.info(
`[batchSeed] 설정 갱신: ${preset.batch_name} (conflict_key=${preset.conflict_key}, save_mode=${preset.save_mode}${shouldSeedFilter ? ", row_filter 시드" : ""})`
);
}
// ── 2) 매핑 동기화 ──
// 2-0) 옛 매핑이 from_connection_id=null 인 경우 정합성 보정 — 등록된 연결 id로 통일
// (그룹 키가 from_connection_id 인데 null/connId 가 섞이면 두 그룹으로 쪼개져서 일부 컬럼이 동기화 안됨)
await pool.query(
`UPDATE batch_mappings
SET from_connection_id = $1,
from_api_url = $2,
from_api_method = $3,
from_api_body = COALESCE(NULLIF(from_api_body, ''), $4),
from_table_name = $5
WHERE batch_config_id = $6
AND from_connection_type = 'restapi'`,
[
conn.id,
conn.base_url,
conn.default_method || "POST",
conn.default_request_body || "",
conn.endpoint_path || "",
existingId,
]
);
const mapRes = await pool.query<{ id: number; from_column_name: string; to_column_name: string; mapping_order: number }>(
"SELECT id, from_column_name, to_column_name, mapping_order FROM batch_mappings WHERE batch_config_id = $1",
[existingId]
);
// 프리셋에 정의된 (from, to) 쌍 set
const presetPairs = new Set(preset.column_map.map((m) => `${m.from}=>${m.to}`));
// 기존 매핑들의 (from, to) 키 → row id
const existingPairs = new Map<string, number>();
mapRes.rows.forEach((r) => {
existingPairs.set(`${r.from_column_name}=>${r.to_column_name}`, r.id);
});
// 폐기된 매핑(같은 to_column 인데 다른 from 사용 등) 삭제 — 프리셋에 없는 쌍 모두
const obsoleteIds = mapRes.rows
.filter((r) => !presetPairs.has(`${r.from_column_name}=>${r.to_column_name}`))
.map((r) => r.id);
if (obsoleteIds.length > 0) {
await pool.query(`DELETE FROM batch_mappings WHERE id = ANY($1::int[])`, [obsoleteIds]);
logger.info(`[batchSeed] 폐기 매핑 삭제: ${preset.batch_name} (-${obsoleteIds.length}개)`);
}
// 누락 매핑 추가
const maxOrder = mapRes.rows.reduce((acc, r) => Math.max(acc, r.mapping_order || 0), 0);
const missingMaps = preset.column_map.filter((m) => !existingPairs.has(`${m.from}=>${m.to}`));
if (missingMaps.length > 0) {
try {
let order = maxOrder + 1;
for (const m of missingMaps) {
await pool.query(
`INSERT INTO batch_mappings
(batch_config_id, company_code, from_connection_type, from_connection_id,
from_table_name, from_column_name,
from_api_url, from_api_key, from_api_method, from_api_body,
to_connection_type, to_connection_id, to_table_name, to_column_name,
mapping_order, mapping_type, created_by, created_date)
VALUES ($1, $2, 'restapi', $3, $4, $5, $6, $7, $8, $9,
'internal', NULL, $10, $11, $12, 'direct', 'system-seed', NOW())`,
[
existingId,
DEFAULT_COMPANY_CODE,
conn.id,
conn.endpoint_path || "",
m.from,
conn.base_url,
"",
conn.default_method || "POST",
conn.default_request_body || "",
preset.to_table_name,
m.to,
order++,
]
);
}
topupCount += missingMaps.length;
logger.info(
`[batchSeed] 매핑 보충: ${preset.batch_name} (+${missingMaps.length}개)`
);
} catch (e: any) {
logger.warn(`[batchSeed] 매핑 보충 실패: ${preset.batch_name}${e?.message}`);
}
}
skipped++;
continue;
}
try {
await transaction(async (client) => {
// 배치 설정 생성
const cfgResult = await client.query(
`INSERT INTO batch_configs
(batch_name, description, cron_schedule, is_active, company_code,
save_mode, conflict_key, data_array_path, execution_type, row_filter_config,
created_by, created_date, updated_date)
VALUES ($1, $2, $3, 'Y', $4, $5, $6, $7, 'mapping', $8, 'system-seed', NOW(), NOW())
RETURNING id`,
[
preset.batch_name,
preset.description,
preset.cron_schedule,
DEFAULT_COMPANY_CODE,
preset.save_mode,
preset.conflict_key || null,
preset.data_array_path,
preset.row_filter_config ? JSON.stringify(preset.row_filter_config) : null,
]
);
const batchId = cfgResult.rows[0].id;
// 매핑 INSERT — N개 컬럼 매핑
let order = 1;
for (const m of preset.column_map) {
await client.query(
`INSERT INTO batch_mappings
(batch_config_id, company_code, from_connection_type, from_connection_id,
from_table_name, from_column_name,
from_api_url, from_api_key, from_api_method, from_api_body,
to_connection_type, to_connection_id, to_table_name, to_column_name,
mapping_order, mapping_type, created_by, created_date)
VALUES ($1, $2, 'restapi', $3, $4, $5, $6, $7, $8, $9,
'internal', NULL, $10, $11, $12, 'direct', 'system-seed', NOW())`,
[
batchId,
DEFAULT_COMPANY_CODE,
conn.id, // from_connection_id = REST API 연결 ID → 실행기에서 Wehago 인증 적용
conn.endpoint_path || "",
m.from,
conn.base_url,
"", // from_api_key — 비워둠. 실행기가 connection_id 보고 connection의 인증을 사용
conn.default_method || "POST",
conn.default_request_body || "",
preset.to_table_name,
m.to,
order++,
]
);
}
});
created++;
logger.info(`[batchSeed] 생성: ${preset.batch_name} (REST API 연결 #${conn.id})`);
} catch (e: any) {
logger.error(`[batchSeed] 실패: ${preset.batch_name}${e?.message}`);
}
}
if (created > 0 || skipped > 0 || missing > 0 || topupCount > 0) {
logger.info(
`🌱 Amaranth 배치 시드 완료: 신규 ${created}개 / 이미 존재 ${skipped}개 / 매핑 보충 ${topupCount}개 / 연결 없음 ${missing}개 (회사: ${DEFAULT_COMPANY_CODE})`
);
}
}