Skip to content

Commit bd938f6

Browse files
committed
Support for delayClose.
1 parent 44c957d commit bd938f6

File tree

2 files changed

+92
-18
lines changed

2 files changed

+92
-18
lines changed

vpro-shared-util/src/main/java/nl/vpro/util/locker/ObjectLocker.java

Lines changed: 78 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
@Slf4j
2929
public class ObjectLocker {
3030

31+
32+
3133
public static BiPredicate<StackTraceElement, AtomicInteger> summaryBiPredicate =
3234
(e, count) -> {
3335
boolean match = e.getClassName().startsWith("nl.vpro") &&
@@ -205,7 +207,7 @@ public static <K extends Serializable> LockHolderCloser<Serializable> acquireLoc
205207

206208

207209
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
208-
public static <K extends Serializable> LockHolderCloser<K> acquireLock(
210+
private static <K extends Serializable> LockHolderCloser<K> acquireLock(
209211
final K key,
210212
final @NonNull String reason,
211213
final @NonNull Map<K, LockHolder<K>> locks,
@@ -227,7 +229,7 @@ public static <K extends Serializable> LockHolderCloser<K> acquireLock(
227229
locks.remove(key);
228230
continue;
229231
}
230-
closer = new LockHolderCloser<>(nanoStart, locks, holder, true);
232+
closer = new LockHolderCloser<>(nanoStart, locks, holder, comparable);
231233
if (holder.lock.isLocked() && !holder.lock.isHeldByCurrentThread()) {
232234
log.debug("There are already threads ({}) for {}, waiting", holder.lock.getQueueLength(), key);
233235
alreadyWaiting = true;
@@ -324,31 +326,29 @@ private static <K extends Serializable> LockHolder<K> computeLock(
324326
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
325327
private static <K extends Serializable> void releaseLock(
326328
final long nanoStart,
327-
final @NonNull K key,
328-
final @NonNull String reason,
329329
final @NonNull Map<K, LockHolder<K>> locks,
330-
final @NonNull LockHolder<K> lock,
331-
final boolean checkIfCurrentThread) {
330+
final @NonNull LockHolder<K> lock) {
332331
synchronized (locks) {
333332
if (lock.lock.getHoldCount() == 1) {
334333
if (!lock.lock.hasQueuedThreads()) {
335-
log.trace("Removed {}", key);
336-
locks.remove(key);
334+
log.trace("Removed {}", lock.key);
335+
locks.remove(lock.key);
337336
}
338337
final Duration duration = Duration.ofNanos(System.nanoTime() - nanoStart);
339338
for (Listener listener : LISTENERS) {
340339
listener.unlock(lock, duration);
341340
}
342341

343342
Slf4jHelper.log(log, duration.compareTo(lock.warnTime)> 0 ? Level.WARN : Level.DEBUG,
344-
"Released lock for {} ({}) in {}", key, reason, Duration.ofNanos(System.nanoTime() - nanoStart));
343+
"Released lock for {} ({}) in {}", lock.key, lock.reason, Duration.ofNanos(System.nanoTime() - nanoStart));
345344
}
346-
if (! checkIfCurrentThread || lock.lock.isHeldByCurrentThread()) { // MSE-4946
345+
if (lock.lock.isHeldByCurrentThread()) { // MSE-4946
347346
HOLDS.get().remove(lock);
348347
lock.lock.unlock();
349348
} else {
350349
// can happen if 'continuing without lock'
351-
log.warn("Current lock {} not hold by current thread {} but by {}", lock, Thread.currentThread().getName(), Optional.ofNullable(lock.thread.get()).map(Thread::getName).orElse(null));
350+
Thread currentThread = Thread.currentThread();
351+
log.warn("Current lock {} not hold by current thread {} ({}) but by {} ({})", lock, currentThread.getName(), currentThread, Optional.ofNullable(lock.thread.get()).map(Thread::getName).orElse(null), lock.thread.get(), new Exception());
352352
}
353353

354354
locks.notifyAll();
@@ -383,6 +383,7 @@ public static class LockHolder<K> {
383383
@Setter
384384
private Duration warnTime = ObjectLocker.defaultWarnTime;
385385

386+
386387
LockHolder(K k, String reason, ReentrantLock lock) {
387388
this.key = k;
388389
this.lock = lock;
@@ -391,6 +392,8 @@ public static class LockHolder<K> {
391392
this.reason = reason;
392393
}
393394

395+
396+
394397
@SuppressWarnings("rawtypes")
395398
@Override
396399
public boolean equals(Object o) {
@@ -457,25 +460,83 @@ public static class LockHolderCloser<K extends Serializable> implements AutoClos
457460

458461
final long nanoStart;
459462

460-
@With
461-
final boolean checkIfCurrentThread;
462-
463463
final @NonNull Map<K, LockHolder<K>> locks;
464464

465+
final BiPredicate<Serializable, K> comparable;
466+
boolean closed = false;
467+
465468
private LockHolderCloser(
466469
final long nanoStart,
467470
@NonNull Map<K, LockHolder<K>> locks,
468471
@NonNull LockHolder<K> lockHolder,
469-
final boolean checkIfCurrentThread) {
472+
BiPredicate<Serializable, K> comparable
473+
) {
470474
this.nanoStart = nanoStart;
471475
this.locks = locks;
472476
this.lockHolder = lockHolder;
473-
this.checkIfCurrentThread = checkIfCurrentThread;
477+
this.comparable = comparable;
478+
}
479+
480+
public CompletableFuture<LockHolderCloser<K>> forTransfer() {
481+
close();
482+
CompletableFuture<LockHolderCloser<K>> future = new CompletableFuture<>();
483+
484+
485+
future.thenAccept((o) -> {
486+
if (log.isDebugEnabled()) {
487+
log.debug("Completed transfer {} ({}) -> {} ({})", LockHolderCloser.this, LockHolderCloser.this.lockHolder.thread.get(), o, o.lockHolder.thread.get());
488+
}
489+
});
490+
log.debug("Future to use {}", future);
491+
return future;
492+
}
493+
494+
public LockHolderCloser<K> transfer(CompletableFuture<LockHolderCloser<K>> future) {
495+
synchronized (locks) {
496+
try {
497+
498+
LockHolderCloser<K> closer = acquireLock(lockHolder.key, lockHolder.reason, locks, comparable);
499+
future.complete(closer);
500+
return closer;
501+
} catch (Exception e) {
502+
return null;
503+
}
504+
}
505+
}
506+
public CompletableFuture<Void> delayedClose(Duration duration) {
507+
CompletableFuture<Void> ready = new CompletableFuture<>();
508+
final CompletableFuture<LockHolderCloser<K>> future = forTransfer();
509+
ForkJoinPool.commonPool().execute(() -> {
510+
511+
try (var closer = transfer(future)) {
512+
log.debug("{} Starting delay", lockHolder.key);
513+
Thread.sleep(duration.toMillis());
514+
log.info("{} Ready after delay {}", lockHolder.key, duration);
515+
ready.complete(null);
516+
} catch (InterruptedException e) {
517+
log.warn(e.getMessage(), e);
518+
} finally {
519+
520+
}
521+
});
522+
return ready;
474523
}
475524

476525
@Override
477526
public void close() {
478-
releaseLock(nanoStart, lockHolder.key, lockHolder.reason, locks, lockHolder, checkIfCurrentThread);
527+
synchronized (locks) {
528+
if (!closed) {
529+
releaseLock(nanoStart, locks, lockHolder);
530+
} else {
531+
log.debug("Closed already");
532+
}
533+
closed = true;
534+
}
535+
}
536+
537+
@Override
538+
public String toString() {
539+
return lockHolder + (closed ? " (closed)" : "");
479540
}
480541
}
481542

vpro-shared-util/src/test/java/nl/vpro/util/locker/ObjectLockerTest.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
import java.io.Serial;
88
import java.io.Serializable;
99
import java.time.Duration;
10-
import java.util.*;
1110
import java.util.AbstractMap.SimpleEntry;
11+
import java.util.List;
1212
import java.util.concurrent.*;
1313

1414
import org.junit.jupiter.api.*;
@@ -222,6 +222,19 @@ public void twoDifferentLocksStrictly() {
222222
}).isInstanceOf(IllegalStateException.class);
223223
}
224224

225+
@Test
226+
public void delayedClose() throws InterruptedException, ExecutionException {
227+
CompletableFuture<Void> voidCompletableFuture;
228+
try (var closer = ObjectLocker.acquireLock("bla", "test", ObjectLocker.LOCKED_OBJECTS)) {
229+
Thread.sleep(100);
230+
log.info("DelayClosing");
231+
voidCompletableFuture = closer.delayedClose(Duration.ofSeconds(10));
232+
}
233+
log.info("going ahhead..");
234+
voidCompletableFuture.get();
235+
236+
}
237+
225238

226239

227240
@SneakyThrows

0 commit comments

Comments
 (0)