Skip to content
This repository was archived by the owner on Jan 24, 2025. It is now read-only.

Commit 4499ef0

Browse files
authored
Merge pull request #857 from iotaledger/fix/retained-tx-first
Hook to OnTransactionAttached in submitBlock
2 parents dc5b73c + c307b3c commit 4499ef0

File tree

12 files changed

+114
-48
lines changed

12 files changed

+114
-48
lines changed

Diff for: components/debugapi/component.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func configure() error {
8686

8787
debugAPIWorkerPool := workerpool.NewGroup("DebugAPI").CreatePool("DebugAPI", workerpool.WithWorkerCount(1))
8888

89-
deps.Protocol.Events.Engine.Retainer.BlockRetained.Hook(func(block *blocks.Block) {
89+
deps.Protocol.Events.Engine.BlockRetainer.BlockRetained.Hook(func(block *blocks.Block) {
9090
blocksPerSlot.Set(block.ID().Slot(), append(lo.Return1(blocksPerSlot.GetOrCreate(block.ID().Slot(), func() []*blocks.Block {
9191
return make([]*blocks.Block, 0)
9292
})), block))

Diff for: components/inx/server_blocks.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ func (s *Server) attachBlock(ctx context.Context, block *iotago.Block) (*inx.Blo
186186
mergedCtx, mergedCtxCancel := contextutils.MergeContexts(ctx, Component.Daemon().ContextStopped())
187187
defer mergedCtxCancel()
188188

189-
blockID, err := deps.RequestHandler.SubmitBlockAndAwaitBooking(mergedCtx, block)
189+
blockID, err := deps.RequestHandler.SubmitBlockAndAwaitRetainer(mergedCtx, block)
190190
if err != nil {
191191
return nil, status.Errorf(codes.Internal, "failed to attach block: %s", err.Error())
192192
}

Diff for: components/restapi/core/blocks.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func sendBlock(c echo.Context) (*api.BlockCreatedResponse, error) {
4242
return nil, err
4343
}
4444

45-
blockID, err := deps.RequestHandler.SubmitBlockAndAwaitBooking(c.Request().Context(), iotaBlock)
45+
blockID, err := deps.RequestHandler.SubmitBlockAndAwaitRetainer(c.Request().Context(), iotaBlock)
4646
if err != nil {
4747
return nil, ierrors.WithMessagef(echo.ErrInternalServerError, "failed to attach block: %w", err)
4848
}

Diff for: pkg/protocol/chains.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -176,10 +176,14 @@ func attachEngineLogs(instance *engine.Engine) func() {
176176
instance.LogTrace("BlockProcessed", "block", blockID)
177177
}).Unhook,
178178

179-
events.Retainer.BlockRetained.Hook(func(block *blocks.Block) {
179+
events.BlockRetainer.BlockRetained.Hook(func(block *blocks.Block) {
180180
instance.LogTrace("Retainer.BlockRetained", "block", block.ID())
181181
}).Unhook,
182182

183+
events.TransactionRetainer.TransactionRetained.Hook(func(txID iotago.TransactionID) {
184+
instance.LogTrace("Retainer.TransactionRetained", "transaction", txID)
185+
}).Unhook,
186+
183187
events.Notarization.SlotCommitted.Hook(func(details *notarization.SlotCommittedDetails) {
184188
instance.LogTrace("NotarizationManager.SlotCommitted", "commitment", details.Commitment.ID(), "acceptedBlocks count", details.AcceptedBlocks.Size(), "accepted transactions", len(details.Mutations))
185189
}).Unhook,

Diff for: pkg/protocol/engine/events.go

+20-18
Original file line numberDiff line numberDiff line change
@@ -29,23 +29,24 @@ type Events struct {
2929
AcceptedBlockProcessed *event.Event1[*blocks.Block]
3030
Evict *event.Event1[iotago.SlotIndex]
3131

32-
PreSolidFilter *presolidfilter.Events
33-
PostSolidFilter *postsolidfilter.Events
34-
BlockRequester *eventticker.Events[iotago.SlotIndex, iotago.BlockID]
35-
TipManager *tipmanager.Events
36-
BlockDAG *blockdag.Events
37-
Booker *booker.Events
38-
Clock *clock.Events
39-
BlockGadget *blockgadget.Events
40-
SlotGadget *slotgadget.Events
41-
SybilProtection *sybilprotection.Events
42-
Ledger *ledger.Events
43-
Notarization *notarization.Events
44-
SpendDAG *spenddag.Events[iotago.TransactionID, mempool.StateID]
45-
Scheduler *scheduler.Events
46-
SeatManager *seatmanager.Events
47-
SyncManager *syncmanager.Events
48-
Retainer *retainer.Events
32+
PreSolidFilter *presolidfilter.Events
33+
PostSolidFilter *postsolidfilter.Events
34+
BlockRequester *eventticker.Events[iotago.SlotIndex, iotago.BlockID]
35+
TipManager *tipmanager.Events
36+
BlockDAG *blockdag.Events
37+
Booker *booker.Events
38+
Clock *clock.Events
39+
BlockGadget *blockgadget.Events
40+
SlotGadget *slotgadget.Events
41+
SybilProtection *sybilprotection.Events
42+
Ledger *ledger.Events
43+
Notarization *notarization.Events
44+
SpendDAG *spenddag.Events[iotago.TransactionID, mempool.StateID]
45+
Scheduler *scheduler.Events
46+
SeatManager *seatmanager.Events
47+
SyncManager *syncmanager.Events
48+
BlockRetainer *retainer.BlockRetainerEvents
49+
TransactionRetainer *retainer.TransactionRetainerEvents
4950
event.Group[Events, *Events]
5051
}
5152

@@ -71,6 +72,7 @@ var NewEvents = event.CreateGroupConstructor(func() (newEvents *Events) {
7172
Scheduler: scheduler.NewEvents(),
7273
SeatManager: seatmanager.NewEvents(),
7374
SyncManager: syncmanager.NewEvents(),
74-
Retainer: retainer.NewEvents(),
75+
BlockRetainer: retainer.NewBlockRetainerEvents(),
76+
TransactionRetainer: retainer.NewTransactionRetainerEvents(),
7577
}
7678
})

Diff for: pkg/requesthandler/blockissuance.go

+41-14
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/iotaledger/iota-core/pkg/protocol/engine/blocks"
1212
"github.com/iotaledger/iota-core/pkg/protocol/engine/filter/postsolidfilter"
1313
"github.com/iotaledger/iota-core/pkg/protocol/engine/filter/presolidfilter"
14+
"github.com/iotaledger/iota-core/pkg/retainer/txretainer"
1415
iotago "github.com/iotaledger/iota.go/v4"
1516
)
1617

@@ -24,27 +25,53 @@ func (r *RequestHandler) SubmitBlockWithoutAwaitingBooking(block *model.Block) e
2425
return r.submitBlock(block)
2526
}
2627

27-
// submitBlockAndAwaitEvent submits a block to be processed and waits for the event to be triggered.
28-
func (r *RequestHandler) submitBlockAndAwaitEvent(ctx context.Context, block *model.Block, evt *event.Event1[*blocks.Block]) error {
28+
// submitBlockAndAwaitRetainer submits a block to be processed and waits for the block gets retained.
29+
func (r *RequestHandler) submitBlockAndAwaitRetainer(ctx context.Context, block *model.Block) error {
2930
filtered := make(chan error, 1)
3031
exit := make(chan struct{})
3132
defer close(exit)
3233

3334
// Make sure we don't wait forever here. If the block is not dispatched to the main engine,
3435
// it will never trigger one of the below events.
35-
processingCtx, processingCtxCancel := context.WithTimeout(ctx, 5*time.Second)
36+
processingCtx, processingCtxCancel := context.WithTimeout(ctx, 10*time.Second)
3637
defer processingCtxCancel()
3738
// Calculate the blockID so that we don't capture the block pointer in the event handlers.
3839
blockID := block.ID()
39-
evtUnhook := evt.Hook(func(eventBlock *blocks.Block) {
40-
if blockID != eventBlock.ID() {
41-
return
42-
}
43-
select {
44-
case filtered <- nil:
45-
case <-exit:
40+
41+
var successUnhook func()
42+
// Hook to TransactionAttached event if the block contains a transaction.
43+
signedTx, isTx := block.SignedTransaction()
44+
if isTx {
45+
txID := signedTx.Transaction.MustID()
46+
// Check if the transaction is already retained. The onTransactionAttached event is only triggered if it's a new transaction.
47+
// If the transaction is already retained, we hook to the BlockRetained event.
48+
_, err := r.protocol.Engines.Main.Get().TxRetainer.TransactionMetadata(txID)
49+
if ierrors.Is(err, txretainer.ErrEntryNotFound) {
50+
successUnhook = r.protocol.Events.Engine.TransactionRetainer.TransactionRetained.Hook(func(transactionID iotago.TransactionID) {
51+
if transactionID != txID {
52+
return
53+
}
54+
select {
55+
case filtered <- nil:
56+
case <-exit:
57+
}
58+
}, event.WithWorkerPool(r.workerPool)).Unhook
4659
}
47-
}, event.WithWorkerPool(r.workerPool)).Unhook
60+
}
61+
62+
// if no hook was set, hook to the block retained event.
63+
if successUnhook == nil {
64+
successUnhook = r.protocol.Events.Engine.BlockRetainer.BlockRetained.Hook(func(eventBlock *blocks.Block) {
65+
if blockID != eventBlock.ID() {
66+
return
67+
}
68+
select {
69+
case filtered <- nil:
70+
case <-exit:
71+
}
72+
}, event.WithWorkerPool(r.workerPool)).Unhook
73+
}
74+
4875
prefilteredUnhook := r.protocol.Events.Engine.PreSolidFilter.BlockPreFiltered.Hook(func(event *presolidfilter.BlockPreFilteredEvent) {
4976
if blockID != event.Block.ID() {
5077
return
@@ -65,7 +92,7 @@ func (r *RequestHandler) submitBlockAndAwaitEvent(ctx context.Context, block *mo
6592
}
6693
}, event.WithWorkerPool(r.workerPool)).Unhook
6794

68-
defer lo.BatchReverse(evtUnhook, prefilteredUnhook, postfilteredUnhook)()
95+
defer lo.BatchReverse(successUnhook, prefilteredUnhook, postfilteredUnhook)()
6996

7097
if err := r.submitBlock(block); err != nil {
7198
return ierrors.Wrapf(err, "failed to issue block %s", blockID)
@@ -82,13 +109,13 @@ func (r *RequestHandler) submitBlockAndAwaitEvent(ctx context.Context, block *mo
82109
}
83110
}
84111

85-
func (r *RequestHandler) SubmitBlockAndAwaitBooking(ctx context.Context, iotaBlock *iotago.Block) (iotago.BlockID, error) {
112+
func (r *RequestHandler) SubmitBlockAndAwaitRetainer(ctx context.Context, iotaBlock *iotago.Block) (iotago.BlockID, error) {
86113
modelBlock, err := model.BlockFromBlock(iotaBlock)
87114
if err != nil {
88115
return iotago.EmptyBlockID, ierrors.Wrap(err, "error serializing block to model block")
89116
}
90117

91-
if err = r.submitBlockAndAwaitEvent(ctx, modelBlock, r.protocol.Events.Engine.Retainer.BlockRetained); err != nil {
118+
if err = r.submitBlockAndAwaitRetainer(ctx, modelBlock); err != nil {
92119
return iotago.EmptyBlockID, ierrors.Wrap(err, "error issuing model block")
93120
}
94121

Diff for: pkg/retainer/blockretainer/block_retainer.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type (
2323
)
2424

2525
type BlockRetainer struct {
26-
events *retainer.Events
26+
events *retainer.BlockRetainerEvents
2727
store StoreFunc
2828
cache *cache
2929

@@ -39,7 +39,7 @@ type BlockRetainer struct {
3939
func New(module module.Module, workersGroup *workerpool.Group, retainerStoreFunc StoreFunc, finalizedSlotFunc FinalizedSlotFunc, errorHandler func(error)) *BlockRetainer {
4040
b := &BlockRetainer{
4141
Module: module,
42-
events: retainer.NewEvents(),
42+
events: retainer.NewBlockRetainerEvents(),
4343
workerPool: workersGroup.CreatePool("Retainer", workerpool.WithWorkerCount(1)),
4444
store: retainerStoreFunc,
4545
cache: newCache(),
@@ -99,7 +99,7 @@ func NewProvider() module.Provider[*engine.Engine, retainer.BlockRetainer] {
9999
}
100100
}, asyncOpt)
101101

102-
e.Events.Retainer.BlockRetained.LinkTo(r.events.BlockRetained)
102+
e.Events.BlockRetainer.BlockRetained.LinkTo(r.events.BlockRetained)
103103

104104
r.InitializedEvent().Trigger()
105105

Diff for: pkg/retainer/events.go

+22-6
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,35 @@ package retainer
33
import (
44
"github.com/iotaledger/hive.go/runtime/event"
55
"github.com/iotaledger/iota-core/pkg/protocol/engine/blocks"
6+
iotago "github.com/iotaledger/iota.go/v4"
67
)
78

8-
// Events is a collection of Retainer related Events.
9-
type Events struct {
9+
// BlockRetainerEvents is a collection of Retainer related BlockRetainerEvents.
10+
type BlockRetainerEvents struct {
1011
// BlockRetained is triggered when a block is stored in the retainer.
1112
BlockRetained *event.Event1[*blocks.Block]
1213

13-
event.Group[Events, *Events]
14+
event.Group[BlockRetainerEvents, *BlockRetainerEvents]
1415
}
1516

16-
// NewEvents contains the constructor of the Events object (it is generated by a generic factory).
17-
var NewEvents = event.CreateGroupConstructor(func() (newEvents *Events) {
18-
return &Events{
17+
// NewBlockRetainerEvents contains the constructor of the Events object (it is generated by a generic factory).
18+
var NewBlockRetainerEvents = event.CreateGroupConstructor(func() (newEvents *BlockRetainerEvents) {
19+
return &BlockRetainerEvents{
1920
BlockRetained: event.New1[*blocks.Block](),
2021
}
2122
})
23+
24+
// TransactionRetainerEvents is a collection of Retainer related TransactionRetainerEvents.
25+
type TransactionRetainerEvents struct {
26+
// TransactionRetained is triggered when a transaction is stored in the retainer.
27+
TransactionRetained *event.Event1[iotago.TransactionID]
28+
29+
event.Group[TransactionRetainerEvents, *TransactionRetainerEvents]
30+
}
31+
32+
// NewTransactionRetainerEvents contains the constructor of the Events object (it is generated by a generic factory).
33+
var NewTransactionRetainerEvents = event.CreateGroupConstructor(func() (newEvents *TransactionRetainerEvents) {
34+
return &TransactionRetainerEvents{
35+
TransactionRetained: event.New1[iotago.TransactionID](),
36+
}
37+
})

Diff for: pkg/retainer/txretainer/tx_retainer.go

+8
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ type (
9090

9191
// TransactionRetainer keeps and resolves all the transaction-related metadata needed in the API and INX.
9292
type TransactionRetainer struct {
93+
events *retainer.TransactionRetainerEvents
9394
workerPool *workerpool.WorkerPool
9495
txRetainerDatabase *transactionRetainerDatabase
9596
latestCommittedSlotFunc SlotFunc
@@ -113,6 +114,7 @@ func WithDebugStoreErrorMessages(store bool) options.Option[TransactionRetainer]
113114
func New(parentModule module.Module, workersGroup *workerpool.Group, dbExecFunc storage.SQLDatabaseExecFunc, latestCommittedSlotFunc SlotFunc, finalizedSlotFunc SlotFunc, errorHandler func(error), opts ...options.Option[TransactionRetainer]) *TransactionRetainer {
114115
return module.InitSimpleLifecycle(options.Apply(&TransactionRetainer{
115116
Module: parentModule.NewSubModule("TransactionRetainer"),
117+
events: retainer.NewTransactionRetainerEvents(),
116118
workerPool: workersGroup.CreatePool("TxRetainer", workerpool.WithWorkerCount(1)),
117119
txRetainerCache: NewTransactionRetainerCache(),
118120
txRetainerDatabase: NewTransactionRetainerDB(dbExecFunc),
@@ -158,6 +160,8 @@ func NewProvider(opts ...options.Option[TransactionRetainer]) module.Provider[*e
158160
// therefore we use false for the "validSignature" argument.
159161
r.UpdateTransactionMetadata(transactionMetadata.ID(), false, transactionMetadata.EarliestIncludedAttachment().Slot(), api.TransactionStatePending, nil)
160162

163+
r.events.TransactionRetained.Trigger(transactionMetadata.ID())
164+
161165
// the transaction was accepted
162166
transactionMetadata.OnAccepted(func() {
163167
e.LogTrace("TxRetainer.TransactionAccepted", "tx", transactionMetadata.ID())
@@ -242,6 +246,10 @@ func NewProvider(opts ...options.Option[TransactionRetainer]) module.Provider[*e
242246
}, asyncOpt)
243247
})
244248

249+
e.Events.TransactionRetainer.TransactionRetained.LinkTo(r.events.TransactionRetained)
250+
251+
r.InitializedEvent().Trigger()
252+
245253
return r
246254
})
247255
}

Diff for: pkg/tests/reward_test.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -373,10 +373,13 @@ func Test_Account_StakeAmountCalculation(t *testing.T) {
373373
)
374374
block7 := lo.PanicOnErr(ts.IssueBasicBlockWithOptions("block7", ts.DefaultWallet(), tx7, mock.WithStrongParents(latestParents...)))
375375

376+
latestParents = ts.CommitUntilSlot(block7_8Slot, block7.ID())
377+
376378
tx8 := ts.DefaultWallet().ClaimDelegatorRewards("TX8", "TX7:0")
377-
block8 := lo.PanicOnErr(ts.IssueBasicBlockWithOptions("block8", ts.DefaultWallet(), tx8, mock.WithStrongParents(block7.ID())))
379+
block8 := lo.PanicOnErr(ts.IssueBasicBlockWithOptions("block8", ts.DefaultWallet(), tx8, mock.WithStrongParents(latestParents...)))
378380

379-
latestParents = ts.CommitUntilSlot(block7_8Slot, block8.ID())
381+
block8Slot := ts.CurrentSlot()
382+
latestParents = ts.CommitUntilSlot(block8Slot, block8.ID())
380383

381384
// Delegated Stake should be unaffected since no new delegation was effectively added in that slot.
382385
ts.AssertAccountStake(accountID, stakedAmount, deleg1+deleg2, ts.Nodes()...)

Diff for: pkg/testsuite/mock/client.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ func (c *TestSuiteClient) Routes(_ context.Context) (*api.RoutesResponse, error)
241241
}
242242

243243
func (c *TestSuiteClient) SubmitBlock(ctx context.Context, block *iotago.Block) (iotago.BlockID, error) {
244-
return c.Node.RequestHandler.SubmitBlockAndAwaitBooking(ctx, block)
244+
return c.Node.RequestHandler.SubmitBlockAndAwaitRetainer(ctx, block)
245245
}
246246

247247
func (c *TestSuiteClient) TransactionIncludedBlock(_ context.Context, txID iotago.TransactionID) (*iotago.Block, error) {

Diff for: tools/docker-network/tests/eventapiframework.go

+6
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,13 @@ func (e *EventAPIDockerTestFramework) AssertTransactionMetadataByTransactionID(c
284284
select {
285285
case metadata := <-acceptedChan:
286286
if txID.Compare(metadata.TransactionID) == 0 {
287+
// make sure the transaction state is available from the core API
288+
resp, err := eventClt.Client.TransactionMetadata(ctx, txID)
289+
require.NoError(e.Testing, err)
290+
require.NotNil(e.Testing, resp)
291+
287292
e.finishChan <- struct{}{}
293+
288294
return
289295
}
290296

0 commit comments

Comments
 (0)