Blame view

src/main/java/com/batch/service/JobService.java 14.7 KB
8dc487b1   함상기   Init Version - 20...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
  package com.batch.service;
  
  import java.text.SimpleDateFormat;
  import java.util.ArrayList;
  import java.util.Arrays;
  import java.util.Date;
  import java.util.HashMap;
  import java.util.List;
  import java.util.Map;
  import java.util.UUID;
  
  import org.springframework.batch.core.BatchStatus;
  import org.springframework.batch.core.Job;
  import org.springframework.batch.core.JobExecution;
  import org.springframework.batch.core.JobParameters;
  import org.springframework.batch.core.JobParametersBuilder;
  import org.springframework.batch.core.JobParametersInvalidException;
  import org.springframework.batch.core.launch.JobLauncher;
  import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
  import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
  import org.springframework.batch.core.repository.JobRestartException;
  import org.springframework.beans.factory.annotation.Autowired;
  import org.springframework.beans.factory.annotation.Value;
  import org.springframework.context.ApplicationContext;
  import org.springframework.scheduling.annotation.Async;
  import org.springframework.stereotype.Service;
  import org.zeroturnaround.exec.ProcessExecutor;
  import org.zeroturnaround.exec.stream.LogOutputStream;
  
  import com.batch.config.MatchingExtraProcessorAuto;
  import com.batch.config.MatchingSetup;
  import com.batch.config.MatchingSetup.Matching;
  import com.batch.mapper.primary.MatchingInnerDelingMapper;
  import com.batch.mapper.secondary.OracleMapper;
  import com.batch.util.FileUtil;
  
78928007   함상기   20240305
37
  import com.batch.service.JobService;
