From 0ca3045a540e9778bcdd6f10d78ec7d160615818 Mon Sep 17 00:00:00 2001 From: Satya Date: Fri, 8 Mar 2024 17:43:45 +0800 Subject: [PATCH] chore: Remove extra columns --- .../account/domain/AddressTxAmount.java | 4 - .../processor/AddressTxAmountProcessor.java | 64 ++------ .../GensisBlockAddressTxAmtProcessor.java | 4 - .../impl/AddressTxAmountStorageImpl.java | 139 ++++++------------ .../impl/model/AddressTxAmountEntity.java | 25 ++-- .../resources/db/account/V2_1_1__init.sql | 28 +--- .../src/main/resources/application.yml | 18 ++- 7 files changed, 92 insertions(+), 190 deletions(-) diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/domain/AddressTxAmount.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/domain/AddressTxAmount.java index a5c1ac22..00d89a0c 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/domain/AddressTxAmount.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/domain/AddressTxAmount.java @@ -23,11 +23,7 @@ public class AddressTxAmount extends BlockAwareDomain { private String txHash; private Long slot; private BigInteger quantity; - private String policy; - private String assetName; - private String paymentCredential; private String stakeAddress; - private String blockHash; private Integer epoch; } diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java index 3bf6f619..2d7420ca 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java @@ -4,13 +4,11 @@ import com.bloxbean.cardano.yaci.store.common.domain.AddressUtxo; import com.bloxbean.cardano.yaci.store.common.domain.Amt; import com.bloxbean.cardano.yaci.store.common.domain.UtxoKey; -import com.bloxbean.cardano.yaci.store.common.executor.ParallelExecutor; import com.bloxbean.cardano.yaci.store.events.EventMetadata; import com.bloxbean.cardano.yaci.store.events.RollbackEvent; import com.bloxbean.cardano.yaci.store.events.internal.ReadyForBalanceAggregationEvent; import com.bloxbean.cardano.yaci.store.utxo.domain.AddressUtxoEvent; import com.bloxbean.cardano.yaci.store.utxo.domain.TxInputOutput; -import jakarta.annotation.PostConstruct; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.cardanofoundation.ledgersync.account.domain.AddressTxAmount; @@ -18,15 +16,10 @@ import org.springframework.context.event.EventListener; import org.springframework.data.util.Pair; import org.springframework.stereotype.Component; -import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.annotation.Transactional; -import org.springframework.transaction.support.TransactionTemplate; import java.math.BigInteger; import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import static org.cardanofoundation.ledgersync.account.util.AddressUtil.getAddress; @@ -34,22 +27,14 @@ @RequiredArgsConstructor @Slf4j public class AddressTxAmountProcessor { + public static final int BLOCK_ADDRESS_TX_AMT_THRESHOLD = 100; //Threshold to save address_tx_amounts records for block + private final AddressTxAmountStorage addressTxAmountStorage; private final UtxoClient utxoClient; - private List> txInputOutputListCache = Collections.synchronizedList(new ArrayList<>()); + private List> pendingTxInputOutputListCache = Collections.synchronizedList(new ArrayList<>()); private List addressTxAmountListCache = Collections.synchronizedList(new ArrayList<>()); - private final PlatformTransactionManager transactionManager; - private final ParallelExecutor parallelExecutor; - private TransactionTemplate transactionTemplate; - - @PostConstruct - void init() { - transactionTemplate = new TransactionTemplate(transactionManager); - transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); - } - @EventListener @Transactional public void processAddressUtxoEvent(AddressUtxoEvent addressUtxoEvent) { @@ -70,12 +55,11 @@ public void processAddressUtxoEvent(AddressUtxoEvent addressUtxoEvent) { addressTxAmountList.addAll(txAddressTxAmountEntities); } - if (addressTxAmountList.size() > 100) { + if (addressTxAmountList.size() > BLOCK_ADDRESS_TX_AMT_THRESHOLD) { + if (log.isDebugEnabled()) + log.debug("Saving address_tx_amounts records : {} -- {}", addressTxAmountList.size(), addressUtxoEvent.getEventMetadata().getBlock()); addressTxAmountStorage.save(addressTxAmountList); //Save - return; - } - - if (addressTxAmountList.size() > 0) { + } else if (addressTxAmountList.size() > 0) { addressTxAmountListCache.addAll(addressTxAmountList); } } @@ -102,7 +86,7 @@ private List processAddressAmountForTx(EventMetadata metadata, if (throwExceptionOnFailure) throw new IllegalStateException("Unable to get inputs for all input keys for account balance calculation : " + inputUtxoKeys); else - txInputOutputListCache.add(Pair.of(metadata, txInputOutput)); + pendingTxInputOutputListCache.add(Pair.of(metadata, txInputOutput)); return Collections.emptyList(); } @@ -166,12 +150,8 @@ private List processTxAmount(String txHash, EventMetadata metad .slot(metadata.getSlot()) .quantity(entry.getValue()) .stakeAddress(addressDetails.ownerStakeAddress) - .assetName(assetDetails.assetName) - .policy(assetDetails.policy) - .paymentCredential(addressDetails.ownerPaymentCredential) .epoch(metadata.getEpochNumber()) .blockNumber(metadata.getBlock()) - .blockHash(metadata.getBlockHash()) .blockTime(metadata.getBlockTime()) .build(); }).toList(); @@ -182,7 +162,7 @@ private List processTxAmount(String txHash, EventMetadata metad public void handleRemainingTxInputOuputs(ReadyForBalanceAggregationEvent readyForBalanceAggregationEvent) { try { List addressTxAmountList = new ArrayList<>(); - for (var pair : txInputOutputListCache) { + for (var pair : pendingTxInputOutputListCache) { EventMetadata metadata = pair.getFirst(); TxInputOutput txInputOutput = pair.getSecond(); @@ -197,28 +177,16 @@ public void handleRemainingTxInputOuputs(ReadyForBalanceAggregationEvent readyFo addressTxAmountListCache.addAll(addressTxAmountList); } - var future = CompletableFuture.supplyAsync(() -> { - long t1 = System.currentTimeMillis(); - if (addressTxAmountListCache.size() > 0) { - addressTxAmountStorage.save(addressTxAmountListCache); - } - - long t2 = System.currentTimeMillis(); - log.info("Time taken to save address_tx_amounts records : {}, time: {} ms", addressTxAmountListCache.size(), (t2 - t1)); - - return null; - }, parallelExecutor.getVirtualThreadExecutor()); - - try { - future.get(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } catch (ExecutionException e) { - throw new RuntimeException(e); + long t1 = System.currentTimeMillis(); + if (addressTxAmountListCache.size() > 0) { + addressTxAmountStorage.save(addressTxAmountListCache); } + long t2 = System.currentTimeMillis(); + log.info("Time taken to save additional address_tx_amounts records : {}, time: {} ms", addressTxAmountListCache.size(), (t2 - t1)); + } finally { - txInputOutputListCache.clear(); + pendingTxInputOutputListCache.clear(); addressTxAmountListCache.clear(); } } diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/GensisBlockAddressTxAmtProcessor.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/GensisBlockAddressTxAmtProcessor.java index 1ca05d9b..e9cf050d 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/GensisBlockAddressTxAmtProcessor.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/GensisBlockAddressTxAmtProcessor.java @@ -69,12 +69,8 @@ public void handleAddressTxAmtForGenesisBlock(GenesisBlockEvent genesisBlockEven .slot(genesisBlockEvent.getSlot()) .quantity(balance) .stakeAddress(stakeAddress) - .assetName(LOVELACE) - .policy(null) - .paymentCredential(ownerPaymentCredential) .epoch(0) .blockNumber(genesisBlockEvent.getBlock()) - .blockHash(genesisBlockEvent.getBlockHash()) .blockTime(genesisBlockEvent.getBlockTime()) .build(); diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java index 4756c349..61fa6b2c 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java @@ -1,7 +1,6 @@ package org.cardanofoundation.ledgersync.account.storage.impl; import com.bloxbean.cardano.yaci.store.account.AccountStoreProperties; -import com.bloxbean.cardano.yaci.store.common.executor.ParallelExecutor; import com.bloxbean.cardano.yaci.store.common.util.ListUtil; import jakarta.annotation.PostConstruct; import lombok.RequiredArgsConstructor; @@ -13,12 +12,8 @@ import org.cardanofoundation.ledgersync.account.storage.impl.repository.AddressTxAmountRepository; import org.jooq.DSLContext; import org.springframework.stereotype.Component; -import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.annotation.Transactional; -import org.springframework.transaction.support.TransactionTemplate; -import java.time.LocalDateTime; import java.util.List; import static org.cardanofoundation.ledgersync.account.jooq.Tables.ADDRESS_TX_AMOUNT; @@ -30,18 +25,12 @@ public class AddressTxAmountStorageImpl implements AddressTxAmountStorage { private final AddressTxAmountRepository addressTxAmountRepository; private final DSLContext dsl; private final AccountStoreProperties accountStoreProperties; - private final PlatformTransactionManager transactionManager; - private final ParallelExecutor parallelExecutor; private final AggrMapper aggrMapper = AggrMapper.INSTANCE; - private TransactionTemplate transactionTemplate; @PostConstruct public void postConstruct() { this.dsl.settings().setBatchSize(accountStoreProperties.getJooqWriteBatchSize()); - - transactionTemplate = new TransactionTemplate(transactionManager); - transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); } @Override @@ -51,93 +40,63 @@ public void save(List addressTxAmount) { .map(addressTxAmount1 -> aggrMapper.toAddressTxAmountEntity(addressTxAmount1)) .toList(); - if (accountStoreProperties.isParallelWrite()) { -// transactionTemplate.execute(status -> { - ListUtil.partitionAndApplyInParallel(addressTxAmtEntities, accountStoreProperties.getPerThreadBatchSize(), this::doSave); -// return null; -// }); + if (accountStoreProperties.isParallelWrite() + && addressTxAmtEntities.size() > accountStoreProperties.getPerThreadBatchSize()) { + ListUtil.partitionAndApplyInParallel(addressTxAmtEntities, accountStoreProperties.getPerThreadBatchSize(), this::saveBatch); } else { - doSave(addressTxAmtEntities); + saveBatch(addressTxAmtEntities); } } - private void doSave(List addressTxAmountEntities) { - LocalDateTime localDateTime = LocalDateTime.now(); - - /** + private void saveBatch(List addressTxAmountEntities) { var inserts = addressTxAmountEntities.stream() - .map(addressTxAmount -> { - return dsl.insertInto(ADDRESS_TX_AMOUNT) - .set(ADDRESS_TX_AMOUNT.ADDRESS, addressTxAmount.getAddress()) - .set(ADDRESS_TX_AMOUNT.UNIT, addressTxAmount.getUnit()) - .set(ADDRESS_TX_AMOUNT.TX_HASH, addressTxAmount.getTxHash()) - .set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot()) - .set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity()) - .set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull()) - .set(ADDRESS_TX_AMOUNT.POLICY, addressTxAmount.getPolicy()) - .set(ADDRESS_TX_AMOUNT.ASSET_NAME, addressTxAmount.getAssetName()) - .set(ADDRESS_TX_AMOUNT.PAYMENT_CREDENTIAL, addressTxAmount.getPaymentCredential()) - .set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress()) - .set(ADDRESS_TX_AMOUNT.BLOCK_HASH, addressTxAmount.getBlockHash()) - .set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber()) - .set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime()) - .set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch()) - .set(ADDRESS_TX_AMOUNT.UPDATE_DATETIME, localDateTime) - .onDuplicateKeyUpdate() - .set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot()) - .set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity()) - .set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull()) - .set(ADDRESS_TX_AMOUNT.POLICY, addressTxAmount.getPolicy()) - .set(ADDRESS_TX_AMOUNT.ASSET_NAME, addressTxAmount.getAssetName()) - .set(ADDRESS_TX_AMOUNT.PAYMENT_CREDENTIAL, addressTxAmount.getPaymentCredential()) - .set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress()) - .set(ADDRESS_TX_AMOUNT.BLOCK_HASH, addressTxAmount.getBlockHash()) - .set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber()) - .set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime()) - .set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch()) - .set(ADDRESS_TX_AMOUNT.UPDATE_DATETIME, localDateTime); - }).toList(); + .map(addressTxAmount -> dsl.insertInto(ADDRESS_TX_AMOUNT) + .set(ADDRESS_TX_AMOUNT.ADDRESS, addressTxAmount.getAddress()) + .set(ADDRESS_TX_AMOUNT.UNIT, addressTxAmount.getUnit()) + .set(ADDRESS_TX_AMOUNT.TX_HASH, addressTxAmount.getTxHash()) + .set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot()) + .set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity()) + .set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull()) + .set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress()) + .set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber()) + .set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime()) + .set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch()) + .onDuplicateKeyUpdate() + .set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot()) + .set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity()) + .set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull()) + .set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress()) + .set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber()) + .set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime()) + .set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch())).toList(); dsl.batch(inserts).execute(); - **/ - transactionTemplate.execute(status -> { - dsl.batched(c -> { - for (var addressTxAmount : addressTxAmountEntities) { - c.dsl().insertInto(ADDRESS_TX_AMOUNT) - .set(ADDRESS_TX_AMOUNT.ADDRESS, addressTxAmount.getAddress()) - .set(ADDRESS_TX_AMOUNT.UNIT, addressTxAmount.getUnit()) - .set(ADDRESS_TX_AMOUNT.TX_HASH, addressTxAmount.getTxHash()) - .set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot()) - .set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity()) - .set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull()) - .set(ADDRESS_TX_AMOUNT.POLICY, addressTxAmount.getPolicy()) - .set(ADDRESS_TX_AMOUNT.ASSET_NAME, addressTxAmount.getAssetName()) - .set(ADDRESS_TX_AMOUNT.PAYMENT_CREDENTIAL, addressTxAmount.getPaymentCredential()) - .set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress()) - .set(ADDRESS_TX_AMOUNT.BLOCK_HASH, addressTxAmount.getBlockHash()) - .set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber()) - .set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime()) - .set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch()) - .set(ADDRESS_TX_AMOUNT.UPDATE_DATETIME, localDateTime) - .onDuplicateKeyUpdate() - .set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot()) - .set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity()) - .set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull()) - .set(ADDRESS_TX_AMOUNT.POLICY, addressTxAmount.getPolicy()) - .set(ADDRESS_TX_AMOUNT.ASSET_NAME, addressTxAmount.getAssetName()) - .set(ADDRESS_TX_AMOUNT.PAYMENT_CREDENTIAL, addressTxAmount.getPaymentCredential()) - .set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress()) - .set(ADDRESS_TX_AMOUNT.BLOCK_HASH, addressTxAmount.getBlockHash()) - .set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber()) - .set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime()) - .set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch()) - .set(ADDRESS_TX_AMOUNT.UPDATE_DATETIME, localDateTime) - .execute(); - } - }); - return null; + /** + dsl.batched(c -> { + for (var addressTxAmount : addressTxAmountEntities) { + c.dsl().insertInto(ADDRESS_TX_AMOUNT) + .set(ADDRESS_TX_AMOUNT.ADDRESS, addressTxAmount.getAddress()) + .set(ADDRESS_TX_AMOUNT.UNIT, addressTxAmount.getUnit()) + .set(ADDRESS_TX_AMOUNT.TX_HASH, addressTxAmount.getTxHash()) + .set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot()) + .set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity()) + .set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull()) + .set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress()) + .set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber()) + .set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime()) + .set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch()) + .onDuplicateKeyUpdate() + .set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot()) + .set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity()) + .set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull()) + .set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress()) + .set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber()) + .set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime()) + .set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch()) + .execute(); + } }); - + **/ } @Override diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/model/AddressTxAmountEntity.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/model/AddressTxAmountEntity.java index cdd83420..bff2d895 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/model/AddressTxAmountEntity.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/model/AddressTxAmountEntity.java @@ -1,11 +1,10 @@ package org.cardanofoundation.ledgersync.account.storage.impl.model; -import com.bloxbean.cardano.yaci.store.common.model.BlockAwareEntity; import jakarta.persistence.*; import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; -import lombok.experimental.SuperBuilder; import org.hibernate.annotations.DynamicUpdate; import java.math.BigInteger; @@ -13,12 +12,12 @@ @Data @NoArgsConstructor @AllArgsConstructor -@SuperBuilder +@Builder @Entity @Table(name = "address_tx_amount") @IdClass(AddressTxAmountId.class) @DynamicUpdate -public class AddressTxAmountEntity extends BlockAwareEntity { +public class AddressTxAmountEntity { @Id @Column(name = "address") private String address; @@ -41,21 +40,15 @@ public class AddressTxAmountEntity extends BlockAwareEntity { @Column(name = "addr_full") private String addrFull; - @Column(name = "policy") - private String policy; - - @Column(name = "asset_name") - private String assetName; - - @Column(name = "payment_credential") - private String paymentCredential; - @Column(name = "stake_address") private String stakeAddress; - @Column(name = "block_hash") - private String blockHash; - @Column(name = "epoch") private Integer epoch; + + @Column(name = "block") + private Long blockNumber; + + @Column(name = "block_time") + private Long blockTime; } diff --git a/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql b/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql index b5e4e039..bc42aea3 100644 --- a/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql +++ b/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql @@ -7,36 +7,12 @@ create table address_tx_amount slot bigint, quantity numeric(38) null, addr_full text, - policy varchar(56), - asset_name varchar(255), - payment_credential varchar(56), stake_address varchar(255), - block_hash varchar(64), block bigint, block_time bigint, epoch integer, - update_datetime timestamp, primary key (address, unit, tx_hash) ); --- CREATE INDEX idx_address_tx_amount_slot --- ON address_tx_amount(slot); --- --- -- address_balance_view --- drop view if exists address_balance_view; --- create view address_balance_view as --- select ab.* --- from address_balance ab --- inner join (select address, unit, max(slot) as max_slot --- from address_balance ab2 --- group by address, unit) max_ab --- on ab.address = max_ab.address and ab.unit = max_ab.unit and ab.slot = max_ab.max_slot; --- --- -- stake_address_balance_view --- drop view if exists stake_address_balance_view; --- create view stake_address_balance_view AS --- select sb.* --- from stake_address_balance sb --- inner join (select address, MAX(slot) as max_slot --- from stake_address_balance sb2 --- group by address) max_sb on sb.address = max_sb.address and sb.slot = max_sb.max_slot; +CREATE INDEX idx_address_tx_amount_slot + ON address_tx_amount(slot); diff --git a/aggregation-app/src/main/resources/application.yml b/aggregation-app/src/main/resources/application.yml index c8bff9bd..cad62292 100644 --- a/aggregation-app/src/main/resources/application.yml +++ b/aggregation-app/src/main/resources/application.yml @@ -9,6 +9,16 @@ spring: - classpath:db/store/{vendor} - classpath:db/account out-of-order: true + datasource: + hikari: + maximum-pool-size: 30 + minimum-idle: 5 + jpa: + properties: + hibernate: + jdbc: + batch_size: 100 + order_inserts: true apiPrefix: /api/v1 @@ -19,10 +29,14 @@ logging: store: event-publisher-id: 1000 auto-index-management: true + cardano: + keep-alive-interval: 3000 account: enabled: true balance-aggregation-enabled: true - history-cleanup-enabled: false + history-cleanup-enabled: true + # 3 months + balance-cleanup-slot-count: 7889238 api-enabled: true parallel-write: true per-thread-batch-size: 6000 @@ -34,4 +48,4 @@ store: blocks-batch-size: 100 blocks-partition-size: 10 use-virtual-thread-for-batch-processing: false - use-virtual-thread-for-event-processing: true + use-virtual-thread-for-event-processing: false