package com.batch.service; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRestartException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.zeroturnaround.exec.ProcessExecutor; import org.zeroturnaround.exec.stream.LogOutputStream; import com.batch.config.MatchingExtraProcessorAuto; import com.batch.config.MatchingSetup; import com.batch.config.MatchingSetup.Matching; import com.batch.mapper.primary.MatchingInnerDelingMapper; import com.batch.mapper.secondary.OracleMapper; import com.batch.util.FileUtil; import com.batch.util.StatisticsUtil; import com.batch.service.JobService; import lombok.extern.slf4j.Slf4j; @Slf4j @Service public class JobService { @Value("${pytyon.path}") String sPythonPrg; @Value("${python.ai.target}") String sPythonAiTarget; @Autowired private JobLauncher jobLauncher; @Autowired private ApplicationContext context; @Autowired private MatchingInnerDelingMapper matchingInnerDelingMapper; @Autowired private OracleMapper oracleMapper; @SuppressWarnings("rawtypes") @Async("commAsync") public void matchingJob(String jobGroupId, Map params) throws Exception { // Job Create Log UUID uuid = UUID.randomUUID(); HashMap mt = new HashMap(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); String sDate = dateFormat.format(new Date()) + ":" + uuid.toString(); Map paramLog = new HashMap(); paramLog.put("user_job_group", jobGroupId); paramLog.put("user_job_id", sDate); paramLog.put("user_job_name", "자동매칭(" + params.toString() + ")"); matchingInnerDelingMapper.createUserJob(paramLog); String sThreadName = Thread.currentThread().getName(); long startTime = System.currentTimeMillis(); log.info("[" + sThreadName + "]Job Started : " + startTime); log.debug("[" + sThreadName + "]params=" + params.toString()); StringBuffer sb = FileUtil.readFileToString("matchingSetup.json"); MatchingSetup matchingSetup = (MatchingSetup) FileUtil.strToObj(sb.toString(), MatchingSetup.class); String sJobTypeList = params.get("jobType").toUpperCase(); List lJobType = Arrays.asList(sJobTypeList != null ? sJobTypeList.split(",") : new String[0]); // 파라미터가 ALL일 경우 ALL 대신 등록된 모든 Type를 넣어준다. if (lJobType.size() > 0 && lJobType.indexOf("ALL") != -1) { lJobType = new ArrayList(); for (Matching matching : matchingSetup.getMatchingSetup()) { if (matching.getActive()) { lJobType.add(matching.getType()); } else { log.info("[" + sThreadName + "]JobType(" + matching.getType() + ") is Disabled"); } } } /** 일치key 초기화 */ StatisticsUtil.initMtchNum(); for (String sJobType : lJobType) { log.info("[" + sThreadName + "]Current running job type: " + sJobType); JobExecution jobExe = invokeJob("matchingInnerDelng", sJobType, params); if (!jobExe.getStatus().equals(BatchStatus.COMPLETED)) { throw new Exception("Job execution error : Batchstatus(" + jobExe.getStatus() + "), ExitStatus(" + jobExe.getExitStatus() + ")"); } } long endTime = System.currentTimeMillis(); log.info("[" + sThreadName + "]Job Type : " + lJobType.toString()); log.info("[" + sThreadName + "]Job Ended: " + endTime); log.info("[" + sThreadName + "]Running Time : " + (endTime - startTime) + "ms"); // 작업종료에 대한 로그 업데이트 paramLog.put("exit_code", "0"); paramLog.put("exit_message", ""); matchingInnerDelingMapper.finishUserJob(paramLog); } @SuppressWarnings("rawtypes") @Async("extAsync") public void extraJobSub(String jobGroupId, Map paramRec, int key) throws Exception { int mtchNumber = key; // Job Create Log UUID uuid = UUID.randomUUID(); HashMap mt = new HashMap(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); String sDate = dateFormat.format(new Date()) + ":" + uuid.toString(); Map paramLog = new HashMap(); paramLog.put("user_job_group", jobGroupId); paramLog.put("user_job_id", sDate); paramLog.put("user_job_name", "Extra매칭(" + paramRec.toString() + ")"); matchingInnerDelingMapper.createUserJob(paramLog); String sThreadName = Thread.currentThread().getName(); long startTime = System.currentTimeMillis(); log.info("extra [" + sThreadName + " : " + sDate + "]Job Started : " + startTime + "]params=" + paramRec.toString()); log.debug("extra [" + sThreadName + "]params=" + paramRec.toString()); MatchingExtraProcessorAuto matchingExtraProcessorAuto = new MatchingExtraProcessorAuto(matchingInnerDelingMapper); // 2건씩 합산 매칭일 경우 최대 20000건 까지 for (int i = 0; i < 20000; i = i + 1000) { matchingExtraProcessorAuto.process(paramRec, 1, 2, 0, i, mtchNumber); } for (int i = 0; i < 20000; i = i + 1000) { matchingExtraProcessorAuto.process(paramRec, 2, 1, i, 0, mtchNumber); } for (int i = 0; i < 20000; i = i + 1000) { matchingExtraProcessorAuto.process(paramRec, 2, 2, i, i, mtchNumber); } // 3건씩 매칭일 경우 최대 5000건 까지 for (int i = 0; i < 2000; i = i + 100) { matchingExtraProcessorAuto.process(paramRec, 1, 3, 0, i, mtchNumber); } for (int i = 0; i < 2000; i = i + 100) { matchingExtraProcessorAuto.process(paramRec, 3, 1, i, 0, mtchNumber); } long endTime = System.currentTimeMillis(); log.info("extra [" + sThreadName + " : " + sDate + "]Job Ended: " + endTime); log.info("extra [" + sThreadName + "]Running Time : " + (endTime - startTime) + "ms"); // 작업종료에 대한 로그 업데이트 paramLog.put("exit_code", "0"); paramLog.put("exit_message", ""); matchingInnerDelingMapper.finishUserJob(paramLog); } @SuppressWarnings("rawtypes") @Async("extAsync") public void extraJobSub2(String jobGroupId, Map paramRec) throws Exception { List retData = matchingInnerDelingMapper.getCustomItemReadData(paramRec); long mtchNumber = 1000000; /** Extra매칭 일치key 백만 부터 */ paramRec.put("conds", "T"); for (Map curMap : retData) { paramRec.putAll(curMap); // Job Create Log UUID uuid = UUID.randomUUID(); HashMap mt = new HashMap(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); String sDate = dateFormat.format(new Date()) + ":" + uuid.toString(); Map paramLog = new HashMap(); paramLog.put("user_job_group", jobGroupId); paramLog.put("user_job_id", sDate); paramLog.put("user_job_name", "Extra매칭(" + paramRec.toString() + ")"); try { matchingInnerDelingMapper.createUserJob(paramLog); } catch (Exception e) { log.info("createUserJob Exception : " + e.getMessage()); } String sThreadName = Thread.currentThread().getName(); long startTime = System.currentTimeMillis(); log.info("extra [" + sThreadName + " : " + sDate + "]Job Started : " + startTime + "]params=" + paramRec.toString()); log.debug("extra [" + sThreadName + "]params=" + paramRec.toString()); MatchingExtraProcessorAuto matchingExtraProcessorAuto = new MatchingExtraProcessorAuto(matchingInnerDelingMapper); // 2건씩 합산 매칭일 경우 최대 20000건 까지 for (int i = 0; i < 20000; i = i + 1000) { mtchNumber = matchingExtraProcessorAuto.process(paramRec, 1, 2, 0, i, mtchNumber); } for (int i = 0; i < 20000; i = i + 1000) { mtchNumber = matchingExtraProcessorAuto.process(paramRec, 2, 1, i, 0, mtchNumber); } for (int i = 0; i < 20000; i = i + 1000) { mtchNumber = matchingExtraProcessorAuto.process(paramRec, 2, 2, i, i, mtchNumber); } // 3건씩 매칭일 경우 최대 2000건 까지 for (int i = 0; i < 2000; i = i + 100) { mtchNumber = matchingExtraProcessorAuto.process(paramRec, 1, 3, 0, i, mtchNumber); } for (int i = 0; i < 2000; i = i + 100) { mtchNumber = matchingExtraProcessorAuto.process(paramRec, 3, 1, i, 0, mtchNumber); } for (int i = 0; i < 2000; i = i + 100) { mtchNumber = matchingExtraProcessorAuto.process(paramRec, 2, 3, i, 0, mtchNumber); } for (int i = 0; i < 2000; i = i + 100) { mtchNumber = matchingExtraProcessorAuto.process(paramRec, 3, 2, i, 0, mtchNumber); } for (int i = 0; i < 2000; i = i + 100) { mtchNumber = matchingExtraProcessorAuto.process(paramRec, 3, 3, i, 0, mtchNumber); } // 3건이상 매칭일 경우 최대 1000건 까지 for (int i = 0; i < 1000; i = i + 25) { mtchNumber = matchingExtraProcessorAuto.process(paramRec, 1, 4, 0, i, mtchNumber); } for (int i = 0; i < 1000; i = i + 25) { mtchNumber = matchingExtraProcessorAuto.process(paramRec, 4, 1, 0, i, mtchNumber); } long endTime = System.currentTimeMillis(); log.info( "extra [" + sThreadName + " : " + sDate + "]Job Ended: " + endTime + " mtchNumber = " + mtchNumber); log.info("extra [" + sThreadName + "]Running Time : " + (endTime - startTime) + "ms"); // 작업종료에 대한 로그 업데이트 paramLog.put("exit_code", "0"); paramLog.put("exit_message", ""); matchingInnerDelingMapper.finishUserJob(paramLog); } paramRec.put("conds", "B"); for (Map curMap : retData) { paramRec.putAll(curMap); // Job Create Log UUID uuid = UUID.randomUUID(); HashMap mt = new HashMap(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); String sDate = dateFormat.format(new Date()) + ":" + uuid.toString(); Map paramLog = new HashMap(); paramLog.put("user_job_group", jobGroupId); paramLog.put("user_job_id", sDate); paramLog.put("user_job_name", "Extra매칭(" + paramRec.toString() + ")"); try { matchingInnerDelingMapper.createUserJob(paramLog); } catch (Exception e) { log.info("createUserJob Exception : " + e.getMessage()); } String sThreadName = Thread.currentThread().getName(); long startTime = System.currentTimeMillis(); log.info("extra [" + sThreadName + " : " + sDate + "]Job Started : " + startTime + "]params=" + paramRec.toString()); log.debug("extra [" + sThreadName + "]params=" + paramRec.toString()); MatchingExtraProcessorAuto matchingExtraProcessorAuto = new MatchingExtraProcessorAuto( matchingInnerDelingMapper); // 2건씩 합산 매칭일 경우 최대 20000건 까지 for (int i = 0; i < 20000; i = i + 1000) { mtchNumber = matchingExtraProcessorAuto.process(paramRec, 1, 2, 0, i, mtchNumber); } for (int i = 0; i < 20000; i = i + 1000) { mtchNumber = matchingExtraProcessorAuto.process(paramRec, 2, 1, i, 0, mtchNumber); } for (int i = 0; i < 20000; i = i + 1000) { mtchNumber = matchingExtraProcessorAuto.process(paramRec, 2, 2, i, i, mtchNumber); } // 3건씩 매칭일 경우 최대 5000건 까지 for (int i = 0; i < 2000; i = i + 100) { mtchNumber = matchingExtraProcessorAuto.process(paramRec, 1, 3, 0, i, mtchNumber); } for (int i = 0; i < 2000; i = i + 100) { mtchNumber = matchingExtraProcessorAuto.process(paramRec, 3, 1, i, 0, mtchNumber); } long endTime = System.currentTimeMillis(); log.info("extra [" + sThreadName + " : " + sDate + "]Job Ended: " + endTime + " mtchNumber = " + mtchNumber); log.info("extra [" + sThreadName + "]Running Time : " + (endTime - startTime) + "ms"); // 작업종료에 대한 로그 업데이트 paramLog.put("exit_code", "0"); paramLog.put("exit_message", ""); matchingInnerDelingMapper.finishUserJob(paramLog); } long endTime = System.currentTimeMillis(); log.info("### End Extra Matching Job2 : " + endTime); } @SuppressWarnings("rawtypes") @Async("aiAsync") public void aiJobSub(String jobGroupId, Map paramRec) throws Exception { // Job Create Log UUID uuid = UUID.randomUUID(); HashMap mt = new HashMap(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); String sDate = dateFormat.format(new Date()) + ":" + uuid.toString(); Map paramLog = new HashMap(); paramLog.put("user_job_group", jobGroupId); paramLog.put("user_job_id", sDate); paramLog.put("user_job_name", "AI매칭(" + paramRec.toString() + ")"); matchingInnerDelingMapper.createUserJob(paramLog); long startTime = System.currentTimeMillis(); log.info("ai Job Started : " + startTime); log.debug("ai Job params=" + paramRec.toString()); String sSysSe = (String) paramRec.get("sys_se"); String sAccnutYm = (String) paramRec.get("accnut_ym"); String sCprCode = (String) paramRec.get("cpr_code"); String sPartCpr = (String) paramRec.get("partn_cpr"); String sDelngCrncy = (String) paramRec.get("delng_crncy"); String sThreadName = Thread.currentThread().getName(); log.debug("call python"); new ProcessExecutor() .command(sPythonPrg, sPythonAiTarget, sDate, sSysSe, sAccnutYm, sCprCode, sPartCpr, sDelngCrncy) .redirectOutput(new LogOutputStream() { @Override protected void processLine(String line) { log.info(line); } }).execute(); long endTime = System.currentTimeMillis(); log.info("ai Job Ended: " + endTime); log.info("ai Job Running Time : " + (endTime - startTime) + "ms"); // //작업종료에 대한 로그 업데이트 // paramLog.put("exit_code", "0"); // paramLog.put("exit_message", ""); // matchingInnerDelingMapper.finishUserJob(paramLog); } public JobExecution invokeJob(String jobName, String jobType, Map params) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException { UUID uuid = UUID.randomUUID(); HashMap mt = new HashMap(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); final String date = dateFormat.format(new Date()) + ":" + uuid.toString(); JobParameters jobParameters = new JobParametersBuilder().addString("syncDate", date) .addString("jobType", jobType).addString("sysSe", params.get("sysSe")) .addString("accnutYm", params.get("accnutYm")).addString("searchCond", params.get("searchCond")) .toJobParameters(); var jobToStart = context.getBean(jobName, Job.class); JobExecution jobExe = jobLauncher.run(jobToStart, jobParameters); return jobExe; } @SuppressWarnings("rawtypes") @Async("commAsync") public void createData(String jobGroupId, Map params) throws Exception { // Job Create Log UUID uuid = UUID.randomUUID(); HashMap mt = new HashMap(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); String sDate = dateFormat.format(new Date()) + ":" + uuid.toString(); Map paramLog = new HashMap(); paramLog.put("user_job_group", jobGroupId); paramLog.put("user_job_id", sDate); paramLog.put("user_job_name", "작업데이타생성(" + params.toString() + ")"); matchingInnerDelingMapper.createUserJob(paramLog); long startTime = System.currentTimeMillis(); log.info("Create Data Started : " + startTime); log.debug("params=" + params.toString()); // 기존데이타 삭제 int iDeleted = matchingInnerDelingMapper.deleteOriginalData(params); log.debug("Deleted OrgData : " + iDeleted + "건"); // 신규데이타 생성 // 매칭키에 대한 정보 (sql로 조인하여 조회하기에는 너무 느리고 데이타 중복도 발생함) List lMatchingInfo = oracleMapper.getMatchingInfo(params); Map mMatchingInfo = new HashMap(); for (Map curMap : lMatchingInfo) { String sKey = String.valueOf(curMap.get("SEQ")); mMatchingInfo.put(sKey, curMap); } List lOrgData = oracleMapper.getOriginalData(params); int iInserted = 0; int iUpdated = 0; int limit = 1000; // 1000건씩 batch List lInserted = new ArrayList(); for (Map curRec : lOrgData) { String sKey = String.valueOf(curRec.get("SEQ")); Map curMatchingInfo = mMatchingInfo.get(sKey); if (curMatchingInfo != null) { curRec.put("MATCHING_CAUSE", curMatchingInfo.get("MATCHING_CAUSE")); curRec.put("MATCH_KEY", curMatchingInfo.get("MATCH_KEY")); } lInserted.add(curRec); if (lInserted.size() == limit) { matchingInnerDelingMapper.insertOriginalData(Map.of("itemList", lInserted)); iInserted = iInserted + lInserted.size(); lInserted.clear(); } } if (lInserted.size() > 0) { matchingInnerDelingMapper.insertOriginalData(Map.of("itemList", lInserted)); iInserted = iInserted + lInserted.size(); } log.info("Inserted OrgData : " + iInserted + "건"); iDeleted = matchingInnerDelingMapper.deleteData(params); log.debug("Deleted Work Data : " + iDeleted + "건"); iInserted = matchingInnerDelingMapper.insertDataFromOriginal(params); log.info("Inserted Work Data : " + iInserted + "건"); /**** 매칭제외 처리 start **************************************************************************************************/ /** 1. 거래금액 = 0 */ iUpdated = matchingInnerDelingMapper.updateInitExclusion1(params); log.info("매칭제외 : 1. 거래금액 = 0 : " + iUpdated + "건"); /** 2. 거래유형, 자기/상대 법인, 거래일자, 계정 이 같은 1:1 내역 금액이 +, - 로 합이 0 인 경우 */ iUpdated = matchingInnerDelingMapper.updateInitExclusion2(params); log.info("매칭제외 : 2. 1:1 내역 금액이 +, - 로 합이 0 : " + iUpdated + "건"); /**** 매칭제외 처리 end **************************************************************************************************/ iDeleted = matchingInnerDelingMapper.deleteDataAi(params); log.debug("Deleted Work AI Data : " + iDeleted + "건"); iInserted = matchingInnerDelingMapper.insertDataAiFromOriginal(params); log.info("Inserted Work AI Data : " + iInserted + "건"); long endTime = System.currentTimeMillis(); log.info("Create Data Ended : " + endTime); log.info("Running Time : " + (endTime - startTime) + "ms"); // 작업종료에 대한 로그 업데이트 paramLog.put("exit_code", "0"); paramLog.put("exit_message", ""); matchingInnerDelingMapper.finishUserJob(paramLog); } @SuppressWarnings("rawtypes") @Async("commAsync") public void returnRwsultData(String jobGroupId, Map params) throws Exception { // Job Create Log UUID uuid = UUID.randomUUID(); HashMap mt = new HashMap(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); String sDate = dateFormat.format(new Date()) + ":" + uuid.toString(); Map paramLog = new HashMap(); paramLog.put("user_job_group", jobGroupId); paramLog.put("user_job_id", sDate); paramLog.put("user_job_name", "결과데이타리턴(" + params.toString() + ")"); matchingInnerDelingMapper.createUserJob(paramLog); long startTime = System.currentTimeMillis(); log.info("Update Data Started : " + startTime); log.debug("params=" + params.toString()); // 기존데이타 초기화 int iUpdated = matchingInnerDelingMapper.updateClearNewMatchKey(params); // 새로운 매칭키 생성 iUpdated = matchingInnerDelingMapper.updateNewMatchKey(params); log.debug("Updated OrgData : " + iUpdated + "건"); long endTime = System.currentTimeMillis(); log.info("Update Data Ended : " + endTime); log.info("Running Time : " + (endTime - startTime) + "ms"); // 작업종료에 대한 로그 업데이트 paramLog.put("exit_code", "0"); paramLog.put("exit_message", ""); matchingInnerDelingMapper.finishUserJob(paramLog); } }