diff --git a/src/main/java/com/batch/controller/JobController.java b/src/main/java/com/batch/controller/JobController.java index 7849330..2954d48 100644 --- a/src/main/java/com/batch/controller/JobController.java +++ b/src/main/java/com/batch/controller/JobController.java @@ -34,16 +34,7 @@ public class JobController { @Autowired private JobService jobService; @Autowired - private MatchingInnerDelingMapper matchingInnerDelingMapper; - @Value("${thread.ai.count.rowmax}") - BigDecimal bdAiAsyncMaxRowCount; - @Value("${thread.ai.processing.wait}") - BigDecimal bdAiAsyncWaitTime; - @Value("${thread.ai.processing.wait.total}") - BigDecimal bdAiAsyncTotalWaitTime; - - - + private MatchingInnerDelingMapper matchingInnerDelingMapper; /** * TODO : AI 서버에 특정 경로에 생성되는 파일을 읽는(끝에 50줄 정도) API @@ -214,43 +205,7 @@ public class JobController { String sJobGroup = uuid.toString(); log.info("Start AI Matching Job"); - List retData = matchingInnerDelingMapper.getAiReadData(params); - BigDecimal bdCurrentRowCount = BigDecimal.ZERO; - for(Map curMap : retData) { - Map curMParams = new HashMap(); - curMParams.putAll(curMap); - curMParams.put("error_range", params.get("error_range")); - - BigDecimal bdCntAll = new BigDecimal(String.valueOf(curMap.get("cnt_all"))); - bdCurrentRowCount = bdCurrentRowCount.add(bdCntAll); - log.info("Call Matching Job (" + JsonUtil.objectToString(curMap) + ")"); - jobService.aiJobSub(sJobGroup, curMParams); - - //전체카운트가 처리가능카운트를 넘어설때 - if (bdCurrentRowCount.compareTo(bdAiAsyncMaxRowCount) > -1) { - bdCurrentRowCount = BigDecimal.ZERO; - - //현재 작업그룹이 종료가 되었는지 체크해서 종료시 카운트 초기화 하고 다음작업을 한다. - boolean blnStatus = true; - Integer iTotalWait = 0; - Integer iWaitTime = bdAiAsyncWaitTime.intValue() * 1000; //밀리초 - do { - Map mParam = new HashMap(); - mParam.put("userJobGroup", sJobGroup); - List lmJob = matchingInnerDelingMapper.getUserJobStatus(mParam); - for (Map curJob : lmJob) { - String ScurStatus = (String) curJob.get("status"); - if (!"Finished".equalsIgnoreCase(ScurStatus)) { - blnStatus = false; - Thread.sleep(iWaitTime); - iTotalWait = iTotalWait + iWaitTime; - break; - } - } - } while (!blnStatus && iTotalWait < bdAiAsyncTotalWaitTime.intValue()); //전체 기다리는 시간까지 체크 - log.info("Next Thread Group Processing"); - } - } + jobService.aiJobSub(sJobGroup, params); log.info("End AI Matching Job"); Map rtnMap = new HashMap(); diff --git a/src/main/java/com/batch/service/JobService.java b/src/main/java/com/batch/service/JobService.java index 72f0cbf..043b6cb 100644 --- a/src/main/java/com/batch/service/JobService.java +++ b/src/main/java/com/batch/service/JobService.java @@ -36,6 +36,7 @@ import com.batch.config.MatchingSetup.Matching; import com.batch.mapper.primary.MatchingInnerDelingMapper; import com.batch.mapper.secondary.OracleMapper; import com.batch.util.FileUtil; +import com.batch.util.JsonUtil; import com.batch.util.StringUtil; import com.batch.service.JobService; import lombok.extern.slf4j.Slf4j; @@ -53,6 +54,15 @@ public class JobService { @Value("${matching.auto.exceptListByComma}") String sExceptMatchType; + @Value("${thread.ai.count.rowmax}") + BigDecimal bdAiAsyncMaxRowCount; + + @Value("${thread.ai.processing.wait}") + BigDecimal bdAiAsyncWaitTime; + + @Value("${thread.ai.processing.wait.total}") + BigDecimal bdAiAsyncTotalWaitTime; + @Autowired private JobLauncher jobLauncher; @@ -282,63 +292,60 @@ public class JobService { matchingInnerDelingMapper.finishUserJob(paramLog); } - + @SuppressWarnings("rawtypes") @Async("aiAsync") - public void aiJobSub(String jobGroupId, Map paramRec) throws Exception { - - - //Job Create Log - UUID uuid = UUID.randomUUID(); - HashMap mt = new HashMap(); - SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); - String sDate = dateFormat.format(new Date()) + ":" + uuid.toString(); - - Map paramLog = new HashMap(); - paramLog.put("user_job_group", jobGroupId); - paramLog.put("user_job_id", sDate); - paramLog.put("user_job_name", "AI매칭(" + paramRec.toString() + ")"); - matchingInnerDelingMapper.createUserJob(paramLog); - - - long startTime = System.currentTimeMillis(); - log.info("ai Job Started : " + startTime); - log.debug("ai Job params=" + paramRec.toString()); - - String sSysSe = (String) paramRec.get("sys_se"); - String sAccnutYm = (String) paramRec.get("accnut_ym"); - String sCprCode = (String) paramRec.get("cpr_code"); - String sPartCpr = (String) paramRec.get("partn_cpr"); - String sDelngCrncy = (String) paramRec.get("delng_crncy"); - String sTbTy = (String) paramRec.get("tb_ty"); - String sErrorRange = (String) paramRec.get("error_range"); - - String sThreadName = Thread.currentThread().getName(); - - log.debug("call python"); - new ProcessExecutor() - .command(sPythonPrg, sPythonAiTarget, sDate, sSysSe, sAccnutYm, sCprCode, sPartCpr, sDelngCrncy, sErrorRange, sTbTy) - .redirectOutput(new LogOutputStream() { - @Override - protected void processLine(String line) { - log.info(line); - } - }) - .execute(); - - long endTime = System.currentTimeMillis(); - log.info("ai Job Ended: " + endTime); - log.info("ai Job Running Time : " + (endTime - startTime) + "ms"); + public void aiJobSub(String jobGroupId, Map params) throws Exception { + List retData = matchingInnerDelingMapper.getAiReadData(params); + BigDecimal bdCurrentRowCount = BigDecimal.ZERO; -// //작업종료에 대한 로그 업데이트 -// paramLog.put("exit_code", "0"); -// paramLog.put("exit_message", ""); -// matchingInnerDelingMapper.finishUserJob(paramLog); - + List lThread = new ArrayList(); + for(Map curMap : retData) { + Map curMParams = new HashMap(); + curMParams.putAll(curMap); + curMParams.put("error_range", params.get("error_range")); + + BigDecimal bdCntAll = new BigDecimal(String.valueOf(curMap.get("cnt_all"))); + bdCurrentRowCount = bdCurrentRowCount.add(bdCntAll); + log.info("Call Matching Job (" + JsonUtil.objectToString(curMap) + ")"); + ThreadAiMatching threadAiMatching = new ThreadAiMatching( + jobGroupId, + curMParams, + matchingInnerDelingMapper, + sPythonPrg, + sPythonAiTarget + ); + threadAiMatching.start(); + + //전체카운트가 처리가능카운트를 넘어설때 + if (bdCurrentRowCount.compareTo(bdAiAsyncMaxRowCount) > -1) { + bdCurrentRowCount = BigDecimal.ZERO; + + //현재 작업그룹이 종료가 되었는지 체크해서 종료시 카운트 초기화 하고 다음작업을 한다. + boolean blnStatus = true; + Integer iTotalWait = 0; + Integer iWaitTime = bdAiAsyncWaitTime.intValue() * 1000; //밀리초 + do { + Map mParam = new HashMap(); + mParam.put("userJobGroup", jobGroupId); + List lmJob = matchingInnerDelingMapper.getUserJobStatus(mParam); + for (Map curJob : lmJob) { + String ScurStatus = (String) curJob.get("status"); + if (!"Finished".equalsIgnoreCase(ScurStatus)) { + blnStatus = false; + Thread.sleep(iWaitTime); + iTotalWait = iTotalWait + iWaitTime; + break; + } + } + } while (!blnStatus && iTotalWait < bdAiAsyncTotalWaitTime.intValue()); //전체 기다리는 시간까지 체크 + log.info("Next Thread Group Processing"); + } + } + log.info("Current Group Process End"); } - public JobExecution invokeJob(String jobName, String jobType, Map params) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException { UUID uuid = UUID.randomUUID(); diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 21d5a99..7d11aa3 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -46,7 +46,7 @@ decorator.datasource.p6spy.enable-logging=true logging.level.root=info logging.level.com.batch=info -logging.level.p6spy=off +logging.level.p6spy=debug #Thread Count 설정 thread.comm.count=10 @@ -58,7 +58,7 @@ thread.ai.processing.wait.total=3600 #Python 프로퍼티 설정 pytyon.path=D:\\Programs\\devp\\python-3.12.2\\python.exe -python.ai.target=D:\\Working\\Python\\Test1.py +python.ai.target=D:\\Working\\Vue\\matching_ai\\src\\test\\resources\\TEST_ALL_V0.3.py #Auto Matching 제외 matching.auto.exceptListByComma=C-A-ROUND(-2) \ No newline at end of file -- libgit2 0.21.4