Skip to content

Commit

Permalink
add lock for concurrent job
Browse files Browse the repository at this point in the history
  • Loading branch information
Sotatek-QuanLeA committed Nov 21, 2024
1 parent e880539 commit 4ecf8ca
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.cardanofoundation.ledgersync.scheduler.config;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

Expand All @@ -13,15 +15,22 @@
public class ExecutorConfig {

private final ExecutorService executor;
private final Lock lock;

public ExecutorConfig() {
this.executor = Executors.newVirtualThreadPerTaskExecutor();
this.lock = new ReentrantLock();
}

@Bean(name = "offChainExecutor")
public ExecutorService virtualThreadExecutor() {
return executor;
}

@Bean(name = "virtualThreadLock")
public Lock executorLock() {
return lock;
}

@PreDestroy
public void shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

import java.util.concurrent.locks.Lock;
import org.cardanofoundation.ledgersync.scheduler.service.OffChainPersistService;
import org.cardanofoundation.ledgersync.scheduler.service.offchain.OffChainProcessPersistDataService;
import org.springframework.beans.factory.annotation.Qualifier;
Expand All @@ -20,6 +21,7 @@
public class OffChainPersistServiceImpl implements OffChainPersistService {

final ExecutorService executor;
final Lock lock;
final OffChainProcessPersistDataService govActionPersistServiceImpl;
final OffChainProcessPersistDataService votingDataPersistServiceImpl;
final OffChainProcessPersistDataService constitutionPersistServiceImpl;
Expand All @@ -28,13 +30,15 @@ public class OffChainPersistServiceImpl implements OffChainPersistService {

public OffChainPersistServiceImpl(
@Qualifier("offChainExecutor") ExecutorService executor,
@Qualifier("virtualThreadLock") Lock lock,
@Qualifier("govActionPersistServiceImpl") OffChainProcessPersistDataService govActionPersistServiceImpl,
@Qualifier("votingDataPersistServiceImpl") OffChainProcessPersistDataService votingDataPersistServiceImpl,
@Qualifier("constitutionPersistServiceImpl") OffChainProcessPersistDataService constitutionPersistServiceImpl,
@Qualifier("committeeDeregPersistServiceImpl") OffChainProcessPersistDataService committeeDeregPersistServiceImpl,
@Qualifier("dRepRegistrationPersistServiceImpl") OffChainProcessPersistDataService dRepRegistrationPersistServiceImpl) {

this.executor = executor;
this.lock = lock;
this.govActionPersistServiceImpl = govActionPersistServiceImpl;
this.votingDataPersistServiceImpl = votingDataPersistServiceImpl;
this.constitutionPersistServiceImpl = constitutionPersistServiceImpl;
Expand All @@ -44,6 +48,7 @@ public OffChainPersistServiceImpl(

@Override
public void validateAndPersistData() {
lock.lock();
long startTime = System.currentTimeMillis();
log.info("Start validating off-chain data");

Expand All @@ -62,6 +67,8 @@ public void validateAndPersistData() {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
} catch (Exception e) {
log.error("Error processing validating off-chain data tasks", e.getCause());
} finally {
lock.unlock();
}
log.info("End validating off-chain data, taken time: {} ms", System.currentTimeMillis() - startTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;

import java.util.concurrent.locks.Lock;
import org.cardanofoundation.ledgersync.scheduler.service.OffChainRetryDataErrorService;
import org.cardanofoundation.ledgersync.scheduler.service.offchain.OffChainProcessRetryDataService;
import org.springframework.beans.factory.annotation.Qualifier;
Expand All @@ -21,6 +22,7 @@
public class OffChainRetryDataErrorServiceImpl implements OffChainRetryDataErrorService {

final ExecutorService executor;
final Lock lock;
final OffChainProcessRetryDataService govActionRetryServiceImpl;
final OffChainProcessRetryDataService votingDataRetryServiceImpl;
final OffChainProcessRetryDataService constitutionRetryServiceImpl;
Expand All @@ -29,13 +31,15 @@ public class OffChainRetryDataErrorServiceImpl implements OffChainRetryDataError

public OffChainRetryDataErrorServiceImpl(
@Qualifier("offChainExecutor") ExecutorService executor,
@Qualifier("virtualThreadLock") Lock lock,
@Qualifier("govActionRetryServiceImpl") OffChainProcessRetryDataService govActionRetryServiceImpl,
@Qualifier("votingDataRetryServiceImpl") OffChainProcessRetryDataService votingDataRetryServiceImpl,
@Qualifier("constitutionRetryServiceImpl") OffChainProcessRetryDataService constitutionRetryServiceImpl,
@Qualifier("committeeDeregRetryServiceImpl") OffChainProcessRetryDataService committeeDeregRetryServiceImpl,
@Qualifier("dRepRegistrationRetryServiceImpl") OffChainProcessRetryDataService dRepRegistrationRetryServiceImpl) {

this.executor = executor;
this.lock = lock;
this.govActionRetryServiceImpl = govActionRetryServiceImpl;
this.votingDataRetryServiceImpl = votingDataRetryServiceImpl;
this.constitutionRetryServiceImpl = constitutionRetryServiceImpl;
Expand All @@ -45,6 +49,8 @@ public OffChainRetryDataErrorServiceImpl(

@Override
public void retryOffChainErrorData() {
lock.lock();
lock.tryLock();
long startTime = System.currentTimeMillis();
log.info("Start retry error offchain data");

Expand All @@ -63,6 +69,8 @@ public void retryOffChainErrorData() {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
} catch (CompletionException e) {
log.error("Error processing retry tasks", e.getCause());
} finally {
lock.unlock();
}
log.info("End retry error offchain data time taken: {} ms", System.currentTimeMillis() - startTime);
}
Expand Down

0 comments on commit 4ecf8ca

Please sign in to comment.