diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/jsr/launch/JsrJobOperator.java b/spring-batch-core/src/main/java/org/springframework/batch/core/jsr/launch/JsrJobOperator.java index 51c11a9741..e8f43dc161 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/jsr/launch/JsrJobOperator.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/jsr/launch/JsrJobOperator.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2019 the original author or authors. + * Copyright 2013-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; +import java.util.stream.Collectors; import javax.batch.operations.BatchRuntimeException; import javax.batch.operations.JobExecutionAlreadyCompleteException; import javax.batch.operations.JobExecutionIsRunningException; @@ -47,6 +48,7 @@ import org.apache.commons.logging.LogFactory; import org.springframework.batch.core.BatchStatus; +import org.springframework.batch.core.Entity; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobParameters; @@ -412,9 +414,11 @@ public List getStepExecutions(long executionId) List batchExecutions = new ArrayList<>(); if(executions != null) { - for (org.springframework.batch.core.StepExecution stepExecution : executions) { - if(!stepExecution.getStepName().contains(":partition")) { - batchExecutions.add(new JsrStepExecution(jobExplorer.getStepExecution(executionId, stepExecution.getId()))); + Set stepExecutionIds = executions.stream().map(Entity::getId).collect(Collectors.toSet()); + org.springframework.batch.core.JobExecution jobExecution = jobExplorer.getJobExecution(executionId); + for (org.springframework.batch.core.StepExecution stepExecution : jobExecution.getStepExecutions()) { + if(!stepExecution.getStepName().contains(":partition") && stepExecutionIds.contains(stepExecution.getId())) { + batchExecutions.add(new JsrStepExecution(stepExecution)); } } } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/RemoteStepExecutionAggregator.java b/spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/RemoteStepExecutionAggregator.java index dd8b60d732..53914ad468 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/RemoteStepExecutionAggregator.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/RemoteStepExecutionAggregator.java @@ -1,5 +1,5 @@ /* - * Copyright 2006-2013 the original author or authors. + * Copyright 2006-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,9 +16,12 @@ package org.springframework.batch.core.partition.support; -import java.util.ArrayList; import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.explore.JobExplorer; import org.springframework.beans.factory.InitializingBean; @@ -90,14 +93,16 @@ public void aggregate(StepExecution result, Collection executions if (executions == null) { return; } - Collection updates = new ArrayList<>(); - for (StepExecution stepExecution : executions) { + Set stepExecutionIds = executions.stream().map(stepExecution -> { Long id = stepExecution.getId(); Assert.state(id != null, "StepExecution has null id. It must be saved first: " + stepExecution); - StepExecution update = jobExplorer.getStepExecution(stepExecution.getJobExecutionId(), id); - Assert.state(update != null, "Could not reload StepExecution from JobRepository: " + stepExecution); - updates.add(update); - } + return id; + }).collect(Collectors.toSet()); + JobExecution jobExecution = jobExplorer.getJobExecution(result.getJobExecutionId()); + Assert.state(jobExecution != null, + "Could not load JobExecution from JobRepository for id " + result.getJobExecutionId()); + List updates = jobExecution.getStepExecutions().stream() + .filter(stepExecution -> stepExecutionIds.contains(stepExecution.getId())).collect(Collectors.toList()); delegate.aggregate(result, updates); } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/jsr/launch/JsrJobOperatorTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/jsr/launch/JsrJobOperatorTests.java index 31fd8341b5..0de680737c 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/jsr/launch/JsrJobOperatorTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/jsr/launch/JsrJobOperatorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2018 the original author or authors. + * Copyright 2013-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -403,8 +403,6 @@ public void testGetStepExecutionsRoseyScenario() { jobExecution.addStepExecutions(stepExecutions); when(jobExplorer.getJobExecution(5L)).thenReturn(jobExecution); - when(jobExplorer.getStepExecution(5L, 1L)).thenReturn(new StepExecution("step1", jobExecution, 1L)); - when(jobExplorer.getStepExecution(5L, 2L)).thenReturn(new StepExecution("step2", jobExecution, 2L)); List results = jsrJobOperator.getStepExecutions(5L); @@ -429,8 +427,6 @@ public void testGetStepExecutionsPartitionedStepScenario() { jobExecution.addStepExecutions(stepExecutions); when(jobExplorer.getJobExecution(5L)).thenReturn(jobExecution); - when(jobExplorer.getStepExecution(5L, 1L)).thenReturn(new StepExecution("step1", jobExecution, 1L)); - when(jobExplorer.getStepExecution(5L, 2L)).thenReturn(new StepExecution("step2", jobExecution, 2L)); List results = jsrJobOperator.getStepExecutions(5L); diff --git a/spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java b/spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java index bac0462b61..09257c52b5 100644 --- a/spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java +++ b/spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java @@ -1,19 +1,35 @@ +/* + * Copyright 2009-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.springframework.batch.integration.partition; import java.util.ArrayList; import java.util.Collection; -import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import javax.sql.DataSource; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.Step; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.explore.JobExplorer; @@ -242,19 +258,12 @@ private Collection pollReplies(final StepExecution masterStepExec Callable> callback = new Callable>() { @Override public Collection call() throws Exception { - - for(Iterator stepExecutionIterator = split.iterator(); stepExecutionIterator.hasNext(); ) { - StepExecution curStepExecution = stepExecutionIterator.next(); - - if(!result.contains(curStepExecution)) { - StepExecution partitionStepExecution = - jobExplorer.getStepExecution(masterStepExecution.getJobExecutionId(), curStepExecution.getId()); - - if(!partitionStepExecution.getStatus().isRunning()) { - result.add(partitionStepExecution); - } - } - } + Set currentStepExecutionIds = split.stream().map(StepExecution::getId).collect(Collectors.toSet()); + JobExecution jobExecution = jobExplorer.getJobExecution(masterStepExecution.getJobExecutionId()); + jobExecution.getStepExecutions().stream() + .filter(stepExecution -> currentStepExecutionIds.contains(stepExecution.getId())) + .filter(stepExecution -> !result.contains(stepExecution)) + .filter(stepExecution -> !stepExecution.getStatus().isRunning()).forEach(result::add); if(logger.isDebugEnabled()) { logger.debug(String.format("Currently waiting on %s partitions to finish", split.size())); diff --git a/spring-batch-integration/src/test/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandlerTests.java b/spring-batch-integration/src/test/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandlerTests.java index fd9170412f..f863555545 100644 --- a/spring-batch-integration/src/test/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandlerTests.java +++ b/spring-batch-integration/src/test/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandlerTests.java @@ -1,5 +1,22 @@ +/* + * Copyright 2020-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.springframework.batch.integration.partition; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -154,7 +171,12 @@ public void testHandleWithJobRepositoryPolling() throws Exception { stepExecutions.add(partition2); stepExecutions.add(partition3); when(stepExecutionSplitter.split(any(StepExecution.class), eq(1))).thenReturn(stepExecutions); - when(jobExplorer.getStepExecution(eq(5L), any(Long.class))).thenReturn(partition2, partition1, partition3, partition3, partition3, partition3, partition4); + JobExecution runningJobExecution = new JobExecution(5L, new JobParameters()); + runningJobExecution.addStepExecutions(Arrays.asList(partition2, partition1, partition3)); + JobExecution completedJobExecution = new JobExecution(5L, new JobParameters()); + completedJobExecution.addStepExecutions(Arrays.asList(partition2, partition1, partition4)); + when(jobExplorer.getJobExecution(5L)).thenReturn(runningJobExecution, runningJobExecution, runningJobExecution, + completedJobExecution); //set messageChannelPartitionHandler.setMessagingOperations(operations); @@ -198,7 +220,9 @@ public void testHandleWithJobRepositoryPollingTimeout() throws Exception { stepExecutions.add(partition2); stepExecutions.add(partition3); when(stepExecutionSplitter.split(any(StepExecution.class), eq(1))).thenReturn(stepExecutions); - when(jobExplorer.getStepExecution(eq(5L), any(Long.class))).thenReturn(partition2, partition1, partition3); + JobExecution runningJobExecution = new JobExecution(5L, new JobParameters()); + runningJobExecution.addStepExecutions(Arrays.asList(partition2, partition1, partition3)); + when(jobExplorer.getJobExecution(5L)).thenReturn(runningJobExecution); //set messageChannelPartitionHandler.setMessagingOperations(operations);