Skip to content

Commit

Permalink
Changed idea of delaylocking a bit. Just made the next holder thread …
Browse files Browse the repository at this point in the history
…do the waiting.
  • Loading branch information
mihxil committed Dec 5, 2024
1 parent afd76fd commit fe9dcf2
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 70 deletions.
115 changes: 54 additions & 61 deletions vpro-shared-util/src/main/java/nl/vpro/util/locker/ObjectLocker.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

import java.io.Serializable;
import java.lang.ref.WeakReference;
import java.time.Duration;
import java.time.Instant;
import java.time.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -18,6 +17,7 @@
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.meeuw.functional.Predicates;
import org.meeuw.functional.ThrowingConsumer;
import org.slf4j.event.Level;

import nl.vpro.logging.Slf4jHelper;
Expand All @@ -27,7 +27,10 @@
*/
@Slf4j
public class ObjectLocker {

public static Clock clock = Clock.systemUTC();
public static ThrowingConsumer<Duration, InterruptedException> sleeper = (d) -> {
Thread.sleep(d.toMillis());
};


public static BiPredicate<StackTraceElement, AtomicInteger> summaryBiPredicate =
Expand Down Expand Up @@ -150,7 +153,7 @@ public static <T> T withKeyLock(
}

public static <T> T withKeyLock(
Serializable id,
@NonNull Serializable id,
@NonNull String reason,
@NonNull Runnable runnable) {
return withKeyLock(id, reason, () -> {
Expand All @@ -159,9 +162,6 @@ public static <T> T withKeyLock(
});
}




/**
* @param key The key to lock on
* @param reason A description for the reason of locking, which can be used in logging or exceptions
Expand Down Expand Up @@ -192,7 +192,7 @@ public static <T, K extends Serializable> T withObjectLock(
log.warn("Calling with null key: {}", reason);
return callable.call();
}
try (final LockHolderCloser<K> lock = acquireLock(key, reason, locks, comparable)) {
try (final LockHolderCloser<K> lock = acquireLock(key, reason, locks, comparable, Duration.ZERO)) {
consumer.accept(lock.lockHolder);
return callable.call();
}
Expand All @@ -201,24 +201,27 @@ public static <T, K extends Serializable> T withObjectLock(
public static <K extends Serializable> LockHolderCloser<Serializable> acquireLock(
final Serializable key,
final @NonNull String reason,
final @NonNull Map<Serializable, LockHolder<Serializable>> locks) throws InterruptedException {
return acquireLock(key, reason, locks, CLASS_EQUALS);
final @NonNull Map<Serializable, LockHolder<Serializable>> locks,
Duration delayAfterClose) throws InterruptedException {
return acquireLock(key, reason, locks, CLASS_EQUALS, delayAfterClose);
}


@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
public static <K extends Serializable> LockHolderCloser<K> acquireLock(
private static <K extends Serializable> LockHolderCloser<K> acquireLock(
final K key,
final @NonNull String reason,
final @NonNull Map<K, LockHolder<K>> locks,
final @NonNull BiPredicate<Serializable, K> comparable) throws InterruptedException {
final @NonNull BiPredicate<Serializable, K> comparable,
final Duration delayAfterRelease) throws InterruptedException {
final long nanoStart = System.nanoTime();
LockHolderCloser<K> closer = null;
try {

LockHolder<K> holder;
boolean alreadyWaiting = false;
while (true) {
log.debug("Acquiring lock {} ({})", key, reason);
synchronized (locks) {
holder = locks.computeIfAbsent(key, (m) ->
computeLock(m, reason, comparable))
Expand All @@ -229,19 +232,33 @@ public static <K extends Serializable> LockHolderCloser<K> acquireLock(
locks.remove(key);
continue;
}
closer = new LockHolderCloser<>(nanoStart, locks, holder, comparable);
closer = new LockHolderCloser<>(nanoStart, locks, holder, comparable, delayAfterRelease);
if (holder.lock.isLocked() && !holder.lock.isHeldByCurrentThread()) {
log.debug("There are already threads ({}) for {}, waiting", holder.lock.getQueueLength(), key);
alreadyWaiting = true;
}
break;
}
}


if (Optional.ofNullable(threadLocalMonitorTime.get()).map(c -> true).orElse(monitor)) {
monitoredLock(holder, key);
} else {
log.debug("Locking for {}", holder);
holder.lock.lock();
}

if (holder.availableAfter != null) {
Instant now = clock.instant();
if (holder.availableAfter.isAfter(now)) {
sleeper.accept(Duration.between(now, holder.availableAfter));
}
log.debug("Acquired and waited until {}", holder.availableAfter);
holder.availableAfter = null;

}

if (alreadyWaiting) {
log.debug("Released and continuing {}", key);
}
Expand Down Expand Up @@ -332,7 +349,9 @@ private static <K extends Serializable> void releaseLock(
if (lock.lock.getHoldCount() == 1) {
if (!lock.lock.hasQueuedThreads()) {
log.trace("Removed {}", lock.key);
locks.remove(lock.key);
if (lock.availableAfter == null || lock.availableAfter.isBefore(clock.instant())) {
locks.remove(lock.key);
}
}
final Duration duration = Duration.ofNanos(System.nanoTime() - nanoStart);
for (Listener listener : LISTENERS) {
Expand All @@ -343,15 +362,20 @@ private static <K extends Serializable> void releaseLock(
"Released lock for {} ({}) in {}", lock.key, lock.reason, Duration.ofNanos(System.nanoTime() - nanoStart));
}
if (lock.lock.isHeldByCurrentThread()) { // MSE-4946
if (lock.availableAfter == null || lock.availableAfter.isBefore(clock.instant())) {

}
HOLDS.get().remove(lock);
lock.lock.unlock();

} else {
// can happen if 'continuing without lock'
Thread currentThread = Thread.currentThread();
log.warn("Current lock {} not hold by current thread {} ({}) but by {} ({})", lock, currentThread.getName(), currentThread, Optional.ofNullable(lock.thread.get()).map(Thread::getName).orElse(null), lock.thread.get(), new Exception());
}

locks.notifyAll();

}
}

Expand All @@ -373,7 +397,7 @@ public static class LockHolder<K> {
public final ReentrantLock lock;
final StackTraceElement[] initiator;
final WeakReference<Thread> thread;
final Instant createdAt = Instant.now();
final Instant createdAt = clock.instant();
final String reason;

@Getter
Expand All @@ -383,6 +407,10 @@ public static class LockHolder<K> {
@Setter
private Duration warnTime = ObjectLocker.defaultWarnTime;

@Getter
@Setter
private Instant availableAfter;


LockHolder(K k, String reason, ReentrantLock lock) {
this.key = k;
Expand Down Expand Up @@ -414,7 +442,7 @@ public int hashCode() {
* @since 5.13
*/
public Duration getAge() {
return Duration.between(createdAt, Instant.now());
return Duration.between(createdAt, clock.instant());
}

public String summarize() {
Expand All @@ -438,7 +466,7 @@ public String summarize(boolean showThreadBusy) {

@Override
public String toString() {
return "holder:" + key + ":" + createdAt;
return "holder:" + key + ":" + createdAt + ":" + reason;
}

public void disable(boolean interrupt) {
Expand All @@ -465,65 +493,30 @@ public static class LockHolderCloser<K extends Serializable> implements AutoClos
final BiPredicate<Serializable, K> comparable;
boolean closed = false;

final Duration delayAfterClose;

private LockHolderCloser(
final long nanoStart,
@NonNull Map<K, LockHolder<K>> locks,
@NonNull LockHolder<K> lockHolder,
BiPredicate<Serializable, K> comparable
BiPredicate<Serializable, K> comparable,
Duration delayAfterClose
) {
this.nanoStart = nanoStart;
this.locks = locks;
this.lockHolder = lockHolder;
this.comparable = comparable;
this.delayAfterClose = delayAfterClose;
}

public CompletableFuture<LockHolderCloser<K>> forTransfer() {
close();
CompletableFuture<LockHolderCloser<K>> future = new CompletableFuture<>();


future.thenAccept((o) -> {
if (log.isDebugEnabled()) {
log.debug("Completed transfer {} ({}) -> {} ({})", LockHolderCloser.this, LockHolderCloser.this.lockHolder.thread.get(), o, o.lockHolder.thread.get());
}
});
log.debug("Future to use {}", future);
return future;
}

public LockHolderCloser<K> transfer(CompletableFuture<LockHolderCloser<K>> future) {
synchronized (locks) {
try {

LockHolderCloser<K> closer = acquireLock(lockHolder.key, lockHolder.reason, locks, comparable);
future.complete(closer);
return closer;
} catch (Exception e) {
return null;
}
}
}
public CompletableFuture<Void> delayedClose(Duration duration) {
CompletableFuture<Void> ready = new CompletableFuture<>();
final CompletableFuture<LockHolderCloser<K>> future = forTransfer();
ForkJoinPool.commonPool().execute(() -> {

try (var closer = transfer(future)) {
log.debug("{} Starting delay", lockHolder.key);
Thread.sleep(duration.toMillis());
log.info("{} Ready after delay {}", lockHolder.key, duration);
ready.complete(null);
} catch (InterruptedException e) {
log.warn(e.getMessage(), e);
} finally {

}
});
return ready;
}

@Override
public void close() {
if (delayAfterClose.compareTo(Duration.ZERO) > 0) {
lockHolder.setAvailableAfter(clock.instant().plus(delayAfterClose));
}

synchronized (locks) {
if (!closed) {
releaseLock(nanoStart, locks, lockHolder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,23 @@

import lombok.EqualsAndHashCode;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import lombok.extern.log4j.Log4j2;

import java.io.Serial;
import java.io.Serializable;
import java.time.Duration;
import java.util.AbstractMap.SimpleEntry;
import java.util.List;
import java.util.*;
import java.util.concurrent.*;

import org.junit.jupiter.api.*;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

import nl.vpro.logging.simple.*;
import nl.vpro.util.ThreadPools;

import static nl.vpro.util.locker.ObjectLocker.LOCKED_OBJECTS;
import static nl.vpro.util.locker.ObjectLocker.withKeyLock;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand All @@ -23,7 +27,7 @@
/**
* @author Michiel Meeuwissen
*/
@Slf4j
@Log4j2
@Execution(ExecutionMode.SAME_THREAD)
public class ObjectLockerTest {

Expand Down Expand Up @@ -224,22 +228,84 @@ public void twoDifferentLocksStrictly() {

@Test
public void delayedClose() throws InterruptedException, ExecutionException {
CompletableFuture<Void> voidCompletableFuture;
try (var closer = ObjectLocker.acquireLock("bla", "test", ObjectLocker.LOCKED_OBJECTS)) {
//TestClock testClock = TestClock.twentyTwenty();
//ObjectLocker.clock = testClock;
//ObjectLocker.sleeper = d -> testClock.sleep(d.toMillis());

List<Event> events = Collections.synchronizedList(new ArrayList<>());
SimpleLogger logger = Log4j2SimpleLogger.of(log).chain(QueueSimpleLogger.of(events::add));

CompletableFuture<Void> voidCompletableFuture = new CompletableFuture<>();
try (var closer = ObjectLocker.acquireLock("bla", "initiallock", ObjectLocker.LOCKED_OBJECTS, Duration.ofSeconds(5))) {
ThreadPools.backgroundExecutor.execute(() -> {
logger.info("Concurrent: waiting for lock");
ObjectLocker.withKeyLock("bla", "concurrent lock", () -> {
logger.info("Concurrent: acquired lock");
sleep(100);
logger.info("Concurrent: about to release lock");
});
logger.info("Concurrent released lock. Completing proceess now.");
voidCompletableFuture.complete(null);

});
sleep(100);
logger.info("DelayClosing");
}

logger.info("going ahead..");
voidCompletableFuture.get();
logger.info("ready");
assertThat(LOCKED_OBJECTS).isEmpty();
assertThat(events.stream().map(Event::getMessage)).containsExactly(
"Concurrent: waiting for lock",
"DelayClosing",
"going ahead..",
"Concurrent: acquired lock",
"Concurrent: about to release lock",
"Concurrent released lock. Completing proceess now.",
"ready"
);
}


@Test
public void delayedClose2() throws InterruptedException, ExecutionException {
List<Event> events = Collections.synchronizedList(new ArrayList<>());
SimpleLogger logger = Log4j2SimpleLogger.of(log).chain(QueueSimpleLogger.of(events::add));
CompletableFuture<Void> voidCompletableFuture = new CompletableFuture<>();
try (var closer = ObjectLocker.acquireLock("bla", "initiallock", ObjectLocker.LOCKED_OBJECTS, Duration.ofSeconds(10))) {
Thread.sleep(100);
log.info("DelayClosing");
voidCompletableFuture = closer.delayedClose(Duration.ofSeconds(10));
logger.info("DelayClosing");
}
log.info("going ahhead..");

logger.info("going ahead..");
ObjectLocker.withKeyLock("bla", "concurrent lock", () -> {
logger.info("Concurrent: acquired lock");
sleep(100);
logger.info("Concurrent: about to release lock");
});
logger.info("Concurrent released lock. Completing proceess now.");
voidCompletableFuture.complete(null);

voidCompletableFuture.get();
logger.info("ready");
assertThat(LOCKED_OBJECTS).isEmpty();
assertThat(events.stream().map(Event::getMessage)).containsExactly(
"DelayClosing",
"going ahead..",
"Concurrent: acquired lock",
"Concurrent: about to release lock",
"Concurrent released lock. Completing proceess now.",
"ready"
);

}



@SneakyThrows
private static void sleep(long duration) {
Thread.sleep(duration);
ObjectLocker.sleeper.accept(Duration.ofMillis(duration));
}

static ForkJoinTask<?> submit(Runnable runnable) {
Expand Down

0 comments on commit fe9dcf2

Please sign in to comment.