8dc487b1   함상기   Init Version - 20...
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
  import lombok.extern.slf4j.Slf4j;
  
  @Slf4j
  @Service
  public class JobService {
  
  	@Value("${pytyon.path}")
  	String sPythonPrg;
  
  	@Value("${python.ai.target}")
  	String sPythonAiTarget;
  	
      @Autowired
      private JobLauncher jobLauncher;
  
      @Autowired
      private ApplicationContext context;
  
      @Autowired
      private MatchingInnerDelingMapper matchingInnerDelingMapper;
  
      @Autowired
      private OracleMapper oracleMapper;
      
      
      @SuppressWarnings("rawtypes")
  	@Async("commAsync")
78928007   함상기   20240305
65
      public void matchingJob(String jobGroupId, Map<String, String> params) throws Exception {
8dc487b1   함상기   Init Version - 20...
66
      	
78928007   함상기   20240305
67
68
69
70
71
72
73
74
75
76
77
78
79
      	//Job Create Log
      	UUID uuid = UUID.randomUUID();
      	HashMap<String, String> mt = new HashMap<String, String>();
          SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss");
          String sDate = dateFormat.format(new Date()) + ":" + uuid.toString();
      	    	
      	Map<String, Object> paramLog = new HashMap<String, Object>();
      	paramLog.put("user_job_group", jobGroupId);
      	paramLog.put("user_job_id", sDate);
      	paramLog.put("user_job_name", "자동매칭(" + params.toString() + ")");
      	matchingInnerDelingMapper.createUserJob(paramLog);
      	
      	String sThreadName = Thread.currentThread().getName();
8dc487b1   함상기   Init Version - 20...
80
      	long startTime = System.currentTimeMillis();
78928007   함상기   20240305
81
82
      	log.info("[" + sThreadName + "]Job Started : " + startTime);
      	log.debug("[" + sThreadName + "]params=" + params.toString());
8dc487b1   함상기   Init Version - 20...
83
84
85
86
87
88
89
90
91
92
93
94
95
96
      	
      	StringBuffer sb = FileUtil.readFileToString("matchingSetup.json");
      	MatchingSetup matchingSetup = (MatchingSetup) FileUtil.strToObj(sb.toString(), MatchingSetup.class);
  
      	String sJobTypeList = params.get("jobType").toUpperCase();
      	List<String> lJobType = Arrays.asList(sJobTypeList != null?sJobTypeList.split(","):new String[0]);
  
      	//파라미터가 ALL일 경우 ALL 대신 등록된 모든 Type를 넣어준다.
      	if (lJobType.size() > 0 && lJobType.indexOf("ALL") != -1) {
      		lJobType = new ArrayList();
      		for (Matching matching : matchingSetup.getMatchingSetup()) {
      			if (matching.getActive()) {
      				lJobType.add(matching.getType());
      			} else {
78928007   함상기   20240305
97
      				log.info("[" + sThreadName + "]JobType(" + matching.getType() + ") is Disabled");
8dc487b1   함상기   Init Version - 20...
98
99
100
101
102
103
      			}
      		}
      	}
      	
      	
      	for (String sJobType : lJobType) {
78928007   함상기   20240305
104
          	log.info("[" + sThreadName + "]Current running job type: " + sJobType);
8dc487b1   함상기   Init Version - 20...
105
106
107
108
109
110
111
          	JobExecution jobExe = invokeJob("matchingInnerDelng", sJobType, params);
          	if (!jobExe.getStatus().equals(BatchStatus.COMPLETED)) {
          		throw new Exception("Job execution error : Batchstatus(" + jobExe.getStatus() + "), ExitStatus(" + jobExe.getExitStatus() + ")" );
          	}
      	}
      	
      	long endTime = System.currentTimeMillis();
78928007   함상기   20240305
112
113
114
      	log.info("[" + sThreadName + "]Job Type : " + lJobType.toString());
      	log.info("[" + sThreadName + "]Job Ended: " + endTime);
      	log.info("[" + sThreadName + "]Running Time : " + (endTime - startTime) + "ms");
8dc487b1   함상기   Init Version - 20...
115
  
78928007   함상기   20240305
116
117
118
119
120
      	//작업종료에 대한 로그 업데이트
      	paramLog.put("exit_code", "0");
      	paramLog.put("exit_message", "");    	
      	matchingInnerDelingMapper.finishUserJob(paramLog);
      	
8dc487b1   함상기   Init Version - 20...
121
122
123
124
      }
      
  
      @SuppressWarnings("rawtypes")
78928007   함상기   20240305
125
126
127
128
129
130
131
132
133
134
135
136
137
138
  	@Async("extAsync")
      public void extraJobSub(String jobGroupId, Map paramRec) throws Exception {
      	
      	//Job Create Log
      	UUID uuid = UUID.randomUUID();
      	HashMap<String, String> mt = new HashMap<String, String>();
          SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss");
          String sDate = dateFormat.format(new Date()) + ":" + uuid.toString();
      	    	
      	Map<String, Object> paramLog = new HashMap<String, Object>();
      	paramLog.put("user_job_group", jobGroupId);
      	paramLog.put("user_job_id", sDate);
      	paramLog.put("user_job_name", "Extra매칭(" + paramRec.toString() + ")");
      	matchingInnerDelingMapper.createUserJob(paramLog);
8dc487b1   함상기   Init Version - 20...
139
      	
78928007   함상기   20240305
140
      	String sThreadName = Thread.currentThread().getName();
8dc487b1   함상기   Init Version - 20...
141
      	long startTime = System.currentTimeMillis();
406c670d   ggun12   - 대상목록, 매칭목록 에 거래...
142
      	log.info("extra [" + sThreadName + "]Job Started : " + startTime + "]params=" + paramRec.toString());
9c28833f   ggun12   aiJobSub : 작업종료 처...
143
      	log.debug("extra [" + sThreadName + "]params=" + paramRec.toString());
8dc487b1   함상기   Init Version - 20...
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
  
      	MatchingExtraProcessorAuto matchingExtraProcessorAuto = new MatchingExtraProcessorAuto(matchingInnerDelingMapper);
      	
      	//2건씩 합산 매칭일 경우 최대 20000건 까지
      	for (int i=0; i<20000;i=i+1000) {
          	matchingExtraProcessorAuto.process(paramRec, 1, 2, 0, i);
      	}
      	for (int i=0; i<20000;i=i+1000) {
          	matchingExtraProcessorAuto.process(paramRec, 2, 1, i, 0);
      	}
      	for (int i=0; i<20000;i=i+1000) {
          	matchingExtraProcessorAuto.process(paramRec, 2, 2, i, i);
      	}
      	
      	//3건씩 매칭일 경우 최대 5000건 까지
78928007   함상기   20240305
159
      	for (int i=0; i<2000;i=i+100) {
8dc487b1   함상기   Init Version - 20...
160
161
          	matchingExtraProcessorAuto.process(paramRec, 1, 3, 0, i);
      	}
78928007   함상기   20240305
162
      	for (int i=0; i<2000;i=i+100) {
8dc487b1   함상기   Init Version - 20...
163
164
          	matchingExtraProcessorAuto.process(paramRec, 3, 1, i, 0);
      	}
8dc487b1   함상기   Init Version - 20...
165
166
      	
      	long endTime = System.currentTimeMillis();
9c28833f   ggun12   aiJobSub : 작업종료 처...
167
168
      	log.info("extra [" + sThreadName + "]Job Ended: " + endTime);
      	log.info("extra [" + sThreadName + "]Running Time : " + (endTime - startTime) + "ms");
8dc487b1   함상기   Init Version - 20...
169
  
78928007   함상기   20240305
170
171
172
173
174
      	//작업종료에 대한 로그 업데이트
      	paramLog.put("exit_code", "0");
      	paramLog.put("exit_message", "");    	
      	matchingInnerDelingMapper.finishUserJob(paramLog);
      	
8dc487b1   함상기   Init Version - 20...
175
176
177
178
      }    
  
      @SuppressWarnings("rawtypes")
  	@Async("aiAsync")
78928007   함상기   20240305
179
      public void aiJobSub(String jobGroupId, Map paramRec) throws Exception {
8dc487b1   함상기   Init Version - 20...
180
181
182
183
184
185
186
187
188
      	
  
      	//Job Create Log
      	UUID uuid = UUID.randomUUID();
      	HashMap<String, String> mt = new HashMap<String, String>();
          SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss");
          String sDate = dateFormat.format(new Date()) + ":" + uuid.toString();
      	
      	Map<String, Object> paramLog = new HashMap<String, Object>();
78928007   함상기   20240305
189
      	paramLog.put("user_job_group", jobGroupId);
8dc487b1   함상기   Init Version - 20...
190
191
192
193
194
195
      	paramLog.put("user_job_id", sDate);
      	paramLog.put("user_job_name", "AI매칭(" + paramRec.toString() + ")");
      	matchingInnerDelingMapper.createUserJob(paramLog);
  
      	
      	long startTime = System.currentTimeMillis();
9c28833f   ggun12   aiJobSub : 작업종료 처...
196
197
      	log.info("ai Job Started : " + startTime);
      	log.debug("ai Job params=" + paramRec.toString());
8dc487b1   함상기   Init Version - 20...
198
199
200
201
202
203
      	
      	String sSysSe = (String) paramRec.get("sys_se");
      	String sAccnutYm = (String) paramRec.get("accnut_ym");
      	String sCprCode = (String) paramRec.get("cpr_code");
      	String sPartCpr = (String) paramRec.get("partn_cpr");
      	String sDelngCrncy = (String) paramRec.get("delng_crncy");
78928007   함상기   20240305
204
205
206
      	
      	String sThreadName = Thread.currentThread().getName();
      	
8dc487b1   함상기   Init Version - 20...
207
208
      	log.debug("call python");
      	new ProcessExecutor()
9c28833f   ggun12   aiJobSub : 작업종료 처...
209
  	    	.command(sPythonPrg, sPythonAiTarget, sDate, sSysSe, sAccnutYm, sCprCode, sPartCpr, sDelngCrncy)
8dc487b1   함상기   Init Version - 20...
210
211
212
213
214
215
216
217
218
  	        .redirectOutput(new LogOutputStream() {
  	          @Override
  	          protected void processLine(String line) {
  	            log.info(line);
  	          }
  	        })
  	        .execute();   	
       	
      	long endTime = System.currentTimeMillis();
9c28833f   ggun12   aiJobSub : 작업종료 처...
219
220
      	log.info("ai Job Ended: " + endTime);
      	log.info("ai Job Running Time : " + (endTime - startTime) + "ms");
8dc487b1   함상기   Init Version - 20...
221
222
  
      	
9c28833f   ggun12   aiJobSub : 작업종료 처...
223
224
225
226
  //    	//작업종료에 대한 로그 업데이트
  //    	paramLog.put("exit_code", "0");
  //    	paramLog.put("exit_message", "");    	
  //    	matchingInnerDelingMapper.finishUserJob(paramLog);
8dc487b1   함상기   Init Version - 20...
227
228
      	
      }
8dc487b1   함상기   Init Version - 20...
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
      
      
      public JobExecution invokeJob(String jobName, String jobType, Map<String, String> params) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
  
      	UUID uuid = UUID.randomUUID();
      	HashMap<String, String> mt = new HashMap<String, String>();
          SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss");
          final String date = dateFormat.format(new Date()) + ":" + uuid.toString();
          JobParameters jobParameters = new JobParametersBuilder()
                  .addString("syncDate", date)
                  .addString("jobType", jobType)
                  .addString("sysSe", params.get("sysSe"))
                  .addString("accnutYm", params.get("accnutYm"))
                  .addString("searchCond", params.get("searchCond"))
                  .toJobParameters(); 
  
          var jobToStart = context.getBean(jobName, Job.class);
          JobExecution jobExe = jobLauncher.run(jobToStart, jobParameters);
          
          return jobExe;
      }
      
      
      
      @SuppressWarnings("rawtypes")
  	@Async("commAsync")
