From 8355988032f2aeb5d30855d5a4a29ad76eb1de97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ha=CC=8Avard=20Ottestad?= Date: Wed, 14 Aug 2024 10:42:56 +0200 Subject: [PATCH 1/5] reproduce test failure --- .../testsuite/sail/SailConcurrencyTest.java | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/testsuites/sail/src/main/java/org/eclipse/rdf4j/testsuite/sail/SailConcurrencyTest.java b/testsuites/sail/src/main/java/org/eclipse/rdf4j/testsuite/sail/SailConcurrencyTest.java index 4ee406b0e7..4a2be27f09 100644 --- a/testsuites/sail/src/main/java/org/eclipse/rdf4j/testsuite/sail/SailConcurrencyTest.java +++ b/testsuites/sail/src/main/java/org/eclipse/rdf4j/testsuite/sail/SailConcurrencyTest.java @@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.eclipse.rdf4j.common.concurrent.locks.Properties; import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.common.transaction.IsolationLevels; import org.eclipse.rdf4j.model.IRI; @@ -36,7 +37,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,6 +78,7 @@ public abstract class SailConcurrencyTest { @BeforeEach public void setUp() { + Properties.setLockTrackingEnabled(true); store = createSail(); store.init(); vf = store.getValueFactory(); @@ -157,6 +159,7 @@ public int getSize() { */ @Test @Timeout(value = 30, unit = TimeUnit.MINUTES) +// @RepeatedTest(100) public void testConcurrentAddLargeTxn() throws Exception { logger.info("executing two large concurrent transactions"); final CountDownLatch runnersDone = new CountDownLatch(2); @@ -199,6 +202,7 @@ public void testConcurrentAddLargeTxn() throws Exception { */ @Test @Timeout(value = 30, unit = TimeUnit.MINUTES) +// @RepeatedTest(100) public void testConcurrentAddLargeTxnRollback() throws Exception { logger.info("executing two large concurrent transactions"); final CountDownLatch runnersDone = new CountDownLatch(2); @@ -317,6 +321,7 @@ public void testGetContextIDs() throws Exception { } } +// @RepeatedTest(100) @Test @Timeout(value = 30, unit = TimeUnit.MINUTES) public void testConcurrentConnectionsShutdown() throws InterruptedException { @@ -362,6 +367,7 @@ public void testConcurrentConnectionsShutdown() throws InterruptedException { } // @Disabled +// @RepeatedTest(100) @Test @Timeout(value = 30, unit = TimeUnit.MINUTES) public void testSerialThreads() throws InterruptedException { @@ -443,6 +449,7 @@ public void testSerialThreads() throws InterruptedException { } +// @RepeatedTest(100) @Test @Timeout(value = 30, unit = TimeUnit.MINUTES) public void testConcurrentConnectionsShutdownReadCommitted() throws InterruptedException { @@ -499,6 +506,7 @@ public void testConcurrentConnectionsShutdownReadCommitted() throws InterruptedE } +// @RepeatedTest(100) @Test @Timeout(value = 30, unit = TimeUnit.MINUTES) public void testConcurrentConnectionsShutdownAndClose() throws InterruptedException { @@ -545,13 +553,24 @@ public void testConcurrentConnectionsShutdownAndClose() throws InterruptedExcept try { if (thread2.isAlive()) { +// try { connection2.get().close(); + +// }finally { connection1.get().close(); + +// } } else { +// try { connection1.get().close(); + +// }finally { connection2.get().close(); + +// } } - } catch (SailException ignored) { + } catch (Throwable logged) { + logger.error("Error closing connection", logged); } try (SailConnection connection = store.getConnection()) { @@ -575,6 +594,7 @@ public void testConcurrentConnectionsShutdownAndClose() throws InterruptedExcept store.shutDown(); } +// @RepeatedTest(100) @Test @Timeout(value = 30, unit = TimeUnit.MINUTES) public void testConcurrentConnectionsShutdownAndCloseRollback() throws InterruptedException { From 2779ef33366150002a7d479dc30f9a3ac34ce850 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ha=CC=8Avard=20Ottestad?= Date: Wed, 14 Aug 2024 10:43:44 +0200 Subject: [PATCH 2/5] various attempts at fixing issue --- .../locks/ExclusiveReentrantLockManager.java | 26 ++++-- .../rdf4j/sail/helpers/AbstractSail.java | 2 + .../sail/helpers/AbstractSailConnection.java | 79 +++++++++++++------ .../rdf4j/sail/shacl/ShaclSailConnection.java | 49 ++++++------ 4 files changed, 105 insertions(+), 51 deletions(-) diff --git a/core/sail/api/src/main/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManager.java b/core/sail/api/src/main/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManager.java index 6ac7fa4df1..f9896126df 100644 --- a/core/sail/api/src/main/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManager.java +++ b/core/sail/api/src/main/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManager.java @@ -34,7 +34,7 @@ public class ExclusiveReentrantLockManager { private final int waitToCollect; - LockMonitoring lockMonitoring; + LockMonitoring lockMonitoring; public ExclusiveReentrantLockManager() { this(false); @@ -50,18 +50,18 @@ public ExclusiveReentrantLockManager(boolean trackLocks, int collectionFrequency if (trackLocks || Properties.lockTrackingEnabled()) { - lockMonitoring = new LockTracking( + lockMonitoring = new LockTracking<>( true, - "ExclusiveReentrantLockManager", + "ExclusiveReentrantLockManager-Tracking", LoggerFactory.getLogger(this.getClass()), waitToCollect, Lock.ExtendedSupplier.wrap(this::getExclusiveLockInner, this::tryExclusiveLockInner) ); } else { - lockMonitoring = new LockCleaner( + lockMonitoring = new LockCleaner<>( false, - "ExclusiveReentrantLockManager", + "ExclusiveReentrantLockManager-Cleaner", LoggerFactory.getLogger(this.getClass()), Lock.ExtendedSupplier.wrap(this::getExclusiveLockInner, this::tryExclusiveLockInner) ); @@ -87,6 +87,8 @@ private Lock tryExclusiveLockInner() { } + private final AtomicLong ownerIsDead = new AtomicLong(); + private Lock getExclusiveLockInner() throws InterruptedException { synchronized (owner) { @@ -100,6 +102,14 @@ private Lock getExclusiveLockInner() throws InterruptedException { if (lock != null) { return lock; } else { + if (!owner.get().isAlive()) { + long l = ownerIsDead.incrementAndGet(); + if (l > 10) { + ownerIsDead.set(0); + continue; + } + + } lockMonitoring.runCleanup(); owner.wait(waitToCollect); } @@ -113,6 +123,12 @@ private Lock getExclusiveLockInner() throws InterruptedException { if (lock != null) { return lock; } else { + if (!owner.get().isAlive()) { + System.out.println("Owner thread is dead"); +// activeLocks.set(0); +// owner.set(null); +// continue; + } owner.wait(waitToCollect); } } diff --git a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSail.java b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSail.java index 55e58fa3fa..c9b382e76e 100644 --- a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSail.java +++ b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSail.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.WeakHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.eclipse.rdf4j.common.transaction.IsolationLevel; @@ -119,6 +120,7 @@ protected static boolean debugEnabled() { * debugging was disable at the time the connection was acquired. */ private final Map activeConnections = new IdentityHashMap<>(); +// private final Map activeConnections = new WeakHashMap<>(); /* * constructors diff --git a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java index ca9bb7b60f..34fac8cab8 100644 --- a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java +++ b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java @@ -22,9 +22,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.LockSupport; -import java.util.concurrent.locks.ReentrantLock; import org.eclipse.rdf4j.common.annotation.InternalUseOnly; +import org.eclipse.rdf4j.common.concurrent.locks.ExclusiveReentrantLockManager; import org.eclipse.rdf4j.common.concurrent.locks.Lock; import org.eclipse.rdf4j.common.concurrent.locks.diagnostics.ConcurrentCleaner; import org.eclipse.rdf4j.common.iteration.CloseableIteration; @@ -98,7 +98,7 @@ public abstract class AbstractSailConnection implements SailConnection { private final AtomicReference activeThread = new AtomicReference<>(); @SuppressWarnings("FieldMayBeFinal") - private boolean isOpen = true; + private volatile boolean isOpen = true; private static final VarHandle IS_OPEN; private Thread owner; @@ -106,9 +106,8 @@ public abstract class AbstractSailConnection implements SailConnection { /** * Lock used to prevent concurrent calls to update methods like addStatement, clear, commit, etc. within a * transaction. - * */ - private final ReentrantLock updateLock = new ReentrantLock(); + private final ExclusiveReentrantLockManager updateLock = new ExclusiveReentrantLockManager(); private final LongAdder iterationsOpened = new LongAdder(); private final LongAdder iterationsClosed = new LongAdder(); @@ -200,8 +199,7 @@ public void begin(IsolationLevel isolationLevel) throws SailException { activeThread.setRelease(Thread.currentThread()); verifyIsOpen(); - - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); try { if (isActive()) { throw new SailException("a transaction is already active on this connection."); @@ -210,8 +208,11 @@ public void begin(IsolationLevel isolationLevel) throws SailException { startTransactionInternal(); txnActive = true; } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); @@ -505,15 +506,19 @@ public final void prepare() throws SailException { activeThread.setRelease(Thread.currentThread()); verifyIsOpen(); - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); + try { if (txnActive) { prepareInternal(); txnPrepared = true; } } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); @@ -535,7 +540,8 @@ public final void commit() throws SailException { verifyIsOpen(); - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); + try { if (txnActive) { if (!txnPrepared) { @@ -546,8 +552,11 @@ public final void commit() throws SailException { txnPrepared = false; } } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); @@ -572,7 +581,8 @@ public final void rollback() throws SailException { verifyIsOpen(); - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); + try { if (txnActive) { try { @@ -586,8 +596,11 @@ public final void rollback() throws SailException { debugEnabled ? new Throwable() : null); } } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); @@ -694,13 +707,17 @@ public final void endUpdate(UpdateContext op) throws SailException { verifyIsOpen(); - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); + try { verifyIsActive(); endUpdateInternal(op); } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); @@ -750,14 +767,18 @@ public final void clear(Resource... contexts) throws SailException { verifyIsOpen(); - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); + try { verifyIsActive(); clearInternal(contexts); statementsRemoved = true; } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); @@ -820,13 +841,17 @@ public final void setNamespace(String prefix, String name) throws SailException verifyIsOpen(); - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); + try { verifyIsActive(); setNamespaceInternal(prefix, name); } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); @@ -848,13 +873,17 @@ public final void removeNamespace(String prefix) throws SailException { verifyIsOpen(); - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); + try { verifyIsActive(); removeNamespaceInternal(prefix); } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); @@ -873,13 +902,17 @@ public final void clearNamespaces() throws SailException { verifyIsOpen(); - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); + try { verifyIsActive(); clearNamespacesInternal(); } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); diff --git a/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSailConnection.java b/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSailConnection.java index 829fdd9176..cec1d83061 100644 --- a/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSailConnection.java +++ b/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSailConnection.java @@ -735,49 +735,51 @@ synchronized public void close() throws SailException { if (closed) { return; } - - if (getWrappedConnection() instanceof AbstractSailConnection) { - AbstractSailConnection abstractSailConnection = (AbstractSailConnection) getWrappedConnection(); - - abstractSailConnection.waitForOtherOperations(true); - } - try { - if (isActive()) { - rollback(); + if (getWrappedConnection() instanceof AbstractSailConnection) { + AbstractSailConnection abstractSailConnection = (AbstractSailConnection) getWrappedConnection(); + + abstractSailConnection.waitForOtherOperations(true); } } finally { - try { - shapesRepoConnection.close(); + try { + if (isActive()) { + rollback(); + } } finally { try { - if (previousStateConnection != null) { - previousStateConnection.close(); - } + shapesRepoConnection.close(); } finally { try { - if (serializableConnection != null) { - serializableConnection.close(); + if (previousStateConnection != null) { + previousStateConnection.close(); } - } finally { + } finally { try { - super.close(); + if (serializableConnection != null) { + serializableConnection.close(); + } } finally { + try { - sail.closeConnection(); + super.close(); } finally { try { - cleanupShapesReadWriteLock(); + sail.closeConnection(); } finally { try { - cleanupReadWriteLock(); + cleanupShapesReadWriteLock(); } finally { - closed = true; - } + try { + cleanupReadWriteLock(); + } finally { + closed = true; + } + } } } } @@ -785,6 +787,7 @@ synchronized public void close() throws SailException { } } } + } @Override From b8cf9687c51b1e8779c61a078a7edaf231c831d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ha=CC=8Avard=20Ottestad?= Date: Sun, 29 Dec 2024 14:21:48 +0100 Subject: [PATCH 3/5] wip --- .../testsuite/sail/SailConcurrencyTest.java | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/testsuites/sail/src/main/java/org/eclipse/rdf4j/testsuite/sail/SailConcurrencyTest.java b/testsuites/sail/src/main/java/org/eclipse/rdf4j/testsuite/sail/SailConcurrencyTest.java index 4a2be27f09..56113d42e7 100644 --- a/testsuites/sail/src/main/java/org/eclipse/rdf4j/testsuite/sail/SailConcurrencyTest.java +++ b/testsuites/sail/src/main/java/org/eclipse/rdf4j/testsuite/sail/SailConcurrencyTest.java @@ -38,6 +38,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -157,9 +158,9 @@ public int getSize() { * * @see https://github.com/eclipse/rdf4j/issues/693 */ - @Test +// @Test @Timeout(value = 30, unit = TimeUnit.MINUTES) -// @RepeatedTest(100) + @RepeatedTest(100) public void testConcurrentAddLargeTxn() throws Exception { logger.info("executing two large concurrent transactions"); final CountDownLatch runnersDone = new CountDownLatch(2); @@ -200,9 +201,9 @@ public void testConcurrentAddLargeTxn() throws Exception { * Verifies that two large concurrent transactions in separate contexts do not cause inconsistencies or errors when * one of the transactions rolls back at the end. */ - @Test +// @Test @Timeout(value = 30, unit = TimeUnit.MINUTES) -// @RepeatedTest(100) + @RepeatedTest(100) public void testConcurrentAddLargeTxnRollback() throws Exception { logger.info("executing two large concurrent transactions"); final CountDownLatch runnersDone = new CountDownLatch(2); @@ -321,8 +322,8 @@ public void testGetContextIDs() throws Exception { } } -// @RepeatedTest(100) - @Test + @RepeatedTest(100) +// @Test @Timeout(value = 30, unit = TimeUnit.MINUTES) public void testConcurrentConnectionsShutdown() throws InterruptedException { if (store instanceof AbstractSail) { @@ -367,8 +368,8 @@ public void testConcurrentConnectionsShutdown() throws InterruptedException { } // @Disabled -// @RepeatedTest(100) - @Test + @RepeatedTest(100) +// @Test @Timeout(value = 30, unit = TimeUnit.MINUTES) public void testSerialThreads() throws InterruptedException { if (store instanceof AbstractSail) { @@ -449,8 +450,8 @@ public void testSerialThreads() throws InterruptedException { } -// @RepeatedTest(100) - @Test + @RepeatedTest(100) +// @Test @Timeout(value = 30, unit = TimeUnit.MINUTES) public void testConcurrentConnectionsShutdownReadCommitted() throws InterruptedException { if (store instanceof AbstractSail) { @@ -506,8 +507,8 @@ public void testConcurrentConnectionsShutdownReadCommitted() throws InterruptedE } -// @RepeatedTest(100) - @Test + @RepeatedTest(100) +// @Test @Timeout(value = 30, unit = TimeUnit.MINUTES) public void testConcurrentConnectionsShutdownAndClose() throws InterruptedException { if (store instanceof AbstractSail) { @@ -594,8 +595,8 @@ public void testConcurrentConnectionsShutdownAndClose() throws InterruptedExcept store.shutDown(); } -// @RepeatedTest(100) - @Test + @RepeatedTest(1000) +// @Test @Timeout(value = 30, unit = TimeUnit.MINUTES) public void testConcurrentConnectionsShutdownAndCloseRollback() throws InterruptedException { if (store instanceof AbstractSail) { From dbcdf263c5f87138e3c5cad347641672abf78ec8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ha=CC=8Avard=20Ottestad?= Date: Fri, 3 Jan 2025 07:43:24 +0100 Subject: [PATCH 4/5] wip --- .../concurrent/locks/ExclusiveReentrantLockManager.java | 4 ++-- .../locks/ExclusiveReentrantLockManagerTest.java | 3 ++- testsuites/benchmark/pom.xml | 7 +++++++ 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/core/sail/api/src/main/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManager.java b/core/sail/api/src/main/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManager.java index f9896126df..010ae1d0a6 100644 --- a/core/sail/api/src/main/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManager.java +++ b/core/sail/api/src/main/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManager.java @@ -52,7 +52,7 @@ public ExclusiveReentrantLockManager(boolean trackLocks, int collectionFrequency lockMonitoring = new LockTracking<>( true, - "ExclusiveReentrantLockManager-Tracking", + "ExclusiveReentrantLockManager(w/tracking)", LoggerFactory.getLogger(this.getClass()), waitToCollect, Lock.ExtendedSupplier.wrap(this::getExclusiveLockInner, this::tryExclusiveLockInner) @@ -61,7 +61,7 @@ public ExclusiveReentrantLockManager(boolean trackLocks, int collectionFrequency } else { lockMonitoring = new LockCleaner<>( false, - "ExclusiveReentrantLockManager-Cleaner", + "ExclusiveReentrantLockManager(w/cleaner)", LoggerFactory.getLogger(this.getClass()), Lock.ExtendedSupplier.wrap(this::getExclusiveLockInner, this::tryExclusiveLockInner) ); diff --git a/core/sail/api/src/test/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManagerTest.java b/core/sail/api/src/test/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManagerTest.java index 83fe59d93e..66660995d6 100644 --- a/core/sail/api/src/test/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManagerTest.java +++ b/core/sail/api/src/test/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManagerTest.java @@ -128,7 +128,8 @@ void stalledTest() throws InterruptedException { assertThat(memoryAppender.countEventsForLogger(ExclusiveReentrantLockManager.class.getName())) .isGreaterThanOrEqualTo(1); - memoryAppender.assertContains("is waiting on a possibly stalled lock \"ExclusiveReentrantLockManager\" with id", + memoryAppender.assertContains( + "is waiting on a possibly stalled lock \"ExclusiveReentrantLockManager(w/tracking)\" with id", Level.INFO); memoryAppender.assertContains( "at org.eclipse.rdf4j.common.concurrent.locks.ExclusiveReentrantLockManagerTest.lambda$stalledTest$0(ExclusiveReentrantLockManagerTest.java:", diff --git a/testsuites/benchmark/pom.xml b/testsuites/benchmark/pom.xml index e90aa3e94e..b71a8c3c45 100644 --- a/testsuites/benchmark/pom.xml +++ b/testsuites/benchmark/pom.xml @@ -115,6 +115,13 @@ + + com.github.siom79.japicmp + japicmp-maven-plugin + + true + + From 72f9f34d15f9f369b97ede66ed3ca98b02e635cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ha=CC=8Avard=20Ottestad?= Date: Thu, 23 Jan 2025 15:10:13 +0100 Subject: [PATCH 5/5] wip --- .../locks/ExclusiveReentrantLockManager.java | 1 + .../sail/helpers/AbstractSailConnection.java | 4 +- .../rdf4j/sail/base/SailSourceConnection.java | 2 +- .../ExtensibleStoreConnection.java | 2 +- .../rdf4j/sail/lmdb/LmdbStoreConnection.java | 2 +- .../sail/memory/MemoryStoreConnection.java | 2 +- .../sail/nativerdf/NativeStoreConnection.java | 2 +- .../rdf4j/sail/shacl/ShaclSailConnection.java | 75 ++++++++++--------- .../testsuite/sail/SailConcurrencyTest.java | 5 +- .../rdf4j/federated/FedXConnection.java | 2 +- 10 files changed, 51 insertions(+), 46 deletions(-) diff --git a/core/sail/api/src/main/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManager.java b/core/sail/api/src/main/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManager.java index 010ae1d0a6..698b086d46 100644 --- a/core/sail/api/src/main/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManager.java +++ b/core/sail/api/src/main/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManager.java @@ -115,6 +115,7 @@ private Lock getExclusiveLockInner() throws InterruptedException { } } while (true); } else { + int deadCount = 0; while (true) { if (Thread.interrupted()) { throw new InterruptedException(); diff --git a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java index 34fac8cab8..976300af88 100644 --- a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java +++ b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java @@ -1001,7 +1001,7 @@ protected void prepareInternal() throws SailException { protected abstract void commitInternal() throws SailException; - protected abstract void rollbackInternal() throws SailException; + public abstract void rollbackInternal() throws SailException; protected abstract void addStatementInternal(Resource subj, IRI pred, Value obj, Resource... contexts) throws SailException; @@ -1032,7 +1032,7 @@ protected AbstractSail getSailBase() { return sailBase; } - private void forceCloseActiveOperations() throws SailException { + public void forceCloseActiveOperations() throws SailException { for (int i = 0; i < 10 && isActiveOperation() && !debugEnabled; i++) { System.gc(); try { diff --git a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceConnection.java b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceConnection.java index 7942984593..8f54ca839d 100644 --- a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceConnection.java +++ b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceConnection.java @@ -513,7 +513,7 @@ protected void commitInternal() throws SailException { } @Override - protected void rollbackInternal() throws SailException { + public void rollbackInternal() throws SailException { synchronized (datasets) { SailDataset toCloseDataset = null; SailSink toCloseExplicitSink = null; diff --git a/core/sail/extensible-store/src/main/java/org/eclipse/rdf4j/sail/extensiblestore/ExtensibleStoreConnection.java b/core/sail/extensible-store/src/main/java/org/eclipse/rdf4j/sail/extensiblestore/ExtensibleStoreConnection.java index b599d07a97..cddc2971a9 100755 --- a/core/sail/extensible-store/src/main/java/org/eclipse/rdf4j/sail/extensiblestore/ExtensibleStoreConnection.java +++ b/core/sail/extensible-store/src/main/java/org/eclipse/rdf4j/sail/extensiblestore/ExtensibleStoreConnection.java @@ -54,7 +54,7 @@ protected void commitInternal() throws SailException { } @Override - protected void rollbackInternal() throws SailException { + public void rollbackInternal() throws SailException { super.rollbackInternal(); // create a fresh event object. sailChangedEvent = new DefaultSailChangedEvent(sail); diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStoreConnection.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStoreConnection.java index 8dc3c7019e..1ec127f414 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStoreConnection.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStoreConnection.java @@ -102,7 +102,7 @@ protected void commitInternal() throws SailException { } @Override - protected void rollbackInternal() throws SailException { + public void rollbackInternal() throws SailException { try { super.rollbackInternal(); } finally { diff --git a/core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/MemoryStoreConnection.java b/core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/MemoryStoreConnection.java index c4123966a4..f51a92d29b 100644 --- a/core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/MemoryStoreConnection.java +++ b/core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/MemoryStoreConnection.java @@ -72,7 +72,7 @@ protected void commitInternal() throws SailException { } @Override - protected void rollbackInternal() throws SailException { + public void rollbackInternal() throws SailException { super.rollbackInternal(); // create a fresh event object. sailChangedEvent = new DefaultSailChangedEvent(sail); diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeStoreConnection.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeStoreConnection.java index eb2a39fd8b..64e6f9ec1e 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeStoreConnection.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeStoreConnection.java @@ -87,7 +87,7 @@ protected void commitInternal() throws SailException { } @Override - protected void rollbackInternal() throws SailException { + public void rollbackInternal() throws SailException { try { super.rollbackInternal(); } finally { diff --git a/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSailConnection.java b/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSailConnection.java index cec1d83061..511f587e4a 100644 --- a/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSailConnection.java +++ b/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSailConnection.java @@ -11,20 +11,6 @@ package org.eclipse.rdf4j.sail.shacl; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; -import java.util.stream.Collectors; -import java.util.stream.Stream; - import org.eclipse.rdf4j.common.concurrent.locks.Lock; import org.eclipse.rdf4j.common.concurrent.locks.StampedLockManager; import org.eclipse.rdf4j.common.iteration.CloseableIteration; @@ -58,6 +44,20 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.stream.Collectors; +import java.util.stream.Stream; + /** * @author Heshan Jayasinghe * @author HÃ¥vard Ottestad @@ -107,7 +107,7 @@ public class ShaclSailConnection extends NotifyingSailConnectionWrapper implemen private volatile boolean closed; ShaclSailConnection(ShaclSail sail, NotifyingSailConnection connection, SailConnection previousStateConnection, - SailRepositoryConnection shapesRepoConnection, SailConnection serializableConnection) { + SailRepositoryConnection shapesRepoConnection, SailConnection serializableConnection) { super(connection); this.previousStateConnection = previousStateConnection; this.shapesRepoConnection = shapesRepoConnection; @@ -118,7 +118,7 @@ public class ShaclSailConnection extends NotifyingSailConnectionWrapper implemen } ShaclSailConnection(ShaclSail sail, NotifyingSailConnection connection, SailConnection previousStateConnection, - SailRepositoryConnection shapesRepoConnection) { + SailRepositoryConnection shapesRepoConnection) { super(connection); this.previousStateConnection = previousStateConnection; this.shapesRepoConnection = shapesRepoConnection; @@ -129,7 +129,7 @@ public class ShaclSailConnection extends NotifyingSailConnectionWrapper implemen } ShaclSailConnection(ShaclSail sail, NotifyingSailConnection connection, - SailRepositoryConnection shapesRepoConnection, SailConnection serializableConnection) { + SailRepositoryConnection shapesRepoConnection, SailConnection serializableConnection) { super(connection); this.previousStateConnection = null; this.shapesRepoConnection = shapesRepoConnection; @@ -140,7 +140,7 @@ public class ShaclSailConnection extends NotifyingSailConnectionWrapper implemen } ShaclSailConnection(ShaclSail sail, NotifyingSailConnection connection, - SailRepositoryConnection shapesRepoConnection) { + SailRepositoryConnection shapesRepoConnection) { super(connection); this.previousStateConnection = null; this.serializableConnection = null; @@ -230,7 +230,7 @@ public void begin(IsolationLevel level) throws SailException { /** * @return the transaction settings that are based purely on the settings based down through the begin(...) method - * without considering any sail level settings for things like caching or parallel validation. + * without considering any sail level settings for things like caching or parallel validation. */ private Settings getLocalTransactionSettings() { return new Settings(this); @@ -503,7 +503,7 @@ ConnectionsGroup getConnectionsGroup() { } private ValidationReport performValidation(List shapes, boolean validateEntireBaseSail, - ConnectionsGroup connectionsGroup) throws InterruptedException { + ConnectionsGroup connectionsGroup) throws InterruptedException { long beforeValidation = 0; if (sail.isPerformanceLogging()) { @@ -738,14 +738,17 @@ synchronized public void close() throws SailException { try { if (getWrappedConnection() instanceof AbstractSailConnection) { AbstractSailConnection abstractSailConnection = (AbstractSailConnection) getWrappedConnection(); - abstractSailConnection.waitForOtherOperations(true); + abstractSailConnection.forceCloseActiveOperations(); + if(abstractSailConnection.isActive()) { + abstractSailConnection.rollbackInternal(); + } } } finally { try { if (isActive()) { - rollback(); + super.close(); } } finally { try { @@ -1043,7 +1046,7 @@ public RdfsSubClassOfReasoner getRdfsSubClassOfReasoner() { @Override public CloseableIteration getStatements(Resource subj, IRI pred, Value obj, - boolean includeInferred, Resource... contexts) throws SailException { + boolean includeInferred, Resource... contexts) throws SailException { if (useDefaultShapesGraph && contexts.length == 1 && RDF4J.SHACL_SHAPE_GRAPH.equals(contexts[0])) { return ConnectionHelper .getCloseableIteration(shapesRepoConnection.getStatements(subj, pred, obj, includeInferred)); @@ -1097,7 +1100,7 @@ public static class Settings { transient private Settings previous = null; public Settings(boolean cacheSelectNodes, boolean validationEnabled, boolean parallelValidation, - IsolationLevel isolationLevel) { + IsolationLevel isolationLevel) { this.cacheSelectedNodes = cacheSelectNodes; if (!validationEnabled) { validationApproach = ValidationApproach.Disabled; @@ -1122,18 +1125,18 @@ public Settings(ShaclSailConnection connection) { validationApproach = (ValidationApproach) transactionSetting; } else if (transactionSetting instanceof ShaclSail.TransactionSettings.PerformanceHint) { switch (((ShaclSail.TransactionSettings.PerformanceHint) transactionSetting)) { - case ParallelValidation: - parallelValidation = true; - break; - case SerialValidation: - parallelValidation = false; - break; - case CacheDisabled: - cacheSelectedNodes = false; - break; - case CacheEnabled: - cacheSelectedNodes = true; - break; + case ParallelValidation: + parallelValidation = true; + break; + case SerialValidation: + parallelValidation = false; + break; + case CacheDisabled: + cacheSelectedNodes = false; + break; + case CacheEnabled: + cacheSelectedNodes = true; + break; } } diff --git a/testsuites/sail/src/main/java/org/eclipse/rdf4j/testsuite/sail/SailConcurrencyTest.java b/testsuites/sail/src/main/java/org/eclipse/rdf4j/testsuite/sail/SailConcurrencyTest.java index 56113d42e7..530ba87390 100644 --- a/testsuites/sail/src/main/java/org/eclipse/rdf4j/testsuite/sail/SailConcurrencyTest.java +++ b/testsuites/sail/src/main/java/org/eclipse/rdf4j/testsuite/sail/SailConcurrencyTest.java @@ -79,7 +79,7 @@ public abstract class SailConcurrencyTest { @BeforeEach public void setUp() { - Properties.setLockTrackingEnabled(true); +// Properties.setLockTrackingEnabled(true); store = createSail(); store.init(); vf = store.getValueFactory(); @@ -509,7 +509,7 @@ public void testConcurrentConnectionsShutdownReadCommitted() throws InterruptedE @RepeatedTest(100) // @Test - @Timeout(value = 30, unit = TimeUnit.MINUTES) +// @Timeout(value = 30, unit = TimeUnit.MINUTES) public void testConcurrentConnectionsShutdownAndClose() throws InterruptedException { if (store instanceof AbstractSail) { ((AbstractSail) store).setConnectionTimeOut(200); @@ -572,6 +572,7 @@ public void testConcurrentConnectionsShutdownAndClose() throws InterruptedExcept } } catch (Throwable logged) { logger.error("Error closing connection", logged); + throw logged; } try (SailConnection connection = store.getConnection()) { diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConnection.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConnection.java index e1df6b6ca8..e9c98f8c67 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConnection.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConnection.java @@ -362,7 +362,7 @@ protected void removeStatementsInternal(Resource subj, IRI pred, Value obj, Reso } @Override - protected void rollbackInternal() throws SailException { + public void rollbackInternal() throws SailException { try { getWriteStrategyInternal().rollback(); } catch (RepositoryException e) {