Skip to content

Commit 62d1ddc

Browse files
author
Corneil du Plessis
authored
Improve tasks executions (spring-cloud#5747)
* Reduce the number of sub queries for tasks/executions * Added CSVLoader and TaskExecutionQueryIT to test Performance improvement of thinexecutions. * Added regression check on thin execution performance. * Remove action that fails in CI-PR * Improved manifest query. Updated formatting and copyright dates.
1 parent 4a10749 commit 62d1ddc

32 files changed

+33152
-38
lines changed
+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
name: CI IT Performance
2+
3+
on:
4+
workflow_dispatch:
5+
schedule:
6+
- cron: '0 6 * * 1'
7+
8+
jobs:
9+
test:
10+
name: Performance IT
11+
runs-on: ubuntu-latest
12+
steps:
13+
- uses: actions/checkout@v4
14+
- uses: actions/setup-java@v3
15+
with:
16+
java-version: '8'
17+
distribution: 'liberica'
18+
- uses: jvalkeal/setup-maven@v1
19+
with:
20+
maven-version: 3.8.8
21+
maven-mirror: 'https://dlcdn.apache.org/maven/maven-3/'
22+
- name: Login dockerhub
23+
uses: docker/login-action@v3
24+
with:
25+
username: ${{ secrets.DOCKERHUB_USERNAME }}
26+
password: ${{ secrets.DOCKERHUB_TOKEN }}
27+
28+
- name: Run Performance IT
29+
run: |
30+
mvn \
31+
-s .settings.xml \
32+
-pl spring-cloud-dataflow-server \
33+
-Dgroups=performance \
34+
-Pfailsafe \
35+
--batch-mode \
36+
integration-test
37+
- name: Test Report
38+
uses: dorny/test-reporter@v1
39+
if: ${{ success() || failure() }}
40+
with:
41+
name: Integration Tests
42+
path: '**/failsafe-reports/*IT.xml'
43+
reporter: java-junit
44+
list-tests: failed
45+
- name: 'Action: Upload Unit Test Results'
46+
if: ${{ always() }}
47+
uses: actions/upload-artifact@v3
48+
with:
49+
name: ${{ matrix.group }}-test-results-surefire
50+
path: './**/target/surefire-reports/**/*.*'
51+
retention-days: 7
52+
if-no-files-found: ignore
53+
- name: 'Action: Upload Integration Test Results'
54+
if: ${{ always() }}
55+
uses: actions/upload-artifact@v3
56+
with:
57+
name: ${{ matrix.group }}-test-results-failsafe
58+
path: './**/target/failsafe-reports/**/*.*'
59+
retention-days: 7
60+
if-no-files-found: ignore

.github/workflows/ci-pr.yml

+1-16
Original file line numberDiff line numberDiff line change
@@ -8,37 +8,22 @@ jobs:
88
runs-on: ubuntu-latest
99
steps:
1010
- uses: actions/checkout@v4
11-
# cache maven repo
1211
- uses: jlumbroso/free-disk-space@main
1312
with:
1413
tool-cache: false
1514
dotnet: false
1615
docker-images: false
1716
swap-storage: false
1817
large-packages: false
19-
# jdk8
2018
- uses: actions/setup-java@v3
2119
with:
2220
java-version: '8'
2321
distribution: 'liberica'
24-
- uses: jvalkeal/setup-maven@v1
25-
with:
26-
maven-version: 3.8.8
27-
maven-mirror: 'https://dlcdn.apache.org/maven/maven-3/'
28-
# build
2922
- name: Build
3023
shell: bash
3124
timeout-minutes: 75
3225
run: |
33-
mvn -B -s .github/settings.xml clean install
34-
- name: Test Report
35-
uses: dorny/test-reporter@v1
36-
if: ${{ success() || failure() }}
37-
with:
38-
name: Unit Tests
39-
path: '**/surefire-reports/*.xml'
40-
reporter: java-junit
41-
list-tests: failed
26+
./mvnw -B -s .github/settings.xml clean install
4227
- name: Capture Test Results
4328
if: failure()
4429
uses: actions/upload-artifact@v3

spring-cloud-dataflow-docs/src/main/asciidoc/configuration.adoc

+5-2
Original file line numberDiff line numberDiff line change
@@ -431,9 +431,10 @@ access the REST API. To do so, retrieve an
431431
OAuth2 Access Token from your OAuth2 provider and pass that access token to
432432
the REST Api by using the *Authorization* HTTP header, as follows:
433433

434-
```
434+
[source, shell]
435+
----
435436
$ curl -H "Authorization: Bearer <ACCESS_TOKEN>" http://localhost:9393/ -H 'Accept: application/json'
436-
```
437+
----
437438

438439
[[configuration-security-customizing-authorization]]
439440
==== Customizing Authorization
@@ -659,6 +660,8 @@ spring:
659660
- POST /tasks/executions/* => hasRole('ROLE_DEPLOY')
660661
- DELETE /tasks/executions/* => hasRole('ROLE_DESTROY')
661662
663+
- GET /tasks/thinexecutions => hasRole('ROLE_VIEW')
664+
662665
# Task Schedules
663666
664667
- GET /tasks/schedules => hasRole('ROLE_VIEW')

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/JdbcSearchableJobExecutionDao.java

+33-2
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,13 @@
4444
import org.springframework.core.convert.support.DefaultConversionService;
4545
import org.springframework.dao.EmptyResultDataAccessException;
4646
import org.springframework.dao.IncorrectResultSizeDataAccessException;
47+
import org.springframework.jdbc.core.JdbcOperations;
4748
import org.springframework.jdbc.core.JdbcTemplate;
4849
import org.springframework.jdbc.core.RowCallbackHandler;
4950
import org.springframework.jdbc.core.RowMapper;
5051
import org.springframework.jdbc.support.incrementer.AbstractDataFieldMaxValueIncrementer;
5152
import org.springframework.util.Assert;
53+
import org.springframework.util.StringUtils;
5254

5355
/**
5456
* @author Dave Syer
@@ -117,7 +119,9 @@ public class JdbcSearchableJobExecutionDao extends JdbcJobExecutionDao implement
117119
private static final String GET_EXECUTION_BY_ID_5 = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION"
118120
+ " from %PREFIX%JOB_EXECUTION where JOB_EXECUTION_ID = ?";
119121

120-
private static final String FROM_CLAUSE_TASK_TASK_BATCH = "%PREFIX%TASK_BATCH B";
122+
private static final String FROM_CLAUSE_TASK_TASK_BATCH = "%TASK_PREFIX%TASK_BATCH B";
123+
124+
private static final String GET_JOB_EXECUTIONS_BY_TASK_IDS = "SELECT JOB_EXECUTION_ID, TASK_EXECUTION_ID from %TASK_PREFIX%TASK_BATCH WHERE TASK_EXECUTION_ID in (?)";
121125

122126
private PagingQueryProvider allExecutionsPagingQueryProvider;
123127

@@ -142,6 +146,7 @@ public class JdbcSearchableJobExecutionDao extends JdbcJobExecutionDao implement
142146
private DataSource dataSource;
143147

144148
private BatchVersion batchVersion;
149+
private String taskTablePrefix;
145150

146151
public JdbcSearchableJobExecutionDao() {
147152
this(BatchVersion.BATCH_4);
@@ -161,6 +166,10 @@ public void setDataSource(DataSource dataSource) {
161166
this.dataSource = dataSource;
162167
}
163168

169+
public void setTaskTablePrefix(String taskTablePrefix) {
170+
this.taskTablePrefix = taskTablePrefix;
171+
}
172+
164173
/**
165174
* @see JdbcJobExecutionDao#afterPropertiesSet()
166175
*/
@@ -190,12 +199,15 @@ protected long getNextKey() {
190199
byJobInstanceIdWithStepCountPagingQueryProvider = getPagingQueryProvider(FIELDS_WITH_STEP_COUNT, null,
191200
JOB_INSTANCE_ID_FILTER);
192201
byTaskExecutionIdWithStepCountPagingQueryProvider = getPagingQueryProvider(FIELDS_WITH_STEP_COUNT,
193-
FROM_CLAUSE_TASK_TASK_BATCH, TASK_EXECUTION_ID_FILTER);
202+
getTaskQuery(FROM_CLAUSE_TASK_TASK_BATCH), TASK_EXECUTION_ID_FILTER);
194203

195204
super.afterPropertiesSet();
196205

197206
}
198207

