From 1609ec2e7eb4c5488826ac1ef95a6470072433c8 Mon Sep 17 00:00:00 2001 From: Satya Date: Fri, 8 Mar 2024 09:58:57 +0800 Subject: [PATCH] chore: Virtual thread --- .../processor/AddressTxAmountProcessor.java | 28 +++++++++++++++---- .../impl/AddressTxAmountStorageImpl.java | 2 +- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java index f0da54dc..3bf6f619 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java @@ -4,6 +4,7 @@ import com.bloxbean.cardano.yaci.store.common.domain.AddressUtxo; import com.bloxbean.cardano.yaci.store.common.domain.Amt; import com.bloxbean.cardano.yaci.store.common.domain.UtxoKey; +import com.bloxbean.cardano.yaci.store.common.executor.ParallelExecutor; import com.bloxbean.cardano.yaci.store.events.EventMetadata; import com.bloxbean.cardano.yaci.store.events.RollbackEvent; import com.bloxbean.cardano.yaci.store.events.internal.ReadyForBalanceAggregationEvent; @@ -24,6 +25,8 @@ import java.math.BigInteger; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import static org.cardanofoundation.ledgersync.account.util.AddressUtil.getAddress; @@ -38,6 +41,7 @@ public class AddressTxAmountProcessor { private List addressTxAmountListCache = Collections.synchronizedList(new ArrayList<>()); private final PlatformTransactionManager transactionManager; + private final ParallelExecutor parallelExecutor; private TransactionTemplate transactionTemplate; @PostConstruct @@ -193,13 +197,25 @@ public void handleRemainingTxInputOuputs(ReadyForBalanceAggregationEvent readyFo addressTxAmountListCache.addAll(addressTxAmountList); } - long t1 = System.currentTimeMillis(); - if (addressTxAmountListCache.size() > 0) { - addressTxAmountStorage.save(addressTxAmountListCache); - } + var future = CompletableFuture.supplyAsync(() -> { + long t1 = System.currentTimeMillis(); + if (addressTxAmountListCache.size() > 0) { + addressTxAmountStorage.save(addressTxAmountListCache); + } + + long t2 = System.currentTimeMillis(); + log.info("Time taken to save address_tx_amounts records : {}, time: {} ms", addressTxAmountListCache.size(), (t2 - t1)); - long t2 = System.currentTimeMillis(); - log.info("Time taken to save address_tx_amounts records : {}, time: {} ms", addressTxAmountListCache.size(), (t2 - t1)); + return null; + }, parallelExecutor.getVirtualThreadExecutor()); + + try { + future.get(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } } finally { txInputOutputListCache.clear(); diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java index bcae8840..4756c349 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java @@ -53,7 +53,7 @@ public void save(List addressTxAmount) { if (accountStoreProperties.isParallelWrite()) { // transactionTemplate.execute(status -> { - ListUtil.partitionAndApplyInParallel(addressTxAmtEntities, accountStoreProperties.getPerThreadBatchSize(), this::doSave, parallelExecutor.getVirtualThreadExecutor()); + ListUtil.partitionAndApplyInParallel(addressTxAmtEntities, accountStoreProperties.getPerThreadBatchSize(), this::doSave); // return null; // }); } else {