From 5592b6ed225062db22750a176616c162451e1f8e Mon Sep 17 00:00:00 2001 From: Yanming Zhou Date: Wed, 26 Mar 2025 16:34:12 +0800 Subject: [PATCH] Add version to delete sql for optimistic locking Signed-off-by: Yanming Zhou --- .../core/repository/dao/JdbcJobExecutionDao.java | 11 +++++++++-- .../core/repository/dao/JdbcJobInstanceDao.java | 12 ++++++++++-- .../core/repository/dao/JdbcStepExecutionDao.java | 11 +++++++++-- .../batch/test/JobRepositoryTestUtils.java | 11 +++++++++-- 4 files changed, 37 insertions(+), 8 deletions(-) diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcJobExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcJobExecutionDao.java index 9ec0a9e2d8..676806742c 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcJobExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcJobExecutionDao.java @@ -74,6 +74,7 @@ * @author Dimitrios Liapis * @author Philippe Marschall * @author Jinwoo Bae + * @author Yanming Zhou */ public class JdbcJobExecutionDao extends AbstractJdbcBatchMetadataDao implements JobExecutionDao, InitializingBean { @@ -146,7 +147,7 @@ SELECT COUNT(*) private static final String DELETE_JOB_EXECUTION = """ DELETE FROM %PREFIX%JOB_EXECUTION - WHERE JOB_EXECUTION_ID = ? + WHERE JOB_EXECUTION_ID = ? AND VERSION = ? """; private static final String DELETE_JOB_EXECUTION_PARAMETERS = """ @@ -395,7 +396,13 @@ public void synchronizeStatus(JobExecution jobExecution) { */ @Override public void deleteJobExecution(JobExecution jobExecution) { - getJdbcTemplate().update(getQuery(DELETE_JOB_EXECUTION), jobExecution.getId()); + int count = getJdbcTemplate().update(getQuery(DELETE_JOB_EXECUTION), jobExecution.getId(), + jobExecution.getVersion()); + + if (count == 0) { + throw new OptimisticLockingFailureException("Attempt to delete job execution id=" + jobExecution.getId() + + " with wrong version (" + jobExecution.getVersion() + ")"); + } } /** diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcJobInstanceDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcJobInstanceDao.java index 27dcc8b7a2..b231d16ac0 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcJobInstanceDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcJobInstanceDao.java @@ -31,6 +31,7 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.dao.DataAccessException; import org.springframework.dao.EmptyResultDataAccessException; +import org.springframework.dao.OptimisticLockingFailureException; import org.springframework.jdbc.core.ResultSetExtractor; import org.springframework.jdbc.core.RowMapper; import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer; @@ -53,6 +54,7 @@ * @author Will Schipp * @author Mahmoud Ben Hassine * @author Parikshit Dutta + * @author Yanming Zhou */ public class JdbcJobInstanceDao extends AbstractJdbcBatchMetadataDao implements JobInstanceDao, InitializingBean { @@ -124,7 +126,7 @@ SELECT COUNT(*) private static final String DELETE_JOB_INSTANCE = """ DELETE FROM %PREFIX%JOB_INSTANCE - WHERE JOB_INSTANCE_ID = ? + WHERE JOB_INSTANCE_ID = ? AND VERSION = ? """; private DataFieldMaxValueIncrementer jobInstanceIncrementer; @@ -281,7 +283,13 @@ public long getJobInstanceCount(@Nullable String jobName) throws NoSuchJobExcept */ @Override public void deleteJobInstance(JobInstance jobInstance) { - getJdbcTemplate().update(getQuery(DELETE_JOB_INSTANCE), jobInstance.getId()); + int count = getJdbcTemplate().update(getQuery(DELETE_JOB_INSTANCE), jobInstance.getId(), + jobInstance.getVersion()); + + if (count == 0) { + throw new OptimisticLockingFailureException("Attempt to delete job instance id=" + jobInstance.getId() + + " with wrong version (" + jobInstance.getVersion() + ")"); + } } /** diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcStepExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcStepExecutionDao.java index b1e46e0c23..393ab2ccd0 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcStepExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcStepExecutionDao.java @@ -66,6 +66,7 @@ * @author Mahmoud Ben Hassine * @author Baris Cubukcuoglu * @author Minsoo Kim + * @author Yanming Zhou * @see StepExecutionDao */ public class JdbcStepExecutionDao extends AbstractJdbcBatchMetadataDao implements StepExecutionDao, InitializingBean { @@ -114,7 +115,7 @@ SELECT COUNT(*) private static final String DELETE_STEP_EXECUTION = """ DELETE FROM %PREFIX%STEP_EXECUTION - WHERE STEP_EXECUTION_ID = ? + WHERE STEP_EXECUTION_ID = ? and VERSION = ? """; private static final Comparator BY_CREATE_TIME_DESC_ID_DESC = Comparator @@ -378,7 +379,13 @@ public long countStepExecutions(JobInstance jobInstance, String stepName) { */ @Override public void deleteStepExecution(StepExecution stepExecution) { - getJdbcTemplate().update(getQuery(DELETE_STEP_EXECUTION), stepExecution.getId()); + int count = getJdbcTemplate().update(getQuery(DELETE_STEP_EXECUTION), stepExecution.getId(), + stepExecution.getVersion()); + + if (count == 0) { + throw new OptimisticLockingFailureException("Attempt to delete step execution id=" + stepExecution.getId() + + " with wrong version (" + stepExecution.getVersion() + ")"); + } } private static class StepExecutionRowMapper implements RowMapper { diff --git a/spring-batch-test/src/main/java/org/springframework/batch/test/JobRepositoryTestUtils.java b/spring-batch-test/src/main/java/org/springframework/batch/test/JobRepositoryTestUtils.java index d6a7b07327..15e1d2dae5 100644 --- a/spring-batch-test/src/main/java/org/springframework/batch/test/JobRepositoryTestUtils.java +++ b/spring-batch-test/src/main/java/org/springframework/batch/test/JobRepositoryTestUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2006-2023 the original author or authors. + * Copyright 2006-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRestartException; +import org.springframework.dao.OptimisticLockingFailureException; import org.springframework.lang.Nullable; /** @@ -39,6 +40,7 @@ * * @author Dave Syer * @author Mahmoud Ben Hassine + * @author Yanming Zhou */ public class JobRepositoryTestUtils { @@ -136,7 +138,12 @@ public void removeJobExecutions(Collection jobExecutions) { removeJobExecution(jobExecution); } for (JobExecution jobExecution : jobExecutions) { - this.jobRepository.deleteJobInstance(jobExecution.getJobInstance()); + try { + this.jobRepository.deleteJobInstance(jobExecution.getJobInstance()); + } + catch (OptimisticLockingFailureException ignore) { + // same job instance may be already deleted + } } }