Skip to content

Commit 93800c6

Browse files
hpoettkerfmbenhassine
authored andcommitted
Improve step execution polling and retrieval
Resolves #3790 --- Ported from c68da18 without the JSR related bits
1 parent c707284 commit 93800c6

File tree

3 files changed

+33
-28
lines changed

3 files changed

+33
-28
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/RemoteStepExecutionAggregator.java

+13-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2006-2013 the original author or authors.
2+
* Copyright 2006-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,9 +16,12 @@
1616

1717
package org.springframework.batch.core.partition.support;
1818

19-
import java.util.ArrayList;
2019
import java.util.Collection;
20+
import java.util.List;
21+
import java.util.Set;
22+
import java.util.stream.Collectors;
2123

24+
import org.springframework.batch.core.JobExecution;
2225
import org.springframework.batch.core.StepExecution;
2326
import org.springframework.batch.core.explore.JobExplorer;
2427
import org.springframework.beans.factory.InitializingBean;
@@ -88,14 +91,16 @@ public void aggregate(StepExecution result, Collection<StepExecution> executions
8891
if (executions == null) {
8992
return;
9093
}
91-
Collection<StepExecution> updates = new ArrayList<>();
92-
for (StepExecution stepExecution : executions) {
94+
Set<Long> stepExecutionIds = executions.stream().map(stepExecution -> {
9395
Long id = stepExecution.getId();
9496
Assert.state(id != null, "StepExecution has null id. It must be saved first: " + stepExecution);
95-
StepExecution update = jobExplorer.getStepExecution(stepExecution.getJobExecutionId(), id);
96-
Assert.state(update != null, "Could not reload StepExecution from JobRepository: " + stepExecution);
97-
updates.add(update);
98-
}
97+
return id;
98+
}).collect(Collectors.toSet());
99+
JobExecution jobExecution = jobExplorer.getJobExecution(result.getJobExecutionId());
100+
Assert.state(jobExecution != null,
101+
"Could not load JobExecution from JobRepository for id " + result.getJobExecutionId());
102+
List<StepExecution> updates = jobExecution.getStepExecutions().stream()
103+
.filter(stepExecution -> stepExecutionIds.contains(stepExecution.getId())).collect(Collectors.toList());
99104
delegate.aggregate(result, updates);
100105
}
101106

spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java

+9-16
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2009-2022 the original author or authors.
2+
* Copyright 2009-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,7 +16,6 @@
1616
package org.springframework.batch.integration.partition;
1717

1818
import java.util.HashSet;
19-
import java.util.Iterator;
2019
import java.util.List;
2120
import java.util.Set;
2221
import java.util.concurrent.Callable;
@@ -29,6 +28,7 @@
2928
import org.apache.commons.logging.Log;
3029
import org.apache.commons.logging.LogFactory;
3130

31+
import org.springframework.batch.core.JobExecution;
3232
import org.springframework.batch.core.Step;
3333
import org.springframework.batch.core.StepExecution;
3434
import org.springframework.batch.core.explore.JobExplorer;
@@ -256,20 +256,13 @@ private Set<StepExecution> pollReplies(final StepExecution managerStepExecution,
256256
Callable<Set<StepExecution>> callback = new Callable<Set<StepExecution>>() {
257257
@Override
258258
public Set<StepExecution> call() throws Exception {
259-
260-
for (Iterator<StepExecution> stepExecutionIterator = split.iterator(); stepExecutionIterator
261-
.hasNext();) {
262-
StepExecution curStepExecution = stepExecutionIterator.next();
263-
264-
if (!result.contains(curStepExecution)) {
265-
StepExecution partitionStepExecution = jobExplorer
266-
.getStepExecution(managerStepExecution.getJobExecutionId(), curStepExecution.getId());
267-
268-
if (!partitionStepExecution.getStatus().isRunning()) {
269-
result.add(partitionStepExecution);
270-
}
271-
}
272-
}
259+
Set<Long> currentStepExecutionIds = split.stream().map(StepExecution::getId)
260+
.collect(Collectors.toSet());
261+
JobExecution jobExecution = jobExplorer.getJobExecution(managerStepExecution.getJobExecutionId());
262+
jobExecution.getStepExecutions().stream()
263+
.filter(stepExecution -> currentStepExecutionIds.contains(stepExecution.getId()))
264+
.filter(stepExecution -> !result.contains(stepExecution))
265+
.filter(stepExecution -> !stepExecution.getStatus().isRunning()).forEach(result::add);
273266

274267
if (logger.isDebugEnabled()) {
275268
logger.debug(String.format("Currently waiting on %s partitions to finish", split.size()));

spring-batch-integration/src/test/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandlerTests.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2022 the original author or authors.
2+
* Copyright 2020-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.batch.integration.partition;
1818

19+
import java.util.Arrays;
1920
import java.util.Collection;
2021
import java.util.Collections;
2122
import java.util.HashSet;
@@ -174,8 +175,12 @@ void testHandleWithJobRepositoryPolling() throws Exception {
174175
stepExecutions.add(partition2);
175176
stepExecutions.add(partition3);
176177
when(stepExecutionSplitter.split(any(StepExecution.class), eq(1))).thenReturn(stepExecutions);
177-
when(jobExplorer.getStepExecution(eq(5L), any(Long.class))).thenReturn(partition2, partition1, partition3,
178-
partition3, partition3, partition3, partition4);
178+
JobExecution runningJobExecution = new JobExecution(5L, new JobParameters());
179+
runningJobExecution.addStepExecutions(Arrays.asList(partition2, partition1, partition3));
180+
JobExecution completedJobExecution = new JobExecution(5L, new JobParameters());
181+
completedJobExecution.addStepExecutions(Arrays.asList(partition2, partition1, partition4));
182+
when(jobExplorer.getJobExecution(5L)).thenReturn(runningJobExecution, runningJobExecution, runningJobExecution,
183+
completedJobExecution);
179184

180185
// set
181186
messageChannelPartitionHandler.setMessagingOperations(operations);
@@ -220,7 +225,9 @@ void testHandleWithJobRepositoryPollingTimeout() throws Exception {
220225
stepExecutions.add(partition2);
221226
stepExecutions.add(partition3);
222227
when(stepExecutionSplitter.split(any(StepExecution.class), eq(1))).thenReturn(stepExecutions);
223-
when(jobExplorer.getStepExecution(eq(5L), any(Long.class))).thenReturn(partition2, partition1, partition3);
228+
JobExecution runningJobExecution = new JobExecution(5L, new JobParameters());
229+
runningJobExecution.addStepExecutions(Arrays.asList(partition2, partition1, partition3));
230+
when(jobExplorer.getJobExecution(5L)).thenReturn(runningJobExecution);
224231

225232
// set
226233
messageChannelPartitionHandler.setMessagingOperations(operations);

0 commit comments

Comments
 (0)