208+
protected String getTaskQuery(String base) {
209+
return StringUtils.replace(base, "%TASK_PREFIX%", taskTablePrefix);
210+
}
199211
@Override
200212
public List<JobExecution> findJobExecutions(JobInstance job) {
201213
Assert.notNull(job, "Job cannot be null.");
@@ -517,6 +529,25 @@ public List<JobExecutionWithStepCount> getJobExecutionsWithStepCount(int start,
517529
}
518530
}
519531

532+
@Override
533+
public Map<Long, Set<Long>> getJobExecutionsByTaskIds(Collection<Long> ids) {
534+
JdbcOperations jdbcTemplate = getJdbcTemplate();
535+
String strIds = StringUtils.collectionToCommaDelimitedString(ids);
536+
537+
String sql = getTaskQuery(GET_JOB_EXECUTIONS_BY_TASK_IDS).replace("?", strIds);
538+
return jdbcTemplate.query(sql,
539+
rs -> {
540+
final Map<Long, Set<Long>> results = new HashMap<>();
541+
while (rs.next()) {
542+
Long taskExecutionId = rs.getLong("TASK_EXECUTION_ID");
543+
Long jobExecutionId = rs.getLong("JOB_EXECUTION_ID");
544+
Set<Long> jobs = results.computeIfAbsent(taskExecutionId, aLong -> new HashSet<>());
545+
jobs.add(jobExecutionId);
546+
}
547+
return results;
548+
});
549+
}
550+
520551
@Override
521552
public void saveJobExecution(JobExecution jobExecution) {
522553
throw new UnsupportedOperationException("SearchableJobExecutionDao is read only");

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/JobService.java

+9
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
import java.util.Collection;
1919
import java.util.Date;
20+
import java.util.Map;
21+
import java.util.Set;
2022

2123
import org.springframework.batch.core.BatchStatus;
2224
import org.springframework.batch.core.Job;
@@ -409,4 +411,11 @@ Collection<JobExecutionWithStepCount> listJobExecutionsForJobWithStepCount(Date
409411
*/
410412
@Deprecated
411413
Collection<JobExecutionWithStepCount> listJobExecutionsForJobWithStepCountFilteredByTaskExecutionId(int taskExecutionId, int start, int count);
414+
415+
/**
416+
* Returns a collection job execution ids given a collection of task execution ids that is mapped by id.
417+
* @param taskExecutionId
418+
* @return
419+
*/
420+
Map<Long, Set<Long>> getJobExecutionIdsByTaskExecutionIds(Collection<Long> taskExecutionId);
412421
}

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SearchableJobExecutionDao.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,16 @@
1818
import java.util.Collection;
1919
import java.util.Date;
2020
import java.util.List;
21+
import java.util.Map;
22+
import java.util.Set;
2123

2224
import org.springframework.batch.core.BatchStatus;
2325
import org.springframework.batch.core.JobExecution;
2426
import org.springframework.batch.core.repository.dao.JobExecutionDao;
2527

2628
/**
2729
* @author Dave Syer
28-
*
30+
* @author Corneil du Plessis
2931
*/
3032
public interface SearchableJobExecutionDao extends JobExecutionDao {
3133

@@ -99,6 +101,11 @@ public interface SearchableJobExecutionDao extends JobExecutionDao {
99101
*/
100102
List<JobExecutionWithStepCount> getJobExecutionsWithStepCount(int start, int count);
101103

104+
/**
105+
* @param ids the set of task execution ids.
106+
* @return Map with the TaskExecution id as the key and the set of job execution ids as values.
107+
*/
108+
Map<Long, Set<Long>> getJobExecutionsByTaskIds(Collection<Long> ids);
102109
/**
103110
* Gets count of job executions.
104111
*

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobService.java

+6
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Iterator;
2626
import java.util.LinkedHashSet;
2727
import java.util.List;
28+
import java.util.Map;
2829
import java.util.Objects;
2930
import java.util.Properties;
3031
import java.util.Set;
@@ -468,6 +469,11 @@ public Collection<JobExecutionWithStepCount> listJobExecutionsForJobWithStepCoun
468469
return jobExecutionDao.getJobExecutionsWithStepCountFilteredByTaskExecutionId(taskExecutionId, start, count);
469470
}
470471

472+
@Override
473+
public Map<Long, Set<Long>> getJobExecutionIdsByTaskExecutionIds(Collection<Long> taskExecutionIds) {
474+
return this.jobExecutionDao.getJobExecutionsByTaskIds(taskExecutionIds);
475+
}
476+
471477
private List<JobExecution> getJobExecutions(String jobName, BatchStatus status, int pageOffset, int pageSize) {
472478
if (StringUtils.isEmpty(jobName)) {
473479
if (status != null) {

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobServiceFactoryBean.java

+6
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public class SimpleJobServiceFactoryBean implements FactoryBean<JobService>, Ini
7373
private String databaseType;
7474

7575
private String tablePrefix = AbstractJdbcBatchMetadataDao.DEFAULT_TABLE_PREFIX;
76+
private String taskTablePrefix = "TASK_";
7677

7778
private DataFieldMaxValueIncrementerFactory incrementerFactory;
7879

@@ -165,6 +166,10 @@ public void setTablePrefix(String tablePrefix) {
165166
this.tablePrefix = tablePrefix;
166167
}
167168

169+
public void setTaskTablePrefix(String taskTablePrefix) {
170+
this.taskTablePrefix = taskTablePrefix;
171+
}
172+
168173
/**
169174
* Sets the {@link JobServiceContainer} for the service.
170175
* @param jobServiceContainer the JobServiceContainer for this service.
@@ -270,6 +275,7 @@ protected SearchableJobExecutionDao createJobExecutionDao() throws Exception {
270275
dao.setJobExecutionIncrementer(incrementerFactory.getIncrementer(databaseType, tablePrefix
271276
+ "JOB_EXECUTION_SEQ"));
272277
dao.setTablePrefix(tablePrefix);
278+
dao.setTaskTablePrefix(taskTablePrefix);
273279
dao.setClobTypeToUse(determineClobTypeToUse(this.databaseType));
274280
dao.setExitMessageLength(maxVarCharLength);
275281
dao.afterPropertiesSet();

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionController.java

+32-13
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@
1919
import java.util.ArrayList;
2020
import java.util.Arrays;
2121
import java.util.Collection;
22+
import java.util.HashMap;
2223
import java.util.HashSet;
2324
import java.util.List;
2425
import java.util.Map;
26+
import java.util.Objects;
2527
import java.util.Set;
28+
import java.util.function.Function;
2629
import java.util.stream.Collectors;
2730

2831
import org.slf4j.Logger;
@@ -376,18 +379,34 @@ public void stop(
376379
}
377380

378381
private Page<TaskJobExecutionRel> getPageableRelationships(Page<AggregateTaskExecution> taskExecutions, Pageable pageable) {
379-
List<TaskJobExecutionRel> taskJobExecutionRels = new ArrayList<>();
380-
for (AggregateTaskExecution taskExecution : taskExecutions.getContent()) {
381-
TaskManifest taskManifest = this.taskExecutionService.findTaskManifestById(taskExecution.getExecutionId(), taskExecution.getSchemaTarget());
382-
taskManifest = this.taskSanitizer.sanitizeTaskManifest(taskManifest);
383-
List<Long> jobExecutionIds = new ArrayList<>(
384-
this.explorer.getJobExecutionIdsByTaskExecutionId(taskExecution.getExecutionId(), taskExecution.getSchemaTarget()));
385-
taskJobExecutionRels
386-
.add(new TaskJobExecutionRel(sanitizeTaskExecutionArguments(taskExecution),
387-
jobExecutionIds,
388-
taskManifest, getCtrTaskJobExecution(taskExecution, jobExecutionIds)));
389-
}
390-
return new PageImpl<>(taskJobExecutionRels, pageable, taskExecutions.getTotalElements());
382+
final Map<String, TaskJobExecutionRel> taskJobExecutionRelMap = new HashMap<>();
383+
Map<String, List<AggregateTaskExecution>> schemaGroups = taskExecutions.getContent()
384+
.stream()
385+
.collect(Collectors.groupingBy(AggregateTaskExecution::getSchemaTarget));
386+
schemaGroups.forEach((schemaTarget,aggregateTaskExecutions) -> {
387+
Map<Long, AggregateTaskExecution> taskMap = aggregateTaskExecutions.stream().collect(Collectors.toMap(AggregateTaskExecution::getExecutionId, Function.identity()));
388+
Set<Long> ids = taskMap.keySet();
389+
Map<Long, TaskManifest> manifests = this.taskExecutionService.findTaskManifestByIds(ids, schemaTarget);
390+
Map<Long, Set<Long>> jobExecutionIdMap = this.taskJobService.getJobExecutionIdsByTaskExecutionIds(ids, schemaTarget);
391+
taskMap.values().forEach(taskExecution -> {
392+
long id = taskExecution.getExecutionId();
393+
TaskManifest taskManifest = manifests.get(id);
394+
if(taskManifest != null) {
395+
taskManifest = this.taskSanitizer.sanitizeTaskManifest(taskManifest);
396+
}
397+
Set<Long> jobIds = jobExecutionIdMap.computeIfAbsent(id, aLong -> new HashSet<>());
398+
List<Long> jobExecutionIds = new ArrayList<>(jobIds);
399+
TaskJobExecutionRel rel = new TaskJobExecutionRel(sanitizeTaskExecutionArguments(taskExecution),
400+
jobExecutionIds,
401+
taskManifest, getCtrTaskJobExecution(taskExecution, jobExecutionIds));
402+
taskJobExecutionRelMap.put(schemaTarget + ":" + id, rel);
403+
});
404+
});
405+
List<TaskJobExecutionRel> taskJobExecutionContent = taskExecutions.stream()
406+
.map(aggregateTaskExecution -> taskJobExecutionRelMap.get(aggregateTaskExecution.getSchemaTarget() + ":" + aggregateTaskExecution.getExecutionId()))
407+
.filter(Objects::nonNull)
408+
.collect(Collectors.toList());
409+
return new PageImpl<>(taskJobExecutionContent, pageable, taskExecutions.getTotalElements());
391410
}
392411

393412

@@ -405,7 +424,7 @@ private TaskJobExecution getCtrTaskJobExecution(AggregateTaskExecution taskExecu
405424
TaskDefinition taskDefinition = this.taskDefinitionRepository.findByTaskName(taskExecution.getTaskName());
406425
if (taskDefinition != null) {
407426
TaskParser parser = new TaskParser(taskExecution.getTaskName(), taskDefinition.getDslText(), true, false);
408-
if (jobExecutionIds.size() > 0 && parser.parse().isComposed()) {
427+
if (!jobExecutionIds.isEmpty() && parser.parse().isComposed()) {
409428
try {
410429
taskJobExecution = this.taskJobService.getJobExecution(jobExecutionIds.toArray(new Long[0])[0], taskExecution.getSchemaTarget());
411430
} catch (NoSuchJobExecutionException noSuchJobExecutionException) {

0 commit comments

Comments
 (0)