Skip to content

Commit

Permalink
chore: Remove extra columns
Browse files Browse the repository at this point in the history
  • Loading branch information
satran004 committed Mar 8, 2024
1 parent 1609ec2 commit 0ca3045
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@ public class AddressTxAmount extends BlockAwareDomain {
private String txHash;
private Long slot;
private BigInteger quantity;
private String policy;
private String assetName;
private String paymentCredential;
private String stakeAddress;
private String blockHash;
private Integer epoch;
}

Original file line number Diff line number Diff line change
Expand Up @@ -4,52 +4,37 @@
import com.bloxbean.cardano.yaci.store.common.domain.AddressUtxo;
import com.bloxbean.cardano.yaci.store.common.domain.Amt;
import com.bloxbean.cardano.yaci.store.common.domain.UtxoKey;
import com.bloxbean.cardano.yaci.store.common.executor.ParallelExecutor;
import com.bloxbean.cardano.yaci.store.events.EventMetadata;
import com.bloxbean.cardano.yaci.store.events.RollbackEvent;
import com.bloxbean.cardano.yaci.store.events.internal.ReadyForBalanceAggregationEvent;
import com.bloxbean.cardano.yaci.store.utxo.domain.AddressUtxoEvent;
import com.bloxbean.cardano.yaci.store.utxo.domain.TxInputOutput;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.cardanofoundation.ledgersync.account.domain.AddressTxAmount;
import org.cardanofoundation.ledgersync.account.storage.AddressTxAmountStorage;
import org.springframework.context.event.EventListener;
import org.springframework.data.util.Pair;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionTemplate;

import java.math.BigInteger;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import static org.cardanofoundation.ledgersync.account.util.AddressUtil.getAddress;

