package com.batch.service; import java.math.BigDecimal; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameter; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRestartException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.zeroturnaround.exec.ProcessExecutor; import org.zeroturnaround.exec.stream.LogOutputStream; import com.batch.config.MatchingAiSubProcessorAuto; import com.batch.config.MatchingExtraProcessorAuto; import com.batch.config.MatchingSetup; import com.batch.config.MatchingSetup.Matching; import com.batch.mapper.primary.MatchingInnerDelingMapper; import com.batch.mapper.secondary.OracleMapper; import com.batch.util.FileUtil; import com.batch.service.JobService; import lombok.extern.slf4j.Slf4j; @Slf4j @Service public class JobService { @Value("${pytyon.path}") String sPythonPrg; @Value("${python.ai.target}") String sPythonAiTarget; @Autowired private JobLauncher jobLauncher; @Autowired private ApplicationContext context; @Autowired private MatchingInnerDelingMapper matchingInnerDelingMapper; @Autowired private OracleMapper oracleMapper; @SuppressWarnings("rawtypes") @Async("commAsync") public void matchingJob(String jobGroupId, Map params) throws Exception { //Job Create Log UUID uuid = UUID.randomUUID(); HashMap mt = new HashMap(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); String sDate = dateFormat.format(new Date()) + ":" + uuid.toString(); Map paramLog = new HashMap(); paramLog.put("user_job_group", jobGroupId); paramLog.put("user_job_id", sDate); paramLog.put("user_job_name", "자동매칭(" + params.toString() + ")"); matchingInnerDelingMapper.createUserJob(paramLog); String sThreadName = Thread.currentThread().getName(); long startTime = System.currentTimeMillis(); log.info("[" + sThreadName + "]Job Started : " + startTime); log.debug("[" + sThreadName + "]params=" + params.toString()); StringBuffer sb = FileUtil.readFileToString("matchingSetup.json"); MatchingSetup matchingSetup = (MatchingSetup) FileUtil.strToObj(sb.toString(), MatchingSetup.class); String sJobTypeList = params.get("jobType").toUpperCase(); List lJobType = Arrays.asList(sJobTypeList != null?sJobTypeList.split(","):new String[0]); //파라미터가 ALL일 경우 ALL 대신 등록된 모든 Type를 넣어준다. if (lJobType.size() > 0 && lJobType.indexOf("ALL") != -1) { lJobType = new ArrayList(); for (Matching matching : matchingSetup.getMatchingSetup()) { if (matching.getActive()) { lJobType.add(matching.getType()); } else { log.info("[" + sThreadName + "]JobType(" + matching.getType() + ") is Disabled"); } } } for (String sJobType : lJobType) { log.info("[" + sThreadName + "]Current running job type: " + sJobType); JobExecution jobExe = invokeJob("matchingInnerDelng", sJobType, params); if (!jobExe.getStatus().equals(BatchStatus.COMPLETED)) { throw new Exception("Job execution error : Batchstatus(" + jobExe.getStatus() + "), ExitStatus(" + jobExe.getExitStatus() + ")" ); } } long endTime = System.currentTimeMillis(); log.info("[" + sThreadName + "]Job Type : " + lJobType.toString()); log.info("[" + sThreadName + "]Job Ended: " + endTime); log.info("[" + sThreadName + "]Running Time : " + (endTime - startTime) + "ms"); //작업종료에 대한 로그 업데이트 paramLog.put("exit_code", "0"); paramLog.put("exit_message", ""); matchingInnerDelingMapper.finishUserJob(paramLog); } @SuppressWarnings("rawtypes") @Async("extAsync") public void extraJobSub(String jobGroupId, Map paramRec) throws Exception { //Job Create Log UUID uuid = UUID.randomUUID(); HashMap mt = new HashMap(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); String sDate = dateFormat.format(new Date()) + ":" + uuid.toString(); Map paramLog = new HashMap(); paramLog.put("user_job_group", jobGroupId); paramLog.put("user_job_id", sDate); paramLog.put("user_job_name", "Extra매칭(" + paramRec.toString() + ")"); matchingInnerDelingMapper.createUserJob(paramLog); String sThreadName = Thread.currentThread().getName(); long startTime = System.currentTimeMillis(); log.info("extra [" + sThreadName + "]Job Started : " + startTime); log.debug("extra [" + sThreadName + "]params=" + paramRec.toString()); MatchingExtraProcessorAuto matchingExtraProcessorAuto = new MatchingExtraProcessorAuto(matchingInnerDelingMapper); //2건씩 합산 매칭일 경우 최대 20000건 까지 for (int i=0; i<20000;i=i+1000) { matchingExtraProcessorAuto.process(paramRec, 1, 2, 0, i); } for (int i=0; i<20000;i=i+1000) { matchingExtraProcessorAuto.process(paramRec, 2, 1, i, 0); } for (int i=0; i<20000;i=i+1000) { matchingExtraProcessorAuto.process(paramRec, 2, 2, i, i); } //3건씩 매칭일 경우 최대 2000건 까지 for (int i=0; i<2000;i=i+50) { matchingExtraProcessorAuto.process(paramRec, 1, 3, 0, i); } for (int i=0; i<2000;i=i+50) { matchingExtraProcessorAuto.process(paramRec, 2, 3, 0, i); } for (int i=0; i<2000;i=i+50) { matchingExtraProcessorAuto.process(paramRec, 3, 3, 0, i); } for (int i=0; i<2000;i=i+50) { matchingExtraProcessorAuto.process(paramRec, 3, 1, i, 0); } for (int i=0; i<2000;i=i+50) { matchingExtraProcessorAuto.process(paramRec, 3, 2, i, 0); } for (int i=0; i<2000;i=i+50) { matchingExtraProcessorAuto.process(paramRec, 3, 3, i, 0); } //4건씩 매칭일 경우 최대 2000건 까지 for (int i=0; i<2000;i=i+25) { matchingExtraProcessorAuto.process(paramRec, 1, 4, 0, i); } for (int i=0; i<2000;i=i+25) { matchingExtraProcessorAuto.process(paramRec, 4, 1, i, 0); } long endTime = System.currentTimeMillis(); log.info("[" + sThreadName + "]Job Ended: " + endTime); log.info("[" + sThreadName + "]Running Time : " + (endTime - startTime) + "ms"); //작업종료에 대한 로그 업데이트 paramLog.put("exit_code", "0"); paramLog.put("exit_message", ""); matchingInnerDelingMapper.finishUserJob(paramLog); } @SuppressWarnings("rawtypes") @Async("extAsync") public void aiSubJobSub(String jobGroupId, Map paramRec) throws Exception { //Job Create Log UUID uuid = UUID.randomUUID(); HashMap mt = new HashMap(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); String sDate = dateFormat.format(new Date()) + ":" + uuid.toString(); Map paramLog = new HashMap(); paramLog.put("user_job_group", jobGroupId); paramLog.put("user_job_id", sDate); paramLog.put("user_job_name", "AI Sub 매칭(" + paramRec.toString() + ")"); matchingInnerDelingMapper.createUserJob(paramLog); String sThreadName = Thread.currentThread().getName(); long startTime = System.currentTimeMillis(); log.info("AI Sub [" + sThreadName + "]Job Started : " + startTime); log.debug("AI Sub [" + sThreadName + "]params=" + paramRec.toString()); MatchingAiSubProcessorAuto matchingAiSubProcessorAuto = new MatchingAiSubProcessorAuto(matchingInnerDelingMapper); //2건씩 합산 매칭일 경우 최대 20000건 까지 for (int i=0; i<20000;i=i+1000) { matchingAiSubProcessorAuto.process(paramRec, 1, 2, 0, i); } for (int i=0; i<20000;i=i+1000) { matchingAiSubProcessorAuto.process(paramRec, 2, 1, i, 0); } for (int i=0; i<20000;i=i+1000) { matchingAiSubProcessorAuto.process(paramRec, 2, 2, i, i); } //3건씩 매칭일 경우 최대 2000건 까지 for (int i=0; i<2000;i=i+50) { matchingAiSubProcessorAuto.process(paramRec, 1, 3, 0, i); } for (int i=0; i<2000;i=i+50) { matchingAiSubProcessorAuto.process(paramRec, 2, 3, 0, i); } for (int i=0; i<2000;i=i+50) { matchingAiSubProcessorAuto.process(paramRec, 3, 3, 0, i); } for (int i=0; i<2000;i=i+50) { matchingAiSubProcessorAuto.process(paramRec, 3, 1, i, 0); } for (int i=0; i<2000;i=i+50) { matchingAiSubProcessorAuto.process(paramRec, 3, 2, i, 0); } for (int i=0; i<2000;i=i+50) { matchingAiSubProcessorAuto.process(paramRec, 3, 3, i, 0); } //4건씩 매칭일 경우 최대 2000건 까지 for (int i=0; i<2000;i=i+25) { matchingAiSubProcessorAuto.process(paramRec, 1, 4, 0, i); } for (int i=0; i<2000;i=i+25) { matchingAiSubProcessorAuto.process(paramRec, 4, 1, i, 0); } long endTime = System.currentTimeMillis(); log.info("AI Sub [" + sThreadName + "]Job Ended: " + endTime); log.info("AI Sub [" + sThreadName + "]Running Time : " + (endTime - startTime) + "ms"); //작업종료에 대한 로그 업데이트 paramLog.put("exit_code", "0"); paramLog.put("exit_message", ""); matchingInnerDelingMapper.finishUserJob(paramLog); } @SuppressWarnings("rawtypes") @Async("aiAsync") public void aiJobSub(String jobGroupId, Map paramRec) throws Exception { //Job Create Log UUID uuid = UUID.randomUUID(); HashMap mt = new HashMap(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); String sDate = dateFormat.format(new Date()) + ":" + uuid.toString(); Map paramLog = new HashMap(); paramLog.put("user_job_group", jobGroupId); paramLog.put("user_job_id", sDate); paramLog.put("user_job_name", "AI매칭(" + paramRec.toString() + ")"); matchingInnerDelingMapper.createUserJob(paramLog); long startTime = System.currentTimeMillis(); log.info("ai Job Started : " + startTime); log.debug("ai Job params=" + paramRec.toString()); String sSysSe = (String) paramRec.get("sys_se"); String sAccnutYm = (String) paramRec.get("accnut_ym"); String sCprCode = (String) paramRec.get("cpr_code"); String sPartCpr = (String) paramRec.get("partn_cpr"); String sDelngCrncy = (String) paramRec.get("delng_crncy"); String sTbTy = (String) paramRec.get("tb_ty"); String sErrorRange = (String) paramRec.get("error_range"); String sThreadName = Thread.currentThread().getName(); log.debug("call python"); new ProcessExecutor() .command(sPythonPrg, sPythonAiTarget, sDate, sSysSe, sAccnutYm, sCprCode, sPartCpr, sDelngCrncy, sTbTy) .redirectOutput(new LogOutputStream() { @Override protected void processLine(String line) { log.info(line); } }) .execute(); long endTime = System.currentTimeMillis(); log.info("ai Job Ended: " + endTime); log.info("ai Job Running Time : " + (endTime - startTime) + "ms"); // //작업종료에 대한 로그 업데이트 // paramLog.put("exit_code", "0"); // paramLog.put("exit_message", ""); // matchingInnerDelingMapper.finishUserJob(paramLog); } public JobExecution invokeJob(String jobName, String jobType, Map params) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException { UUID uuid = UUID.randomUUID(); HashMap mt = new HashMap(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); final String date = dateFormat.format(new Date()) + ":" + uuid.toString(); JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); jobParametersBuilder .addString("syncDate", date) .addString("jobType", jobType) .addString("sysSe", params.get("sysSe")) .addString("accnutYm", params.get("accnutYm")); if (params.get("cpr_code") != null) jobParametersBuilder.addString("cpr_code", params.get("cpr_code")); if (params.get("partn_cpr") != null) jobParametersBuilder.addString("partn_cpr", params.get("partn_cpr")); JobParameters jobParameters = jobParametersBuilder.toJobParameters(); var jobToStart = context.getBean(jobName, Job.class); JobExecution jobExe = jobLauncher.run(jobToStart, jobParameters); return jobExe; } @SuppressWarnings("rawtypes") @Async("commAsync") public void createData(String jobGroupId, Map params) throws Exception { //Job Create Log UUID uuid = UUID.randomUUID(); HashMap mt = new HashMap(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); String sDate = dateFormat.format(new Date()) + ":" + uuid.toString(); Map paramLog = new HashMap(); paramLog.put("user_job_group", jobGroupId); paramLog.put("user_job_id", sDate); paramLog.put("user_job_name", "작업데이타생성(" + params.toString() + ")"); matchingInnerDelingMapper.createUserJob(paramLog); long startTime = System.currentTimeMillis(); log.info("Create Data Started : " + startTime); log.debug("params=" + params.toString()); //기존데이타 삭제 int iDeleted = matchingInnerDelingMapper.deleteOriginalData(params); log.debug("Deleted OrgData : " + iDeleted + "건"); //신규데이타 생성 //매칭키에 대한 정보 (sql로 조인하여 조회하기에는 너무 느리고 데이타 중복도 발생함) List lMatchingInfo = oracleMapper.getMatchingInfo(params); Map mMatchingInfo = new HashMap(); for (Map curMap : lMatchingInfo) { String sKey = String.valueOf(curMap.get("SEQ")); mMatchingInfo.put(sKey, curMap); } List lOrgData = oracleMapper.getOriginalData(params); int iInserted = 0; int limit = 1000; //1000건씩 batch List lInserted = new ArrayList(); for (Map curRec : lOrgData) { String sKey = String.valueOf(curRec.get("SEQ")); Map curMatchingInfo = mMatchingInfo.get(sKey); if (curMatchingInfo != null) { curRec.put("MATCHING_CAUSE", curMatchingInfo.get("MATCHING_CAUSE")); curRec.put("MATCH_KEY", curMatchingInfo.get("MATCH_KEY")); } lInserted.add(curRec); if (lInserted.size() == limit) { matchingInnerDelingMapper.insertOriginalData(Map.of("itemList", lInserted)); iInserted = iInserted + lInserted.size(); lInserted.clear(); } } if (lInserted.size() > 0) { matchingInnerDelingMapper.insertOriginalData(Map.of("itemList", lInserted)); iInserted = iInserted + lInserted.size(); } log.info("Inserted OrgData : " + iInserted + "건"); iDeleted = matchingInnerDelingMapper.deleteData(params); log.debug("Deleted Work Data : " + iDeleted + "건"); iInserted = matchingInnerDelingMapper.insertDataFromOriginal(params); log.info("Inserted Work Data : " + iInserted + "건"); iDeleted = matchingInnerDelingMapper.deleteDataAi(params); log.debug("Deleted Work AI Data : " + iDeleted + "건"); iInserted = matchingInnerDelingMapper.insertDataAiFromOriginal(params); log.info("Inserted Work AI Data : " + iInserted + "건"); long endTime = System.currentTimeMillis(); log.info("Create Data Ended : " + endTime); log.info("Running Time : " + (endTime - startTime) + "ms"); //작업종료에 대한 로그 업데이트 paramLog.put("exit_code", "0"); paramLog.put("exit_message", ""); matchingInnerDelingMapper.finishUserJob(paramLog); } @SuppressWarnings("rawtypes") @Async("commAsync") public void returnRwsultData(String jobGroupId, Map params) throws Exception { //Job Create Log UUID uuid = UUID.randomUUID(); HashMap mt = new HashMap(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); String sDate = dateFormat.format(new Date()) + ":" + uuid.toString(); Map paramLog = new HashMap(); paramLog.put("user_job_group", jobGroupId); paramLog.put("user_job_id", sDate); paramLog.put("user_job_name", "결과데이타리턴(" + params.toString() + ")"); matchingInnerDelingMapper.createUserJob(paramLog); long startTime = System.currentTimeMillis(); log.info("Update Data Started : " + startTime); log.debug("params=" + params.toString()); //기존데이타 초기화 int iUpdated = matchingInnerDelingMapper.updateClearNewMatchKey(params); //새로운 매칭키 생성 iUpdated = matchingInnerDelingMapper.updateNewMatchKey(params); log.debug("Updated OrgData : " + iUpdated + "건"); long endTime = System.currentTimeMillis(); log.info("Update Data Ended : " + endTime); log.info("Running Time : " + (endTime - startTime) + "ms"); //작업종료에 대한 로그 업데이트 paramLog.put("exit_code", "0"); paramLog.put("exit_message", ""); matchingInnerDelingMapper.finishUserJob(paramLog); } }