78928007   함상기   20240305
255
      public void createData(String jobGroupId, Map<String, String> params) throws Exception {
8dc487b1   함상기   Init Version - 20...
256
257
258
259
260
261
262
263
      	
      	//Job Create Log
      	UUID uuid = UUID.randomUUID();
      	HashMap<String, String> mt = new HashMap<String, String>();
          SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss");
          String sDate = dateFormat.format(new Date()) + ":" + uuid.toString();
      	
      	Map<String, Object> paramLog = new HashMap<String, Object>();
78928007   함상기   20240305
264
      	paramLog.put("user_job_group", jobGroupId);
8dc487b1   함상기   Init Version - 20...
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
      	paramLog.put("user_job_id", sDate);
      	paramLog.put("user_job_name", "작업데이타생성(" + params.toString() + ")");
      	matchingInnerDelingMapper.createUserJob(paramLog);
  
      	
      	long startTime = System.currentTimeMillis();
      	log.info("Create Data Started : " + startTime);
      	log.debug("params=" + params.toString());
  
      	//기존데이타 삭제
  		int iDeleted = matchingInnerDelingMapper.deleteOriginalData(params);
      	log.debug("Deleted OrgData : " + iDeleted + "건");
      	
  		//신규데이타 생성
      	//매칭키에 대한 정보 (sql로 조인하여 조회하기에는 너무 느리고 데이타 중복도 발생함)
      	List<Map> lMatchingInfo = oracleMapper.getMatchingInfo(params);
      	Map<String, Map> mMatchingInfo = new HashMap<String, Map>();
      	for (Map curMap : lMatchingInfo) {
      		String sKey = String.valueOf(curMap.get("SEQ"));
      		mMatchingInfo.put(sKey, curMap);
      	}
  
      	List<Map> lOrgData = oracleMapper.getOriginalData(params);    	
      	int iInserted = 0;
      	int limit = 1000; //1000건씩 batch
      	List<Map> lInserted = new ArrayList<Map>();
      	for (Map curRec : lOrgData) {
      		String sKey = String.valueOf(curRec.get("SEQ"));
      		Map curMatchingInfo = mMatchingInfo.get(sKey);
      		if (curMatchingInfo != null) {
          		curRec.put("MATCHING_CAUSE", curMatchingInfo.get("MATCHING_CAUSE"));
          		curRec.put("MATCH_KEY", curMatchingInfo.get("MATCH_KEY"));
      		}
      		lInserted.add(curRec);
      		if (lInserted.size() == limit) {
      			matchingInnerDelingMapper.insertOriginalData(Map.of("itemList", lInserted));
      			iInserted = iInserted + lInserted.size();
      			lInserted.clear();
      		}
      	}
      	if (lInserted.size() > 0) {
      		matchingInnerDelingMapper.insertOriginalData(Map.of("itemList", lInserted));
  			iInserted = iInserted + lInserted.size();    		
      	}
      	log.info("Inserted OrgData : " + iInserted + "건");
      	
      	iDeleted = matchingInnerDelingMapper.deleteData(params);
      	log.debug("Deleted Work Data : " + iDeleted + "건");
      	iInserted = matchingInnerDelingMapper.insertDataFromOriginal(params);
      	log.info("Inserted Work Data : " + iInserted + "건");
      	
      	iDeleted = matchingInnerDelingMapper.deleteDataAi(params);
      	log.debug("Deleted Work AI Data : " + iDeleted + "건");
      	iInserted = matchingInnerDelingMapper.insertDataAiFromOriginal(params);
      	log.info("Inserted Work AI Data : " + iInserted + "건");
      	
      	long endTime = System.currentTimeMillis();
      	log.info("Create Data Ended : " + endTime);
      	log.info("Running Time : " + (endTime - startTime) + "ms");    
      	
      	//작업종료에 대한 로그 업데이트
      	paramLog.put("exit_code", "0");
      	paramLog.put("exit_message", "");    	
      	matchingInnerDelingMapper.finishUserJob(paramLog);
8dc487b1   함상기   Init Version - 20...
329
330
331
332
333
334
      }    
  
      
      
      @SuppressWarnings("rawtypes")
  	@Async("commAsync")
