8dc487b1
함상기
Init Version - 20...
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
package com.batch.service;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.zeroturnaround.exec.ProcessExecutor;
import org.zeroturnaround.exec.stream.LogOutputStream;
import com.batch.config.MatchingExtraProcessorAuto;
import com.batch.config.MatchingSetup;
import com.batch.config.MatchingSetup.Matching;
import com.batch.mapper.primary.MatchingInnerDelingMapper;
import com.batch.mapper.secondary.OracleMapper;
import com.batch.util.FileUtil;
|
78928007
함상기
20240305
|
37
|
import com.batch.service.JobService;
|
8dc487b1
함상기
Init Version - 20...
|
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
public class JobService {
@Value("${pytyon.path}")
String sPythonPrg;
@Value("${python.ai.target}")
String sPythonAiTarget;
@Autowired
private JobLauncher jobLauncher;
@Autowired
private ApplicationContext context;
@Autowired
private MatchingInnerDelingMapper matchingInnerDelingMapper;
@Autowired
private OracleMapper oracleMapper;
@SuppressWarnings("rawtypes")
@Async("commAsync")
|
78928007
함상기
20240305
|
65
|
public void matchingJob(String jobGroupId, Map<String, String> params) throws Exception {
|
8dc487b1
함상기
Init Version - 20...
|
66
|
|
78928007
함상기
20240305
|
67
68
69
70
71
72
73
74
75
76
77
78
79
|
//Job Create Log
UUID uuid = UUID.randomUUID();
HashMap<String, String> mt = new HashMap<String, String>();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss");
String sDate = dateFormat.format(new Date()) + ":" + uuid.toString();
Map<String, Object> paramLog = new HashMap<String, Object>();
paramLog.put("user_job_group", jobGroupId);
paramLog.put("user_job_id", sDate);
paramLog.put("user_job_name", "자동매칭(" + params.toString() + ")");
matchingInnerDelingMapper.createUserJob(paramLog);
String sThreadName = Thread.currentThread().getName();
|
8dc487b1
함상기
Init Version - 20...
|
80
|
long startTime = System.currentTimeMillis();
|
78928007
함상기
20240305
|
81
82
|
log.info("[" + sThreadName + "]Job Started : " + startTime);
log.debug("[" + sThreadName + "]params=" + params.toString());
|
8dc487b1
함상기
Init Version - 20...
|
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
StringBuffer sb = FileUtil.readFileToString("matchingSetup.json");
MatchingSetup matchingSetup = (MatchingSetup) FileUtil.strToObj(sb.toString(), MatchingSetup.class);
String sJobTypeList = params.get("jobType").toUpperCase();
List<String> lJobType = Arrays.asList(sJobTypeList != null?sJobTypeList.split(","):new String[0]);
//파라미터가 ALL일 경우 ALL 대신 등록된 모든 Type를 넣어준다.
if (lJobType.size() > 0 && lJobType.indexOf("ALL") != -1) {
lJobType = new ArrayList();
for (Matching matching : matchingSetup.getMatchingSetup()) {
if (matching.getActive()) {
lJobType.add(matching.getType());
} else {
|
78928007
함상기
20240305
|
97
|
log.info("[" + sThreadName + "]JobType(" + matching.getType() + ") is Disabled");
|
8dc487b1
함상기
Init Version - 20...
|
98
99
100
101
102
103
|
}
}
}
for (String sJobType : lJobType) {
|
78928007
함상기
20240305
|
104
|
log.info("[" + sThreadName + "]Current running job type: " + sJobType);
|
8dc487b1
함상기
Init Version - 20...
|
105
106
107
108
109
110
111
|
JobExecution jobExe = invokeJob("matchingInnerDelng", sJobType, params);
if (!jobExe.getStatus().equals(BatchStatus.COMPLETED)) {
throw new Exception("Job execution error : Batchstatus(" + jobExe.getStatus() + "), ExitStatus(" + jobExe.getExitStatus() + ")" );
}
}
long endTime = System.currentTimeMillis();
|
78928007
함상기
20240305
|
112
113
114
|
log.info("[" + sThreadName + "]Job Type : " + lJobType.toString());
log.info("[" + sThreadName + "]Job Ended: " + endTime);
log.info("[" + sThreadName + "]Running Time : " + (endTime - startTime) + "ms");
|
8dc487b1
함상기
Init Version - 20...
|
115
|
|
78928007
함상기
20240305
|
116
117
118
119
120
|
//작업종료에 대한 로그 업데이트
paramLog.put("exit_code", "0");
paramLog.put("exit_message", "");
matchingInnerDelingMapper.finishUserJob(paramLog);
|
8dc487b1
함상기
Init Version - 20...
|
121
122
123
124
|
}
@SuppressWarnings("rawtypes")
|
78928007
함상기
20240305
|
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
@Async("extAsync")
public void extraJobSub(String jobGroupId, Map paramRec) throws Exception {
//Job Create Log
UUID uuid = UUID.randomUUID();
HashMap<String, String> mt = new HashMap<String, String>();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss");
String sDate = dateFormat.format(new Date()) + ":" + uuid.toString();
Map<String, Object> paramLog = new HashMap<String, Object>();
paramLog.put("user_job_group", jobGroupId);
paramLog.put("user_job_id", sDate);
paramLog.put("user_job_name", "Extra매칭(" + paramRec.toString() + ")");
matchingInnerDelingMapper.createUserJob(paramLog);
|
8dc487b1
함상기
Init Version - 20...
|
139
|
|
78928007
함상기
20240305
|
140
|
String sThreadName = Thread.currentThread().getName();
|
8dc487b1
함상기
Init Version - 20...
|
141
|
long startTime = System.currentTimeMillis();
|
78928007
함상기
20240305
|
142
143
|
log.info("[" + sThreadName + "]Job Started : " + startTime);
log.debug("[" + sThreadName + "]params=" + paramRec.toString());
|
8dc487b1
함상기
Init Version - 20...
|
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
MatchingExtraProcessorAuto matchingExtraProcessorAuto = new MatchingExtraProcessorAuto(matchingInnerDelingMapper);
//2건씩 합산 매칭일 경우 최대 20000건 까지
for (int i=0; i<20000;i=i+1000) {
matchingExtraProcessorAuto.process(paramRec, 1, 2, 0, i);
}
for (int i=0; i<20000;i=i+1000) {
matchingExtraProcessorAuto.process(paramRec, 2, 1, i, 0);
}
for (int i=0; i<20000;i=i+1000) {
matchingExtraProcessorAuto.process(paramRec, 2, 2, i, i);
}
//3건씩 매칭일 경우 최대 5000건 까지
|
78928007
함상기
20240305
|
159
|
for (int i=0; i<2000;i=i+100) {
|
8dc487b1
함상기
Init Version - 20...
|
160
161
|
matchingExtraProcessorAuto.process(paramRec, 1, 3, 0, i);
}
|
78928007
함상기
20240305
|
162
|
for (int i=0; i<2000;i=i+100) {
|
8dc487b1
함상기
Init Version - 20...
|
163
164
|
matchingExtraProcessorAuto.process(paramRec, 3, 1, i, 0);
}
|
8dc487b1
함상기
Init Version - 20...
|
165
166
|
long endTime = System.currentTimeMillis();
|
78928007
함상기
20240305
|
167
168
|
log.info("[" + sThreadName + "]Job Ended: " + endTime);
log.info("[" + sThreadName + "]Running Time : " + (endTime - startTime) + "ms");
|
8dc487b1
함상기
Init Version - 20...
|
169
|
|
78928007
함상기
20240305
|
170
171
172
173
174
|
//작업종료에 대한 로그 업데이트
paramLog.put("exit_code", "0");
paramLog.put("exit_message", "");
matchingInnerDelingMapper.finishUserJob(paramLog);
|
8dc487b1
함상기
Init Version - 20...
|
175
176
177
178
|
}
@SuppressWarnings("rawtypes")
@Async("aiAsync")
|
78928007
함상기
20240305
|
179
|
public void aiJobSub(String jobGroupId, Map paramRec) throws Exception {
|
8dc487b1
함상기
Init Version - 20...
|
180
181
182
183
184
185
186
187
188
|
//Job Create Log
UUID uuid = UUID.randomUUID();
HashMap<String, String> mt = new HashMap<String, String>();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss");
String sDate = dateFormat.format(new Date()) + ":" + uuid.toString();
Map<String, Object> paramLog = new HashMap<String, Object>();
|
78928007
함상기
20240305
|
189
|
paramLog.put("user_job_group", jobGroupId);
|
8dc487b1
함상기
Init Version - 20...
|
190
191
192
193
194
195
196
197
198
199
200
201
202
203
|
paramLog.put("user_job_id", sDate);
paramLog.put("user_job_name", "AI매칭(" + paramRec.toString() + ")");
matchingInnerDelingMapper.createUserJob(paramLog);
long startTime = System.currentTimeMillis();
log.info("Job Started : " + startTime);
log.debug("params=" + paramRec.toString());
String sSysSe = (String) paramRec.get("sys_se");
String sAccnutYm = (String) paramRec.get("accnut_ym");
String sCprCode = (String) paramRec.get("cpr_code");
String sPartCpr = (String) paramRec.get("partn_cpr");
String sDelngCrncy = (String) paramRec.get("delng_crncy");
|
78928007
함상기
20240305
|
204
205
206
|
String sThreadName = Thread.currentThread().getName();
|
8dc487b1
함상기
Init Version - 20...
|
207
208
|
log.debug("call python");
new ProcessExecutor()
|
78928007
함상기
20240305
|
209
|
.command(sPythonPrg, sPythonAiTarget, sThreadName, sSysSe, sAccnutYm, sCprCode, sPartCpr, sDelngCrncy)
|
8dc487b1
함상기
Init Version - 20...
|
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
|
.redirectOutput(new LogOutputStream() {
@Override
protected void processLine(String line) {
log.info(line);
}
})
.execute();
long endTime = System.currentTimeMillis();
log.info("Job Ended: " + endTime);
log.info("Running Time : " + (endTime - startTime) + "ms");
//작업종료에 대한 로그 업데이트
paramLog.put("exit_code", "0");
paramLog.put("exit_message", "");
matchingInnerDelingMapper.finishUserJob(paramLog);
}
|
8dc487b1
함상기
Init Version - 20...
|
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
|
public JobExecution invokeJob(String jobName, String jobType, Map<String, String> params) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
UUID uuid = UUID.randomUUID();
HashMap<String, String> mt = new HashMap<String, String>();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss");
final String date = dateFormat.format(new Date()) + ":" + uuid.toString();
JobParameters jobParameters = new JobParametersBuilder()
.addString("syncDate", date)
.addString("jobType", jobType)
.addString("sysSe", params.get("sysSe"))
.addString("accnutYm", params.get("accnutYm"))
.addString("searchCond", params.get("searchCond"))
.toJobParameters();
var jobToStart = context.getBean(jobName, Job.class);
JobExecution jobExe = jobLauncher.run(jobToStart, jobParameters);
return jobExe;
}
@SuppressWarnings("rawtypes")
@Async("commAsync")
|
78928007
함상기
20240305
|
255
|
public void createData(String jobGroupId, Map<String, String> params) throws Exception {
|
8dc487b1
함상기
Init Version - 20...
|
256
257
258
259
260
261
262
263
|
//Job Create Log
UUID uuid = UUID.randomUUID();
HashMap<String, String> mt = new HashMap<String, String>();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss");
String sDate = dateFormat.format(new Date()) + ":" + uuid.toString();
Map<String, Object> paramLog = new HashMap<String, Object>();
|
78928007
함상기
20240305
|
264
|
paramLog.put("user_job_group", jobGroupId);
|
8dc487b1
함상기
Init Version - 20...
|
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
|
paramLog.put("user_job_id", sDate);
paramLog.put("user_job_name", "작업데이타생성(" + params.toString() + ")");
matchingInnerDelingMapper.createUserJob(paramLog);
long startTime = System.currentTimeMillis();
log.info("Create Data Started : " + startTime);
log.debug("params=" + params.toString());
//기존데이타 삭제
int iDeleted = matchingInnerDelingMapper.deleteOriginalData(params);
log.debug("Deleted OrgData : " + iDeleted + "건");
//신규데이타 생성
//매칭키에 대한 정보 (sql로 조인하여 조회하기에는 너무 느리고 데이타 중복도 발생함)
List<Map> lMatchingInfo = oracleMapper.getMatchingInfo(params);
Map<String, Map> mMatchingInfo = new HashMap<String, Map>();
for (Map curMap : lMatchingInfo) {
String sKey = String.valueOf(curMap.get("SEQ"));
mMatchingInfo.put(sKey, curMap);
}
List<Map> lOrgData = oracleMapper.getOriginalData(params);
int iInserted = 0;
int limit = 1000; //1000건씩 batch
List<Map> lInserted = new ArrayList<Map>();
for (Map curRec : lOrgData) {
String sKey = String.valueOf(curRec.get("SEQ"));
Map curMatchingInfo = mMatchingInfo.get(sKey);
if (curMatchingInfo != null) {
curRec.put("MATCHING_CAUSE", curMatchingInfo.get("MATCHING_CAUSE"));
curRec.put("MATCH_KEY", curMatchingInfo.get("MATCH_KEY"));
}
lInserted.add(curRec);
if (lInserted.size() == limit) {
matchingInnerDelingMapper.insertOriginalData(Map.of("itemList", lInserted));
iInserted = iInserted + lInserted.size();
lInserted.clear();
}
}
if (lInserted.size() > 0) {
matchingInnerDelingMapper.insertOriginalData(Map.of("itemList", lInserted));
iInserted = iInserted + lInserted.size();
}
log.info("Inserted OrgData : " + iInserted + "건");
iDeleted = matchingInnerDelingMapper.deleteData(params);
log.debug("Deleted Work Data : " + iDeleted + "건");
iInserted = matchingInnerDelingMapper.insertDataFromOriginal(params);
log.info("Inserted Work Data : " + iInserted + "건");
iDeleted = matchingInnerDelingMapper.deleteDataAi(params);
log.debug("Deleted Work AI Data : " + iDeleted + "건");
iInserted = matchingInnerDelingMapper.insertDataAiFromOriginal(params);
log.info("Inserted Work AI Data : " + iInserted + "건");
long endTime = System.currentTimeMillis();
log.info("Create Data Ended : " + endTime);
log.info("Running Time : " + (endTime - startTime) + "ms");
//작업종료에 대한 로그 업데이트
paramLog.put("exit_code", "0");
paramLog.put("exit_message", "");
matchingInnerDelingMapper.finishUserJob(paramLog);
|
8dc487b1
함상기
Init Version - 20...
|
329
330
331
332
333
334
|
}
@SuppressWarnings("rawtypes")
@Async("commAsync")
|
78928007
함상기
20240305
|
335
|
public void returnRwsultData(String jobGroupId, Map<String, String> params) throws Exception {
|
8dc487b1
함상기
Init Version - 20...
|
336
337
338
339
340
341
342
343
|
//Job Create Log
UUID uuid = UUID.randomUUID();
HashMap<String, String> mt = new HashMap<String, String>();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss");
String sDate = dateFormat.format(new Date()) + ":" + uuid.toString();
Map<String, Object> paramLog = new HashMap<String, Object>();
|
78928007
함상기
20240305
|
344
|
paramLog.put("user_job_group", jobGroupId);
|
8dc487b1
함상기
Init Version - 20...
|
345
346
347
348
349
350
351
352
|
paramLog.put("user_job_id", sDate);
paramLog.put("user_job_name", "결과데이타리턴(" + params.toString() + ")");
matchingInnerDelingMapper.createUserJob(paramLog);
long startTime = System.currentTimeMillis();
log.info("Update Data Started : " + startTime);
log.debug("params=" + params.toString());
|
78928007
함상기
20240305
|
353
354
355
356
|
//기존데이타 초기화
int iUpdated = matchingInnerDelingMapper.updateClearNewMatchKey(params);
//새로운 매칭키 생성
iUpdated = matchingInnerDelingMapper.updateNewMatchKey(params);
|
8dc487b1
함상기
Init Version - 20...
|
357
358
359
360
361
362
363
364
365
366
367
368
369
370
|
log.debug("Updated OrgData : " + iUpdated + "건");
long endTime = System.currentTimeMillis();
log.info("Update Data Ended : " + endTime);
log.info("Running Time : " + (endTime - startTime) + "ms");
//작업종료에 대한 로그 업데이트
paramLog.put("exit_code", "0");
paramLog.put("exit_message", "");
matchingInnerDelingMapper.finishUserJob(paramLog);
}
}
|