package com.batch.service; import java.math.BigDecimal; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameter; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRestartException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.zeroturnaround.exec.ProcessExecutor; import org.zeroturnaround.exec.stream.LogOutputStream; import com.batch.config.MatchingAiSubProcessorAuto; import com.batch.config.MatchingExtraProcessorAuto; import com.batch.config.MatchingSetup; import com.batch.config.MatchingSetup.Matching; import com.batch.mapper.primary.MatchingInnerDelingMapper; import com.batch.mapper.secondary.OracleMapper; import com.batch.util.FileUtil; import com.batch.util.JsonUtil; import com.batch.util.StringUtil; import com.batch.service.JobService; import lombok.extern.slf4j.Slf4j; @Slf4j @Service public class JobService { @Value("${pytyon.path}") String sPythonPrg; @Value("${python.ai.target}") String sPythonAiTarget; @Value("${matching.auto.exceptListByComma}") String sExceptMatchType; @Value("${thread.ai.count.rowmax}") BigDecimal bdAiAsyncMaxRowCount; @Value("${thread.ai.processing.wait}") BigDecimal bdAiAsyncWaitTime; @Value("${thread.ai.processing.wait.total}") BigDecimal bdAiAsyncTotalWaitTime; @Autowired private JobLauncher jobLauncher; @Autowired private ApplicationContext context; @Autowired private MatchingInnerDelingMapper matchingInnerDelingMapper; @Autowired private OracleMapper oracleMapper; @SuppressWarnings("rawtypes") @Async("commAsync") public void matchingJob(String jobGroupId, Map params) throws Exception { //Job Create Log UUID uuid = UUID.randomUUID(); HashMap mt = new HashMap(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); String sDate = dateFormat.format(new Date()) + ":" + uuid.toString(); Map paramLog = new HashMap(); paramLog.put("user_job_group", jobGroupId); paramLog.put("user_job_id", sDate); paramLog.put("user_job_name", "자동매칭(" + params.toString() + ")"); matchingInnerDelingMapper.createUserJob(paramLog); String sThreadName = Thread.currentThread().getName(); long startTime = System.currentTimeMillis(); log.info("[" + sThreadName + "]Job Started : " + startTime); log.debug("[" + sThreadName + "]params=" + params.toString()); StringBuffer sb = FileUtil.readFileToString("matchingSetup.json"); MatchingSetup matchingSetup = (MatchingSetup) FileUtil.strToObj(sb.toString(), MatchingSetup.class); String sJobTypeList = params.get("jobType").toUpperCase(); List lJobType = Arrays.asList(sJobTypeList != null?sJobTypeList.split(","):new String[0]); //파라미터가 ALL일 경우 ALL 대신 등록된 모든 Type를 넣어준다. if (lJobType.size() > 0 && lJobType.indexOf("ALL") != -1) { lJobType = new ArrayList(); for (Matching matching : matchingSetup.getMatchingSetup()) { if (matching.getActive()) { lJobType.add(matching.getType()); } else { log.info("[" + sThreadName + "]JobType(" + matching.getType() + ") is Disabled"); } } } for (String sJobType : lJobType) { if (!matchingSetup.getMatching(sJobType).getActive()) { log.info("[" + sThreadName + "]JobType(" + sJobType + " is Disabled"); continue; }; List lExceptMatchType = StringUtil.StringToArrayList(sExceptMatchType); if (lExceptMatchType.indexOf(sJobType) > -1) { log.info("[" + sThreadName + "]JobType(" + sJobType + " is Excepted"); continue; }; log.info("[" + sThreadName + "]Current running job type: " + sJobType); JobExecution jobExe = invokeJob("matchingInnerDelng", sJobType, params); if (!jobExe.getStatus().equals(BatchStatus.COMPLETED)) { throw new Exception("Job execution error : Batchstatus(" + jobExe.getStatus() + "), ExitStatus(" + jobExe.getExitStatus() + ")" ); } } long endTime = System.currentTimeMillis(); log.info("[" + sThreadName + "]Job Type : " + lJobType.toString()); log.info("[" + sThreadName + "]Job Ended: " + endTime); log.info("[" + sThreadName + "]Running Time : " + (endTime - startTime) + "ms"); //작업종료에 대한 로그 업데이트 paramLog.put("exit_code", "0"); paramLog.put("exit_message", ""); matchingInnerDelingMapper.finishUserJob(paramLog); } @SuppressWarnings("rawtypes") @Async("extAsync") public void extraJobSub(String jobGroupId, Map paramRec) throws Exception { //Job Create Log UUID uuid = UUID.randomUUID(); HashMap mt = new HashMap(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); String sDate = dateFormat.format(new Date()) + ":" + uuid.toString(); Map paramLog = new HashMap(); paramLog.put("user_job_group", jobGroupId); paramLog.put("user_job_id", sDate); paramLog.put("user_job_name", "Extra매칭(" + paramRec.toString() + ")"); matchingInnerDelingMapper.createUserJob(paramLog); String sThreadName = Thread.currentThread().getName(); long startTime = System.currentTimeMillis(); log.info("extra [" + sThreadName + "]Job Started : " + startTime); log.info("extra [" + sThreadName + "]params=" + paramRec.toString()); MatchingExtraProcessorAuto matchingExtraProcessorAuto = new MatchingExtraProcessorAuto(matchingInnerDelingMapper); //2건씩 합산 매칭일 경우 최대 20000건 까지 for (int i=0; i<20000;i=i+1000) { matchingExtraProcessorAuto.process(paramRec, 1, 2, 0, i); } for (int i=0; i<20000;i=i+1000) { matchingExtraProcessorAuto.process(paramRec, 2, 1, i, 0); } for (int i=0; i<20000;i=i+1000) { matchingExtraProcessorAuto.process(paramRec, 2, 2, i, i); } //3건씩 매칭일 경우 최대 2000건 까지 for (int i=0; i<2000;i=i+50) { matchingExtraProcessorAuto.process(paramRec, 1, 3, 0, i); } for (int i=0; i<2000;i=i+50) { matchingExtraProcessorAuto.process(paramRec, 2, 3, 0, i); } for (int i=0; i<2000;i=i+50) { matchingExtraProcessorAuto.process(paramRec, 3, 3, 0, i); } for (int i=0; i<2000;i=i+50) { matchingExtraProcessorAuto.process(paramRec, 3, 1, i, 0); } for (int i=0; i<2000;i=i+50) { matchingExtraProcessorAuto.process(paramRec, 3, 2, i, 0); } for (int i=0; i<2000;i=i+50) { matchingExtraProcessorAuto.process(paramRec, 3, 3, i, 0); } //4건씩 매칭일 경우 최대 2000건 까지 for (int i=0; i<2000;i=i+25) { matchingExtraProcessorAuto.process(paramRec, 1, 4, 0, i); } for (int i=0; i<2000;i=i+25) { matchingExtraProcessorAuto.process(paramRec, 4, 1, i, 0); } long endTime = System.currentTimeMillis(); log.info("[" + sThreadName + "]Job Ended: " + endTime); log.info("[" + sThreadName + "]Running Time : " + (endTime - startTime) + "ms"); //작업종료에 대한 로그 업데이트 paramLog.put("exit_code", "0"); paramLog.put("exit_message", ""); matchingInnerDelingMapper.finishUserJob(paramLog); } @SuppressWarnings("rawtypes") @Async("extAsync") public void aiSubJobSub(String jobGroupId, Map paramRec) throws Exception { //Job Create Log UUID uuid = UUID.randomUUID(); HashMap mt = new HashMap(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); String sDate = dateFormat.format(new Date()) + ":" + uuid.toString(); Map paramLog = new HashMap(); paramLog.put("user_job_group", jobGroupId); paramLog.put("user_job_id", sDate); paramLog.put("user_job_name", "AI Sub 매칭(" + paramRec.toString() + ")"); matchingInnerDelingMapper.createUserJob(paramLog); String sThreadName = Thread.currentThread().getName(); long startTime = System.currentTimeMillis(); log.info("AI Sub [" + sThreadName + "]Job Started : " + startTime); log.debug("AI Sub [" + sThreadName + "]params=" + paramRec.toString()); MatchingAiSubProcessorAuto matchingAiSubProcessorAuto = new MatchingAiSubProcessorAuto(matchingInnerDelingMapper); //2건씩 합산 매칭일 경우 최대 20000건 까지 for (int i=0; i<20000;i=i+1000) { matchingAiSubProcessorAuto.process(paramRec, 1, 2, 0, i); } for (int i=0; i<20000;i=i+1000) { matchingAiSubProcessorAuto.process(paramRec, 2, 1, i, 0); } for (int i=0; i<20000;i=i+1000) { matchingAiSubProcessorAuto.process(paramRec, 2, 2, i, i); } //3건씩 매칭일 경우 최대 2000건 까지 for (int i=0; i<2000;i=i+50) { matchingAiSubProcessorAuto.process(paramRec, 1, 3, 0, i); } for (int i=0; i<2000;i=i+50) { matchingAiSubProcessorAuto.process(paramRec, 2, 3, 0, i); } for (int i=0; i<2000;i=i+50) { matchingAiSubProcessorAuto.process(paramRec, 3, 3, 0, i); } for (int i=0; i<2000;i=i+50) { matchingAiSubProcessorAuto.process(paramRec, 3, 1, i, 0); } for (int i=0; i<2000;i=i+50) { matchingAiSubProcessorAuto.process(paramRec, 3, 2, i, 0); } for (int i=0; i<2000;i=i+50) { matchingAiSubProcessorAuto.process(paramRec, 3, 3, i, 0); } //4건씩 매칭일 경우 최대 2000건 까지 for (int i=0; i<2000;i=i+25) { matchingAiSubProcessorAuto.process(paramRec, 1, 4, 0, i); } for (int i=0; i<2000;i=i+25) { matchingAiSubProcessorAuto.process(paramRec, 4, 1, i, 0); } long endTime = System.currentTimeMillis(); log.info("AI Sub [" + sThreadName + "]Job Ended: " + endTime); log.info("AI Sub [" + sThreadName + "]Running Time : " + (endTime - startTime) + "ms"); //작업종료에 대한 로그 업데이트 paramLog.put("exit_code", "0"); paramLog.put("exit_message", ""); matchingInnerDelingMapper.finishUserJob(paramLog); } @SuppressWarnings("rawtypes") @Async("aiAsync") public void aiJobSub(String jobGroupId, Map params) throws Exception { List retData = matchingInnerDelingMapper.getAiReadData(params); BigDecimal bdCurrentRowCount = BigDecimal.ZERO; List lThread = new ArrayList(); for(Map curMap : retData) { Map curMParams = new HashMap(); curMParams.putAll(curMap); curMParams.put("error_range", params.get("error_range")); BigDecimal bdCntAll = new BigDecimal(String.valueOf(curMap.get("cnt_all"))); bdCurrentRowCount = bdCurrentRowCount.add(bdCntAll); log.info("Call Matching Job (" + JsonUtil.objectToString(curMap) + ")"); ThreadAiMatching threadAiMatching = new ThreadAiMatching( jobGroupId, curMParams, matchingInnerDelingMapper, sPythonPrg, sPythonAiTarget ); threadAiMatching.start(); //전체카운트가 처리가능카운트를 넘어설때 if (bdCurrentRowCount.compareTo(bdAiAsyncMaxRowCount) > -1) { bdCurrentRowCount = BigDecimal.ZERO; //현재 작업그룹이 종료가 되었는지 체크해서 종료시 카운트 초기화 하고 다음작업을 한다. boolean blnStatus = true; Integer iTotalWait = 0; Integer iWaitTime = bdAiAsyncWaitTime.intValue() * 1000; //밀리초 do { Map mParam = new HashMap(); mParam.put("userJobGroup", jobGroupId); List lmJob = matchingInnerDelingMapper.getUserJobStatus(mParam); for (Map curJob : lmJob) { String ScurStatus = (String) curJob.get("status"); if (!"Finished".equalsIgnoreCase(ScurStatus)) { blnStatus = false; Thread.sleep(iWaitTime); iTotalWait = iTotalWait + iWaitTime; break; } } } while (!blnStatus && iTotalWait < bdAiAsyncTotalWaitTime.intValue()); //전체 기다리는 시간까지 체크 log.info("Next Thread Group Processing"); } } log.info("Current Group Process End"); } public JobExecution invokeJob(String jobName, String jobType, Map params) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException { UUID uuid = UUID.randomUUID(); HashMap mt = new HashMap(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); final String date = dateFormat.format(new Date()) + ":" + uuid.toString(); JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); jobParametersBuilder .addString("syncDate", date) .addString("jobType", jobType) .addString("sysSe", params.get("sysSe")) .addString("accnutYm", params.get("accnutYm")); if (params.get("cpr_code") != null) jobParametersBuilder.addString("cpr_code", params.get("cpr_code")); if (params.get("partn_cpr") != null) jobParametersBuilder.addString("partn_cpr", params.get("partn_cpr")); JobParameters jobParameters = jobParametersBuilder.toJobParameters(); var jobToStart = context.getBean(jobName, Job.class); JobExecution jobExe = jobLauncher.run(jobToStart, jobParameters); return jobExe; } @SuppressWarnings("rawtypes") @Async("commAsync") public void createData(String jobGroupId, Map params) throws Exception { //Job Create Log UUID uuid = UUID.randomUUID(); HashMap mt = new HashMap(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); String sDate = dateFormat.format(new Date()) + ":" + uuid.toString(); Map paramLog = new HashMap(); paramLog.put("user_job_group", jobGroupId); paramLog.put("user_job_id", sDate); paramLog.put("user_job_name", "작업데이타생성(" + params.toString() + ")"); matchingInnerDelingMapper.createUserJob(paramLog); long startTime = System.currentTimeMillis(); log.info("Create Data Started : " + startTime); log.debug("params=" + params.toString()); //기존데이타 삭제 int iDeleted = matchingInnerDelingMapper.deleteOriginalData(params); log.debug("Deleted OrgData : " + iDeleted + "건"); //신규데이타 생성 //매칭키에 대한 정보 (sql로 조인하여 조회하기에는 너무 느리고 데이타 중복도 발생함) List lMatchingInfo = oracleMapper.getMatchingInfo(params); Map mMatchingInfo = new HashMap(); for (Map curMap : lMatchingInfo) { String sKey = String.valueOf(curMap.get("SEQ")); mMatchingInfo.put(sKey, curMap); } List lOrgData = oracleMapper.getOriginalData(params); int iInserted = 0; int limit = 1000; //1000건씩 batch List lInserted = new ArrayList(); for (Map curRec : lOrgData) { String sKey = String.valueOf(curRec.get("SEQ")); Map curMatchingInfo = mMatchingInfo.get(sKey); if (curMatchingInfo != null) { curRec.put("MATCHING_CAUSE", curMatchingInfo.get("MATCHING_CAUSE")); curRec.put("MATCH_KEY", curMatchingInfo.get("MATCH_KEY")); } lInserted.add(curRec); if (lInserted.size() == limit) { matchingInnerDelingMapper.insertOriginalData(Map.of("itemList", lInserted)); iInserted = iInserted + lInserted.size(); lInserted.clear(); } } if (lInserted.size() > 0) { matchingInnerDelingMapper.insertOriginalData(Map.of("itemList", lInserted)); iInserted = iInserted + lInserted.size(); } log.info("Inserted OrgData : " + iInserted + "건"); iDeleted = matchingInnerDelingMapper.deleteData(params); log.debug("Deleted Work Data : " + iDeleted + "건"); iInserted = matchingInnerDelingMapper.insertDataFromOriginal(params); log.info("Inserted Work Data : " + iInserted + "건"); iDeleted = matchingInnerDelingMapper.deleteDataAi(params); log.debug("Deleted Work AI Data : " + iDeleted + "건"); iInserted = matchingInnerDelingMapper.insertDataAiFromOriginal(params); log.info("Inserted Work AI Data : " + iInserted + "건"); long endTime = System.currentTimeMillis(); log.info("Create Data Ended : " + endTime); log.info("Running Time : " + (endTime - startTime) + "ms"); //작업종료에 대한 로그 업데이트 paramLog.put("exit_code", "0"); paramLog.put("exit_message", ""); matchingInnerDelingMapper.finishUserJob(paramLog); } @SuppressWarnings("rawtypes") @Async("commAsync") public void returnRwsultData(String jobGroupId, Map params) throws Exception { //Job Create Log UUID uuid = UUID.randomUUID(); HashMap mt = new HashMap(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); String sDate = dateFormat.format(new Date()) + ":" + uuid.toString(); Map paramLog = new HashMap(); paramLog.put("user_job_group", jobGroupId); paramLog.put("user_job_id", sDate); paramLog.put("user_job_name", "결과데이타리턴(" + params.toString() + ")"); matchingInnerDelingMapper.createUserJob(paramLog); long startTime = System.currentTimeMillis(); log.info("Update Data Started : " + startTime); log.debug("params=" + params.toString()); //기존데이타 초기화 int iUpdated = matchingInnerDelingMapper.updateClearNewMatchKey(params); //새로운 매칭키 생성 iUpdated = matchingInnerDelingMapper.updateNewMatchKey(params); log.debug("Updated OrgData : " + iUpdated + "건"); long endTime = System.currentTimeMillis(); log.info("Update Data Ended : " + endTime); log.info("Running Time : " + (endTime - startTime) + "ms"); //작업종료에 대한 로그 업데이트 paramLog.put("exit_code", "0"); paramLog.put("exit_message", ""); matchingInnerDelingMapper.finishUserJob(paramLog); } @SuppressWarnings("rawtypes") @Async("commAsync") public void callAsyncJobSub(String jobGroupId, Map paramRec) throws Exception { //Job Create Log UUID uuid = UUID.randomUUID(); HashMap mt = new HashMap(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); String sDate = dateFormat.format(new Date()) + ":" + uuid.toString(); Map paramLog = new HashMap(); paramLog.put("user_job_group", jobGroupId); paramLog.put("user_job_id", sDate); paramLog.put("user_job_name", "OS Command Run(" + paramRec.toString() + ")"); matchingInnerDelingMapper.createUserJob(paramLog); long startTime = System.currentTimeMillis(); log.info("OS Command Run Started : " + startTime); log.info("OS Command Run params=" + paramRec.toString()); String sPrgName = (String) paramRec.get("PrgName"); String sPrgParam = (String) paramRec.get("PrgParams"); List lParams = StringUtil.StringToArrayList(sPrgParam, " "); ArrayList lcmd = new ArrayList(); lcmd.add(sPrgName); lcmd.addAll(lParams); lcmd.add(sDate); new ProcessExecutor() .command(lcmd) .redirectOutput(new LogOutputStream() { @Override protected void processLine(String line) { log.info(line); } }) .execute(); long endTime = System.currentTimeMillis(); log.info("OS Command Run Ended: " + endTime); log.info("OS Command Running Time : " + (endTime - startTime) + "ms"); // //작업종료에 대한 로그 업데이트 // paramLog.put("exit_code", "0"); // paramLog.put("exit_message", ""); // matchingInnerDelingMapper.finishUserJob(paramLog); } }