78928007   함상기   20240305
335
      public void returnRwsultData(String jobGroupId, Map<String, String> params) throws Exception {
8dc487b1   함상기   Init Version - 20...
336
337
338
339
340
341
342
343
      	
      	//Job Create Log
      	UUID uuid = UUID.randomUUID();
      	HashMap<String, String> mt = new HashMap<String, String>();
          SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss");
          String sDate = dateFormat.format(new Date()) + ":" + uuid.toString();
      	
      	Map<String, Object> paramLog = new HashMap<String, Object>();
78928007   함상기   20240305
344
      	paramLog.put("user_job_group", jobGroupId);
8dc487b1   함상기   Init Version - 20...
345
346
347
348
349
350
351
352
      	paramLog.put("user_job_id", sDate);
      	paramLog.put("user_job_name", "결과데이타리턴(" + params.toString() + ")");
      	matchingInnerDelingMapper.createUserJob(paramLog);
      	
      	long startTime = System.currentTimeMillis();
      	log.info("Update Data Started : " + startTime);
      	log.debug("params=" + params.toString());
  
78928007   함상기   20240305
353
354
355
356
      	//기존데이타 초기화
  		int iUpdated = matchingInnerDelingMapper.updateClearNewMatchKey(params);
      	//새로운 매칭키 생성
  		iUpdated = matchingInnerDelingMapper.updateNewMatchKey(params);
8dc487b1   함상기   Init Version - 20...
357
358
359
360
361
362
363
364
365
366
367
368
369
370
      	log.debug("Updated OrgData : " + iUpdated + "건");
  
      	long endTime = System.currentTimeMillis();
      	log.info("Update Data Ended : " + endTime);
      	log.info("Running Time : " + (endTime - startTime) + "ms");    	
      	
      	
      	//작업종료에 대한 로그 업데이트
      	paramLog.put("exit_code", "0");
      	paramLog.put("exit_message", "");    	
      	matchingInnerDelingMapper.finishUserJob(paramLog);
      	
      }      
  }