Skip to content

Commit cd2225c

Browse files
committed
GH-3444: Add Custom TTL support for RedisLock, and JdbcLock
Fixes: #3444 * Modify `LockRegistry`, `DistributedLock` interfaces. * Modify implementation of `DefaultLockRepository, `JdbcLockRegistry`, `RedisLockRegistry` * Modify ddl of `INT_LOCK` table. * Maintain test cases and documents. Signed-off-by: Eddie Cho <[email protected]>
1 parent 0b1cf83 commit cd2225c

File tree

23 files changed

+182
-89
lines changed

23 files changed

+182
-89
lines changed

spring-integration-core/src/main/java/org/springframework/integration/support/locks/DistributedLock.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@
1616

1717
package org.springframework.integration.support.locks;
1818

19+
import java.time.Duration;
1920
import java.util.concurrent.TimeUnit;
2021
import java.util.concurrent.locks.Lock;
2122

2223
/**
23-
* A {@link Lock} implementing for spring distributed locks
24+
* A distributed {@link Lock} extension.
2425
*
2526
* @author Eddie Cho
2627
*
@@ -30,24 +31,22 @@ public interface DistributedLock extends Lock {
3031

3132
/**
3233
* Attempt to acquire a lock with a specific time-to-live
33-
* @param customTtl the specific time-to-live for the lock status data
34-
* @param customTtlTimeUnit the time unit of the {@code customTtl} argument
34+
* @param ttl the specific time-to-live for the lock status data
3535
*/
36-
void lock(long customTtl, TimeUnit customTtlTimeUnit);
36+
void lock(Duration ttl);
3737

3838
/**
3939
* Acquires the lock with a specific time-to-live if it is free within the
4040
* given waiting time and the current thread has not been {@linkplain Thread#interrupt interrupted}.
4141
* @param time the maximum time to wait for the lock
4242
* @param unit the time unit of the {@code time} argument
43-
* @param customTtl the specific time-to-live for the lock status data
44-
* @param customTtlTimeUnit the time unit of the customTtl argument
43+
* @param ttl the specific time-to-live for the lock status data
4544
* @return {@code true} if the lock was acquired and {@code false}
4645
* if the waiting time elapsed before the lock was acquired
4746
*
4847
* @throws InterruptedException if the current thread is interrupted
4948
* while acquiring the lock (and interruption of lock
5049
* acquisition is supported)
5150
*/
52-
boolean tryLock(long time, TimeUnit unit, long customTtl, TimeUnit customTtlTimeUnit) throws InterruptedException;
51+
boolean tryLock(long time, TimeUnit unit, Duration ttl) throws InterruptedException;
5352
}

spring-integration-core/src/main/java/org/springframework/integration/support/locks/RenewableLockRegistry.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package org.springframework.integration.support.locks;
1818

19-
import java.util.concurrent.TimeUnit;
19+
import java.time.Duration;
2020
import java.util.concurrent.locks.Lock;
2121

