1
1
package org .cardanofoundation .ledgersync .account .storage .impl ;
2
2
3
3
import com .bloxbean .cardano .yaci .store .account .AccountStoreProperties ;
4
+ import com .bloxbean .cardano .yaci .store .common .executor .ParallelExecutor ;
4
5
import com .bloxbean .cardano .yaci .store .common .util .ListUtil ;
5
6
import jakarta .annotation .PostConstruct ;
6
7
import lombok .RequiredArgsConstructor ;
@@ -30,6 +31,7 @@ public class AddressTxAmountStorageImpl implements AddressTxAmountStorage {
30
31
private final DSLContext dsl ;
31
32
private final AccountStoreProperties accountStoreProperties ;
32
33
private final PlatformTransactionManager transactionManager ;
34
+ private final ParallelExecutor parallelExecutor ;
33
35
34
36
private final AggrMapper aggrMapper = AggrMapper .INSTANCE ;
35
37
private TransactionTemplate transactionTemplate ;
@@ -51,7 +53,7 @@ public void save(List<AddressTxAmount> addressTxAmount) {
51
53
52
54
if (accountStoreProperties .isParallelWrite ()) {
53
55
// transactionTemplate.execute(status -> {
54
- ListUtil .partitionAndApplyInParallel (addressTxAmtEntities , accountStoreProperties .getPerThreadBatchSize (), this ::doSave );
56
+ ListUtil .partitionAndApplyInParallel (addressTxAmtEntities , accountStoreProperties .getPerThreadBatchSize (), this ::doSave , parallelExecutor . getVirtualThreadExecutor () );
55
57
// return null;
56
58
// });
57
59
} else {
@@ -98,6 +100,7 @@ private void doSave(List<AddressTxAmountEntity> addressTxAmountEntities) {
98
100
dsl.batch(inserts).execute();
99
101
**/
100
102
103
+ transactionTemplate .execute (status -> {
101
104
dsl .batched (c -> {
102
105
for (var addressTxAmount : addressTxAmountEntities ) {
103
106
c .dsl ().insertInto (ADDRESS_TX_AMOUNT )
@@ -132,6 +135,8 @@ private void doSave(List<AddressTxAmountEntity> addressTxAmountEntities) {
132
135
.execute ();
133
136
}
134
137
});
138
+ return null ;
139
+ });
135
140
136
141
}
137
142
0 commit comments