diff --git a/vpro-shared-util/src/main/java/nl/vpro/util/locker/ObjectLocker.java b/vpro-shared-util/src/main/java/nl/vpro/util/locker/ObjectLocker.java index e924d82e2..ee2722540 100644 --- a/vpro-shared-util/src/main/java/nl/vpro/util/locker/ObjectLocker.java +++ b/vpro-shared-util/src/main/java/nl/vpro/util/locker/ObjectLocker.java @@ -5,8 +5,7 @@ import java.io.Serializable; import java.lang.ref.WeakReference; -import java.time.Duration; -import java.time.Instant; +import java.time.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -18,6 +17,7 @@ import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.meeuw.functional.Predicates; +import org.meeuw.functional.ThrowingConsumer; import org.slf4j.event.Level; import nl.vpro.logging.Slf4jHelper; @@ -27,7 +27,10 @@ */ @Slf4j public class ObjectLocker { - + public static Clock clock = Clock.systemUTC(); + public static ThrowingConsumer sleeper = (d) -> { + Thread.sleep(d.toMillis()); + }; public static BiPredicate summaryBiPredicate = @@ -150,7 +153,7 @@ public static T withKeyLock( } public static T withKeyLock( - Serializable id, + @NonNull Serializable id, @NonNull String reason, @NonNull Runnable runnable) { return withKeyLock(id, reason, () -> { @@ -159,9 +162,6 @@ public static T withKeyLock( }); } - - - /** * @param key The key to lock on * @param reason A description for the reason of locking, which can be used in logging or exceptions @@ -192,7 +192,7 @@ public static T withObjectLock( log.warn("Calling with null key: {}", reason); return callable.call(); } - try (final LockHolderCloser lock = acquireLock(key, reason, locks, comparable)) { + try (final LockHolderCloser lock = acquireLock(key, reason, locks, comparable, Duration.ZERO)) { consumer.accept(lock.lockHolder); return callable.call(); } @@ -201,17 +201,19 @@ public static T withObjectLock( public static LockHolderCloser acquireLock( final Serializable key, final @NonNull String reason, - final @NonNull Map> locks) throws InterruptedException { - return acquireLock(key, reason, locks, CLASS_EQUALS); + final @NonNull Map> locks, + Duration delayAfterClose) throws InterruptedException { + return acquireLock(key, reason, locks, CLASS_EQUALS, delayAfterClose); } @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") - public static LockHolderCloser acquireLock( + private static LockHolderCloser acquireLock( final K key, final @NonNull String reason, final @NonNull Map> locks, - final @NonNull BiPredicate comparable) throws InterruptedException { + final @NonNull BiPredicate comparable, + final Duration delayAfterRelease) throws InterruptedException { final long nanoStart = System.nanoTime(); LockHolderCloser closer = null; try { @@ -219,6 +221,7 @@ public static LockHolderCloser acquireLock( LockHolder holder; boolean alreadyWaiting = false; while (true) { + log.debug("Acquiring lock {} ({})", key, reason); synchronized (locks) { holder = locks.computeIfAbsent(key, (m) -> computeLock(m, reason, comparable)) @@ -229,7 +232,7 @@ public static LockHolderCloser acquireLock( locks.remove(key); continue; } - closer = new LockHolderCloser<>(nanoStart, locks, holder, comparable); + closer = new LockHolderCloser<>(nanoStart, locks, holder, comparable, delayAfterRelease); if (holder.lock.isLocked() && !holder.lock.isHeldByCurrentThread()) { log.debug("There are already threads ({}) for {}, waiting", holder.lock.getQueueLength(), key); alreadyWaiting = true; @@ -237,11 +240,25 @@ public static LockHolderCloser acquireLock( break; } } + + if (Optional.ofNullable(threadLocalMonitorTime.get()).map(c -> true).orElse(monitor)) { monitoredLock(holder, key); } else { + log.debug("Locking for {}", holder); holder.lock.lock(); } + + if (holder.availableAfter != null) { + Instant now = clock.instant(); + if (holder.availableAfter.isAfter(now)) { + sleeper.accept(Duration.between(now, holder.availableAfter)); + } + log.debug("Acquired and waited until {}", holder.availableAfter); + holder.availableAfter = null; + + } + if (alreadyWaiting) { log.debug("Released and continuing {}", key); } @@ -332,7 +349,9 @@ private static void releaseLock( if (lock.lock.getHoldCount() == 1) { if (!lock.lock.hasQueuedThreads()) { log.trace("Removed {}", lock.key); - locks.remove(lock.key); + if (lock.availableAfter == null || lock.availableAfter.isBefore(clock.instant())) { + locks.remove(lock.key); + } } final Duration duration = Duration.ofNanos(System.nanoTime() - nanoStart); for (Listener listener : LISTENERS) { @@ -343,8 +362,12 @@ private static void releaseLock( "Released lock for {} ({}) in {}", lock.key, lock.reason, Duration.ofNanos(System.nanoTime() - nanoStart)); } if (lock.lock.isHeldByCurrentThread()) { // MSE-4946 + if (lock.availableAfter == null || lock.availableAfter.isBefore(clock.instant())) { + + } HOLDS.get().remove(lock); lock.lock.unlock(); + } else { // can happen if 'continuing without lock' Thread currentThread = Thread.currentThread(); @@ -352,6 +375,7 @@ private static void releaseLock( } locks.notifyAll(); + } } @@ -373,7 +397,7 @@ public static class LockHolder { public final ReentrantLock lock; final StackTraceElement[] initiator; final WeakReference thread; - final Instant createdAt = Instant.now(); + final Instant createdAt = clock.instant(); final String reason; @Getter @@ -383,6 +407,10 @@ public static class LockHolder { @Setter private Duration warnTime = ObjectLocker.defaultWarnTime; + @Getter + @Setter + private Instant availableAfter; + LockHolder(K k, String reason, ReentrantLock lock) { this.key = k; @@ -414,7 +442,7 @@ public int hashCode() { * @since 5.13 */ public Duration getAge() { - return Duration.between(createdAt, Instant.now()); + return Duration.between(createdAt, clock.instant()); } public String summarize() { @@ -438,7 +466,7 @@ public String summarize(boolean showThreadBusy) { @Override public String toString() { - return "holder:" + key + ":" + createdAt; + return "holder:" + key + ":" + createdAt + ":" + reason; } public void disable(boolean interrupt) { @@ -465,65 +493,30 @@ public static class LockHolderCloser implements AutoClos final BiPredicate comparable; boolean closed = false; + final Duration delayAfterClose; + private LockHolderCloser( final long nanoStart, @NonNull Map> locks, @NonNull LockHolder lockHolder, - BiPredicate comparable + BiPredicate comparable, + Duration delayAfterClose ) { this.nanoStart = nanoStart; this.locks = locks; this.lockHolder = lockHolder; this.comparable = comparable; + this.delayAfterClose = delayAfterClose; } - public CompletableFuture> forTransfer() { - close(); - CompletableFuture> future = new CompletableFuture<>(); - - - future.thenAccept((o) -> { - if (log.isDebugEnabled()) { - log.debug("Completed transfer {} ({}) -> {} ({})", LockHolderCloser.this, LockHolderCloser.this.lockHolder.thread.get(), o, o.lockHolder.thread.get()); - } - }); - log.debug("Future to use {}", future); - return future; - } - - public LockHolderCloser transfer(CompletableFuture> future) { - synchronized (locks) { - try { - - LockHolderCloser closer = acquireLock(lockHolder.key, lockHolder.reason, locks, comparable); - future.complete(closer); - return closer; - } catch (Exception e) { - return null; - } - } - } - public CompletableFuture delayedClose(Duration duration) { - CompletableFuture ready = new CompletableFuture<>(); - final CompletableFuture> future = forTransfer(); - ForkJoinPool.commonPool().execute(() -> { - - try (var closer = transfer(future)) { - log.debug("{} Starting delay", lockHolder.key); - Thread.sleep(duration.toMillis()); - log.info("{} Ready after delay {}", lockHolder.key, duration); - ready.complete(null); - } catch (InterruptedException e) { - log.warn(e.getMessage(), e); - } finally { - } - }); - return ready; - } @Override public void close() { + if (delayAfterClose.compareTo(Duration.ZERO) > 0) { + lockHolder.setAvailableAfter(clock.instant().plus(delayAfterClose)); + } + synchronized (locks) { if (!closed) { releaseLock(nanoStart, locks, lockHolder); diff --git a/vpro-shared-util/src/test/java/nl/vpro/util/locker/ObjectLockerTest.java b/vpro-shared-util/src/test/java/nl/vpro/util/locker/ObjectLockerTest.java index 79cd384fd..18124bb4e 100644 --- a/vpro-shared-util/src/test/java/nl/vpro/util/locker/ObjectLockerTest.java +++ b/vpro-shared-util/src/test/java/nl/vpro/util/locker/ObjectLockerTest.java @@ -2,19 +2,23 @@ import lombok.EqualsAndHashCode; import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; +import lombok.extern.log4j.Log4j2; import java.io.Serial; import java.io.Serializable; import java.time.Duration; import java.util.AbstractMap.SimpleEntry; -import java.util.List; +import java.util.*; import java.util.concurrent.*; import org.junit.jupiter.api.*; import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.ExecutionMode; +import nl.vpro.logging.simple.*; +import nl.vpro.util.ThreadPools; + +import static nl.vpro.util.locker.ObjectLocker.LOCKED_OBJECTS; import static nl.vpro.util.locker.ObjectLocker.withKeyLock; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -23,7 +27,7 @@ /** * @author Michiel Meeuwissen */ -@Slf4j +@Log4j2 @Execution(ExecutionMode.SAME_THREAD) public class ObjectLockerTest { @@ -224,14 +228,76 @@ public void twoDifferentLocksStrictly() { @Test public void delayedClose() throws InterruptedException, ExecutionException { - CompletableFuture voidCompletableFuture; - try (var closer = ObjectLocker.acquireLock("bla", "test", ObjectLocker.LOCKED_OBJECTS)) { + //TestClock testClock = TestClock.twentyTwenty(); + //ObjectLocker.clock = testClock; + //ObjectLocker.sleeper = d -> testClock.sleep(d.toMillis()); + + List events = Collections.synchronizedList(new ArrayList<>()); + SimpleLogger logger = Log4j2SimpleLogger.of(log).chain(QueueSimpleLogger.of(events::add)); + + CompletableFuture voidCompletableFuture = new CompletableFuture<>(); + try (var closer = ObjectLocker.acquireLock("bla", "initiallock", ObjectLocker.LOCKED_OBJECTS, Duration.ofSeconds(5))) { + ThreadPools.backgroundExecutor.execute(() -> { + logger.info("Concurrent: waiting for lock"); + ObjectLocker.withKeyLock("bla", "concurrent lock", () -> { + logger.info("Concurrent: acquired lock"); + sleep(100); + logger.info("Concurrent: about to release lock"); + }); + logger.info("Concurrent released lock. Completing proceess now."); + voidCompletableFuture.complete(null); + + }); + sleep(100); + logger.info("DelayClosing"); + } + + logger.info("going ahead.."); + voidCompletableFuture.get(); + logger.info("ready"); + assertThat(LOCKED_OBJECTS).isEmpty(); + assertThat(events.stream().map(Event::getMessage)).containsExactly( + "Concurrent: waiting for lock", + "DelayClosing", + "going ahead..", + "Concurrent: acquired lock", + "Concurrent: about to release lock", + "Concurrent released lock. Completing proceess now.", + "ready" + ); + } + + + @Test + public void delayedClose2() throws InterruptedException, ExecutionException { + List events = Collections.synchronizedList(new ArrayList<>()); + SimpleLogger logger = Log4j2SimpleLogger.of(log).chain(QueueSimpleLogger.of(events::add)); + CompletableFuture voidCompletableFuture = new CompletableFuture<>(); + try (var closer = ObjectLocker.acquireLock("bla", "initiallock", ObjectLocker.LOCKED_OBJECTS, Duration.ofSeconds(10))) { Thread.sleep(100); - log.info("DelayClosing"); - voidCompletableFuture = closer.delayedClose(Duration.ofSeconds(10)); + logger.info("DelayClosing"); } - log.info("going ahhead.."); + + logger.info("going ahead.."); + ObjectLocker.withKeyLock("bla", "concurrent lock", () -> { + logger.info("Concurrent: acquired lock"); + sleep(100); + logger.info("Concurrent: about to release lock"); + }); + logger.info("Concurrent released lock. Completing proceess now."); + voidCompletableFuture.complete(null); + voidCompletableFuture.get(); + logger.info("ready"); + assertThat(LOCKED_OBJECTS).isEmpty(); + assertThat(events.stream().map(Event::getMessage)).containsExactly( + "DelayClosing", + "going ahead..", + "Concurrent: acquired lock", + "Concurrent: about to release lock", + "Concurrent released lock. Completing proceess now.", + "ready" + ); } @@ -239,7 +305,7 @@ public void delayedClose() throws InterruptedException, ExecutionException { @SneakyThrows private static void sleep(long duration) { - Thread.sleep(duration); + ObjectLocker.sleeper.accept(Duration.ofMillis(duration)); } static ForkJoinTask submit(Runnable runnable) {