From 4ecf8ca3bedf5778557ee7c00a5708e9e99458c6 Mon Sep 17 00:00:00 2001 From: Sotatek-QuanLeA Date: Thu, 21 Nov 2024 15:13:41 +0700 Subject: [PATCH] add lock for concurrent job --- .../ledgersync/scheduler/config/ExecutorConfig.java | 9 +++++++++ .../service/impl/OffChainPersistServiceImpl.java | 7 +++++++ .../service/impl/OffChainRetryDataErrorServiceImpl.java | 8 ++++++++ 3 files changed, 24 insertions(+) diff --git a/components/scheduler/src/main/java/org/cardanofoundation/ledgersync/scheduler/config/ExecutorConfig.java b/components/scheduler/src/main/java/org/cardanofoundation/ledgersync/scheduler/config/ExecutorConfig.java index 149ee6a9..b6cbe77a 100644 --- a/components/scheduler/src/main/java/org/cardanofoundation/ledgersync/scheduler/config/ExecutorConfig.java +++ b/components/scheduler/src/main/java/org/cardanofoundation/ledgersync/scheduler/config/ExecutorConfig.java @@ -1,5 +1,7 @@ package org.cardanofoundation.ledgersync.scheduler.config; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -13,15 +15,22 @@ public class ExecutorConfig { private final ExecutorService executor; + private final Lock lock; public ExecutorConfig() { this.executor = Executors.newVirtualThreadPerTaskExecutor(); + this.lock = new ReentrantLock(); } @Bean(name = "offChainExecutor") public ExecutorService virtualThreadExecutor() { return executor; } + + @Bean(name = "virtualThreadLock") + public Lock executorLock() { + return lock; + } @PreDestroy public void shutdown() { diff --git a/components/scheduler/src/main/java/org/cardanofoundation/ledgersync/scheduler/service/impl/OffChainPersistServiceImpl.java b/components/scheduler/src/main/java/org/cardanofoundation/ledgersync/scheduler/service/impl/OffChainPersistServiceImpl.java index 51389a4f..64f9d01e 100644 --- a/components/scheduler/src/main/java/org/cardanofoundation/ledgersync/scheduler/service/impl/OffChainPersistServiceImpl.java +++ b/components/scheduler/src/main/java/org/cardanofoundation/ledgersync/scheduler/service/impl/OffChainPersistServiceImpl.java @@ -5,6 +5,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.concurrent.locks.Lock; import org.cardanofoundation.ledgersync.scheduler.service.OffChainPersistService; import org.cardanofoundation.ledgersync.scheduler.service.offchain.OffChainProcessPersistDataService; import org.springframework.beans.factory.annotation.Qualifier; @@ -20,6 +21,7 @@ public class OffChainPersistServiceImpl implements OffChainPersistService { final ExecutorService executor; + final Lock lock; final OffChainProcessPersistDataService govActionPersistServiceImpl; final OffChainProcessPersistDataService votingDataPersistServiceImpl; final OffChainProcessPersistDataService constitutionPersistServiceImpl; @@ -28,6 +30,7 @@ public class OffChainPersistServiceImpl implements OffChainPersistService { public OffChainPersistServiceImpl( @Qualifier("offChainExecutor") ExecutorService executor, + @Qualifier("virtualThreadLock") Lock lock, @Qualifier("govActionPersistServiceImpl") OffChainProcessPersistDataService govActionPersistServiceImpl, @Qualifier("votingDataPersistServiceImpl") OffChainProcessPersistDataService votingDataPersistServiceImpl, @Qualifier("constitutionPersistServiceImpl") OffChainProcessPersistDataService constitutionPersistServiceImpl, @@ -35,6 +38,7 @@ public OffChainPersistServiceImpl( @Qualifier("dRepRegistrationPersistServiceImpl") OffChainProcessPersistDataService dRepRegistrationPersistServiceImpl) { this.executor = executor; + this.lock = lock; this.govActionPersistServiceImpl = govActionPersistServiceImpl; this.votingDataPersistServiceImpl = votingDataPersistServiceImpl; this.constitutionPersistServiceImpl = constitutionPersistServiceImpl; @@ -44,6 +48,7 @@ public OffChainPersistServiceImpl( @Override public void validateAndPersistData() { + lock.lock(); long startTime = System.currentTimeMillis(); log.info("Start validating off-chain data"); @@ -62,6 +67,8 @@ public void validateAndPersistData() { CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); } catch (Exception e) { log.error("Error processing validating off-chain data tasks", e.getCause()); + } finally { + lock.unlock(); } log.info("End validating off-chain data, taken time: {} ms", System.currentTimeMillis() - startTime); } diff --git a/components/scheduler/src/main/java/org/cardanofoundation/ledgersync/scheduler/service/impl/OffChainRetryDataErrorServiceImpl.java b/components/scheduler/src/main/java/org/cardanofoundation/ledgersync/scheduler/service/impl/OffChainRetryDataErrorServiceImpl.java index 72c3e05e..fd665bab 100644 --- a/components/scheduler/src/main/java/org/cardanofoundation/ledgersync/scheduler/service/impl/OffChainRetryDataErrorServiceImpl.java +++ b/components/scheduler/src/main/java/org/cardanofoundation/ledgersync/scheduler/service/impl/OffChainRetryDataErrorServiceImpl.java @@ -6,6 +6,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.locks.Lock; import org.cardanofoundation.ledgersync.scheduler.service.OffChainRetryDataErrorService; import org.cardanofoundation.ledgersync.scheduler.service.offchain.OffChainProcessRetryDataService; import org.springframework.beans.factory.annotation.Qualifier; @@ -21,6 +22,7 @@ public class OffChainRetryDataErrorServiceImpl implements OffChainRetryDataErrorService { final ExecutorService executor; + final Lock lock; final OffChainProcessRetryDataService govActionRetryServiceImpl; final OffChainProcessRetryDataService votingDataRetryServiceImpl; final OffChainProcessRetryDataService constitutionRetryServiceImpl; @@ -29,6 +31,7 @@ public class OffChainRetryDataErrorServiceImpl implements OffChainRetryDataError public OffChainRetryDataErrorServiceImpl( @Qualifier("offChainExecutor") ExecutorService executor, + @Qualifier("virtualThreadLock") Lock lock, @Qualifier("govActionRetryServiceImpl") OffChainProcessRetryDataService govActionRetryServiceImpl, @Qualifier("votingDataRetryServiceImpl") OffChainProcessRetryDataService votingDataRetryServiceImpl, @Qualifier("constitutionRetryServiceImpl") OffChainProcessRetryDataService constitutionRetryServiceImpl, @@ -36,6 +39,7 @@ public OffChainRetryDataErrorServiceImpl( @Qualifier("dRepRegistrationRetryServiceImpl") OffChainProcessRetryDataService dRepRegistrationRetryServiceImpl) { this.executor = executor; + this.lock = lock; this.govActionRetryServiceImpl = govActionRetryServiceImpl; this.votingDataRetryServiceImpl = votingDataRetryServiceImpl; this.constitutionRetryServiceImpl = constitutionRetryServiceImpl; @@ -45,6 +49,8 @@ public OffChainRetryDataErrorServiceImpl( @Override public void retryOffChainErrorData() { + lock.lock(); + lock.tryLock(); long startTime = System.currentTimeMillis(); log.info("Start retry error offchain data"); @@ -63,6 +69,8 @@ public void retryOffChainErrorData() { CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); } catch (CompletionException e) { log.error("Error processing retry tasks", e.getCause()); + } finally { + lock.unlock(); } log.info("End retry error offchain data time taken: {} ms", System.currentTimeMillis() - startTime); }