@Component
@RequiredArgsConstructor
@Slf4j
public class AddressTxAmountProcessor {
public static final int BLOCK_ADDRESS_TX_AMT_THRESHOLD = 100; //Threshold to save address_tx_amounts records for block

private final AddressTxAmountStorage addressTxAmountStorage;
private final UtxoClient utxoClient;

private List<Pair<EventMetadata, TxInputOutput>> txInputOutputListCache = Collections.synchronizedList(new ArrayList<>());
private List<Pair<EventMetadata, TxInputOutput>> pendingTxInputOutputListCache = Collections.synchronizedList(new ArrayList<>());
private List<AddressTxAmount> addressTxAmountListCache = Collections.synchronizedList(new ArrayList<>());

private final PlatformTransactionManager transactionManager;
private final ParallelExecutor parallelExecutor;
private TransactionTemplate transactionTemplate;

@PostConstruct
void init() {
transactionTemplate = new TransactionTemplate(transactionManager);
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
}

@EventListener
@Transactional
public void processAddressUtxoEvent(AddressUtxoEvent addressUtxoEvent) {
Expand All @@ -70,12 +55,11 @@ public void processAddressUtxoEvent(AddressUtxoEvent addressUtxoEvent) {
addressTxAmountList.addAll(txAddressTxAmountEntities);
}

if (addressTxAmountList.size() > 100) {
if (addressTxAmountList.size() > BLOCK_ADDRESS_TX_AMT_THRESHOLD) {
if (log.isDebugEnabled())
log.debug("Saving address_tx_amounts records : {} -- {}", addressTxAmountList.size(), addressUtxoEvent.getEventMetadata().getBlock());
addressTxAmountStorage.save(addressTxAmountList); //Save
return;
}

if (addressTxAmountList.size() > 0) {
} else if (addressTxAmountList.size() > 0) {
addressTxAmountListCache.addAll(addressTxAmountList);
}
}
Expand All @@ -102,7 +86,7 @@ private List<AddressTxAmount> processAddressAmountForTx(EventMetadata metadata,
if (throwExceptionOnFailure)
throw new IllegalStateException("Unable to get inputs for all input keys for account balance calculation : " + inputUtxoKeys);
else
txInputOutputListCache.add(Pair.of(metadata, txInputOutput));
pendingTxInputOutputListCache.add(Pair.of(metadata, txInputOutput));

return Collections.emptyList();
}
Expand Down Expand Up @@ -166,12 +150,8 @@ private List<AddressTxAmount> processTxAmount(String txHash, EventMetadata metad
.slot(metadata.getSlot())
.quantity(entry.getValue())
.stakeAddress(addressDetails.ownerStakeAddress)
.assetName(assetDetails.assetName)
.policy(assetDetails.policy)
.paymentCredential(addressDetails.ownerPaymentCredential)
.epoch(metadata.getEpochNumber())
.blockNumber(metadata.getBlock())
.blockHash(metadata.getBlockHash())
.blockTime(metadata.getBlockTime())
.build();
}).toList();
Expand All @@ -182,7 +162,7 @@ private List<AddressTxAmount> processTxAmount(String txHash, EventMetadata metad
public void handleRemainingTxInputOuputs(ReadyForBalanceAggregationEvent readyForBalanceAggregationEvent) {
try {
List<AddressTxAmount> addressTxAmountList = new ArrayList<>();
for (var pair : txInputOutputListCache) {
for (var pair : pendingTxInputOutputListCache) {
EventMetadata metadata = pair.getFirst();
TxInputOutput txInputOutput = pair.getSecond();

Expand All @@ -197,28 +177,16 @@ public void handleRemainingTxInputOuputs(ReadyForBalanceAggregationEvent readyFo
addressTxAmountListCache.addAll(addressTxAmountList);
}

var future = CompletableFuture.supplyAsync(() -> {
long t1 = System.currentTimeMillis();
if (addressTxAmountListCache.size() > 0) {
addressTxAmountStorage.save(addressTxAmountListCache);
}

long t2 = System.currentTimeMillis();
log.info("Time taken to save address_tx_amounts records : {}, time: {} ms", addressTxAmountListCache.size(), (t2 - t1));

return null;
}, parallelExecutor.getVirtualThreadExecutor());

try {
future.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
long t1 = System.currentTimeMillis();
if (addressTxAmountListCache.size() > 0) {
addressTxAmountStorage.save(addressTxAmountListCache);
}

long t2 = System.currentTimeMillis();
log.info("Time taken to save additional address_tx_amounts records : {}, time: {} ms", addressTxAmountListCache.size(), (t2 - t1));

} finally {
txInputOutputListCache.clear();
pendingTxInputOutputListCache.clear();
addressTxAmountListCache.clear();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,8 @@ public void handleAddressTxAmtForGenesisBlock(GenesisBlockEvent genesisBlockEven
.slot(genesisBlockEvent.getSlot())
.quantity(balance)
.stakeAddress(stakeAddress)
.assetName(LOVELACE)
.policy(null)
.paymentCredential(ownerPaymentCredential)
.epoch(0)
.blockNumber(genesisBlockEvent.getBlock())
.blockHash(genesisBlockEvent.getBlockHash())
.blockTime(genesisBlockEvent.getBlockTime())
.build();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.cardanofoundation.ledgersync.account.storage.impl;

import com.bloxbean.cardano.yaci.store.account.AccountStoreProperties;
import com.bloxbean.cardano.yaci.store.common.executor.ParallelExecutor;
import com.bloxbean.cardano.yaci.store.common.util.ListUtil;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
Expand All @@ -13,12 +12,8 @@
import org.cardanofoundation.ledgersync.account.storage.impl.repository.AddressTxAmountRepository;
import org.jooq.DSLContext;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionTemplate;

import java.time.LocalDateTime;
import java.util.List;

import static org.cardanofoundation.ledgersync.account.jooq.Tables.ADDRESS_TX_AMOUNT;
Expand All @@ -30,18 +25,12 @@ public class AddressTxAmountStorageImpl implements AddressTxAmountStorage {
private final AddressTxAmountRepository addressTxAmountRepository;
private final DSLContext dsl;
private final AccountStoreProperties accountStoreProperties;
private final PlatformTransactionManager transactionManager;
private final ParallelExecutor parallelExecutor;

private final AggrMapper aggrMapper = AggrMapper.INSTANCE;
private TransactionTemplate transactionTemplate;

@PostConstruct
public void postConstruct() {
this.dsl.settings().setBatchSize(accountStoreProperties.getJooqWriteBatchSize());

transactionTemplate = new TransactionTemplate(transactionManager);
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
}

@Override
Expand All @@ -51,93 +40,63 @@ public void save(List<AddressTxAmount> addressTxAmount) {
.map(addressTxAmount1 -> aggrMapper.toAddressTxAmountEntity(addressTxAmount1))
.toList();

if (accountStoreProperties.isParallelWrite()) {
// transactionTemplate.execute(status -> {
ListUtil.partitionAndApplyInParallel(addressTxAmtEntities, accountStoreProperties.getPerThreadBatchSize(), this::doSave);
// return null;
// });
if (accountStoreProperties.isParallelWrite()
&& addressTxAmtEntities.size() > accountStoreProperties.getPerThreadBatchSize()) {
ListUtil.partitionAndApplyInParallel(addressTxAmtEntities, accountStoreProperties.getPerThreadBatchSize(), this::saveBatch);
} else {
doSave(addressTxAmtEntities);
saveBatch(addressTxAmtEntities);
}
}

private void doSave(List<AddressTxAmountEntity> addressTxAmountEntities) {
LocalDateTime localDateTime = LocalDateTime.now();

/**
private void saveBatch(List<AddressTxAmountEntity> addressTxAmountEntities) {
var inserts = addressTxAmountEntities.stream()
.map(addressTxAmount -> {
return dsl.insertInto(ADDRESS_TX_AMOUNT)
.set(ADDRESS_TX_AMOUNT.ADDRESS, addressTxAmount.getAddress())
.set(ADDRESS_TX_AMOUNT.UNIT, addressTxAmount.getUnit())
.set(ADDRESS_TX_AMOUNT.TX_HASH, addressTxAmount.getTxHash())
.set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot())
.set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity())
.set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull())
.set(ADDRESS_TX_AMOUNT.POLICY, addressTxAmount.getPolicy())
.set(ADDRESS_TX_AMOUNT.ASSET_NAME, addressTxAmount.getAssetName())
.set(ADDRESS_TX_AMOUNT.PAYMENT_CREDENTIAL, addressTxAmount.getPaymentCredential())
.set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress())
.set(ADDRESS_TX_AMOUNT.BLOCK_HASH, addressTxAmount.getBlockHash())
.set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber())
.set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime())
.set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch())
.set(ADDRESS_TX_AMOUNT.UPDATE_DATETIME, localDateTime)
.onDuplicateKeyUpdate()
.set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot())
.set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity())
.set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull())
.set(ADDRESS_TX_AMOUNT.POLICY, addressTxAmount.getPolicy())
.set(ADDRESS_TX_AMOUNT.ASSET_NAME, addressTxAmount.getAssetName())
.set(ADDRESS_TX_AMOUNT.PAYMENT_CREDENTIAL, addressTxAmount.getPaymentCredential())
.set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress())
.set(ADDRESS_TX_AMOUNT.BLOCK_HASH, addressTxAmount.getBlockHash())
.set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber())
.set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime())
.set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch())
.set(ADDRESS_TX_AMOUNT.UPDATE_DATETIME, localDateTime);
}).toList();
.map(addressTxAmount -> dsl.insertInto(ADDRESS_TX_AMOUNT)
.set(ADDRESS_TX_AMOUNT.ADDRESS, addressTxAmount.getAddress())
.set(ADDRESS_TX_AMOUNT.UNIT, addressTxAmount.getUnit())
.set(ADDRESS_TX_AMOUNT.TX_HASH, addressTxAmount.getTxHash())
.set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot())
.set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity())
.set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull())
.set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress())
.set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber())
.set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime())
.set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch())
.onDuplicateKeyUpdate()
.set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot())
.set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity())
.set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull())
.set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress())
.set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber())
.set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime())
.set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch())).toList();
dsl.batch(inserts).execute();
**/

transactionTemplate.execute(status -> {
dsl.batched(c -> {
for (var addressTxAmount : addressTxAmountEntities) {
c.dsl().insertInto(ADDRESS_TX_AMOUNT)
.set(ADDRESS_TX_AMOUNT.ADDRESS, addressTxAmount.getAddress())
.set(ADDRESS_TX_AMOUNT.UNIT, addressTxAmount.getUnit())
.set(ADDRESS_TX_AMOUNT.TX_HASH, addressTxAmount.getTxHash())
.set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot())
.set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity())
.set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull())
.set(ADDRESS_TX_AMOUNT.POLICY, addressTxAmount.getPolicy())
.set(ADDRESS_TX_AMOUNT.ASSET_NAME, addressTxAmount.getAssetName())
.set(ADDRESS_TX_AMOUNT.PAYMENT_CREDENTIAL, addressTxAmount.getPaymentCredential())
.set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress())
.set(ADDRESS_TX_AMOUNT.BLOCK_HASH, addressTxAmount.getBlockHash())
.set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber())
.set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime())
.set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch())
.set(ADDRESS_TX_AMOUNT.UPDATE_DATETIME, localDateTime)
.onDuplicateKeyUpdate()
.set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot())
.set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity())
.set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull())
.set(ADDRESS_TX_AMOUNT.POLICY, addressTxAmount.getPolicy())
.set(ADDRESS_TX_AMOUNT.ASSET_NAME, addressTxAmount.getAssetName())
.set(ADDRESS_TX_AMOUNT.PAYMENT_CREDENTIAL, addressTxAmount.getPaymentCredential())
.set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress())
.set(ADDRESS_TX_AMOUNT.BLOCK_HASH, addressTxAmount.getBlockHash())
.set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber())
.set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime())
.set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch())
.set(ADDRESS_TX_AMOUNT.UPDATE_DATETIME, localDateTime)
.execute();
}
});
return null;
/**
dsl.batched(c -> {
for (var addressTxAmount : addressTxAmountEntities) {
c.dsl().insertInto(ADDRESS_TX_AMOUNT)
.set(ADDRESS_TX_AMOUNT.ADDRESS, addressTxAmount.getAddress())
.set(ADDRESS_TX_AMOUNT.UNIT, addressTxAmount.getUnit())
.set(ADDRESS_TX_AMOUNT.TX_HASH, addressTxAmount.getTxHash())
.set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot())
.set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity())
.set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull())
.set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress())
.set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber())
.set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime())
.set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch())
.onDuplicateKeyUpdate()
.set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot())
.set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity())
.set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull())
.set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress())
.set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber())
.set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime())
.set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch())
.execute();
}
});

**/
}

@Override
Expand Down
Loading

0 comments on commit 0ca3045

Please sign in to comment.