@@ -37,13 +37,49 @@ public class AddressTxAmountProcessor {
37
37
private List <Pair <EventMetadata , TxInputOutput >> txInputOutputListCache = Collections .synchronizedList (new ArrayList <>());
38
38
private List <AddressTxAmount > addressTxAmountListCache = Collections .synchronizedList (new ArrayList <>());
39
39
40
+ private Map <Integer , List <AddressTxAmount >> addressTxAmountListCacheMap = new HashMap <>();
41
+ private int bucketSize = 10 ;
42
+
40
43
private final PlatformTransactionManager transactionManager ;
41
44
private TransactionTemplate transactionTemplate ;
42
45
43
46
@ PostConstruct
44
47
void init () {
45
48
transactionTemplate = new TransactionTemplate (transactionManager );
46
49
transactionTemplate .setPropagationBehavior (TransactionDefinition .PROPAGATION_REQUIRES_NEW );
50
+
51
+ initAddressTxAmtCache ();
52
+ }
53
+
54
+ private void initAddressTxAmtCache () {
55
+ for (int i = 0 ; i < bucketSize ; i ++) {
56
+ addressTxAmountListCacheMap .put (i , Collections .synchronizedList (new ArrayList <>()));
57
+ }
58
+ }
59
+
60
+ private void save (long blockNo , List <AddressTxAmount > addressTxAmounts ) {
61
+ int index = (int ) (blockNo % bucketSize );
62
+ List <AddressTxAmount > addressTxAmountList = addressTxAmountListCacheMap .get (index );
63
+ addressTxAmountList .addAll (addressTxAmounts );
64
+
65
+ if (addressTxAmountList .size () > 1000 ) {
66
+ synchronized (addressTxAmountList ) {
67
+ addressTxAmountStorage .save (addressTxAmountList );
68
+ if (log .isDebugEnabled ())
69
+ log .debug ("-- Saved address_tx_amounts records : {}" , addressTxAmountList .size ());
70
+ addressTxAmountList .clear ();
71
+ }
72
+ }
73
+ }
74
+
75
+ private void clearCache () {
76
+ for (int i = 0 ; i < bucketSize ; i ++) {
77
+ List <AddressTxAmount > addressTxAmountList = addressTxAmountListCacheMap .get (i );
78
+ if (addressTxAmountList .size () > 0 ) {
79
+ //addressTxAmountStorage.save(addressTxAmountList);
80
+ addressTxAmountList .clear ();
81
+ }
82
+ }
47
83
}
48
84
49
85
@ EventListener
@@ -66,13 +102,14 @@ public void processAddressUtxoEvent(AddressUtxoEvent addressUtxoEvent) {
66
102
addressTxAmountList .addAll (txAddressTxAmountEntities );
67
103
}
68
104
69
- if (addressTxAmountList .size () > 100 ) {
70
- addressTxAmountStorage .save (addressTxAmountList ); //Save
71
- return ;
72
- }
105
+ // if (addressTxAmountList.size() > 100) {
106
+ // addressTxAmountStorage.save(addressTxAmountList); //Save
107
+ // return;
108
+ // }
73
109
74
110
if (addressTxAmountList .size () > 0 ) {
75
- addressTxAmountListCache .addAll (addressTxAmountList );
111
+ //TODO -- addressTxAmountListCache.addAll(addressTxAmountList);
112
+ save (addressUtxoEvent .getEventMetadata ().getBlock (), addressTxAmountList );
76
113
}
77
114
}
78
115
@@ -193,17 +230,25 @@ public void handleRemainingTxInputOuputs(ReadyForBalanceAggregationEvent readyFo
193
230
addressTxAmountListCache .addAll (addressTxAmountList );
194
231
}
195
232
233
+ var remainingAddressAmtList = addressTxAmountListCacheMap .values ().stream ()
234
+ .flatMap (List ::stream )
235
+ .toList ();
236
+ if (remainingAddressAmtList .size () > 0 ) {
237
+ addressTxAmountListCache .addAll (remainingAddressAmtList );
238
+ }
239
+
196
240
long t1 = System .currentTimeMillis ();
197
241
if (addressTxAmountListCache .size () > 0 ) {
198
242
addressTxAmountStorage .save (addressTxAmountListCache );
199
- log .info ("Total {} address_tx_amounts records saved" , addressTxAmountListCache .size ());
200
243
}
244
+
201
245
long t2 = System .currentTimeMillis ();
202
- log .info ("Time taken to save address_tx_amounts records : " + (t2 - t1 ) + " ms" );
246
+ log .info ("Time taken to save address_tx_amounts records : {}, time: {} ms" , addressTxAmountListCache . size (), (t2 - t1 ));
203
247
204
248
} finally {
205
249
txInputOutputListCache .clear ();
206
250
addressTxAmountListCache .clear ();
251
+ clearCache ();
207
252
}
208
253
}
209
254
0 commit comments