2222
import org.springframework.scheduling.TaskScheduler;
@@ -45,11 +45,10 @@ public interface RenewableLockRegistry<L extends Lock> extends LockRegistry<L> {
4545
* Renew the time to live of the lock is associated with the parameter object with a specific value.
4646
* The lock must be held by the current thread
4747
* @param lockKey The object with which the lock is associated.
48-
* @param customTtl the specific time-to-live for the lock status data
49-
* @param customTtlTimeUnit the time unit of the {@code customTtl} argument
48+
* @param ttl the specific time-to-live for the lock status data
5049
*
5150
*/
52-
void renewLock(Object lockKey, long customTtl, TimeUnit customTtlTimeUnit);
51+
void renewLock(Object lockKey, Duration ttl);
5352

5453
/**
5554
* Set the {@link TaskScheduler} to use for the renewal task.

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,19 @@ public class DefaultLockRepository
7777
*/
7878
public static final String DEFAULT_TABLE_PREFIX = "INT_";
7979

80+
/**
81+
* Default value for the time-to-live property.
82+
*/
83+
public static final Duration DEFAULT_TTL = Duration.ofSeconds(10);
84+
8085
private final String id;
8186

8287
private final JdbcTemplate template;
8388

8489
private final AtomicBoolean started = new AtomicBoolean();
8590

91+
private Duration ttl = DEFAULT_TTL;
92+
8693
private String prefix = DEFAULT_TABLE_PREFIX;
8794

8895
private String region = "DEFAULT";
@@ -109,8 +116,8 @@ public class DefaultLockRepository
109116
""";
110117

111118
private String insertQuery = """
112-
INSERT INTO %sLOCK (REGION, LOCK_KEY, CLIENT_ID, EXPIRED_AFTER)
113-
VALUES (?, ?, ?, ?)
119+
INSERT INTO %sLOCK (REGION, LOCK_KEY, CLIENT_ID, CREATED_DATE, EXPIRED_AFTER)
120+
VALUES (?, ?, ?, ?, ?)
114121
""";
115122

116123
private String countQuery = """
@@ -182,6 +189,14 @@ public void setPrefix(String prefix) {
182189
this.prefix = prefix;
183190
}
184191

192+
/**
193+
* Specify the time (in milliseconds) to expire deadlocks.
194+
* @param timeToLive the time to expire deadlocks.
195+
*/
196+
public void setTimeToLive(int timeToLive) {
197+
this.ttl = Duration.ofMillis(timeToLive);
198+
}
199+
185200
/**
186201
* Set a {@link PlatformTransactionManager} for operations.
187202
* Otherwise, a primary {@link PlatformTransactionManager} bean is obtained
@@ -233,7 +248,7 @@ public String getUpdateQuery() {
233248
* Set a custom {@code INSERT} query for a lock record.
234249
* The {@link #getInsertQuery()} can be used as a template for customization.
235250
* The default query is
236-
* {@code INSERT INTO %sLOCK (REGION, LOCK_KEY, CLIENT_ID, EXPIRED_AFTER) VALUES (?, ?, ?, ?)}.
251+
* {@code INSERT INTO %sLOCK (REGION, LOCK_KEY, CLIENT_ID, CREATED_DATE, EXPIRED_AFTER) VALUES (?, ?, ?, ?, ?)}.
237252
* For example a PostgreSQL {@code ON CONFLICT DO NOTHING} hint can be provided like this:
238253
* <pre class="code">
239254
* {@code
@@ -380,6 +395,11 @@ public boolean delete(String lock) {
380395
transactionStatus -> this.template.update(this.deleteQuery, this.region, lock, this.id)) == 1;
381396
}
382397

398+
@Override
399+
public boolean acquire(String lock) {
400+
return this.acquire(lock, this.ttl);
401+
}
402+
383403
@Override
384404
public boolean acquire(String lock, Duration ttlDuration) {
385405
Boolean result =
@@ -391,7 +411,7 @@ this.region, lock, this.id, epochMillis()) > 0) {
391411
}
392412
try {
393413
return this.template.update(this.insertQuery, this.region, lock, this.id,
394-
ttlEpochMillis(ttlDuration)) > 0;
414+
epochMillis(), ttlEpochMillis(ttlDuration)) > 0;
395415
}
396416
catch (DataIntegrityViolationException ex) {
397417
return false;
@@ -417,6 +437,11 @@ public void deleteExpired() {
417437
this.template.update(this.deleteExpiredQuery, this.region, epochMillis()));
418438
}
419439

440+
@Override
441+
public boolean renew(String lock) {
442+
return this.renew(lock, this.ttl);
443+
}
444+
420445
@Override
421446
public boolean renew(String lock, Duration ttlDuration) {
422447
final Boolean result = this.defaultTransactionTemplate.execute(

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/JdbcLockRegistry.java

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,13 @@ public JdbcLockRegistry(LockRepository client) {
102102
this.ttl = DEFAULT_TTL;
103103
}
104104

105-
public JdbcLockRegistry(LockRepository client, long expireAfter) {
105+
public JdbcLockRegistry(LockRepository client, Duration expireAfter) {
106106
this.client = client;
107-
this.ttl = convertToDuration(expireAfter, TimeUnit.MILLISECONDS);
107+
this.ttl = expireAfter;
108108
}
109109

110110
/**
111-
* Specify a @link Duration} to sleep between lock record insert/update attempts.
111+
* Specify a {@link Duration} to sleep between lock record insert/update attempts.
112112
* Defaults to 100 milliseconds.
113113
* @param idleBetweenTries the {@link Duration} to sleep between insert/update attempts.
114114
* @since 5.1.8
@@ -166,12 +166,7 @@ public void renewLock(Object lockKey) {
166166
}
167167

168168
@Override
169-
public void renewLock(Object lockKey, long customTtl, TimeUnit customTtlTimeUnit) {
170-
Duration customTtlDuration = convertToDuration(customTtl, customTtlTimeUnit);
171-
this.renewLock(lockKey, customTtlDuration);
172-
}
173-
174-
private void renewLock(Object lockKey, Duration customTtlDuration) {
169+
public void renewLock(Object lockKey, Duration customTtl) {
175170
Assert.isInstanceOf(String.class, lockKey);
176171
String path = pathFor((String) lockKey);
177172
JdbcLock jdbcLock;
@@ -186,7 +181,7 @@ private void renewLock(Object lockKey, Duration customTtlDuration) {
186181
if (jdbcLock == null) {
187182
throw new IllegalStateException("Could not found mutex at " + path);
188183
}
189-
if (!jdbcLock.renew(customTtlDuration)) {
184+
if (!jdbcLock.renew(customTtl)) {
190185
throw new IllegalStateException("Could not renew mutex at " + path);
191186
}
192187
}
@@ -224,12 +219,7 @@ public void lock() {
224219
}
225220

226221
@Override
227-
public void lock(long customTtl, TimeUnit customTtlUnit) {
228-
Duration customTtlDuration = convertToDuration(customTtl, customTtlUnit);
229-
lock(customTtlDuration);
230-
}
231-
232-
private void lock(Duration ttl) {
222+
public void lock(Duration ttl) {
233223
this.delegate.lock();
234224
while (true) {
235225
try {
@@ -304,12 +294,7 @@ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
304294
}
305295

306296
@Override
307-
public boolean tryLock(long time, TimeUnit unit, long customTtl, TimeUnit customTtlUnit) throws InterruptedException {
308-
Duration customTtlDuration = convertToDuration(customTtl, customTtlUnit);
309-
return tryLock(time, unit, customTtlDuration);
310-
}
311-
312-
private boolean tryLock(long time, TimeUnit unit, Duration ttl) throws InterruptedException {
297+
public boolean tryLock(long time, TimeUnit unit, Duration ttl) throws InterruptedException {
313298
long now = System.currentTimeMillis();
314299
if (!this.delegate.tryLock(time, unit)) {
315300
return false;
@@ -389,10 +374,21 @@ public boolean isAcquiredInThisProcess() {
389374
return this.delegate.isLocked();
390375
}
391376

377+
/**
378+
* Renew the time-to-live of the distributed lock
379+
* @return {@code true} if the lock's time-to-live was successfully renewed;
380+
* {@code false} if the time-to-live could not be renewed
381+
*/
392382
public boolean renew() {
393383
return renew(JdbcLockRegistry.this.ttl);
394384
}
395385

386+
/**
387+
* Renew the time-to-live of the distributed lock
388+
* @param ttl the new time-to-live value for the lock status data
389+
* @return {@code true} if the lock's time-to-live was successfully renewed;
390+
* {@code false} if the time-to-live could not be renewed
391+
*/
396392
public boolean renew(Duration ttl) {
397393
if (!this.delegate.isHeldByCurrentThread()) {
398394
throw new IllegalMonitorStateException("The current thread doesn't own mutex at " + this.path);

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/LockRepository.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,13 @@ public interface LockRepository extends Closeable {
5252
*/
5353
void deleteExpired();
5454

55+
/**
56+
* Acquire a lock for a key.
57+
* @param lock the key for lock to acquire.
58+
* @return acquired or not.
59+
*/
60+
boolean acquire(String lock);
61+
5562
/**
5663
* Acquire a lock for a key with specific time-to-live value
5764
* @param lock the key for lock to acquire.
@@ -60,6 +67,13 @@ public interface LockRepository extends Closeable {
6067
*/
6168
boolean acquire(String lock, Duration ttl);
6269

70+
/**
71+
* Renew the lease for a lock.
72+
* @param lock the lock to renew.
73+
* @return renewed or not.
74+
*/
75+
boolean renew(String lock);
76+
6377
/**
6478
* Renew the lease for a lock with specific time-to-live value
6579
* @param lock the key for lock to acquire.

spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-db2.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ CREATE TABLE INT_LOCK (
3030
LOCK_KEY CHAR(36) NOT NULL,
3131
REGION VARCHAR(100) NOT NULL,
3232
CLIENT_ID CHAR(36),
33-
EXPIRED_AFTER TIMESTAMP NOT NULL,
33+
CREATED_DATE TIMESTAMP NOT NULL,
34+
EXPIRED_AFTER TIMESTAMP NOT NULL,
3435
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
3536
);
3637

spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-derby.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ CREATE TABLE INT_LOCK (
3030
LOCK_KEY CHAR(36) NOT NULL,
3131
REGION VARCHAR(100) NOT NULL,
3232
CLIENT_ID CHAR(36),
33-
EXPIRED_AFTER TIMESTAMP NOT NULL,
33+
CREATED_DATE TIMESTAMP NOT NULL,
34+
EXPIRED_AFTER TIMESTAMP NOT NULL,
3435
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
3536
);
3637

spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-h2.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ CREATE TABLE INT_LOCK (
3030
LOCK_KEY CHAR(36) NOT NULL,
3131
REGION VARCHAR(100) NOT NULL,
3232
CLIENT_ID CHAR(36),
33-
EXPIRED_AFTER TIMESTAMP NOT NULL,
33+
CREATED_DATE TIMESTAMP NOT NULL,
34+
EXPIRED_AFTER TIMESTAMP NOT NULL,
3435
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
3536
);
3637

spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-hsqldb.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ CREATE TABLE INT_LOCK (
3030
LOCK_KEY CHAR(36) NOT NULL,
3131
REGION VARCHAR(100) NOT NULL,
3232
CLIENT_ID CHAR(36),
33-
EXPIRED_AFTER TIMESTAMP NOT NULL,
33+
CREATED_DATE TIMESTAMP NOT NULL,
34+
EXPIRED_AFTER TIMESTAMP NOT NULL,
3435
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
3536
);
3637

spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-mysql.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ CREATE TABLE INT_LOCK (
3030
LOCK_KEY CHAR(36) NOT NULL,
3131
REGION VARCHAR(100) NOT NULL,
3232
CLIENT_ID CHAR(36),
33-
EXPIRED_AFTER DATETIME(6) NOT NULL,
33+
CREATED_DATE DATETIME(6) NOT NULL,
34+
EXPIRED_AFTER DATETIME(6) NOT NULL,
3435
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
3536
) ENGINE=InnoDB;
3637

spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-oracle.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ CREATE TABLE INT_LOCK (
3030
LOCK_KEY VARCHAR2(36) NOT NULL,
3131
REGION VARCHAR2(100) NOT NULL,
3232
CLIENT_ID VARCHAR2(36),
33-
EXPIRED_AFTER TIMESTAMP NOT NULL,
33+
CREATED_DATE TIMESTAMP NOT NULL,
34+
EXPIRED_AFTER TIMESTAMP NOT NULL,
3435
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
3536
);
3637

spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-postgresql.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ CREATE TABLE INT_LOCK (
3030
LOCK_KEY CHAR(36) NOT NULL,
3131
REGION VARCHAR(100) NOT NULL,
3232
CLIENT_ID CHAR(36),
33-
EXPIRED_AFTER TIMESTAMP NOT NULL,
33+
CREATED_DATE TIMESTAMP NOT NULL,
34+
EXPIRED_AFTER TIMESTAMP NOT NULL,
3435
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
3536
);
3637

spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-sqlserver.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ CREATE TABLE INT_LOCK (
3030
LOCK_KEY CHAR(36) NOT NULL,
3131
REGION VARCHAR(100) NOT NULL,
3232
CLIENT_ID CHAR(36),
33-
EXPIRED_AFTER DATETIME NOT NULL,
33+
CREATED_DATE DATETIME NOT NULL,
34+
EXPIRED_AFTER DATETIME NOT NULL,
3435
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
3536
);
3637

spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-sybase.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ CREATE TABLE INT_LOCK (
3030
LOCK_KEY CHAR(36) NOT NULL,
3131
REGION VARCHAR(100) NOT NULL,
3232
CLIENT_ID CHAR(36),
33-
EXPIRED_AFTER DATETIME NOT NULL,
33+
CREATED_DATE DATETIME NOT NULL,
34+
EXPIRED_AFTER DATETIME NOT NULL,
3435
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
3536
) LOCK DATAROWS;
3637

0 commit comments

Comments
 (0)